This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-0.10.1-rc1 by this 
push:
     new 6ecefd9  Fixing TestHoodieDeltaStreamerWithMultiWriter tests for 
deterministic execution
6ecefd9 is described below

commit 6ecefd94effc14704c90940555bff6b60757d4b4
Author: sivabalan narayanan <[email protected]>
AuthorDate: Mon Jan 10 10:08:33 2022 -0500

    Fixing TestHoodieDeltaStreamerWithMultiWriter tests for deterministic 
execution
---
 .../TestHoodieDeltaStreamerWithMultiWriter.java    | 53 +++++++++++++++++++++-
 1 file changed, 52 insertions(+), 1 deletion(-)

diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
index ce48211..2acba81 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -35,6 +35,8 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -50,6 +52,7 @@ import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 import static 
org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable;
@@ -73,6 +76,7 @@ import static 
org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource
 public class TestHoodieDeltaStreamerWithMultiWriter extends 
SparkClientFunctionalTestHarness {
 
   private static final String COW_TEST_TABLE_NAME = "testtable_COPY_ON_WRITE";
+  private static final Logger LOG = 
LogManager.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);
 
   String basePath;
   String propsFilePath;
@@ -102,7 +106,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends 
SparkClientFunctiona
         propsFilePath, 
Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
     cfgBackfillJob.continuousMode = false;
     HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
-    HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    HoodieTimeline timeline = 
meta.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
         
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), 
HoodieCommitMetadata.class);
     cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
@@ -328,18 +332,27 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends SparkClientFunctiona
       return true;
     };
 
+    AtomicBoolean continousFailed = new AtomicBoolean(false);
+    AtomicBoolean backfillFailed = new AtomicBoolean(false);
     try {
       Future regularIngestionJobFuture = service.submit(() -> {
         try {
           deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, 
conditionForRegularIngestion);
         } catch (Exception ex) {
+          continousFailed.set(true);
+          LOG.error("Continuous job failed " + ex.getMessage());
           throw new RuntimeException(ex);
         }
       });
       Future backfillJobFuture = service.submit(() -> {
         try {
+          // trigger backfill atleast after 1 requested entry is added to 
timline from continuous job. If not, there is a chance that backfill will 
complete even before
+          // continous job starts.
+          awaitCondition(new GetCommitsAfterInstant(tableBasePath, 
lastSuccessfulCommit));
           backfillJob.sync();
         } catch (Exception ex) {
+          LOG.error("Backfill job failed " + ex.getMessage());
+          backfillFailed.set(true);
           throw new RuntimeException(ex);
         }
       });
@@ -355,10 +368,48 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends SparkClientFunctiona
        */
       if (expectConflict && 
e.getCause().getMessage().contains(ConcurrentModificationException.class.getName()))
 {
         // expected ConcurrentModificationException since ingestion & backfill 
will have overlapping writes
+        if (backfillFailed.get()) {
+          // if backfill job failed, shutdown the continuous job.
+          LOG.warn("Calling shutdown on ingestion job since the backfill job 
has failed");
+          ingestionJob.shutdownGracefully();
+        }
       } else {
+        LOG.error("Conflict happened, but not expected " + 
e.getCause().getMessage());
         throw e;
       }
     }
   }
 
+  class GetCommitsAfterInstant {
+
+    String basePath;
+    String lastSuccessfulCommit;
+    HoodieTableMetaClient meta;
+    GetCommitsAfterInstant(String basePath, String lastSuccessfulCommit) {
+      this.basePath = basePath;
+      this.lastSuccessfulCommit = lastSuccessfulCommit;
+      meta = 
HoodieTableMetaClient.builder().setConf(fs().getConf()).setBasePath(basePath).build();
+    }
+
+    long getCommitsAfterInstant() {
+      HoodieTimeline timeline1 = 
meta.reloadActiveTimeline().getAllCommitsTimeline().findInstantsAfter(lastSuccessfulCommit);
+      // LOG.info("Timeline Instants=" + 
meta1.getActiveTimeline().getInstants().collect(Collectors.toList()));
+      return timeline1.getInstants().count();
+    }
+  }
+
+  private static void awaitCondition(GetCommitsAfterInstant callback) throws 
InterruptedException {
+    long startTime = System.currentTimeMillis();
+    long soFar = 0;
+    while (soFar <= 5000) {
+      if (callback.getCommitsAfterInstant() > 0) {
+        break;
+      } else {
+        Thread.sleep(500);
+        soFar += 500;
+      }
+    }
+    LOG.warn("Awaiting completed in " + (System.currentTimeMillis() - 
startTime));
+  }
+
 }

Reply via email to