This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f1103281d2 [HUDI-4258] Fix when HoodieTable removes data file before 
the end of Flink job (#5876)
f1103281d2 is described below

commit f1103281d2aebf177231b6c3b6df5cce299957cf
Author: Alexander Trushev <[email protected]>
AuthorDate: Mon Jun 20 16:07:49 2022 +0700

    [HUDI-4258] Fix when HoodieTable removes data file before the end of Flink 
job (#5876)
    
    * [HUDI-4258] Fix when HoodieTable removes data file before the end of 
Flink job
---
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  3 +-
 .../apache/hudi/sink/event/WriteMetadataEvent.java | 12 ++++
 .../apache/hudi/sink/utils/NonThrownExecutor.java  | 68 ++++++++++++++++------
 .../sink/TestStreamWriteOperatorCoordinator.java   | 38 ++++++++++++
 4 files changed, 102 insertions(+), 19 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 75e8beaef1..25b34b11e7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -285,7 +285,8 @@ public class StreamWriteOperatorCoordinator
 
     if (event.isEndInput()) {
       // handle end input event synchronously
-      handleEndInputEvent(event);
+      // wrap handleEndInputEvent in executeSync to preserve the order of 
events
+      executor.executeSync(() -> handleEndInputEvent(event), "handle end input 
event for instant %s", this.instant);
     } else {
       executor.execute(
           () -> {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
index eb89b5ad7e..0eb06bdd82 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
@@ -163,6 +163,18 @@ public class WriteMetadataEvent implements OperatorEvent {
     return lastBatch && this.instantTime.equals(currentInstant);
   }
 
+  @Override
+  public String toString() {
+    return "WriteMetadataEvent{"
+        + "writeStatusesSize=" + writeStatuses.size()
+        + ", taskID=" + taskID
+        + ", instantTime='" + instantTime + '\''
+        + ", lastBatch=" + lastBatch
+        + ", endInput=" + endInput
+        + ", bootstrap=" + bootstrap
+        + '}';
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
index 242b3ee0d8..535e05f687 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
@@ -26,9 +26,11 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 /**
  * An executor service that catches all the throwable with logging.
@@ -85,25 +87,21 @@ public class NonThrownExecutor implements AutoCloseable {
       final ExceptionHook hook,
       final String actionName,
       final Object... actionParams) {
+    executor.execute(wrapAction(action, hook, actionName, actionParams));
+  }
 
-    executor.execute(
-        () -> {
-          final String actionString = String.format(actionName, actionParams);
-          try {
-            action.run();
-            logger.info("Executor executes action [{}] success!", 
actionString);
-          } catch (Throwable t) {
-            // if we have a JVM critical error, promote it immediately, there 
is a good
-            // chance the
-            // logging or job failing will not succeed any more
-            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-            final String errMsg = String.format("Executor executes action [%s] 
error", actionString);
-            logger.error(errMsg, t);
-            if (hook != null) {
-              hook.apply(errMsg, t);
-            }
-          }
-        });
+  /**
+   * Run the action in a loop and wait for completion.
+   */
+  public void executeSync(ThrowingRunnable<Throwable> action, String 
actionName, Object... actionParams) {
+    try {
+      executor.submit(wrapAction(action, this.exceptionHook, actionName, 
actionParams)).get();
+    } catch (InterruptedException e) {
+      handleException(e, this.exceptionHook, getActionString(actionName, 
actionParams));
+    } catch (ExecutionException e) {
+      // nonfatal exceptions are handled by wrapAction
+      ExceptionUtils.rethrowIfFatalErrorOrOOM(e.getCause());
+    }
   }
 
   @Override
@@ -120,6 +118,40 @@ public class NonThrownExecutor implements AutoCloseable {
     }
   }
 
+  private <E extends Throwable> Runnable wrapAction(
+      final ThrowingRunnable<E> action,
+      final ExceptionHook hook,
+      final String actionName,
+      final Object... actionParams) {
+
+    return () -> {
+      final Supplier<String> actionString = getActionString(actionName, 
actionParams);
+      try {
+        action.run();
+        logger.info("Executor executes action [{}] success!", 
actionString.get());
+      } catch (Throwable t) {
+        handleException(t, hook, actionString);
+      }
+    };
+  }
+
+  private void handleException(Throwable t, ExceptionHook hook, 
Supplier<String> actionString) {
+    // if we have a JVM critical error, promote it immediately, there is a good
+    // chance the
+    // logging or job failing will not succeed any more
+    ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+    final String errMsg = String.format("Executor executes action [%s] error", 
actionString.get());
+    logger.error(errMsg, t);
+    if (hook != null) {
+      hook.apply(errMsg, t);
+    }
+  }
+
+  private Supplier<String> getActionString(String actionName, Object... 
actionParams) {
+    // avoid String.format before OOM rethrown
+    return () -> String.format(actionName, actionParams);
+  }
+
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 59a0580e56..d5d35f7494 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
+import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestUtils;
@@ -46,11 +47,14 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -298,6 +302,40 @@ public class TestStreamWriteOperatorCoordinator {
     assertThat(completedTimeline.lastInstant().get().getTimestamp(), 
is(instant));
   }
 
+  @Test
+  public void testEndInputIsTheLastEvent() throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    MockOperatorCoordinatorContext context = new 
MockOperatorCoordinatorContext(new OperatorID(), 1);
+    Logger logger = Mockito.mock(Logger.class); // avoid too many logs by 
executor
+    NonThrownExecutor executor = 
NonThrownExecutor.builder(logger).waitForTasksFinish(true).build();
+
+    try (StreamWriteOperatorCoordinator coordinator = new 
StreamWriteOperatorCoordinator(conf, context)) {
+      coordinator.start();
+      coordinator.setExecutor(executor);
+      coordinator.handleEventFromOperator(0, 
WriteMetadataEvent.emptyBootstrap(0));
+      TimeUnit.SECONDS.sleep(5); // wait for handled bootstrap event
+
+      int eventCount = 20_000; // big enough to fill executor's queue
+      for (int i = 0; i < eventCount; i++) {
+        coordinator.handleEventFromOperator(0, createOperatorEvent(0, 
coordinator.getInstant(), "par1", true, 0.1));
+      }
+
+      WriteMetadataEvent endInput = WriteMetadataEvent.builder()
+          .taskID(0)
+          .instantTime(coordinator.getInstant())
+          .writeStatus(Collections.emptyList())
+          .endInput(true)
+          .build();
+      coordinator.handleEventFromOperator(0, endInput);
+
+      // wait for submitted events completed
+      executor.close();
+
+      // there should be no events after endInput
+      assertNull(coordinator.getEventBuffer()[0]);
+    }
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------

Reply via email to