This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fca45562180 [HUDI-8480] Adding new write config for streaming writes
to metadata table and Enabling Non Blocking Concurrency Control with Metadata
(#13292)
fca45562180 is described below
commit fca45562180c232dfefbf1fdf5cb3a639987f3c3
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat May 31 07:31:40 2025 -0700
[HUDI-8480] Adding new write config for streaming writes to metadata table
and Enabling Non Blocking Concurrency Control with Metadata (#13292)
- Adding write config to support streaming writes to metadata table.
Config is named "hoodie.metadata.streaming.write.enabled".
- Enabling Non Blocking Concurrency Control with Metadata when streaming
writes are enabled
---
.../apache/hudi/client/utils/TransactionUtils.java | 2 +-
.../org/apache/hudi/config/HoodieWriteConfig.java | 44 ++++++++++++++---
.../metadata/HoodieBackedTableMetadataWriter.java | 2 +-
.../hudi/metadata/HoodieMetadataWriteUtils.java | 26 ++++++++--
.../hudi/table/upgrade/UpgradeDowngrade.java | 2 +-
.../TestConflictResolutionStrategyUtil.java | 23 ++++++---
.../hudi/client/utils/TestTransactionUtils.java | 47 ++++++++++++++++++
.../apache/hudi/config/TestHoodieWriteConfig.java | 27 +++++++++++
.../metadata/TestHoodieMetadataWriteUtils.java | 56 ++++++++++++++++++++--
.../hudi/utils/HoodieWriterClientTestHarness.java | 10 +++-
.../apache/hudi/client/HoodieFlinkWriteClient.java | 2 +-
.../hudi/client/TestJavaHoodieBackedMetadata.java | 1 +
.../TestHoodieJavaClientOnCopyOnWriteStorage.java | 5 ++
.../hudi/testutils/TestHoodieMetadataBase.java | 8 +++-
.../hudi/client/SparkRDDMetadataWriteClient.java | 6 +++
.../SparkHoodieBackedTableMetadataWriter.java | 3 +-
.../client/TestSparkRDDMetadataWriteClient.java | 4 +-
.../client/functional/TestHoodieMetadataBase.java | 4 +-
.../hudi/common/config/HoodieMetadataConfig.java | 31 ++++++++++++
.../TestColStatsRecordWithMetadataRecord.java | 3 +-
20 files changed, 275 insertions(+), 31 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index a83f1ffd6cf..8ebfb45ca08 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -72,7 +72,7 @@ public class TransactionUtils {
boolean timelineRefreshedWithinTransaction,
Set<String> pendingInstants) throws HoodieWriteConflictException {
WriteOperationType operationType =
thisCommitMetadata.map(HoodieCommitMetadata::getOperationType).orElse(null);
- if (config.needResolveWriteConflict(operationType)) {
+ if (config.needResolveWriteConflict(operationType,
table.isMetadataTable(), config, table.getMetaClient().getTableConfig())) {
// deal with pendingInstants
if (!timelineRefreshedWithinTransaction) {
table.getMetaClient().reloadActiveTimeline();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index ae34ee3337e..289863cc3a4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2746,16 +2746,24 @@ public class HoodieWriteConfig extends HoodieConfig {
return props.getInteger(WRITES_FILEID_ENCODING,
HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
}
- public boolean needResolveWriteConflict(WriteOperationType operationType) {
+ public boolean needResolveWriteConflict(WriteOperationType operationType,
boolean isMetadataTable, HoodieWriteConfig config,
+ HoodieTableConfig tableConfig) {
WriteConcurrencyMode mode = getWriteConcurrencyMode();
switch (mode) {
case SINGLE_WRITER:
return false;
case OPTIMISTIC_CONCURRENCY_CONTROL:
return true;
- case NON_BLOCKING_CONCURRENCY_CONTROL:
- // NB-CC don't need to resolve write conflict except bulk insert
operation
- return WriteOperationType.BULK_INSERT == operationType;
+ case NON_BLOCKING_CONCURRENCY_CONTROL: {
+ if (isMetadataTable) {
+ // datatable NB-CC is still evolving and might go through evolution
compared to its current state.
+ // But in case of metadata table, when streaming writes are enabled,
no two writes can conflict and hence.
+ return false;
+ } else {
+ // NB-CC don't need to resolve write conflict except bulk insert
operation
+ return WriteOperationType.BULK_INSERT == operationType;
+ }
+ }
default:
throw new IllegalArgumentException("Invalid WriteConcurrencyMode " +
mode);
}
@@ -2800,6 +2808,29 @@ public class HoodieWriteConfig extends HoodieConfig {
return metadataConfig.getSecondaryIndexParallelism();
}
+ /**
+ * Whether to enable streaming writes to metadata table or not.
+ * We have support for streaming writes only in SPARK engine (due to spark
task retries intricacies) and for table version >= 8 due to the
+ * pre-requisite of NBCC.
+ *
+ * <p>To support streaming writes, we need NBCC support for metadata table,
since there could have an ingestion and a table service from data table
+ * concurrently trying to write to metadata table.
+ *
+ * <p>In Spark, when streaming writes are enabled, incremental operations
from data table like insert, upsert, delete and table services
+ * (compaction and clustering) will take the streaming writes flow, while
all other operations (like delete_partition, insert_overwrite, etc.) go through
+ * legacy metadata write paths (since these might involve reading entire
partition and not purely rely on incremental data written).
+ *
+ * @param tableVersion {@link HoodieTableVersion} of interest.
+ * @return true if streaming writes are enabled. false otherwise.
+ */
+ public boolean isMetadataStreamingWritesEnabled(HoodieTableVersion
tableVersion) {
+ if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ return metadataConfig.isStreamingWriteEnabled();
+ } else {
+ return false;
+ }
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -3478,9 +3509,10 @@ public class HoodieWriteConfig extends HoodieConfig {
writeConcurrencyMode.name()));
}
if (writeConcurrencyMode ==
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL) {
+ boolean isMetadataTable =
HoodieTableMetadata.isMetadataTable(writeConfig.getBasePath());
checkArgument(
- writeConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)
&& writeConfig.isSimpleBucketIndex(),
- "Non-blocking concurrency control requires the MOR table with
simple bucket index");
+ writeConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)
&& (isMetadataTable || writeConfig.isSimpleBucketIndex()),
+ "Non-blocking concurrency control requires the MOR table with
simple bucket index or it has to be Metadata table");
}
HoodieCleaningPolicy cleaningPolicy =
HoodieCleaningPolicy.valueOf(writeConfig.getString(CLEANER_POLICY));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 104e0a51946..f3cae39ce82 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -191,7 +191,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
.setTimeGeneratorConfig(dataWriteConfig.getTimeGeneratorConfig()).build();
this.enabledPartitionTypes =
getEnabledPartitions(dataWriteConfig.getMetadataConfig(), dataMetaClient);
if (writeConfig.isMetadataTableEnabled()) {
- this.metadataWriteConfig = createMetadataWriteConfig(writeConfig,
failedWritesCleaningPolicy);
+ this.metadataWriteConfig = createMetadataWriteConfig(writeConfig,
failedWritesCleaningPolicy, dataMetaClient.getTableConfig().getTableVersion());
try {
initRegistry();
initialized = initializeIfNeeded(dataMetaClient,
inflightInstantTimestamp);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index d0fec373275..b2e114dfd1b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.metadata;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.FailOnFirstErrorWriteStatus;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.RecordMergeMode;
@@ -28,15 +29,18 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
@@ -84,8 +88,19 @@ public class HoodieMetadataWriteUtils {
*/
@VisibleForTesting
public static HoodieWriteConfig createMetadataWriteConfig(
- HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy) {
+ HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy,
+ HoodieTableVersion datatableVersion) {
String tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
+ boolean isStreamingWritesToMetadataEnabled =
writeConfig.isMetadataStreamingWritesEnabled(datatableVersion);
+ WriteConcurrencyMode concurrencyMode = isStreamingWritesToMetadataEnabled
+ ? WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL :
WriteConcurrencyMode.SINGLE_WRITER;
+ HoodieLockConfig lockConfig = isStreamingWritesToMetadataEnabled
+ ?
HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()
: HoodieLockConfig.newBuilder().build();
+ // HUDI-9407 tracks adding support for separate lock configuration for
MDT. Until then, all writes to MDT will happen within data table lock.
+
+ if (isStreamingWritesToMetadataEnabled) {
+ failedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
+ }
final long maxLogFileSizeBytes =
writeConfig.getMetadataConfig().getMaxLogFileSize();
// Borrow the cleaner policy from the main table and adjust the cleaner
policy based on the main table's cleaner policy
@@ -119,8 +134,8 @@ public class HoodieMetadataWriteUtils {
.withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
.build())
- .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false)
+
.withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
.withAvroSchemaValidate(false)
.withEmbeddedTimelineServerEnabled(false)
.withMarkersType(MarkerType.DIRECT.name())
@@ -166,10 +181,13 @@ public class HoodieMetadataWriteUtils {
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
.withPayloadClass(HoodieMetadataPayload.class.getCanonicalName()).build())
.withRecordMergeImplClasses(HoodieAvroRecordMerger.class.getCanonicalName())
- .withWriteRecordPositionsEnabled(false);
+ .withWriteRecordPositionsEnabled(false)
+ .withWriteConcurrencyMode(concurrencyMode)
+ .withLockConfig(lockConfig);
// RecordKey properties are needed for the metadata table records
final Properties properties = new Properties();
+ properties.put(HoodieTableConfig.TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(),
RECORD_KEY_FIELD_NAME);
properties.put("hoodie.datasource.write.recordkey.field",
RECORD_KEY_FIELD_NAME);
if (nonEmpty(writeConfig.getMetricReporterMetricsNamePrefix())) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index 27f0320da9c..da391abe78f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -139,7 +139,7 @@ public class UpgradeDowngrade {
HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
.setConf(metaClient.getStorageConf().newInstance()).setBasePath(metadataTablePath).build();
HoodieWriteConfig mdtWriteConfig =
HoodieMetadataWriteUtils.createMetadataWriteConfig(
- config, HoodieFailedWritesCleaningPolicy.EAGER);
+ config, HoodieFailedWritesCleaningPolicy.EAGER,
metaClient.getTableConfig().getTableVersion());
new UpgradeDowngrade(mdtMetaClient, mdtWriteConfig, context,
upgradeDowngradeHelper)
.run(toVersion, instantTime);
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
index 32de4ab768a..ea7ddac7aa4 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
@@ -26,6 +26,7 @@ import
org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -53,9 +54,13 @@ public class TestConflictResolutionStrategyUtil {
writeStat.setFileId("file-1");
commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
writeStat);
commitMetadata.setOperationType(WriteOperationType.INSERT);
- HoodieTestTable.of(metaClient)
- .addCommit(instantTime, Option.of(commitMetadata))
-
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ if (metaClient.getTableConfig().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ testTable = testTable.addCommit(instantTime, Option.of(commitMetadata));
+ } else {
+ testTable = testTable.addDeltaCommit(instantTime, Option.empty(),
commitMetadata);
+ }
+
testTable.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
}
public static HoodieCommitMetadata createCommitMetadata(String instantTime,
String writeFileName) {
@@ -75,9 +80,15 @@ public class TestConflictResolutionStrategyUtil {
public static void createInflightCommit(String instantTime,
HoodieTableMetaClient metaClient) throws Exception {
String fileId1 = "file-" + instantTime + "-1";
String fileId2 = "file-" + instantTime + "-2";
- HoodieTestTable.of(metaClient)
- .addInflightCommit(instantTime)
-
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
+ if (metaClient.getTableConfig().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ HoodieTestTable.of(metaClient)
+ .addInflightCommit(instantTime)
+
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
+ } else {
+ HoodieTestTable.of(metaClient)
+ .addInflightDeltaCommit(instantTime)
+
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
+ }
}
public static void createCompactionRequested(String instantTime,
HoodieTableMetaClient metaClient) throws Exception {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java
index 0dad70501a5..c243a49d599 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java
@@ -18,28 +18,35 @@
package org.apache.hudi.client.utils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.HoodieTable;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Collections;
+import java.util.Properties;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommit;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommitMetadata;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createInflightCommit;
+import static
org.apache.hudi.common.model.WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL;
import static
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -85,4 +92,44 @@ class TestTransactionUtils extends HoodieCommonTestHarness {
lastSuccessfulInstant, timelineRefreshedWithinTransaction,
Collections.singleton(newInstantTime)));
verify(spyMetaClient, times(timelineRefreshedWithinTransaction ? 0 :
1)).reloadActiveTimeline();
}
+
+ @Test
+ void resolveWriteConflictIfAnyNoExceptionForMetadataTable() throws Exception
{
+ // instantiate MOR table for metadata table.
+ metaClient = HoodieTestUtils.init(basePath + "/.hoodie/metadata/",
HoodieTableType.MERGE_ON_READ);
+
+ createCommit(HoodieTestTable.makeNewCommitTime(), metaClient);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ // consider commits before this are all successful
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+ // writer 1 starts
+ String currentWriterInstant = HoodieTestTable.makeNewCommitTime();
+ createInflightCommit(currentWriterInstant, metaClient);
+ // writer 2 starts and finishes
+ String newInstantTime = HoodieTestTable.makeNewCommitTime();
+ createCommit(newInstantTime, metaClient);
+
+ Option<HoodieInstant> currentInstant =
Option.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, currentWriterInstant));
+ HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant, "file-1");
+
+ // mimic all props for metadata table.
+ Properties props = new Properties();
+ props.setProperty(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(),
"true");
+ props.setProperty(HoodieTableConfig.TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
+
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(metaClient.getBasePath().toString())
+ .withWriteConcurrencyMode(NON_BLOCKING_CONCURRENCY_CONTROL)
+ .withProperties(props)
+ .build();
+ metaClient.reloadActiveTimeline();
+ HoodieTable table = mock(HoodieTable.class);
+ when(table.isMetadataTable()).thenReturn(true);
+ HoodieTableMetaClient spyMetaClient = spy(metaClient);
+ when(table.getMetaClient()).thenReturn(spyMetaClient);
+ Option<HoodieCommitMetadata> actualResult =
TransactionUtils.resolveWriteConflictIfAny(table, currentInstant,
Option.of(currentMetadata), writeConfig,
+ lastSuccessfulInstant, false,
Collections.singleton(newInstantTime));
+ // since we bypass entire conflict resolution
+ verify(spyMetaClient, times(0)).reloadActiveTimeline();
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 90709ce8c7d..7d2563c5f2a 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -57,6 +57,7 @@ import java.util.Set;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -725,6 +726,32 @@ public class TestHoodieWriteConfig {
assertEquals(bloomFilterType, config.getBloomFilterType());
}
+ @Test
+ public void testStreamingWritesToMetadataConfig() {
+ Properties props = new Properties();
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withProperties(props)
+ .withEngineType(EngineType.SPARK).build();
+
+
assertTrue(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.EIGHT));
+
assertFalse(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.SIX));
+
+ config = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withProperties(props)
+ .withEngineType(EngineType.FLINK).build();
+
assertFalse(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.SIX));
+
assertFalse(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.EIGHT));
+
+ config = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withProperties(props)
+ .withEngineType(EngineType.JAVA).build();
+
assertFalse(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.SIX));
+
assertFalse(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.EIGHT));
+ }
+
private HoodieWriteConfig createWriteConfig(Map<String, String> configs) {
final Properties properties = new Properties();
configs.forEach(properties::setProperty);
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
index 529d2ddfc7f..ee1fd587e04 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
@@ -18,29 +18,36 @@
package org.apache.hudi.metadata;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.Test;
+import java.util.Properties;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
public class TestHoodieMetadataWriteUtils {
@Test
public void testCreateMetadataWriteConfigForCleaner() {
HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder()
- .withPath("/tmp")
+ .withPath("/tmp/")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(5).build())
.build();
- HoodieWriteConfig metadataWriteConfig1 =
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig1,
HoodieFailedWritesCleaningPolicy.EAGER);
+ HoodieWriteConfig metadataWriteConfig1 =
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig1,
HoodieFailedWritesCleaningPolicy.EAGER,
+ HoodieTableVersion.SIX);
assertEquals(HoodieFailedWritesCleaningPolicy.EAGER,
metadataWriteConfig1.getFailedWritesCleanPolicy());
assertEquals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
metadataWriteConfig1.getCleanerPolicy());
// default value already greater than data cleaner commits retained * 1.2
@@ -55,10 +62,53 @@ public class TestHoodieMetadataWriteUtils {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(20).build())
.build();
- HoodieWriteConfig metadataWriteConfig2 =
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig2,
HoodieFailedWritesCleaningPolicy.EAGER);
+ HoodieWriteConfig metadataWriteConfig2 =
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig2,
HoodieFailedWritesCleaningPolicy.EAGER,
+ HoodieTableVersion.SIX);
assertEquals(HoodieFailedWritesCleaningPolicy.EAGER,
metadataWriteConfig2.getFailedWritesCleanPolicy());
assertEquals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
metadataWriteConfig2.getCleanerPolicy());
// data cleaner commits retained * 1.2 is greater than default
assertEquals(24, metadataWriteConfig2.getCleanerCommitsRetained());
}
+
+ @Test
+ public void testCreateMetadataWriteConfigForNBCC() {
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp/base_path/.hoodie/metadata/")
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+ .retainCommits(5).build())
+ .build();
+
+ HoodieWriteConfig metadataWriteConfig =
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig,
HoodieFailedWritesCleaningPolicy.EAGER,
+ HoodieTableVersion.EIGHT);
+ validateMetadataWriteConfig(metadataWriteConfig,
HoodieFailedWritesCleaningPolicy.LAZY,
+ WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL,
InProcessLockProvider.class.getCanonicalName());
+
+ // disable streaming writes to metadata table.
+ Properties properties = new Properties();
+ properties.put(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(),
"false");
+ writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp/base_path/.hoodie/metadata/")
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+ .retainCommits(5).build())
+ .withProperties(properties)
+ .build();
+
+ metadataWriteConfig =
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig,
HoodieFailedWritesCleaningPolicy.EAGER,
+ HoodieTableVersion.EIGHT);
+ validateMetadataWriteConfig(metadataWriteConfig,
HoodieFailedWritesCleaningPolicy.EAGER,
+ WriteConcurrencyMode.SINGLE_WRITER, null);
+ }
+
+ private void validateMetadataWriteConfig(HoodieWriteConfig
metadataWriteConfig, HoodieFailedWritesCleaningPolicy expectedPolicy,
+ WriteConcurrencyMode
expectedWriteConcurrencyMode, String expectedLockProviderClass) {
+ assertEquals(expectedPolicy,
metadataWriteConfig.getFailedWritesCleanPolicy());
+ assertEquals(expectedWriteConcurrencyMode,
metadataWriteConfig.getWriteConcurrencyMode());
+ if (expectedLockProviderClass != null) {
+ assertEquals(expectedLockProviderClass,
metadataWriteConfig.getLockProviderClass());
+ } else {
+ assertNull(metadataWriteConfig.getLockProviderClass());
+ }
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index f2513ebf2b7..9a413cedc44 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
@@ -207,6 +208,10 @@ public abstract class HoodieWriterClientTestHarness
extends HoodieCommonTestHarn
return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
}
+ protected EngineType getEngineType() {
+ return EngineType.SPARK;
+ }
+
/**
* Get Config builder with default configs set.
*
@@ -256,6 +261,7 @@ public abstract class HoodieWriterClientTestHarness extends
HoodieCommonTestHarn
if (StringUtils.nonEmpty(schemaStr)) {
builder.withSchema(schemaStr);
}
+ builder.withEngineType(getEngineType());
return builder;
}
@@ -1062,7 +1068,7 @@ public abstract class HoodieWriterClientTestHarness
extends HoodieCommonTestHarn
* @throws Exception in case of failure
*/
protected void testAutoCommit(Function3<Object, BaseHoodieWriteClient,
Object, String> writeFn,
- boolean isPrepped, boolean populateMetaFields,
InstantGenerator instantGenerator) throws Exception {
+ boolean isPrepped, boolean populateMetaFields,
InstantGenerator instantGenerator) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
try (BaseHoodieWriteClient client =
getHoodieWriteClient(cfgBuilder.build())) {
@@ -1191,7 +1197,7 @@ public abstract class HoodieWriterClientTestHarness
extends HoodieCommonTestHarn
HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withRollbackUsingMarkers(true)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).withColumnStatsIndexForColumns("driver,rider")
- .withMetadataIndexColumnStatsFileGroupCount(1).build())
+
.withMetadataIndexColumnStatsFileGroupCount(1).withEngineType(getEngineType()).build())
.withWriteTableVersion(6);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 1a88cd6ec48..3d9958d0f4d 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -312,7 +312,7 @@ public class HoodieFlinkWriteClient<T>
* should be called before the Driver starts a new transaction with a
reloaded metaclient.
*/
public void preTxn(WriteOperationType operationType, HoodieTableMetaClient
metaClient) {
- if (txnManager.isLockRequired() &&
config.needResolveWriteConflict(operationType)) {
+ if (txnManager.isLockRequired() &&
config.needResolveWriteConflict(operationType, metaClient.isMetadataTable(),
config, metaClient.getTableConfig())) {
this.lastCompletedTxnAndMetadata =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
this.pendingInflightAndRequestedInstants =
TransactionUtils.getInflightAndRequestedInstants(metaClient);
}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 68258674aec..27f319c9739 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -507,6 +507,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
.withMetadataIndexColumnStats(true)
.withMetadataIndexBloomFilterFileGroups(2)
.withMaxNumDeltaCommitsBeforeCompaction(12) // cannot restore to
before the oldest compaction on MDT as there are no base files before that time
+ .withEngineType(EngineType.JAVA)
.build())
.build();
// module com.fasterxml.jackson.datatype:jackson-datatype-jsr310 is needed
for proper column stats processing for Jackson >= 2.11
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index ac71581c42d..a8d4e5990f1 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
import
org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy;
import
org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy;
import org.apache.hudi.client.common.JavaTaskContextSupplier;
+import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -77,6 +78,10 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
);
}
+ protected EngineType getEngineType() {
+ return EngineType.JAVA;
+ }
+
@BeforeEach
public void setUpTestTable() {
testTable = HoodieMetadataTestTable.of(metaClient);
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
index 8c6b55547f2..b555b769646 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
@@ -28,6 +28,7 @@ import
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -309,15 +310,18 @@ public class TestHoodieMetadataBase extends
HoodieJavaClientTestHarness {
.enableMetrics(enableMetrics)
.ignoreSpuriousDeletes(validateMetadataPayloadConsistency)
.withMetadataIndexColumnStats(false) // HUDI-8774
+ .withEngineType(EngineType.JAVA)
.build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(enableMetrics).withReporterType(MetricsReporterType.INMEMORY.name()).build())
.withRollbackUsingMarkers(useRollbackUsingMarkers)
- .withProperties(properties);
+ .withProperties(properties)
+ .withEngineType(EngineType.JAVA);
}
protected HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig
writeConfig) {
- return HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig,
HoodieFailedWritesCleaningPolicy.LAZY);
+ return HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig,
HoodieFailedWritesCleaningPolicy.LAZY,
+ HoodieTableVersion.EIGHT);
}
protected HoodieTableMetaClient createMetaClientForMetadataTable() {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
index a94f68759fc..f3f897d3915 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -57,6 +58,11 @@ public class SparkRDDMetadataWriteClient<T> extends
SparkRDDWriteClient<T> {
super(context, writeConfig, timelineService);
}
+ @Override
+ public String createNewInstantTime() {
+ return TimelineUtils.generateInstantTime(false, timeGenerator);
+ }
+
/**
* Upserts the given prepared records into the Hoodie table, at the supplied
instantTime.
* <p>
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 3dcb60db62f..7c555c958c6 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.metadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieWriteResult;
+import org.apache.hudi.client.SparkRDDMetadataWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkMetadataWriterUtils;
@@ -247,7 +248,7 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
@Override
public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?,
JavaRDD<WriteStatus>> initializeWriteClient() {
- return new SparkRDDWriteClient(engineContext, metadataWriteConfig,
Option.empty());
+ return new SparkRDDMetadataWriteClient(engineContext, metadataWriteConfig,
Option.empty());
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
index 00cef9163eb..ddda4a422d5 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -99,7 +100,8 @@ public class TestSparkRDDMetadataWriteClient extends
HoodieClientTestBase {
initDataTableWithACommit(hoodieWriteConfig);
// fetch metadata file slice info
- HoodieWriteConfig mdtWriteConfig =
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig,
HoodieFailedWritesCleaningPolicy.EAGER);
+ HoodieWriteConfig mdtWriteConfig =
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig,
HoodieFailedWritesCleaningPolicy.EAGER,
+ HoodieTableVersion.EIGHT);
Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new
ArrayList<>();
List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 12456dfd223..bb674484f98 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -28,6 +28,7 @@ import
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -354,6 +355,7 @@ public class TestHoodieMetadataBase extends
HoodieSparkClientTestHarness {
}
protected HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig
writeConfig) {
- return HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig,
HoodieFailedWritesCleaningPolicy.LAZY);
+ return HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig,
+ HoodieFailedWritesCleaningPolicy.LAZY, HoodieTableVersion.EIGHT);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index f1a91e8227b..5de0600ccd5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -71,6 +71,15 @@ public final class HoodieMetadataConfig extends HoodieConfig
{
.sinceVersion("0.7.0")
.withDocumentation("Enable the internal metadata table which serves
table metadata like level file listings");
+ public static final ConfigProperty<Boolean> STREAMING_WRITE_ENABLED =
ConfigProperty
+ .key(METADATA_PREFIX + ".streaming.write.enabled")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Whether to enable streaming writes to metadata table
or not. With streaming writes, we execute writes to both data table and
metadata table "
+ + "in streaming manner rather than two disjoint writes. By default "
+ + "streaming writes to metadata table is enabled for SPARK engine
for incremental operations and disabled for all other cases.");
+
public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = true;
// Enable metrics for internal Metadata Table
@@ -466,6 +475,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getBoolean(ENABLE);
}
+ public boolean isStreamingWriteEnabled() {
+ return getBoolean(STREAMING_WRITE_ENABLED);
+ }
+
public boolean isBloomFilterIndexEnabled() {
return getBooleanOrDefault(ENABLE_METADATA_INDEX_BLOOM_FILTER);
}
@@ -704,6 +717,11 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return this;
}
+ public Builder withStreamingWriteEnabled(boolean enabled) {
+ metadataConfig.setValue(STREAMING_WRITE_ENABLED,
String.valueOf(enabled));
+ return this;
+ }
+
public Builder withMetadataIndexBloomFilter(boolean enable) {
metadataConfig.setValue(ENABLE_METADATA_INDEX_BLOOM_FILTER,
String.valueOf(enable));
return this;
@@ -942,6 +960,7 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS,
getDefaultColStatsEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_PARTITION_STATS,
metadataConfig.isColumnStatsIndexEnabled());
metadataConfig.setDefaultValue(SECONDARY_INDEX_ENABLE_PROP,
getDefaultSecondaryIndexEnable(engineType));
+ metadataConfig.setDefaultValue(STREAMING_WRITE_ENABLED,
getDefaultForStreamingWriteEnabled(engineType));
// fix me: disable when schema on read is enabled.
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
return metadataConfig;
@@ -959,6 +978,18 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
}
}
+ private boolean getDefaultForStreamingWriteEnabled(EngineType engineType) {
+ switch (engineType) {
+ case SPARK:
+ return true;
+ case FLINK:
+ case JAVA:
+ return false;
+ default:
+ throw new HoodieNotSupportedException("Unsupported engine " +
engineType);
+ }
+ }
+
private boolean getDefaultColStatsEnable(EngineType engineType) {
switch (engineType) {
case SPARK:
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
index 316a662cee8..8026fd24385 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
@@ -31,6 +31,7 @@ import
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Functions;
@@ -193,7 +194,7 @@ public class TestColStatsRecordWithMetadataRecord extends
HoodieSparkClientTestH
}
});
- HoodieWriteConfig mdtWriteConfig =
HoodieMetadataWriteUtils.createMetadataWriteConfig(cfg,
HoodieFailedWritesCleaningPolicy.EAGER);
+ HoodieWriteConfig mdtWriteConfig =
HoodieMetadataWriteUtils.createMetadataWriteConfig(cfg,
HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT);
HoodieTableMetaClient mdtMetaClient =
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf().newInstance()).build();
HoodieTable table = HoodieSparkTable.create(mdtWriteConfig, context,
mdtMetaClient);