nsivabalan commented on code in PR #18295:
URL: https://github.com/apache/hudi/pull/18295#discussion_r2942915009


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -362,6 +387,99 @@ public static HoodieWriteConfig createMetadataWriteConfig(
     return metadataWriteConfig;
   }
 
+  /**
+   * Build the lock config for the metadata table by extracting lock-specific 
properties from the
+   * data table's write config. This avoids copying all properties (which 
would overwrite MDT-specific
+   * settings like base path and auto-clean).
+   */
+  private static HoodieLockConfig buildMetadataLockConfig(HoodieWriteConfig 
writeConfig) {
+    TypedProperties props = writeConfig.getProps();
+    HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder()
+        .withClientNumRetries(Integer.parseInt(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(),
+            HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())))
+        .withClientRetryWaitTimeInMillis(Long.parseLong(props.getString(
+            
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withLockWaitTimeInMillis(Long.valueOf(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(),
+            
String.valueOf(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.defaultValue()))))
+        .withNumRetries(Integer.parseInt(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key(),
+            HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.defaultValue())))
+        .withRetryWaitTimeInMillis(Long.parseLong(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withRetryMaxWaitTimeInMillis(Long.parseLong(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withHeartbeatIntervalInMillis(Long.valueOf(props.getString(
+            HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.key(),
+            
String.valueOf(HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.defaultValue()))));
+
+    String lockProviderClass = writeConfig.getLockProviderClass();
+    if (lockProviderClass == null) {
+      return lockConfigBuilder.build();
+    }
+
+    Properties providerProp = new Properties();
+    providerProp.setProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
lockProviderClass);
+    lockConfigBuilder.fromProperties(providerProp);
+
+    if (ZookeeperBasedLockProvider.class.getName().equals(lockProviderClass)) {
+      if (props.containsKey(HoodieLockConfig.ZK_CONNECT_URL.key())) {
+        
lockConfigBuilder.withZkQuorum(props.getString(HoodieLockConfig.ZK_CONNECT_URL.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_BASE_PATH.key())) {
+        
lockConfigBuilder.withZkBasePath(props.getString(HoodieLockConfig.ZK_BASE_PATH.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_LOCK_KEY.key())) {
+        
lockConfigBuilder.withZkLockKey(props.getString(HoodieLockConfig.ZK_LOCK_KEY.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_PORT.key())) {
+        
lockConfigBuilder.withZkPort(props.getString(HoodieLockConfig.ZK_PORT.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_SESSION_TIMEOUT_MS.key())) {
+        lockConfigBuilder.withZkSessionTimeoutInMs(
+            
Long.valueOf(props.getString(HoodieLockConfig.ZK_SESSION_TIMEOUT_MS.key())));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_CONNECTION_TIMEOUT_MS.key())) {
+        lockConfigBuilder.withZkConnectionTimeoutInMs(
+            
Long.valueOf(props.getString(HoodieLockConfig.ZK_CONNECTION_TIMEOUT_MS.key())));
+      }
+    } else if 
(FileSystemBasedLockProvider.class.getName().equals(lockProviderClass)) {
+      if (props.containsKey(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key())) {
+        
lockConfigBuilder.withFileSystemLockPath(props.getString(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key())) {
+        lockConfigBuilder.withFileSystemLockExpire(
+            
Integer.parseInt(props.getString(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key())));
+      }
+    } else if (lockProviderClass.contains("HiveMetastoreBasedLockProvider")) {
+      if (props.containsKey(HoodieLockConfig.HIVE_DATABASE_NAME.key())) {
+        
lockConfigBuilder.withHiveDatabaseName(props.getString(HoodieLockConfig.HIVE_DATABASE_NAME.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.HIVE_TABLE_NAME.key())) {
+        
lockConfigBuilder.withHiveTableName(props.getString(HoodieLockConfig.HIVE_TABLE_NAME.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.HIVE_METASTORE_URI.key())) {
+        
lockConfigBuilder.withHiveMetastoreURIs(props.getString(HoodieLockConfig.HIVE_METASTORE_URI.key()));
+      }
+    } else {
+      // For any custom lock provider, pass through all lock-prefixed 
properties
+      // so provider-specific configs are preserved.

Review Comment:
   bcoz, if a new lock provider is added tomorrow, how do we ensure this code 
is updated? 
   for eg, I already don't see dynamo db, storage based lock provider not being 
accounted here. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -81,6 +82,28 @@ public final class HoodieMetadataConfig extends HoodieConfig 
{
           + "in streaming manner rather than two disjoint writes. By default "
           + "streaming writes to metadata table is enabled for SPARK engine 
for incremental operations and disabled for all other cases.");
 
+  public static final ConfigProperty<String> METADATA_WRITE_CONCURRENCY_MODE = 
ConfigProperty
+      .key(METADATA_PREFIX + ".write.concurrency.mode")

Review Comment:
   can we add tests to TestHoodieMetadataConfig



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -81,6 +82,28 @@ public final class HoodieMetadataConfig extends HoodieConfig 
{
           + "in streaming manner rather than two disjoint writes. By default "
           + "streaming writes to metadata table is enabled for SPARK engine 
for incremental operations and disabled for all other cases.");
 
+  public static final ConfigProperty<String> METADATA_WRITE_CONCURRENCY_MODE = 
ConfigProperty
+      .key(METADATA_PREFIX + ".write.concurrency.mode")
+      .defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name())
+      .markAdvanced()
+      .withDocumentation("Change this to OPTIMISTIC_CONCURRENCY_CONTROL when 
MDT operations are being performed "
+          + "from an external concurrent writer (such as a table service 
platform) so that appropriate locks are taken.");
+
+  public static final ConfigProperty<Boolean> TABLE_SERVICE_MANAGER_ENABLED = 
ConfigProperty
+      .key(METADATA_PREFIX + ".table.service.manager.enabled")

Review Comment:
   I see why you have taken this route. 
   we are re-using TABLE_SERVICE_MANAGER_ENABLED and 
TABLE_SERVICE_MANAGER_ACTIONS from HoodieTableServiceManagerConfig. 
   
   In that case, we can retain the two configs. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -81,6 +82,28 @@ public final class HoodieMetadataConfig extends HoodieConfig 
{
           + "in streaming manner rather than two disjoint writes. By default "
           + "streaming writes to metadata table is enabled for SPARK engine 
for incremental operations and disabled for all other cases.");
 
+  public static final ConfigProperty<String> METADATA_WRITE_CONCURRENCY_MODE = 
ConfigProperty
+      .key(METADATA_PREFIX + ".write.concurrency.mode")
+      .defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name())
+      .markAdvanced()
+      .withDocumentation("Change this to OPTIMISTIC_CONCURRENCY_CONTROL when 
MDT operations are being performed "
+          + "from an external concurrent writer (such as a table service 
platform) so that appropriate locks are taken.");
+
+  public static final ConfigProperty<Boolean> TABLE_SERVICE_MANAGER_ENABLED = 
ConfigProperty
+      .key(METADATA_PREFIX + ".table.service.manager.enabled")

Review Comment:
   how about folding two configs into just 1. 
   `hoodie.metadata.skip.table.service.executions` 
   
   There won't be any default value. 
   
   value could be comma separated list of table service actions. like 
`compaction`, `logcompaction` and `clean`.
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java:
##########
@@ -279,7 +286,11 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> 
writeClient, Option<Strin
         LOG.info("Log compaction with same {} time is already present in the 
timeline.", logCompactionInstantTime);
       } else if 
(writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, 
Option.empty())) {
         LOG.info("Log compaction is scheduled for timestamp {}", 
logCompactionInstantTime);
-        writeClient.logCompact(logCompactionInstantTime, true);
+        if (tsmConfig.isEnabledAndActionSupported(ActionType.logcompaction)) {
+          LOG.info("Skipping execution of log compaction on MDT as it is 
delegated to table service manager.");
+        } else {
+          writeClient.logCompact(logCompactionInstantTime, true);
+        }

Review Comment:
   looks like we are only supporting compaction and log compactions for now. 
   can we call out in the config property documentation for the new configs 
added



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -362,6 +387,99 @@ public static HoodieWriteConfig createMetadataWriteConfig(
     return metadataWriteConfig;
   }
 
+  /**
+   * Build the lock config for the metadata table by extracting lock-specific 
properties from the
+   * data table's write config. This avoids copying all properties (which 
would overwrite MDT-specific
+   * settings like base path and auto-clean).
+   */
+  private static HoodieLockConfig buildMetadataLockConfig(HoodieWriteConfig 
writeConfig) {
+    TypedProperties props = writeConfig.getProps();
+    HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder()
+        .withClientNumRetries(Integer.parseInt(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(),
+            HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())))
+        .withClientRetryWaitTimeInMillis(Long.parseLong(props.getString(
+            
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withLockWaitTimeInMillis(Long.valueOf(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(),
+            
String.valueOf(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.defaultValue()))))
+        .withNumRetries(Integer.parseInt(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key(),
+            HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.defaultValue())))
+        .withRetryWaitTimeInMillis(Long.parseLong(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withRetryMaxWaitTimeInMillis(Long.parseLong(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withHeartbeatIntervalInMillis(Long.valueOf(props.getString(
+            HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.key(),
+            
String.valueOf(HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.defaultValue()))));
+
+    String lockProviderClass = writeConfig.getLockProviderClass();
+    if (lockProviderClass == null) {
+      return lockConfigBuilder.build();
+    }
+
+    Properties providerProp = new Properties();
+    providerProp.setProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
lockProviderClass);
+    lockConfigBuilder.fromProperties(providerProp);
+
+    if (ZookeeperBasedLockProvider.class.getName().equals(lockProviderClass)) {
+      if (props.containsKey(HoodieLockConfig.ZK_CONNECT_URL.key())) {
+        
lockConfigBuilder.withZkQuorum(props.getString(HoodieLockConfig.ZK_CONNECT_URL.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_BASE_PATH.key())) {
+        
lockConfigBuilder.withZkBasePath(props.getString(HoodieLockConfig.ZK_BASE_PATH.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_LOCK_KEY.key())) {
+        
lockConfigBuilder.withZkLockKey(props.getString(HoodieLockConfig.ZK_LOCK_KEY.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_PORT.key())) {
+        
lockConfigBuilder.withZkPort(props.getString(HoodieLockConfig.ZK_PORT.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_SESSION_TIMEOUT_MS.key())) {
+        lockConfigBuilder.withZkSessionTimeoutInMs(
+            
Long.valueOf(props.getString(HoodieLockConfig.ZK_SESSION_TIMEOUT_MS.key())));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_CONNECTION_TIMEOUT_MS.key())) {
+        lockConfigBuilder.withZkConnectionTimeoutInMs(
+            
Long.valueOf(props.getString(HoodieLockConfig.ZK_CONNECTION_TIMEOUT_MS.key())));
+      }
+    } else if 
(FileSystemBasedLockProvider.class.getName().equals(lockProviderClass)) {
+      if (props.containsKey(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key())) {
+        
lockConfigBuilder.withFileSystemLockPath(props.getString(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key())) {
+        lockConfigBuilder.withFileSystemLockExpire(
+            
Integer.parseInt(props.getString(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key())));
+      }
+    } else if (lockProviderClass.contains("HiveMetastoreBasedLockProvider")) {
+      if (props.containsKey(HoodieLockConfig.HIVE_DATABASE_NAME.key())) {
+        
lockConfigBuilder.withHiveDatabaseName(props.getString(HoodieLockConfig.HIVE_DATABASE_NAME.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.HIVE_TABLE_NAME.key())) {
+        
lockConfigBuilder.withHiveTableName(props.getString(HoodieLockConfig.HIVE_TABLE_NAME.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.HIVE_METASTORE_URI.key())) {
+        
lockConfigBuilder.withHiveMetastoreURIs(props.getString(HoodieLockConfig.HIVE_METASTORE_URI.key()));
+      }
+    } else {
+      // For any custom lock provider, pass through all lock-prefixed 
properties
+      // so provider-specific configs are preserved.

Review Comment:
   can we simplify and do this for all? Is there any case, where this may not 
work? 
   btw, how are alternate keys if any are accounted here? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -362,6 +387,99 @@ public static HoodieWriteConfig createMetadataWriteConfig(
     return metadataWriteConfig;
   }
 
+  /**
+   * Build the lock config for the metadata table by extracting lock-specific 
properties from the
+   * data table's write config. This avoids copying all properties (which 
would overwrite MDT-specific
+   * settings like base path and auto-clean).
+   */
+  private static HoodieLockConfig buildMetadataLockConfig(HoodieWriteConfig 
writeConfig) {
+    TypedProperties props = writeConfig.getProps();
+    HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder()
+        .withClientNumRetries(Integer.parseInt(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(),
+            HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())))
+        .withClientRetryWaitTimeInMillis(Long.parseLong(props.getString(
+            
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withLockWaitTimeInMillis(Long.valueOf(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(),
+            
String.valueOf(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.defaultValue()))))
+        .withNumRetries(Integer.parseInt(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key(),
+            HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.defaultValue())))
+        .withRetryWaitTimeInMillis(Long.parseLong(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withRetryMaxWaitTimeInMillis(Long.parseLong(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withHeartbeatIntervalInMillis(Long.valueOf(props.getString(
+            HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.key(),
+            
String.valueOf(HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.defaultValue()))));
+
+    String lockProviderClass = writeConfig.getLockProviderClass();
+    if (lockProviderClass == null) {

Review Comment:
   shouldn't we throw here? 
   how come data table's write config does not have any lock provider class 
set? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to