This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch release-0.12.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5c3aba37788e29a80f6b2e949318c77341e6a6e4
Author: just-JL <[email protected]>
AuthorDate: Wed Dec 7 19:45:49 2022 +0800

    [HUDI-5290] Remove the lock in HoodieFlinkWriteClient#writeTableMetadata 
(#7320)
---
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 23 +++++---------
 .../org/apache/hudi/util/FlinkWriteClients.java    | 21 ++++++++----
 .../sink/TestStreamWriteOperatorCoordinator.java   | 37 ++++++++++++++++++++++
 3 files changed, 59 insertions(+), 22 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 12081e6bf1c..33148cddb11 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -282,21 +282,14 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     if (this.metadataWriter == null) {
       initMetadataWriter();
     }
-    try {
-      // guard the metadata writer with concurrent lock
-      this.txnManager.getLockManager().lock();
-
-      // refresh the timeline
-
-      // Note: the data meta client is not refreshed currently, some code path
-      // relies on the meta client for resolving the latest data schema,
-      // the schema expects to be immutable for SQL jobs but may be not for 
non-SQL
-      // jobs.
-      this.metadataWriter.initTableMetadata();
-      this.metadataWriter.update(metadata, instantTime, 
getHoodieTable().isTableServiceAction(actionType));
-    } finally {
-      this.txnManager.getLockManager().unlock();
-    }
+    // refresh the timeline
+
+    // Note: the data meta client is not refreshed currently, some code path
+    // relies on the meta client for resolving the latest data schema,
+    // the schema expects to be immutable for SQL jobs but may be not for 
non-SQL
+    // jobs.
+    this.metadataWriter.initTableMetadata();
+    this.metadataWriter.update(metadata, instantTime, 
getHoodieTable().isTableServiceAction(actionType));
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 41712e8fb98..ab409079c19 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.config.HoodieArchivalConfig;
@@ -211,13 +212,6 @@ public class FlinkWriteClients {
                 .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
                 
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
                 .build())
-            .withLockConfig(HoodieLockConfig.newBuilder()
-                .withLockProvider(FileSystemBasedLockProvider.class)
-                .withLockWaitTimeInMillis(2000L) // 2s
-                .withFileSystemLockExpire(1) // 1 minute
-                .withClientNumRetries(30)
-                .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
-                .build())
             .withPayloadConfig(getPayloadConfig(conf))
             .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
             .withEmbeddedTimelineServerReuseEnabled(true) // make write client 
embedded timeline service singleton
@@ -226,6 +220,19 @@ public class FlinkWriteClients {
             .withProps(flinkConf2TypedProperties(conf))
             .withSchema(getSourceSchema(conf).toString());
 
+    if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
+      
builder.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL);
+      if (!conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
+        builder.withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(FileSystemBasedLockProvider.class)
+            .withLockWaitTimeInMillis(2000L) // 2s
+            .withFileSystemLockExpire(1) // 1 minute
+            .withClientNumRetries(30)
+            .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
+            .build());
+      }
+    }
+
     // do not configure cleaning strategy as LAZY until multi-writers is 
supported.
     HoodieWriteConfig writeConfig = builder.build();
     if (loadFsViewStorageConfig && 
!conf.containsKey(FileSystemViewStorageConfig.REMOTE_HOST_NAME.key())) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 070937a1ab4..f2a8a49b5eb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -336,6 +337,42 @@ public class TestStreamWriteOperatorCoordinator {
     }
   }
 
+  @Test
+  void testLockForMetadataTable() throws Exception {
+    // reset
+    reset();
+    // override the default configuration
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
+
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
"optimistic_concurrency_control");
+    conf.setInteger("hoodie.write.lock.client.num_retries", 1);
+
+    OperatorCoordinator.Context context = new 
MockOperatorCoordinatorContext(new OperatorID(), 1);
+    coordinator = new StreamWriteOperatorCoordinator(conf, context);
+    coordinator.start();
+    coordinator.setExecutor(new MockCoordinatorExecutor(context));
+
+    final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
+
+    coordinator.handleEventFromOperator(0, event0);
+
+    String instant = coordinator.getInstant();
+    assertNotEquals("", instant);
+
+    final String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
+    HoodieTableMetaClient metadataTableMetaClient = 
StreamerUtil.createMetaClient(metadataTableBasePath, 
HadoopConfigurations.getHadoopConf(conf));
+    HoodieTimeline completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
+    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(1));
+    assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
+
+    instant = mockWriteWithMetadata();
+    metadataTableMetaClient.reloadActiveTimeline();
+    completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
+    assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(2));
+    assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
is(instant));
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------

Reply via email to