This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.10.1-rc1 by this
push:
new 6ecefd9 Fixing TestHoodieDeltaStreamerWithMultiWriter tests for
deterministic execution
6ecefd9 is described below
commit 6ecefd94effc14704c90940555bff6b60757d4b4
Author: sivabalan narayanan <[email protected]>
AuthorDate: Mon Jan 10 10:08:33 2022 -0500
Fixing TestHoodieDeltaStreamerWithMultiWriter tests for deterministic
execution
---
.../TestHoodieDeltaStreamerWithMultiWriter.java | 53 +++++++++++++++++++++-
1 file changed, 52 insertions(+), 1 deletion(-)
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
index ce48211..2acba81 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -35,6 +35,8 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
@@ -50,6 +52,7 @@ import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static
org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable;
@@ -73,6 +76,7 @@ import static
org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource
public class TestHoodieDeltaStreamerWithMultiWriter extends
SparkClientFunctionalTestHarness {
private static final String COW_TEST_TABLE_NAME = "testtable_COPY_ON_WRITE";
+ private static final Logger LOG =
LogManager.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);
String basePath;
String propsFilePath;
@@ -102,7 +106,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends
SparkClientFunctiona
propsFilePath,
Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
cfgBackfillJob.continuousMode = false;
HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
- HoodieTimeline timeline =
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ HoodieTimeline timeline =
meta.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(),
HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
@@ -328,18 +332,27 @@ public class TestHoodieDeltaStreamerWithMultiWriter
extends SparkClientFunctiona
return true;
};
+ AtomicBoolean continousFailed = new AtomicBoolean(false);
+ AtomicBoolean backfillFailed = new AtomicBoolean(false);
try {
Future regularIngestionJobFuture = service.submit(() -> {
try {
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob,
conditionForRegularIngestion);
} catch (Exception ex) {
+ continousFailed.set(true);
+ LOG.error("Continuous job failed " + ex.getMessage());
throw new RuntimeException(ex);
}
});
Future backfillJobFuture = service.submit(() -> {
try {
+ // trigger backfill atleast after 1 requested entry is added to
timline from continuous job. If not, there is a chance that backfill will
complete even before
+ // continous job starts.
+ awaitCondition(new GetCommitsAfterInstant(tableBasePath,
lastSuccessfulCommit));
backfillJob.sync();
} catch (Exception ex) {
+ LOG.error("Backfill job failed " + ex.getMessage());
+ backfillFailed.set(true);
throw new RuntimeException(ex);
}
});
@@ -355,10 +368,48 @@ public class TestHoodieDeltaStreamerWithMultiWriter
extends SparkClientFunctiona
*/
if (expectConflict &&
e.getCause().getMessage().contains(ConcurrentModificationException.class.getName()))
{
// expected ConcurrentModificationException since ingestion & backfill
will have overlapping writes
+ if (backfillFailed.get()) {
+ // if backfill job failed, shutdown the continuous job.
+ LOG.warn("Calling shutdown on ingestion job since the backfill job
has failed");
+ ingestionJob.shutdownGracefully();
+ }
} else {
+ LOG.error("Conflict happened, but not expected " +
e.getCause().getMessage());
throw e;
}
}
}
+ class GetCommitsAfterInstant {
+
+ String basePath;
+ String lastSuccessfulCommit;
+ HoodieTableMetaClient meta;
+ GetCommitsAfterInstant(String basePath, String lastSuccessfulCommit) {
+ this.basePath = basePath;
+ this.lastSuccessfulCommit = lastSuccessfulCommit;
+ meta =
HoodieTableMetaClient.builder().setConf(fs().getConf()).setBasePath(basePath).build();
+ }
+
+ long getCommitsAfterInstant() {
+ HoodieTimeline timeline1 =
meta.reloadActiveTimeline().getAllCommitsTimeline().findInstantsAfter(lastSuccessfulCommit);
+ // LOG.info("Timeline Instants=" +
meta1.getActiveTimeline().getInstants().collect(Collectors.toList()));
+ return timeline1.getInstants().count();
+ }
+ }
+
+ private static void awaitCondition(GetCommitsAfterInstant callback) throws
InterruptedException {
+ long startTime = System.currentTimeMillis();
+ long soFar = 0;
+ while (soFar <= 5000) {
+ if (callback.getCommitsAfterInstant() > 0) {
+ break;
+ } else {
+ Thread.sleep(500);
+ soFar += 500;
+ }
+ }
+ LOG.warn("Awaiting completed in " + (System.currentTimeMillis() -
startTime));
+ }
+
}