This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 5c3aba37788e29a80f6b2e949318c77341e6a6e4 Author: just-JL <[email protected]> AuthorDate: Wed Dec 7 19:45:49 2022 +0800 [HUDI-5290] Remove the lock in HoodieFlinkWriteClient#writeTableMetadata (#7320) --- .../apache/hudi/client/HoodieFlinkWriteClient.java | 23 +++++--------- .../org/apache/hudi/util/FlinkWriteClients.java | 21 ++++++++---- .../sink/TestStreamWriteOperatorCoordinator.java | 37 ++++++++++++++++++++++ 3 files changed, 59 insertions(+), 22 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 12081e6bf1c..33148cddb11 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -282,21 +282,14 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends if (this.metadataWriter == null) { initMetadataWriter(); } - try { - // guard the metadata writer with concurrent lock - this.txnManager.getLockManager().lock(); - - // refresh the timeline - - // Note: the data meta client is not refreshed currently, some code path - // relies on the meta client for resolving the latest data schema, - // the schema expects to be immutable for SQL jobs but may be not for non-SQL - // jobs. - this.metadataWriter.initTableMetadata(); - this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); - } finally { - this.txnManager.getLockManager().unlock(); - } + // refresh the timeline + + // Note: the data meta client is not refreshed currently, some code path + // relies on the meta client for resolving the latest data schema, + // the schema expects to be immutable for SQL jobs but may be not for non-SQL + // jobs. + this.metadataWriter.initTableMetadata(); + this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java index 41712e8fb98..ab409079c19 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.config.HoodieArchivalConfig; @@ -211,13 +212,6 @@ public class FlinkWriteClients { .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED)) .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS)) .build()) - .withLockConfig(HoodieLockConfig.newBuilder() - .withLockProvider(FileSystemBasedLockProvider.class) - .withLockWaitTimeInMillis(2000L) // 2s - .withFileSystemLockExpire(1) // 1 minute - .withClientNumRetries(30) - .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf)) - .build()) .withPayloadConfig(getPayloadConfig(conf)) .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton @@ -226,6 +220,19 @@ public class FlinkWriteClients { .withProps(flinkConf2TypedProperties(conf)) .withSchema(getSourceSchema(conf).toString()); + if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) { + builder.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL); + if (!conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) { + builder.withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProvider.class) + .withLockWaitTimeInMillis(2000L) // 2s + .withFileSystemLockExpire(1) // 1 minute + .withClientNumRetries(30) + .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf)) + .build()); + } + } + // do not configure cleaning strategy as LAZY until multi-writers is supported. HoodieWriteConfig writeConfig = builder.build(); if (loadFsViewStorageConfig && !conf.containsKey(FileSystemViewStorageConfig.REMOTE_HOST_NAME.key())) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 070937a1ab4..f2a8a49b5eb 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.metadata.HoodieTableMetadata; @@ -336,6 +337,42 @@ public class TestStreamWriteOperatorCoordinator { } } + @Test + void testLockForMetadataTable() throws Exception { + // reset + reset(); + // override the default configuration + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); + + conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), "optimistic_concurrency_control"); + conf.setInteger("hoodie.write.lock.client.num_retries", 1); + + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); + coordinator = new StreamWriteOperatorCoordinator(conf, context); + coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); + + final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); + + coordinator.handleEventFromOperator(0, event0); + + String instant = coordinator.getInstant(); + assertNotEquals("", instant); + + final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath()); + HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf)); + HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); + assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1)); + assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP)); + + instant = mockWriteWithMetadata(); + metadataTableMetaClient.reloadActiveTimeline(); + completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); + assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(2)); + assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant)); + } + // ------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------
