This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 160f43c5a20 [HUDI-6480] Flink support non-blocking concurrency control
(#9850)
160f43c5a20 is described below
commit 160f43c5a208f22d63fe309e00550bb223a8e018
Author: Jing Zhang <[email protected]>
AuthorDate: Mon Oct 16 17:35:57 2023 +0800
[HUDI-6480] Flink support non-blocking concurrency control (#9850)
* modify existing test cases 'testWriteMultiWriterInvolved' and
'testWriteMultiWriterPartialOverlapping'
* Introduce two new test cases to test complex multiple writers with
compaction
* modify method name and add more cases
* scheduling compaction now can use arbitrary instant time
---------
Co-authored-by: danny0405 <[email protected]>
---
.../apache/hudi/client/utils/TransactionUtils.java | 3 +-
.../org/apache/hudi/config/HoodieWriteConfig.java | 11 ++
.../apache/hudi/index/bucket/BucketIdentifier.java | 7 +
.../apache/hudi/client/HoodieFlinkWriteClient.java | 2 +-
.../apache/hudi/configuration/OptionsResolver.java | 14 ++
.../hudi/sink/StreamWriteOperatorCoordinator.java | 2 +-
.../sink/bucket/BucketStreamWriteFunction.java | 7 +-
.../hudi/sink/compact/HoodieFlinkCompactor.java | 15 +-
.../org/apache/hudi/sink/meta/CkpMetadata.java | 13 ++
.../java/org/apache/hudi/util/CompactionUtil.java | 29 +---
.../java/org/apache/hudi/util/StreamerUtil.java | 1 +
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 35 +++--
.../hudi/sink/TestWriteMergeOnReadWithCompact.java | 158 +++++++++++++++++++++
.../sink/compact/ITTestHoodieFlinkCompactor.java | 26 ++--
.../org/apache/hudi/sink/utils/TestWriteBase.java | 13 +-
.../org/apache/hudi/utils/TestCompactionUtil.java | 4 +-
.../test/java/org/apache/hudi/utils/TestData.java | 18 ++-
17 files changed, 279 insertions(+), 79 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index d162fe28a62..15f6be8f79a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -67,7 +67,8 @@ public class TransactionUtils {
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
boolean reloadActiveTimeline,
Set<String> pendingInstants) throws HoodieWriteConflictException {
- if
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ // Skip to resolve conflict if using non-blocking concurrency control
+ if
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() &&
!config.isNonBlockingConcurrencyControl()) {
// deal with pendingInstants
Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation =
getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(),
pendingInstants);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index dd9d53bfe89..c9e9b94b1a9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1289,6 +1289,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return getIndexType() == HoodieIndex.IndexType.BUCKET &&
getBucketIndexEngineType() ==
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING;
}
+ public boolean isSimpleBucketIndex() {
+ return HoodieIndex.IndexType.BUCKET.equals(getIndexType())
+ &&
HoodieIndex.BucketIndexEngineType.SIMPLE.equals(getBucketIndexEngineType());
+ }
+
public boolean isConsistentLogicalTimestampEnabled() {
return
getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
}
@@ -2611,6 +2616,12 @@ public class HoodieWriteConfig extends HoodieConfig {
return props.getInteger(WRITES_FILEID_ENCODING,
HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
}
+ public boolean isNonBlockingConcurrencyControl() {
+ return getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ && getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
+ && isSimpleBucketIndex();
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index 088e851fd8d..ff48a54366c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -33,6 +33,9 @@ public class BucketIdentifier implements Serializable {
// Compatible with the spark bucket name
private static final Pattern BUCKET_NAME =
Pattern.compile(".*_(\\d+)(?:\\..*)?$");
+ // Ensure the same records keys from different writers are desired to be
distributed into the same bucket
+ private static final String CONSTANT_FILE_ID_SUFFIX =
"-0000-0000-0000-000000000000";
+
public static int getBucketId(HoodieRecord record, String indexKeyFields,
int numBuckets) {
return getBucketId(record.getKey(), indexKeyFields, numBuckets);
}
@@ -99,6 +102,10 @@ public class BucketIdentifier implements Serializable {
return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId);
}
+ public static String newBucketFileIdFixedSuffix(int bucketId) {
+ return bucketIdStr(bucketId) + CONSTANT_FILE_ID_SUFFIX;
+ }
+
public static boolean isBucketFileName(String name) {
return BUCKET_NAME.matcher(name).matches();
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 8378a1db07e..e4126785b24 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -303,7 +303,7 @@ public class HoodieFlinkWriteClient<T> extends
* should be called before the Driver starts a new transaction.
*/
public void preTxn(HoodieTableMetaClient metaClient) {
- if (txnManager.isLockRequired()) {
+ if (txnManager.isLockRequired() &&
!config.isNonBlockingConcurrencyControl()) {
// refresh the meta client which is reused
metaClient.reloadActiveTimeline();
this.lastCompletedTxnAndMetadata =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 934e22f1139..18cc4a23ae4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -158,6 +158,13 @@ public class OptionsResolver {
return isBucketIndexType(conf) &&
getBucketEngineType(conf).equals(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING);
}
+ /**
+ * Returns whether the table index is simple bucket index.
+ */
+ public static boolean isSimpleBucketIndexType(Configuration conf) {
+ return isBucketIndexType(conf) &&
getBucketEngineType(conf).equals(HoodieIndex.BucketIndexEngineType.SIMPLE);
+ }
+
/**
* Returns the default plan strategy class.
*/
@@ -360,6 +367,13 @@ public class OptionsResolver {
return conf.getBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false);
}
+ /**
+ * Returns whether this is non-blocking concurrency control.
+ */
+ public static boolean isNonBlockingConcurrencyControl(Configuration config) {
+ return isMorTable(config) && isSimpleBucketIndexType(config) &&
isOptimisticConcurrencyControl(config);
+ }
+
public static boolean isLazyFailedWritesCleanPolicy(Configuration conf) {
return
conf.getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.defaultValue())
.equalsIgnoreCase(HoodieFailedWritesCleaningPolicy.LAZY.name());
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index c580d43c351..fcb2b1dc295 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -460,7 +460,7 @@ public class StreamWriteOperatorCoordinator
private void scheduleTableServices(Boolean committed) {
// if compaction is on, schedule the compaction
if (tableState.scheduleCompaction) {
- CompactionUtil.scheduleCompaction(metaClient, writeClient,
tableState.isDeltaTimeCompaction, committed);
+ CompactionUtil.scheduleCompaction(writeClient,
tableState.isDeltaTimeCompaction, committed);
}
// if clustering is on, schedule the clustering
if (tableState.scheduleClustering) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 63b9c4b3742..0cd66460c32 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -58,6 +58,8 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
private String indexKeyFields;
+ private boolean isNonBlockingConcurrencyControl;
+
/**
* BucketID to file group mapping in each partition.
* Map(partition -> Map(bucketId, fileID)).
@@ -85,6 +87,7 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
super.open(parameters);
this.bucketNum = config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
this.indexKeyFields = OptionsResolver.getIndexKeyField(config);
+ this.isNonBlockingConcurrencyControl =
OptionsResolver.isNonBlockingConcurrencyControl(config);
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
this.bucketIndex = new HashMap<>();
@@ -119,7 +122,9 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
} else if (bucketToFileId.containsKey(bucketNum)) {
location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
} else {
- String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+ String newFileId = isNonBlockingConcurrencyControl
+ ? BucketIdentifier.newBucketFileIdFixedSuffix(bucketNum)
+ : BucketIdentifier.newBucketFileIdPrefix(bucketNum);
location = new HoodieRecordLocation("I", newFileId);
bucketToFileId.put(bucketNum, newFileId);
incBucketIndex.add(bucketId);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index fec2c07f468..57e823ab21c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -221,16 +221,13 @@ public class HoodieFlinkCompactor {
// checks the compaction plan and do compaction.
if (cfg.schedule) {
- Option<String> compactionInstantTimeOption =
CompactionUtil.getCompactionInstantTime(table.getMetaClient());
- if (compactionInstantTimeOption.isPresent()) {
- boolean scheduled =
writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(),
Option.empty());
- if (!scheduled) {
- // do nothing.
- LOG.info("No compaction plan for this job ");
- return;
- }
- table.getMetaClient().reloadActiveTimeline();
+ boolean scheduled =
writeClient.scheduleCompaction(Option.empty()).isPresent();
+ if (!scheduled) {
+ // do nothing.
+ LOG.info("No compaction plan for this job ");
+ return;
}
+ table.getMetaClient().reloadActiveTimeline();
}
// fetch the instant based on the configured execution sequence
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 90636bf6ac5..6d562412bb5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -200,6 +200,19 @@ public class CkpMetadata implements Serializable,
AutoCloseable {
return this.instantCache;
}
+ @Nullable
+ @VisibleForTesting
+ public String lastCompleteInstant() {
+ load();
+ for (int i = this.messages.size() - 1; i >= 0; i--) {
+ CkpMessage ckpMsg = this.messages.get(i);
+ if (ckpMsg.isComplete()) {
+ return ckpMsg.getInstant();
+ }
+ }
+ return null;
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 3a6f7657d2a..7780204ae3d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -48,13 +48,11 @@ public class CompactionUtil {
/**
* Schedules a new compaction instant.
*
- * @param metaClient The metadata client
* @param writeClient The write client
* @param deltaTimeCompaction Whether the compaction is trigger by elapsed
delta time
* @param committed Whether the last instant was committed
successfully
*/
public static void scheduleCompaction(
- HoodieTableMetaClient metaClient,
HoodieFlinkWriteClient<?> writeClient,
boolean deltaTimeCompaction,
boolean committed) {
@@ -63,32 +61,7 @@ public class CompactionUtil {
} else if (deltaTimeCompaction) {
// if there are no new commits and the compaction trigger strategy is
based on elapsed delta time,
// schedules the compaction anyway.
- metaClient.reloadActiveTimeline();
- Option<String> compactionInstantTime =
CompactionUtil.getCompactionInstantTime(metaClient);
- if (compactionInstantTime.isPresent()) {
- writeClient.scheduleCompactionAtInstant(compactionInstantTime.get(),
Option.empty());
- }
- }
- }
-
- /**
- * Gets compaction Instant time.
- */
- public static Option<String> getCompactionInstantTime(HoodieTableMetaClient
metaClient) {
- Option<HoodieInstant> firstPendingInstant = metaClient.getCommitsTimeline()
- .filterPendingExcludingCompaction().firstInstant();
- Option<HoodieInstant> lastCompleteInstant =
metaClient.getActiveTimeline().getWriteTimeline()
- .filterCompletedAndCompactionInstants().lastInstant();
- if (firstPendingInstant.isPresent() && lastCompleteInstant.isPresent()) {
- String firstPendingTimestamp = firstPendingInstant.get().getTimestamp();
- String lastCompleteTimestamp = lastCompleteInstant.get().getTimestamp();
- // Committed and pending compaction instants should have strictly lower
timestamps
- return StreamerUtil.medianInstantTime(firstPendingTimestamp,
lastCompleteTimestamp);
- } else if (!lastCompleteInstant.isPresent()) {
- LOG.info("No instants to schedule the compaction plan");
- return Option.empty();
- } else {
- return Option.of(metaClient.createNewInstantTime());
+ writeClient.scheduleCompaction(Option.empty());
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index b4d9bd74092..077d0b52306 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -216,6 +216,7 @@ public class StreamerUtil {
properties.put(option.key(), option.defaultValue());
}
}
+ properties.put(HoodieTableConfig.TYPE.key(),
conf.getString(FlinkOptions.TABLE_TYPE));
return new TypedProperties(properties);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 90aa86cd353..da712523c16 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -28,6 +28,7 @@ import
org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.utils.TestWriteBase;
@@ -538,11 +539,24 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.assertNextEvent()
.checkpointComplete(1)
.checkWrittenData(EXPECTED3, 1);
- // step to commit the 2nd txn, should throw exception
- // for concurrent modification of same fileGroups
- pipeline1.checkpoint(1)
- .assertNextEvent()
- .checkpointCompleteThrows(1, HoodieWriteConflictException.class,
"Cannot resolve conflicts");
+ // step to commit the 2nd txn
+ validateConcurrentCommit(pipeline1);
+ }
+
+ private void validateConcurrentCommit(TestHarness pipeline) throws Exception
{
+ pipeline
+ .checkpoint(1)
+ .assertNextEvent();
+ if (OptionsResolver.isNonBlockingConcurrencyControl(conf)) {
+ // NB-CC(non-blocking concurrency control) allows concurrent
modification of the same fileGroup
+ pipeline
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED3, 1);
+ } else {
+ // normal OCC(optimistic concurrency control) should throw exception
otherwise
+ pipeline
+ .checkpointCompleteThrows(1, HoodieWriteConflictException.class,
"Cannot resolve conflicts");
+ }
}
// case2: txn2's time range has partial overlap with txn1
@@ -566,17 +580,14 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
// step to commit the 1st txn, should succeed
pipeline1.checkpoint(1)
- .assertNextEvent()
- .checkpoint(1)
.assertNextEvent()
.checkpointComplete(1)
.checkWrittenData(EXPECTED3, 1);
- // step to commit the 2nd txn, should throw exception
- // for concurrent modification of same fileGroups
- pipeline2.checkpoint(1)
- .assertNextEvent()
- .checkpointCompleteThrows(1, HoodieWriteConflictException.class,
"Cannot resolve conflicts");
+ // step to commit the 2nd txn
+ // should success for concurrent modification of same fileGroups if using
non-blocking concurrency control
+ // should throw exception otherwise
+ validateConcurrentCommit(pipeline2);
}
@Test
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index abc5679367a..79320e1549f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -18,15 +18,31 @@
package org.apache.hudi.sink;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import static org.apache.hudi.utils.TestData.insertRow;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
/**
* Test cases for delta stream write with compaction.
*/
@@ -70,6 +86,148 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
return expected;
}
+ @Test
+ public void testNonBlockingConcurrencyControlWithPartialUpdatePayload()
throws Exception {
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ conf.setString(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.BUCKET.name());
+ conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME,
PartialUpdateAvroPayload.class.getName());
+ // disable schedule compaction in writers
+ conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+ conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+ // start pipeline1 and insert record: [id1,Danny,null,1,par1], suspend the
tx commit
+ List<RowData> dataset1 = Collections.singletonList(
+ insertRow(
+ StringData.fromString("id1"), StringData.fromString("Danny"), null,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+ TestHarness pipeline1 = preparePipeline(conf)
+ .consume(dataset1)
+ .assertEmptyDataFiles();
+
+ // start pipeline2 and insert record: [id1,null,23,1,par1], suspend the tx
commit
+ Configuration conf2 = conf.clone();
+ conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+ List<RowData> dataset2 = Collections.singletonList(
+ insertRow(
+ StringData.fromString("id1"), null, 23,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+ TestHarness pipeline2 = preparePipeline(conf2)
+ .consume(dataset2)
+ .assertEmptyDataFiles();
+
+ // step to commit the 1st txn
+ pipeline1.checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1);
+
+ // step to commit the 2nd txn
+ pipeline2.checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1);
+
+ // snapshot result is [(id1,Danny,23,2,par1)] after two writers finish to
commit
+ Map<String, String> tmpSnapshotResult = Collections.singletonMap("par1",
"[id1,par1,id1,Danny,23,2,par1]");
+ pipeline2.checkWrittenData(tmpSnapshotResult, 1);
+
+ // There is no base file in partition dir because there is no compaction
yet.
+ pipeline1.assertEmptyDataFiles();
+
+ // schedule compaction outside of all writers
+ try (HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf)) {
+ Option<String> scheduleInstant =
writeClient.scheduleCompaction(Option.empty());
+ assertNotNull(scheduleInstant.get());
+ }
+
+ // step to commit the 3rd txn
+ // it also triggers inline compactor
+ List<RowData> dataset3 = Collections.singletonList(
+ insertRow(
+ StringData.fromString("id3"), StringData.fromString("Julian"), 53,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par1")));
+ pipeline1.consume(dataset3)
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2);
+
+ // snapshot read result is [(id1,Danny,23,2,par1), (id3,Julian,53,4,par1)]
after three writers finish to commit
+ Map<String, String> finalSnapshotResult = Collections.singletonMap(
+ "par1",
+ "[id1,par1,id1,Danny,23,2,par1, id3,par1,id3,Julian,53,4,par1]");
+ pipeline1.checkWrittenData(finalSnapshotResult, 1);
+ // read optimized read result is [(id1,Danny,23,2,par1)]
+ // because the data files belongs 3rd commit is not included in the last
compaction.
+ Map<String, String> readOptimizedResult = Collections.singletonMap("par1",
"[id1,par1,id1,Danny,23,2,par1]");
+ TestData.checkWrittenData(tempFile, readOptimizedResult, 1);
+ }
+
+ @Test
+ public void testNonBlockingConcurrencyControlWithInflightInstant() throws
Exception {
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+ conf.setString(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.BUCKET.name());
+ // disable schedule compaction in writers
+ conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+ conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+ // start pipeline1 and insert record: [id1,Danny,23,1,par1], suspend the
tx commit
+ List<RowData> dataset1 = Collections.singletonList(
+ insertRow(
+ StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+ TestHarness pipeline1 = preparePipeline(conf)
+ .consume(dataset1)
+ .assertEmptyDataFiles();
+
+ // start pipeline2 and insert record: [id2,Stephen,34,2,par1], suspend the
tx commit
+ Configuration conf2 = conf.clone();
+ conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+
+ List<RowData> dataset2 = Collections.singletonList(
+ insertRow(
+ StringData.fromString("id2"), StringData.fromString("Stephen"), 34,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+ TestHarness pipeline2 = preparePipeline(conf2)
+ .consume(dataset2)
+ .assertEmptyDataFiles();
+
+ // step to commit the 1st txn
+ pipeline1.checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1);
+
+ // step to flush the 2nd data, but not commit yet
+ pipeline2.checkpoint(1)
+ .assertNextEvent();
+
+ // schedule compaction outside of all writers
+ try (HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf)) {
+ Option<String> scheduleInstant =
writeClient.scheduleCompaction(Option.empty());
+ assertNotNull(scheduleInstant.get());
+ }
+
+ // step to commit the 3rd txn and insert record: [id3,Julian,53,4,par1]
+ // it also triggers inline compactor
+ List<RowData> dataset3 = Collections.singletonList(
+ insertRow(
+ StringData.fromString("id3"), StringData.fromString("Julian"), 53,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par1")));
+ pipeline1.consume(dataset3)
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2);
+
+ // snapshot read result is [(id1,Danny,23,1,par1), (id3,Julian,53,4,par1)]
after 1st writer and 3rd writer finish to commit
+ // and the data of 2nd writer is not included because it is still in
inflight state
+ Map<String, String> finalSnapshotResult = Collections.singletonMap(
+ "par1",
+ "[id1,par1,id1,Danny,23,1,par1, id3,par1,id3,Julian,53,4,par1]");
+ pipeline1.checkWrittenData(finalSnapshotResult, 1);
+ // read optimized read result is [(id1,Danny,23,1,par1)]
+ // because 2nd commit is in inflight state and
+ // the data files belongs 3rd commit is not included in the last
compaction.
+ Map<String, String> readOptimizedResult = Collections.singletonMap("par1",
"[id1,par1,id1,Danny,23,1,par1]");
+ TestData.checkWrittenData(tempFile, readOptimizedResult, 1);
+ }
+
@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index a1608674f15..7e66ab0f59e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -155,7 +155,7 @@ public class ITTestHoodieFlinkCompactor {
try (HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf)) {
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
- String compactionInstantTime =
scheduleCompactionPlan(table.getMetaClient(), writeClient);
+ String compactionInstantTime = scheduleCompactionPlan(writeClient);
// generate compaction plan
// should support configurable commit metadata
@@ -226,7 +226,7 @@ public class ITTestHoodieFlinkCompactor {
try (HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf)) {
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
- String compactionInstantTime =
scheduleCompactionPlan(table.getMetaClient(), writeClient);
+ String compactionInstantTime = scheduleCompactionPlan(writeClient);
// try to upgrade or downgrade
if (upgrade) {
@@ -337,8 +337,7 @@ public class ITTestHoodieFlinkCompactor {
HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf);
- HoodieFlinkTable<?> table = writeClient.getHoodieTable();
-
compactionInstantTimeList.add(scheduleCompactionPlan(table.getMetaClient(),
writeClient));
+ compactionInstantTimeList.add(scheduleCompactionPlan(writeClient));
// insert a new record to new partition, so that we can generate a new
compaction plan
String insertT1ForNewPartition = "insert into t1 values\n"
@@ -352,8 +351,8 @@ public class ITTestHoodieFlinkCompactor {
// the reader metadata view is not complete
writeClient = FlinkWriteClients.createWriteClient(conf);
- table = writeClient.getHoodieTable();
-
compactionInstantTimeList.add(scheduleCompactionPlan(table.getMetaClient(),
writeClient));
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+ compactionInstantTimeList.add(scheduleCompactionPlan(writeClient));
List<Pair<String, HoodieCompactionPlan>> compactionPlans = new
ArrayList<>(2);
for (String compactionInstantTime : compactionInstantTimeList) {
@@ -475,7 +474,7 @@ public class ITTestHoodieFlinkCompactor {
try (HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf)) {
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
- String compactionInstantTime =
scheduleCompactionPlan(table.getMetaClient(), writeClient);
+ String compactionInstantTime = scheduleCompactionPlan(writeClient);
// generate compaction plan
// should support configurable commit metadata
@@ -509,14 +508,9 @@ public class ITTestHoodieFlinkCompactor {
}
}
- private String scheduleCompactionPlan(HoodieTableMetaClient metaClient,
HoodieFlinkWriteClient<?> writeClient) {
- boolean scheduled = false;
- // judge whether there are any compaction operations.
- Option<String> compactionInstantTimeOption =
CompactionUtil.getCompactionInstantTime(metaClient);
- if (compactionInstantTimeOption.isPresent()) {
- scheduled =
writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(),
Option.empty());
- }
- assertTrue(scheduled, "The compaction plan should be scheduled");
- return compactionInstantTimeOption.get();
+ private String scheduleCompactionPlan(HoodieFlinkWriteClient<?> writeClient)
{
+ Option<String> compactionInstant =
writeClient.scheduleCompaction(Option.empty());
+ assertTrue(compactionInstant.isPresent(), "The compaction plan should be
scheduled");
+ return compactionInstant.get();
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index 7332a0ae61a..1b301cb3cb1 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -524,9 +524,16 @@ public class TestWriteBase {
}
protected String lastCompleteInstant() {
- return OptionsResolver.isMorTable(conf)
- ? TestUtils.getLastDeltaCompleteInstant(basePath)
- : TestUtils.getLastCompleteInstant(basePath,
HoodieTimeline.COMMIT_ACTION);
+ // If using optimistic concurrency control, fetch last complete instant
of current writer from ckp metadata
+ // because there are multiple write clients commit to the timeline.
+ if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
+ return this.ckpMetadata.lastCompleteInstant();
+ } else {
+ // fetch the instant from timeline.
+ return OptionsResolver.isMorTable(conf)
+ ? TestUtils.getLastDeltaCompleteInstant(basePath)
+ : TestUtils.getLastCompleteInstant(basePath,
HoodieTimeline.COMMIT_ACTION);
+ }
}
public TestHarness checkCompletedInstantCount(int count) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 1c3ca7c1ea1..b30d090072e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -140,7 +140,7 @@ public class TestCompactionUtil {
TestData.writeDataAsBatch(TestData.DATA_SET_SINGLE_INSERT, conf);
try (HoodieFlinkWriteClient<?> writeClient =
FlinkWriteClients.createWriteClient(conf)) {
- CompactionUtil.scheduleCompaction(metaClient, writeClient, true, true);
+ CompactionUtil.scheduleCompaction(writeClient, true, true);
Option<HoodieInstant> pendingCompactionInstant =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant();
assertTrue(pendingCompactionInstant.isPresent(), "A compaction plan
expects to be scheduled");
@@ -150,7 +150,7 @@ public class TestCompactionUtil {
TimeUnit.SECONDS.sleep(3); // in case the instant time interval is too
close
writeClient.startCommit();
- CompactionUtil.scheduleCompaction(metaClient, writeClient, true, false);
+ CompactionUtil.scheduleCompaction(writeClient, true, false);
int numCompactionCommits =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants();
assertThat("Two compaction plan expects to be scheduled",
numCompactionCommits, is(2));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 65c8e82ada1..afaf3608049 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -964,16 +964,24 @@ public class TestData {
*/
private static String filterOutVariables(GenericRecord genericRecord) {
List<String> fields = new ArrayList<>();
- fields.add(genericRecord.get("_hoodie_record_key").toString());
- fields.add(genericRecord.get("_hoodie_partition_path").toString());
- fields.add(genericRecord.get("uuid").toString());
- fields.add(genericRecord.get("name").toString());
- fields.add(genericRecord.get("age").toString());
+ fields.add(getFieldValue(genericRecord, "_hoodie_record_key"));
+ fields.add(getFieldValue(genericRecord, "_hoodie_partition_path"));
+ fields.add(getFieldValue(genericRecord, "uuid"));
+ fields.add(getFieldValue(genericRecord, "name"));
+ fields.add(getFieldValue(genericRecord, "age"));
fields.add(genericRecord.get("ts").toString());
fields.add(genericRecord.get("partition").toString());
return String.join(",", fields);
}
+ private static String getFieldValue(GenericRecord genericRecord, String
fieldName) {
+ if (genericRecord.get(fieldName) != null) {
+ return genericRecord.get(fieldName).toString();
+ } else {
+ return null;
+ }
+ }
+
public static BinaryRowData insertRow(Object... fields) {
return insertRow(TestConfigurations.ROW_TYPE, fields);
}