mao-liu opened a new issue, #6563: URL: https://github.com/apache/paimon/issues/6563
### Search before asking - [x] I searched in the [issues](https://github.com/apache/paimon/issues) and found nothing similar. ### Motivation Current concurrency controls in Paimon rely on an external hive or jdbc metastore to provide locks to mitigate the risk of [snapshot conflicts](https://paimon.apache.org/docs/master/concepts/concurrency-control/). We would like to run multiple concurrent writers, but without needing to also needing to maintain another metastore. Taking inspiration form Hudi, we would like to propose a similar implementation that uses conditional-put semantics, and so the lock can be based on the storage system. **References**: - Hudi - [AWS S3-based lock using conditional requests](https://hudi.apache.org/docs/next/concurrency_control/#storage-with-conditional-writes-based) - [hudi.client.transaction.lock.StorageBasedLockProvider](https://github.com/apache/hudi/blob/f5f0ef6549fedf93863526a2110fe262a3460432/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java) - [hudi.aws.transaction.lock.S3StorageLockClient](https://github.com/apache/hudi/blob/master/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java) - [hudi.gcp.transaction.lock.GCSStorageLockClient](https://github.com/apache/hudi/blob/master/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java) - AWS S3 - [S3 If-None-Match header](https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html) - GCP Storage - [Conditional write example](https://docs.cloud.google.com/storage/docs/uploading-objects#storage-upload-object-client-libraries) - [Storage.BlobWriteOption.doesNotExist()](https://docs.cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.Storage.BlobWriteOption#com_google_cloud_storage_Storage_BlobWriteOption_doesNotExist__) - [Request preconditions](https://docs.cloud.google.com/storage/docs/request-preconditions) - [Storage JSON api - objects.insert](https://docs.cloud.google.com/storage/docs/json_api/v1/objects/insert) ### Solution Intended implementation approach: - New classes in [org.apache.paimon.s3](https://github.com/apache/paimon/tree/master/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3) and [org.apache.paimon.gs](https://github.com/apache/paimon/tree/master/paimon-filesystems/paimon-gs-impl/src/main/java/org/apache/paimon/gs) - Use existing methods to extract hadoop configurations (particularly for auth) - Construct new AWS / GCP storage clients, not relying on Hadoop filesystem (similar to implementation in Hudi) - New classes implement [CatalogLockFactory](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLockFactory.java#L26) - Extend [CatalogEnvironment](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java) to support adding a lock factory on top of `CatalogEnvironment::empty`, e.g. new method `.withLockFactory(CatalogLockFactory lockFactory)` To use the lock: ```kotlin import org.apache.paimon.flink.FlinkFileIO import org.apache.paimon.fs.Path import org.apache.paimon.options.Options import org.apache.paimon.schema.SchemaManager import org.apache.paimon.table.CatalogEnvironment import org.apache.paimon.table.FileStoreTableFactory // assuming an existing table for simplicity val tablePath = Path("s3a://bucket/prefix") val fileIO = FlinkFileIO(paimonPath) val writerOptions = Options() val schema = SchemaManager(fileIO, tablePath).latest().get() val emptyCatalogWithLock = CatalogEnvironment .empty() .withLockFactory(S3LockFactory()) // NEW val table = FileStoreTableFactory.create(fileIO, tablePath, schema, writerOptions, emptyCatalogWithLock) ``` **Guidance needed** - For the purpose of commit locks, the option `lock.type` does not appear to be used when we construct the table via `FileStoreTableFactory.create(...)`. - Is it necessary to extend catalogs and the catalog option `lock.type` to also support s3/gs locks? ### Anything else? _No response_ ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
