This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 84064a9 [HUDI-3772] Fixing auto adjustment of lock configs for
deltastreamer (#5207)
84064a9 is described below
commit 84064a9b081c246f306855ae125f0dae5eb8f6d0
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Apr 2 23:44:10 2022 -0700
[HUDI-3772] Fixing auto adjustment of lock configs for deltastreamer (#5207)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 79 +++++++++++++---------
.../apache/hudi/config/TestHoodieWriteConfig.java | 44 ++++++++++++
.../org/apache/hudi/HoodieStreamingSink.scala | 4 +-
.../org/apache/hudi/utilities/UtilHelpers.java | 3 +-
.../deltastreamer/HoodieDeltaStreamer.java | 9 ++-
5 files changed, 104 insertions(+), 35 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 23f1f38..87dd56b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -480,6 +480,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Control to enable release all persist rdds when the
spark job finish.");
+ public static final ConfigProperty<Boolean> AUTO_ADJUST_LOCK_CONFIGS =
ConfigProperty
+ .key("hoodie.auto.adjust.lock.configs")
+ .defaultValue(false)
+ .sinceVersion("0.11.0")
+ .withDocumentation("Auto adjust lock configurations when metadata table
is enabled and for async table services.");
+
private ConsistencyGuardConfig consistencyGuardConfig;
private FileSystemRetryConfig fileSystemRetryConfig;
@@ -1968,6 +1974,9 @@ public class HoodieWriteConfig extends HoodieConfig {
* Hoodie Client Lock Configs.
* @return
*/
+ public boolean isAutoAdjustLockConfigs() {
+ return getBooleanOrDefault(AUTO_ADJUST_LOCK_CONFIGS);
+ }
public String getLockProviderClass() {
return getString(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME);
@@ -2443,6 +2452,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withAutoAdjustLockConfigs(boolean autoAdjustLockConfigs) {
+ writeConfig.setValue(AUTO_ADJUST_LOCK_CONFIGS,
String.valueOf(autoAdjustLockConfigs));
+ return this;
+ }
+
protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE,
getDefaultMarkersType(engineType));
// Check for mandatory properties
@@ -2480,41 +2494,42 @@ public class HoodieWriteConfig extends HoodieConfig {
HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM,
String.valueOf(TimelineLayoutVersion.CURR_VERSION));
- autoAdjustConfigsForConcurrencyMode();
- }
-
- private void autoAdjustConfigsForConcurrencyMode() {
- boolean isMetadataTableEnabled =
writeConfig.getBoolean(HoodieMetadataConfig.ENABLE);
+ // isLockProviderPropertySet must be fetched before setting defaults of
HoodieLockConfig
final TypedProperties writeConfigProperties = writeConfig.getProps();
final boolean isLockProviderPropertySet =
writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)
||
writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);
-
- if (!isLockConfigSet) {
- HoodieLockConfig.Builder lockConfigBuilder =
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps());
- writeConfig.setDefault(lockConfigBuilder.build());
- }
-
- if (isMetadataTableEnabled) {
- // When metadata table is enabled, optimistic concurrency control must
be used for
- // single writer with async table services.
- // Async table services can update the metadata table and a lock
provider is
- // needed to guard against any concurrent table write operations. If
user has
- // not configured any lock provider, let's use the InProcess lock
provider.
- boolean areTableServicesEnabled =
writeConfig.areTableServicesEnabled();
- boolean areAsyncTableServicesEnabled =
writeConfig.areAnyTableServicesAsync();
-
- if (!isLockProviderPropertySet && areTableServicesEnabled &&
areAsyncTableServicesEnabled) {
- // This is targeted at Single writer with async table services
- // If user does not set the lock provider, likely that the
concurrency mode is not set either
- // Override the configs for metadata table
- writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
- WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
- writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
- InProcessLockProvider.class.getName());
- LOG.info(String.format("Automatically set %s=%s and %s=%s since user
has not set the "
- + "lock provider for single writer with async table
services",
- WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
- HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getName()));
+ writeConfig.setDefaultOnCondition(!isLockConfigSet,
+
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+
+ autoAdjustConfigsForConcurrencyMode(isLockProviderPropertySet);
+ }
+
+ private void autoAdjustConfigsForConcurrencyMode(boolean
isLockProviderPropertySet) {
+ if (writeConfig.isAutoAdjustLockConfigs()) {
+ // auto adjustment is required only for deltastreamer and spark
streaming where async table services can be executed in the same JVM.
+ boolean isMetadataTableEnabled =
writeConfig.getBoolean(HoodieMetadataConfig.ENABLE);
+
+ if (isMetadataTableEnabled) {
+ // When metadata table is enabled, optimistic concurrency control
must be used for
+ // single writer with async table services.
+ // Async table services can update the metadata table and a lock
provider is
+ // needed to guard against any concurrent table write operations. If
user has
+ // not configured any lock provider, let's use the InProcess lock
provider.
+ boolean areTableServicesEnabled =
writeConfig.areTableServicesEnabled();
+ boolean areAsyncTableServicesEnabled =
writeConfig.areAnyTableServicesAsync();
+ if (!isLockProviderPropertySet && areTableServicesEnabled &&
areAsyncTableServicesEnabled) {
+ // This is targeted at Single writer with async table services
+ // If user does not set the lock provider, likely that the
concurrency mode is not set either
+ // Override the configs for metadata table
+ writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
+ WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
+
writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
+ InProcessLockProvider.class.getName());
+ LOG.info(String.format("Automatically set %s=%s and %s=%s since
user has not set the "
+ + "lock provider for single writer with async table
services",
+ WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getName()));
+ }
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 778bef7..85d4096 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -136,6 +136,7 @@ public class TestHoodieWriteConfig {
put(INLINE_COMPACT.key(), "true");
put(AUTO_CLEAN.key(), "true");
put(ASYNC_CLEAN.key(), "false");
+ put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
@@ -148,6 +149,7 @@ public class TestHoodieWriteConfig {
put(INLINE_COMPACT.key(), "true");
put(AUTO_CLEAN.key(), "true");
put(ASYNC_CLEAN.key(), "true");
+ put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
@@ -160,6 +162,7 @@ public class TestHoodieWriteConfig {
put(INLINE_COMPACT.key(), "false");
put(AUTO_CLEAN.key(), "true");
put(ASYNC_CLEAN.key(), "false");
+ put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true,
tableType == HoodieTableType.MERGE_ON_READ,
@@ -181,6 +184,7 @@ public class TestHoodieWriteConfig {
put(INLINE_COMPACT.key(), "true");
put(AUTO_CLEAN.key(), "true");
put(ASYNC_CLEAN.key(), "false");
+ put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), Option.of(true), Option.of(false), Option.of(true),
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
@@ -190,6 +194,38 @@ public class TestHoodieWriteConfig {
@ParameterizedTest
@EnumSource(HoodieTableType.class)
+ public void testAutoAdjustLockConfigs(HoodieTableType tableType) {
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name());
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withAutoAdjustLockConfigs(false)
+ .withClusteringConfig(new
HoodieClusteringConfig.Builder().withAsyncClustering(true).build())
+ .withProperties(properties)
+ .build();
+
+ verifyConcurrencyControlRelatedConfigs(writeConfig,
+ true, true,
+ WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
+
+ writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withAutoAdjustLockConfigs(false)
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withClusteringConfig(new
HoodieClusteringConfig.Builder().withAsyncClustering(true).build())
+ .withProperties(properties)
+ .build();
+
+ verifyConcurrencyControlRelatedConfigs(writeConfig,
+ true, true,
+ WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
HoodieFailedWritesCleaningPolicy.LAZY,
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
+ }
+
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
public void
testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType tableType) {
// 1. User override for the lock provider should always take the precedence
TypedProperties properties = new TypedProperties();
@@ -199,8 +235,10 @@ public class TestHoodieWriteConfig {
.withLockConfig(HoodieLockConfig.newBuilder()
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
.build())
+ .withAutoAdjustLockConfigs(true)
.withProperties(properties)
.build();
+
verifyConcurrencyControlRelatedConfigs(writeConfig,
true, tableType == HoodieTableType.MERGE_ON_READ,
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
@@ -217,6 +255,7 @@ public class TestHoodieWriteConfig {
put(ASYNC_CLEAN.key(), "true");
put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
ZookeeperBasedLockProvider.class.getName());
+ put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true,
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
@@ -227,6 +266,7 @@ public class TestHoodieWriteConfig {
writeConfig = createWriteConfig(new HashMap<String, String>() {
{
put(HoodieTableConfig.TYPE.key(), tableType.name());
+ put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
});
if (writeConfig.areAnyTableServicesAsync()) {
@@ -252,6 +292,7 @@ public class TestHoodieWriteConfig {
{
put(HoodieTableConfig.TYPE.key(), tableType.name());
put(TABLE_SERVICES_ENABLED.key(), "false");
+ put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), false, tableType == HoodieTableType.MERGE_ON_READ,
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
@@ -268,6 +309,7 @@ public class TestHoodieWriteConfig {
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
FileSystemBasedLockProviderTestClass.class.getName());
+ put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), false, tableType == HoodieTableType.MERGE_ON_READ,
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
@@ -288,6 +330,7 @@ public class TestHoodieWriteConfig {
put(INLINE_COMPACT.key(), "true");
put(AUTO_CLEAN.key(), "true");
put(ASYNC_CLEAN.key(), "false");
+ put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true,
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
@@ -306,6 +349,7 @@ public class TestHoodieWriteConfig {
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
FileSystemBasedLockProviderTestClass.class.getName());
+ put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
HoodieFailedWritesCleaningPolicy.LAZY,
FileSystemBasedLockProviderTestClass.class.getName());
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 93580de..2befb47 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -81,7 +81,9 @@ class HoodieStreamingSink(sqlContext: SQLContext,
// Override to use direct markers. In Structured streaming, timeline
server is closed after
// first micro-batch and subsequent micro-batches do not have timeline
server running.
// Thus, we can't use timeline-server-based markers.
- val updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(),
MarkerType.DIRECT.name())
+ var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(),
MarkerType.DIRECT.name())
+ // we need auto adjustment enabled for streaming sink since async table
services are feasible within the same JVM.
+ updatedOptions =
updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
retry(retryCnt, retryIntervalMs)(
Try(
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 92e123b..5d1fd19 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -43,8 +43,8 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
-import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
+import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.schema.ChainedSchemaPostProcessor;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
@@ -272,6 +272,7 @@ public class UtilHelpers {
sparkConf.set("spark.eventLog.overwrite", "true");
sparkConf.set("spark.eventLog.enabled", "true");
}
+ sparkConf.set("spark.ui.port", "8090");
sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 2e83233..4b0f148 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -127,7 +128,6 @@ public class HoodieDeltaStreamer implements Serializable {
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs,
Configuration conf,
Option<TypedProperties> propsOverride) throws
IOException {
this.properties = combineProperties(cfg, propsOverride,
jssc.hadoopConfiguration());
-
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
InitialCheckPointProvider checkPointProvider =
UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider,
this.properties);
@@ -156,7 +156,14 @@ public class HoodieDeltaStreamer implements Serializable {
hoodieConfig.setAll(UtilHelpers.readConfig(hadoopConf, new
Path(cfg.propsFilePath), cfg.configs).getProps());
}
+ // set any configs that Deltastreamer has to override explicitly
hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA());
+ // we need auto adjustment enabled for deltastreamer since async table
services are feasible within the same JVM.
+ hoodieConfig.setValue(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(),
"true");
+ if (cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+ // Explicitly set the table type
+ hoodieConfig.setValue(HoodieTableConfig.TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
+ }
return hoodieConfig.getProps(true);
}