This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new def40238f42 [MINOR] Rename a conflict resolution strategy class (#9239)
def40238f42 is described below
commit def40238f421de7a701009ff8d4430721faae6b3
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Jul 21 08:31:52 2023 +0530
[MINOR] Rename a conflict resolution strategy class (#9239)
* Rename to PreferWriterConflictResolutionStrategy
---
...=> PreferWriterConflictResolutionStrategy.java} | 11 +-
...estPreferWriterConflictResolutionStrategy.java} | 28 +--
.../hudi/client/TestHoodieClientMultiWriter.java | 71 ++++----
... TestMultiWriterWithPreferWriterIngestion.java} | 11 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 199 ++++++++++-----------
5 files changed, 159 insertions(+), 161 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/IngestionPrimaryWriterBasedConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
similarity index 92%
rename from
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/IngestionPrimaryWriterBasedConflictResolutionStrategy.java
rename to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
index 1c71be9f70b..f95e7b078a6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/IngestionPrimaryWriterBasedConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,12 +40,12 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMI
/**
* This class extends the base implementation of conflict resolution strategy.
- * It gives preference to ingestion writers compared to table services.
+ * It gives preference to non-blocking ingestion over table services in case
of conflicts.
*/
-public class IngestionPrimaryWriterBasedConflictResolutionStrategy
+public class PreferWriterConflictResolutionStrategy
extends SimpleConcurrentFileWritesConflictResolutionStrategy {
- private static final Logger LOG =
LoggerFactory.getLogger(IngestionPrimaryWriterBasedConflictResolutionStrategy.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(PreferWriterConflictResolutionStrategy.class);
/**
* For tableservices like replacecommit and compaction commits this method
also returns ingestion inflight commits.
@@ -54,7 +55,7 @@ public class
IngestionPrimaryWriterBasedConflictResolutionStrategy
Option<HoodieInstant>
lastSuccessfulInstant) {
HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline();
if ((REPLACE_COMMIT_ACTION.equals(currentInstant.getAction())
- && ClusteringUtils.isClusteringCommit(metaClient, currentInstant))
+ && ClusteringUtils.isClusteringCommit(metaClient, currentInstant))
|| COMPACTION_ACTION.equals(currentInstant.getAction())) {
return getCandidateInstantsForTableServicesCommits(activeTimeline,
currentInstant);
} else {
@@ -64,7 +65,7 @@ public class
IngestionPrimaryWriterBasedConflictResolutionStrategy
private Stream<HoodieInstant>
getCandidateInstantsForNonTableServicesCommits(HoodieActiveTimeline
activeTimeline, HoodieInstant currentInstant) {
- // To findout which instants are conflicting, we apply the following logic
+ // To find out which instants are conflicting, we apply the following logic
// Get all the completed instants timeline only for commits that have
happened
// since the last successful write based on the transition times.
// We need to check for write conflicts since they may have mutated the
same files
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestIngestionPrimaryWriterBasedConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
similarity index 90%
rename from
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestIngestionPrimaryWriterBasedConflictResolutionStrategy.java
rename to
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
index 966da46690e..a2cf9103284 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestIngestionPrimaryWriterBasedConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieWriteConflictException;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -43,7 +44,7 @@ import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyU
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplaceInflight;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplaceRequested;
-public class TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends
HoodieCommonTestHarness {
+public class TestPreferWriterConflictResolutionStrategy extends
HoodieCommonTestHarness {
@BeforeEach
public void init() throws IOException {
@@ -64,7 +65,7 @@ public class
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
createCompactionRequested(newInstantTime, metaClient);
Option<HoodieInstant> currentInstant = Option.of(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
currentWriterInstant));
- IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
// writer 1 does not have a conflict with scheduled compaction plan 1
@@ -84,12 +85,12 @@ public class
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
// compaction 1 gets scheduled and finishes
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
// TODO: Remove sleep stmt once the modified times issue is fixed.
- // Sleep thread for atleast 1sec for consecutive commits that way they do
not have two commits modified times falls on the same millisecond.
+ // Sleep thread for at least 1sec for consecutive commits that way they do
not have two commits modified times falls on the same millisecond.
Thread.sleep(1000);
createCompaction(newInstantTime, metaClient);
Option<HoodieInstant> currentInstant = Option.of(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
currentWriterInstant));
- IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant);
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
@@ -121,7 +122,7 @@ public class
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
createCompactionRequested(newInstantTime, metaClient);
Option<HoodieInstant> currentInstant = Option.of(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
newInstantTime));
- IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
// TODO Create method to create compactCommitMetadata
// HoodieCommitMetadata currentMetadata =
createCommitMetadata(newInstantTime);
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
@@ -149,7 +150,7 @@ public class
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
createInflightCommit(currentWriterInstant, metaClient);
Option<HoodieInstant> currentInstant = Option.of(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
currentWriterInstant));
- IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
// writer 1 should not conflict with an earlier scheduled compaction 1
with the same file ids
@@ -174,16 +175,16 @@ public class
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
createReplaceInflight(newInstantTime, WriteOperationType.CLUSTER,
metaClient);
Option<HoodieInstant> currentInstant = Option.of(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
currentWriterInstant));
- IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
- // Since we give preference to ingestion over clustering, there wont be a
conflict with replacecommit.
+ // Since we give preference to ingestion over clustering, there won't be a
conflict with replacecommit.
Assertions.assertEquals(0, candidateInstants.size());
}
/**
* This method confirms ingestion commit failing due to already present
replacecommit.
- * Here the replacecommit is allowed to commit. Ideally replacecommit cannot
be committed when there is a ingestion inflight.
+ * Here the replacecommit is allowed to commit. Ideally replacecommit cannot
be committed when there is an ingestion inflight.
* The following case can occur, during transition phase of ingestion commit
from Requested to Inflight,
* during that time replacecommit can be completed.
*/
@@ -197,14 +198,14 @@ public class
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
createInflightCommit(currentWriterInstant, metaClient);
// TODO: Remove sleep stmt once the modified times issue is fixed.
- // Sleep thread for atleast 1sec for consecutive commits that way they do
not have two commits modified times falls on the same millisecond.
+ // Sleep thread for at least 1sec for consecutive commits that way they do
not have two commits modified times falls on the same millisecond.
Thread.sleep(1000);
// clustering writer starts and complete before ingestion commit.
String replaceWriterInstant = HoodieActiveTimeline.createNewInstantTime();
createReplace(replaceWriterInstant, WriteOperationType.CLUSTER,
metaClient);
Option<HoodieInstant> currentInstant = Option.of(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
currentWriterInstant));
- IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
metaClient.reloadActiveTimeline();
List<HoodieInstant> candidateInstants = strategy
.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant)
@@ -233,14 +234,14 @@ public class
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
createInflightCommit(currentWriterInstant, metaClient);
// TODO: Remove sleep stmt once the modified times issue is fixed.
- // Sleep thread for atleast 1sec for consecutive commits that way they do
not have two commits modified times falls on the same millisecond.
+ // Sleep thread for at least 1sec for consecutive commits that way they do
not have two commits modified times falls on the same millisecond.
Thread.sleep(1000);
// replace 1 gets scheduled and finished
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
createReplace(newInstantTime, WriteOperationType.INSERT_OVERWRITE,
metaClient);
Option<HoodieInstant> currentInstant = Option.of(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
currentWriterInstant));
- IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+ PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant);
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
@@ -256,5 +257,4 @@ public class
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
// expected
}
}
-
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 0984f9436bf..c2de0595dc7 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -19,7 +19,7 @@
package org.apache.hudi.client;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
-import
org.apache.hudi.client.transaction.IngestionPrimaryWriterBasedConflictResolutionStrategy;
+import
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
@@ -143,12 +143,12 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
private static final List<ConflictResolutionStrategy>
CONFLICT_RESOLUTION_STRATEGY_CLASSES = Arrays.asList(
new SimpleConcurrentFileWritesConflictResolutionStrategy(),
- new IngestionPrimaryWriterBasedConflictResolutionStrategy());
+ new PreferWriterConflictResolutionStrategy());
private static Iterable<Object[]>
providerClassResolutionStrategyAndTableType() {
List<Object[]> opts = new ArrayList<>();
for (Object providerClass : LOCK_PROVIDER_CLASSES) {
- for (ConflictResolutionStrategy resolutionStrategy:
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
+ for (ConflictResolutionStrategy resolutionStrategy :
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass,
resolutionStrategy});
opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass,
resolutionStrategy});
}
@@ -158,19 +158,20 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
/**
* Test multi-writers with early conflict detect enable, including
- * 1. MOR + Direct marker
- * 2. COW + Direct marker
- * 3. MOR + Timeline server based marker
- * 4. COW + Timeline server based marker
- *
- * |---------------------- 003 heartBeat
expired -------------------|
- *
- *
---|---------|--------------------|--------------------------------------|-------------------------|------------------------->
time
+ * 1. MOR + Direct marker
+ * 2. COW + Direct marker
+ * 3. MOR + Timeline server based marker
+ * 4. COW + Timeline server based marker
+ * <p>
+ * |---------------------- 003 heartBeat expired -------------------|
+ * <p>
+ *
---|---------|--------------------|--------------------------------------|-------------------------|------------------------->
time
* init 001
- * 002 start writing
- * 003 start which has conflict with 002
- * and failed soon
- *
002 commit successfully 004 write successfully
+ * 002 start writing
+ * 003 start which has conflict with 002
+ * and failed soon
+ * 002 commit successfully 004 write successfully
+ *
* @param tableType
* @param markerType
* @throws Exception
@@ -527,7 +528,7 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
String pendingCompactionTime = (tableType == HoodieTableType.MERGE_ON_READ)
? metaClient.reloadActiveTimeline().filterPendingCompactionTimeline()
- .firstInstant().get().getTimestamp()
+ .firstInstant().get().getTimestamp()
: "";
Option<HoodieInstant> pendingCleanInstantOp =
metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflightsAndRequested()
.firstInstant();
@@ -551,7 +552,7 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
latchCountDownAndWait(runCountDownLatch, 30000);
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertDoesNotThrow(() -> {
- HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
client2.compact(pendingCompactionTime);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
client2.compact(pendingCompactionTime);
client2.commitCompaction(pendingCompactionTime,
compactionMetadata.getCommitMetadata().get(), Option.empty());
validInstants.add(pendingCompactionTime);
});
@@ -587,22 +588,22 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
}
// Disabling embedded timeline server, it doesn't work with multiwriter
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
- .withCleanConfig(HoodieCleanConfig.newBuilder()
- .withAutoClean(false)
- .withAsyncClean(true)
-
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withInlineCompaction(false)
- .withMaxNumDeltaCommitsBeforeCompaction(2).build())
- .withEmbeddedTimelineServerEnabled(false)
- // Timeline-server-based markers are not used for multi-writer
tests
- .withMarkersType(MarkerType.DIRECT.name())
-
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
- FileSystemViewStorageType.MEMORY).build())
-
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
- // Set the config so that heartbeat will expire in 1 second
without update
-
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
-
.build()).withAutoCommit(false).withProperties(lockProperties);
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+ .withAsyncClean(true)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ // Timeline-server-based markers are not used for multi-writer tests
+ .withMarkersType(MarkerType.DIRECT.name())
+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
+ FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ // Set the config so that heartbeat will expire in 1 second without
update
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+ .build()).withAutoCommit(false).withProperties(lockProperties);
Set<String> validInstants = new HashSet<>();
// Create the first commit with inserts
HoodieWriteConfig cfg = writeConfigBuilder.build();
@@ -855,8 +856,8 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
}
private JavaRDD<WriteStatus> createCommitWithInserts(HoodieWriteConfig cfg,
SparkRDDWriteClient client,
- String prevCommitTime, String
newCommitTime, int numRecords,
- boolean doCommit) throws Exception {
+ String prevCommitTime,
String newCommitTime, int numRecords,
+ boolean doCommit)
throws Exception {
// Finish first base commit
JavaRDD<WriteStatus> result = insertFirstBatch(cfg, client, newCommitTime,
prevCommitTime, numRecords, SparkRDDWriteClient::bulkInsert,
false, false, numRecords);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiwriterWithIngestionAsPrimaryWriter.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
similarity index 96%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiwriterWithIngestionAsPrimaryWriter.java
rename to
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
index 6d2e2182106..59547cd5b63 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiwriterWithIngestionAsPrimaryWriter.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
@@ -18,9 +18,8 @@
package org.apache.hudi.client;
-import org.apache.hadoop.fs.Path;
+import
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
-import
org.apache.hudi.client.transaction.IngestionPrimaryWriterBasedConflictResolutionStrategy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
@@ -41,6 +40,8 @@ import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -61,7 +62,7 @@ import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PA
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class TestMultiwriterWithIngestionAsPrimaryWriter extends
HoodieClientTestBase {
+public class TestMultiWriterWithPreferWriterIngestion extends
HoodieClientTestBase {
public void setUpMORTestTable() throws IOException {
cleanupResources();
@@ -102,7 +103,7 @@ public class TestMultiwriterWithIngestionAsPrimaryWriter
extends HoodieClientTes
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
- .withConflictResolutionStrategy(new
IngestionPrimaryWriterBasedConflictResolutionStrategy())
+ .withConflictResolutionStrategy(new
PreferWriterConflictResolutionStrategy())
.build()).withAutoCommit(false).withProperties(properties).build();
Set<String> validInstants = new HashSet<>();
// Create the first commit with inserts
@@ -207,7 +208,7 @@ public class TestMultiwriterWithIngestionAsPrimaryWriter
extends HoodieClientTes
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
- .withConflictResolutionStrategy(new
IngestionPrimaryWriterBasedConflictResolutionStrategy())
+ .withConflictResolutionStrategy(new
PreferWriterConflictResolutionStrategy())
.build()).withAutoCommit(false).withProperties(properties).build();
// Create the first commit
String instant1 = HoodieActiveTimeline.createNewInstantTime();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 75023f98b5a..946c4115ab9 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -30,7 +30,7 @@ import
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanSt
import
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
import
org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy;
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
-import
org.apache.hudi.client.transaction.IngestionPrimaryWriterBasedConflictResolutionStrategy;
+import
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
@@ -135,8 +135,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -191,7 +189,7 @@ import static org.mockito.Mockito.when;
@Tag("functional")
public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase
{
- private static final Logger LOG =
LoggerFactory.getLogger(TestHoodieClientOnCopyOnWriteStorage.class);
+ private static final String CLUSTERING_FAILURE = "CLUSTERING FAILURE";
private static final Map<String, String> STRATEGY_PARAMS = new
HashMap<String, String>() {
{
put("sortColumn", "record_key");
@@ -289,13 +287,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
/**
- * Test auto-commit by applying write function.
+ * Test auto-commit by applying the write function.
*
* @param writeFn One of HoodieWriteClient Write API
* @throws Exception in case of failure
*/
private void testAutoCommit(Function3<JavaRDD<WriteStatus>,
SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
- boolean isPrepped, boolean populateMetaFields) throws Exception {
+ boolean isPrepped, boolean populateMetaFields)
throws Exception {
// Set autoCommit false
HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder().withAutoCommit(false);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
@@ -405,7 +403,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
assertTrue(testTable.inflightCommitExists(instant1));
assertTrue(testTable.commitExists(instant2));
}
-
+
private void insertWithConfig(HoodieWriteConfig config, int numRecords,
String instant) throws Exception {
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> writeFn = (writeClient, recordRDD, instantTime)
->
@@ -513,7 +511,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
List<WriteStatus> statuses = writeFn.apply(client, recordList,
newCommitTime).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size());
-
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
+
assertNoDuplicatesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
.collect(Collectors.toList()));
}
}
@@ -566,7 +564,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
List<WriteStatus> statuses = writeFn.apply(client, recordList,
newCommitTime).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size());
-
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
+
assertNoDuplicatesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
.collect(Collectors.toList()));
}
}
@@ -576,7 +574,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
*
* @param recordDelegates List of Hoodie record delegates
*/
- void assertNodupesInPartition(List<HoodieRecordDelegate> recordDelegates) {
+ void assertNoDuplicatesInPartition(List<HoodieRecordDelegate>
recordDelegates) {
Map<String, Set<String>> partitionToKeys = new HashMap<>();
for (HoodieRecordDelegate r : recordDelegates) {
String recordKey = r.getRecordKey();
@@ -614,22 +612,22 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
/**
* Test one of HoodieWriteClient upsert(Prepped) APIs.
*
- * @param config Write Config
+ * @param config Write Config
* @param writeFn One of Hoodie Write Function API
* @throws Exception in case of error
*/
private void testUpsertsInternal(HoodieWriteConfig config,
- Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
+ Function3<JavaRDD<WriteStatus>,
SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
throws Exception {
// Force using older timeline layout
HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.withRollbackUsingMarkers(true)
.withProps(config.getProps()).withTimelineLayoutVersion(
- VERSION_0).build();
+ VERSION_0).build();
HoodieTableMetaClient.withPropertyBuilder()
- .fromMetaClient(metaClient)
- .setTimelineLayoutVersion(VERSION_0)
+ .fromMetaClient(metaClient)
+ .setTimelineLayoutVersion(VERSION_0)
.setPopulateMetaFields(config.populateMetaFields())
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
@@ -664,7 +662,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
TimelineLayoutVersion.CURR_VERSION).build();
client = getHoodieWriteClient(newConfig);
- client.savepoint("004", "user1","comment1");
+ client.savepoint("004", "user1", "comment1");
client.restoreToInstant("004", config.isMetadataTableEnabled());
@@ -714,7 +712,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
jsc.parallelize(Arrays.asList(1)).map(e -> {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(
-
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(),
+
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(),
HoodieCommitMetadata.class);
String filePath =
commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(w -> w.stream()).filter(s ->
s.getPath().endsWith(extension)).findAny()
@@ -801,7 +799,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
0, 150, config.populateMetaFields());
HoodieWriteConfig newConfig =
getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
- TimelineLayoutVersion.CURR_VERSION)
+ TimelineLayoutVersion.CURR_VERSION)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().withArchiveBeyondSavepoint(true).build()).build();
client = getHoodieWriteClient(newConfig);
@@ -833,7 +831,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
public void testInsertsPreppedWithHoodieConcatHandle(boolean
populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
- testHoodieConcatHandle(cfgBuilder.build(), true);
+ testHoodieConcatHandle(cfgBuilder.build(), true);
}
/**
@@ -849,9 +847,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
.withProps(config.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion(
VERSION_0).build();
HoodieTableMetaClient.withPropertyBuilder()
- .fromMetaClient(metaClient)
- .setTimelineLayoutVersion(VERSION_0)
- .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
+ .fromMetaClient(metaClient)
+ .setTimelineLayoutVersion(VERSION_0)
+ .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
@@ -931,7 +929,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 100);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 10);
- BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new
RDDCustomColumnsSortPartitioner(new String[]{"rider"},
HoodieTestDataGenerator.AVRO_SCHEMA, config);
+ BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new
RDDCustomColumnsSortPartitioner(new String[] {"rider"},
HoodieTestDataGenerator.AVRO_SCHEMA, config);
List<WriteStatus> statuses = client.bulkInsert(insertRecordsRDD1,
commitTime1, Option.of(partitioner)).collect();
assertNoWriteErrors(statuses);
}
@@ -951,7 +949,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
assertNoWriteErrors(statuses);
// inject a pending restore
- client.savepoint("001", "user1","comment1");
+ client.savepoint("001", "user1", "comment1");
client.restoreToInstant("001", false);
// remove completed restore instant from timeline to mimic pending
restore.
@@ -1035,6 +1033,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
/**
* When records getting inserted are deleted in the same write batch, hudi
should have deleted those records and
* not be available in read path.
+ *
* @throws Exception
*/
@ParameterizedTest
@@ -1099,7 +1098,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
private void
assertActualAndExpectedPartitionPathRecordKeyMatches(List<Pair<String, String>>
expectedPartitionPathRecKeyPairs,
- List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
+
List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
// verify all partitionpath, record key matches
assertEquals(expectedPartitionPathRecKeyPairs.size(),
actualPartitionPathRecKeyPairs.size());
for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
@@ -1146,7 +1145,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
String commitTime2 = "002";
List<List<FileSlice>> firstInsertFileSlicesList =
table.getFileSystemView().getAllFileGroups(testPartitionPath)
.map(fileGroup ->
fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList());
- List<FileSlice>[] fileSlices =
(List<FileSlice>[])firstInsertFileSlicesList.toArray(new
List[firstInsertFileSlicesList.size()]);
+ List<FileSlice>[] fileSlices = (List<FileSlice>[])
firstInsertFileSlicesList.toArray(new List[firstInsertFileSlicesList.size()]);
createRequestedReplaceInstant(this.metaClient, commitTime2, fileSlices);
// 3. insert one record with no updating reject exception, and not merge
the small file, just generate a new file group
@@ -1164,7 +1163,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
String assertMsg = String.format("Not allowed to update the clustering
files in partition: %s "
+ "For pending clustering operations, we are not going to support
update for now.", testPartitionPath);
assertThrows(HoodieUpsertException.class, () -> {
- client.upsert(jsc.parallelize(insertsAndUpdates3, 1),
commitTime4).collect(); }, assertMsg);
+ client.upsert(jsc.parallelize(insertsAndUpdates3, 1),
commitTime4).collect();
+ }, assertMsg);
// 5. insert one record with no updating reject exception, will merge the
small file
String commitTime5 = "005";
@@ -1479,7 +1479,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
List<String> filesFromReplaceCommit = new ArrayList<>();
replaceCommitMetadata.getPartitionToWriteStats()
- .forEach((k,v) -> v.forEach(entry ->
filesFromReplaceCommit.add(entry.getPath())));
+ .forEach((k, v) -> v.forEach(entry ->
filesFromReplaceCommit.add(entry.getPath())));
// find all parquet files created as part of clustering. Verify it
matches w/ what is found in replace commit metadata.
FileStatus[] fileStatuses = fs.listStatus(new Path(basePath + "/" +
partitionPath));
@@ -1490,7 +1490,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
@Test
- public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline()
throws Exception {
+ public void testRollbackOfRegularCommitWithPendingReplaceCommitInTimeline()
throws Exception {
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
.build();
@@ -1728,13 +1728,13 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
}
}
- private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig
clusteringConfig, boolean populateMetaFields, boolean completeClustering)
throws Exception {
+ private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig
clusteringConfig, boolean populateMetaFields, boolean completeClustering)
throws Exception {
return testInsertAndClustering(clusteringConfig, populateMetaFields,
completeClustering, false, "", "", "");
}
-
+
private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig
clusteringConfig, boolean populateMetaFields,
- boolean completeClustering, boolean
assertSameFileIds, String validatorClasses,
- String sqlQueryForEqualityValidation,
String sqlQueryForSingleResultValidation) throws Exception {
+ boolean
completeClustering, boolean assertSameFileIds, String validatorClasses,
+ String
sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws
Exception {
Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>>
allRecords = testInsertTwoBatches(populateMetaFields);
testClustering(clusteringConfig, populateMetaFields, completeClustering,
assertSameFileIds, validatorClasses, sqlQueryForEqualityValidation,
sqlQueryForSingleResultValidation, allRecords);
return allRecords.getLeft().getLeft();
@@ -1805,7 +1805,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
assertEquals(0, fileIdIntersection.size());
return Pair.of(Pair.of(Stream.concat(records1.stream(),
records2.stream()).collect(Collectors.toList()), Arrays.asList(commitTime1,
commitTime2)), fileIdsUnion);
}
-
+
private void testClustering(HoodieClusteringConfig clusteringConfig, boolean
populateMetaFields, boolean completeClustering, boolean assertSameFileIds,
String validatorClasses, String
sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation,
Pair<Pair<List<HoodieRecord>, List<String>>,
Set<HoodieFileGroupId>> allRecords) throws IOException {
@@ -1817,7 +1817,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
performClustering(clusteringConfig, populateMetaFields,
completeClustering, validatorClasses, sqlQueryForEqualityValidation,
sqlQueryForSingleResultValidation, allRecords.getLeft());
if (assertSameFileIds) {
Set<HoodieFileGroupId> replacedFileIds =
clusterMetadata.getWriteStats().get().stream()
- .map(s -> new
HoodieFileGroupId(s.getPartitionPath(),s.getFileId())).collect(Collectors.toSet());
+ .map(s -> new HoodieFileGroupId(s.getPartitionPath(),
s.getFileId())).collect(Collectors.toSet());
Set<HoodieFileGroupId> insertedFileIds = allRecords.getRight();
assertEquals(insertedFileIds, replacedFileIds);
}
@@ -1827,7 +1827,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
verifyRecordsWritten(clusteringCommitTime, populateMetaFields,
allRecords.getLeft().getLeft(), clusterMetadata.getWriteStatuses().collect(),
config);
}
}
-
+
private HoodieWriteMetadata<JavaRDD<WriteStatus>>
performClustering(HoodieClusteringConfig clusteringConfig,
boolean
populateMetaFields,
boolean
completeClustering,
@@ -1839,7 +1839,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
.withPrecommitValidatorEqualitySqlQueries(sqlQueryForEqualityValidation)
.withPrecommitValidatorSingleResultSqlQueries(sqlQueryForSingleResultValidation)
.build();
-
+
HoodieWriteConfig config = getConfigBuilder().withAutoCommit(false)
.withPreCommitValidatorConfig(validatorConfig)
.withProps(populateMetaFields ? new Properties() :
getPropertiesForKeyGen())
@@ -1894,10 +1894,10 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
}
/**
- * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records.
- * 2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of
records.
- *
- * Verify that all records in step1 are overwritten
+ * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records.
+ * 2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of
records.
+ * <p>
+ * Verify that all records in step1 are overwritten
*/
private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount,
int batch2RecordsCount, boolean populateMetaFields) throws Exception {
final String testPartitionPath = "americas";
@@ -1932,7 +1932,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
/**
- * Test scenario of writing fewer file groups for first partition than
second an third partition.
+ * Test scenario of writing fewer file groups for first partition than
second and third partition.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
@@ -1950,7 +1950,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
/**
- * Test scenario of writing more file groups for first partition than second
an third partition.
+ * Test scenario of writing more file groups for first partition than second
and third partition.
*/
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
@@ -1979,12 +1979,11 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
}
/**
- * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for
first partition.
- * 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for
second partition.
- * 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for
third partition.
- * 4) delete first partition and check result.
- * 5) delete second and third partition and check result.
- *
+ * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for
first partition.
+ * 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for
second partition.
+ * 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for
third partition.
+ * 4) delete first partition and check result.
+ * 5) delete second and third partition and check result.
*/
private void verifyDeletePartitionsHandling(int batch1RecordsCount, int
batch2RecordsCount, int batch3RecordsCount,
boolean populateMetaFields)
throws Exception {
@@ -2111,7 +2110,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
private Pair<Set<String>, List<HoodieRecord>> testUpdates(String
instantTime, SparkRDDWriteClient client,
- int sizeToInsertAndUpdate, int expectedTotalRecords)
+ int
sizeToInsertAndUpdate, int expectedTotalRecords)
throws IOException {
client.startCommitWithTime(instantTime);
List<HoodieRecord> inserts = dataGen.generateInserts(instantTime,
sizeToInsertAndUpdate);
@@ -2136,7 +2135,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
private void testDeletes(SparkRDDWriteClient client, List<HoodieRecord>
previousRecords, int sizeToDelete,
- String existingFile, String instantTime, int expectedRecords,
List<String> keys) {
+ String existingFile, String instantTime, int
expectedRecords, List<String> keys) {
client.startCommitWithTime(instantTime);
List<HoodieKey> hoodieKeysToDelete =
randomSelectAsHoodieKeys(previousRecords, sizeToDelete);
@@ -2407,25 +2406,25 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
// Perform 2 failed writes to table
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "100",
- 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert,
false, 100, 300,
- 0, false);
+ 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
+ 0, false);
client.close();
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
- 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert,
false, 100, 300,
- 0, false);
+ 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
+ 0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
// Perform 1 successful write
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
- 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert,
false, 100, 300,
- 0, true);
+ 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
+ 0, true);
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
- CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0);
+ CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0);
assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() ==
2);
assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()
== 2);
// Await till enough time passes such that the first 2 failed commits
heartbeats are expired
@@ -2437,36 +2436,36 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Perform 1 successful write
writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500",
- 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert,
false, 100, 300,
- 0, true);
+ 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
+ 0, true);
client.clean();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
if (cleaningPolicy.isLazy()) {
assertTrue(
timeline
-
.getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
- .countInstants()
+ .getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
+ .countInstants()
== 2);
// Since we write rollbacks not clean, there should be no clean action
on the timeline
assertTrue(
timeline
-
.getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
- .countInstants()
+ .getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
+ .countInstants()
== 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants()
== 3);
} else if (cleaningPolicy.isNever()) {
// never will get translated to Lazy if OCC is enabled.
assertTrue(
- timeline
-
.getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
- .countInstants()
- == 2);
+ timeline
+ .getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
+ .countInstants()
+ == 2);
// There should be no clean or rollback action on the timeline
assertTrue(
- timeline
-
.getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
- .countInstants()
- == 0);
+ timeline
+ .getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
+ .countInstants()
+ == 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants()
== 3);
}
}
@@ -2484,21 +2483,21 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
// Perform 1 failed writes to table
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
- 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert,
false, 100, 300,
- 0, false);
+ 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
+ 0, false);
client.close();
// Toggle cleaning policy to LAZY
cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
// Perform 2 failed writes to table
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
- 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert,
false, 100, 300,
- 0, false);
+ 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
+ 0, false);
client.close();
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
- 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert,
false, 100, 300,
- 0, false);
+ 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
+ 0, false);
client.close();
// Await till enough time passes such that the 2 failed commits heartbeats
are expired
boolean conditionMet = false;
@@ -2509,17 +2508,17 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
client.clean();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
- CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
+ CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
// Perform 2 failed commits
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "500", "400", Option.of(Arrays.asList("300")), "300",
- 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert,
false, 100, 300,
- 0, false);
+ 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
+ 0, false);
client.close();
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "600", "500", Option.of(Arrays.asList("400")), "400",
- 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert,
false, 100, 300,
- 0, false);
+ 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
+ 0, false);
client.close();
// Toggle cleaning policy to EAGER
cleaningPolicy = EAGER;
@@ -2528,7 +2527,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
timeline = metaClient.getActiveTimeline().reload();
// since OCC is enabled, hudi auto flips the cleaningPolicy to Lazy.
assertTrue(timeline.getTimelineOfActions(
- CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
+ CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants()
== 1);
}
@@ -2545,20 +2544,20 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
// Perform 2 failed writes to table
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
- 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert,
false, 100, 100,
- 0, false);
+ 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 100,
+ 0, false);
client.close();
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
- 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert,
false, 100, 100,
- 0, false);
+ 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 100,
+ 0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
// Create a successful commit
Future<JavaRDD<WriteStatus>> commit3 = service.submit(() -> writeBatch(new
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy,
true)),
- "400", "300", Option.of(Arrays.asList("400")), "300", 100,
dataGen::generateInserts,
- SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
+ "400", "300", Option.of(Arrays.asList("400")), "300", 100,
dataGen::generateInserts,
+ SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
commit3.get();
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
@@ -2574,8 +2573,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
Thread.sleep(2000);
}
Future<JavaRDD<WriteStatus>> commit4 = service.submit(() -> writeBatch(new
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy,
true)),
- "500", "400", Option.of(Arrays.asList("500")), "500", 100,
dataGen::generateInserts,
- SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
+ "500", "400", Option.of(Arrays.asList("500")), "500", 100,
dataGen::generateInserts,
+ SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
Future<HoodieCleanMetadata> clean1 = service.submit(() -> new
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy,
true)).clean());
commit4.get();
clean1.get();
@@ -2626,8 +2625,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
.create(partitionPath,
FSUtils.makeBaseFileName(instantTime, "1-0-1",
UUID.randomUUID().toString()),
IOType.MERGE);
- LOG.info("Created a dummy marker path=" + markerFilePath.get());
-
if (!enableOptimisticConsistencyGuard) {
Exception e = assertThrows(HoodieCommitException.class, () -> {
client.commit(instantTime, result);
@@ -2686,7 +2683,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder()
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
- .withConflictResolutionStrategy(new
IngestionPrimaryWriterBasedConflictResolutionStrategy())
+ .withConflictResolutionStrategy(new
PreferWriterConflictResolutionStrategy())
.build();
HoodieCleanConfig cleanConfig = HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build();
@@ -2702,7 +2699,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
int numRecords = 200;
String firstCommit = HoodieActiveTimeline.createNewInstantTime();
String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[]{partitionStr});
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionStr});
writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")),
"000",
numRecords, dataGenerator::generateInserts,
SparkRDDWriteClient::insert, true, numRecords, numRecords,
1, true);
@@ -2754,7 +2751,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder()
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
- .withConflictResolutionStrategy(new
IngestionPrimaryWriterBasedConflictResolutionStrategy())
+ .withConflictResolutionStrategy(new
PreferWriterConflictResolutionStrategy())
.build();
HoodieCleanConfig cleanConfig = HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build();
@@ -2770,7 +2767,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
int numRecords = 200;
String firstCommit = HoodieActiveTimeline.createNewInstantTime();
String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[]{partitionStr});
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionStr});
writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")),
"000",
numRecords, dataGenerator::generateInserts,
SparkRDDWriteClient::insert, true, numRecords, numRecords,
1, true);
@@ -2868,7 +2865,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize,
String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts,
- boolean populateMetaFields, Properties props) {
+ boolean
populateMetaFields, Properties props) {
HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr);
if (!populateMetaFields) {
builder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.SIMPLE).build());
@@ -2937,7 +2934,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
}
- public static class WriteClientBrokenClustering<T extends
HoodieRecordPayload> extends org.apache.hudi.client.SparkRDDWriteClient<T> {
+ public static class WriteClientBrokenClustering<T extends
HoodieRecordPayload> extends org.apache.hudi.client.SparkRDDWriteClient<T> {
public WriteClientBrokenClustering(HoodieEngineContext context,
HoodieWriteConfig clientConfig) {
super(context, clientConfig);
@@ -2951,6 +2948,4 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
}
-
- public static String CLUSTERING_FAILURE = "CLUSTERING FAILURE";
}