mao-liu opened a new issue, #6563:
URL: https://github.com/apache/paimon/issues/6563

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   Current concurrency controls in Paimon rely on an external hive or jdbc 
metastore to provide locks to mitigate the risk of [snapshot 
conflicts](https://paimon.apache.org/docs/master/concepts/concurrency-control/).
   
   We would like to run multiple concurrent writers, but without needing to 
also needing to maintain another metastore.
   
   Taking inspiration form Hudi, we would like to propose a similar 
implementation that uses conditional-put semantics, and so the lock can be 
based on the storage system.
   
   **References**:
   - Hudi
     - [AWS S3-based lock using conditional 
requests](https://hudi.apache.org/docs/next/concurrency_control/#storage-with-conditional-writes-based)
     - 
[hudi.client.transaction.lock.StorageBasedLockProvider](https://github.com/apache/hudi/blob/f5f0ef6549fedf93863526a2110fe262a3460432/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java)
     - 
[hudi.aws.transaction.lock.S3StorageLockClient](https://github.com/apache/hudi/blob/master/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java)
     - 
[hudi.gcp.transaction.lock.GCSStorageLockClient](https://github.com/apache/hudi/blob/master/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java)
   - AWS S3
     - [S3 If-None-Match 
header](https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html)
   - GCP Storage
       - [Conditional write 
example](https://docs.cloud.google.com/storage/docs/uploading-objects#storage-upload-object-client-libraries)
       - 
[Storage.BlobWriteOption.doesNotExist()](https://docs.cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.Storage.BlobWriteOption#com_google_cloud_storage_Storage_BlobWriteOption_doesNotExist__)
       - [Request 
preconditions](https://docs.cloud.google.com/storage/docs/request-preconditions)
       - [Storage JSON api - 
objects.insert](https://docs.cloud.google.com/storage/docs/json_api/v1/objects/insert)
   
   ### Solution
   
   Intended implementation approach:
   
   - New classes in 
[org.apache.paimon.s3](https://github.com/apache/paimon/tree/master/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3)
 and 
[org.apache.paimon.gs](https://github.com/apache/paimon/tree/master/paimon-filesystems/paimon-gs-impl/src/main/java/org/apache/paimon/gs)
     - Use existing methods to extract hadoop configurations (particularly for 
auth)
     - Construct new AWS / GCP storage clients, not relying on Hadoop 
filesystem (similar to implementation in Hudi)
     - New classes implement  
[CatalogLockFactory](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockFactory.java#L26)
   - Extend 
[CatalogEnvironment](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java)
 to support adding a lock factory on top of `CatalogEnvironment::empty`, e.g. 
new method `.withLockFactory(CatalogLockFactory lockFactory)`
   
   To use the lock:
   ```kotlin
   import org.apache.paimon.flink.FlinkFileIO
   import org.apache.paimon.fs.Path
   import org.apache.paimon.options.Options
   import org.apache.paimon.schema.SchemaManager
   import org.apache.paimon.table.CatalogEnvironment
   import org.apache.paimon.table.FileStoreTableFactory
   
   // assuming an existing table for simplicity
   val tablePath = Path("s3a://bucket/prefix")
   val fileIO = FlinkFileIO(paimonPath)
   val writerOptions = Options()
   val schema = SchemaManager(fileIO, tablePath).latest().get()
   val emptyCatalogWithLock = 
       CatalogEnvironment
       .empty()
       .withLockFactory(S3LockFactory()) // NEW
   
   val table = FileStoreTableFactory.create(fileIO, tablePath, schema, 
writerOptions, emptyCatalogWithLock)
   ```
   
   **Guidance needed**
   - For the purpose of commit locks, the option `lock.type` does not appear to 
be used when we construct the table via `FileStoreTableFactory.create(...)`.
   - Is it necessary to extend catalogs and the catalog option `lock.type` to 
also support s3/gs locks?
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to