This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f1103281d2 [HUDI-4258] Fix when HoodieTable removes data file before
the end of Flink job (#5876)
f1103281d2 is described below
commit f1103281d2aebf177231b6c3b6df5cce299957cf
Author: Alexander Trushev <[email protected]>
AuthorDate: Mon Jun 20 16:07:49 2022 +0700
[HUDI-4258] Fix when HoodieTable removes data file before the end of Flink
job (#5876)
* [HUDI-4258] Fix when HoodieTable removes data file before the end of
Flink job
---
.../hudi/sink/StreamWriteOperatorCoordinator.java | 3 +-
.../apache/hudi/sink/event/WriteMetadataEvent.java | 12 ++++
.../apache/hudi/sink/utils/NonThrownExecutor.java | 68 ++++++++++++++++------
.../sink/TestStreamWriteOperatorCoordinator.java | 38 ++++++++++++
4 files changed, 102 insertions(+), 19 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 75e8beaef1..25b34b11e7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -285,7 +285,8 @@ public class StreamWriteOperatorCoordinator
if (event.isEndInput()) {
// handle end input event synchronously
- handleEndInputEvent(event);
+ // wrap handleEndInputEvent in executeSync to preserve the order of
events
+ executor.executeSync(() -> handleEndInputEvent(event), "handle end input
event for instant %s", this.instant);
} else {
executor.execute(
() -> {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
index eb89b5ad7e..0eb06bdd82 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
@@ -163,6 +163,18 @@ public class WriteMetadataEvent implements OperatorEvent {
return lastBatch && this.instantTime.equals(currentInstant);
}
+ @Override
+ public String toString() {
+ return "WriteMetadataEvent{"
+ + "writeStatusesSize=" + writeStatuses.size()
+ + ", taskID=" + taskID
+ + ", instantTime='" + instantTime + '\''
+ + ", lastBatch=" + lastBatch
+ + ", endInput=" + endInput
+ + ", bootstrap=" + bootstrap
+ + '}';
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
index 242b3ee0d8..535e05f687 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
@@ -26,9 +26,11 @@ import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
/**
* An executor service that catches all the throwable with logging.
@@ -85,25 +87,21 @@ public class NonThrownExecutor implements AutoCloseable {
final ExceptionHook hook,
final String actionName,
final Object... actionParams) {
+ executor.execute(wrapAction(action, hook, actionName, actionParams));
+ }
- executor.execute(
- () -> {
- final String actionString = String.format(actionName, actionParams);
- try {
- action.run();
- logger.info("Executor executes action [{}] success!",
actionString);
- } catch (Throwable t) {
- // if we have a JVM critical error, promote it immediately, there
is a good
- // chance the
- // logging or job failing will not succeed any more
- ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
- final String errMsg = String.format("Executor executes action [%s]
error", actionString);
- logger.error(errMsg, t);
- if (hook != null) {
- hook.apply(errMsg, t);
- }
- }
- });
+ /**
+ * Run the action in a loop and wait for completion.
+ */
+ public void executeSync(ThrowingRunnable<Throwable> action, String
actionName, Object... actionParams) {
+ try {
+ executor.submit(wrapAction(action, this.exceptionHook, actionName,
actionParams)).get();
+ } catch (InterruptedException e) {
+ handleException(e, this.exceptionHook, getActionString(actionName,
actionParams));
+ } catch (ExecutionException e) {
+ // nonfatal exceptions are handled by wrapAction
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(e.getCause());
+ }
}
@Override
@@ -120,6 +118,40 @@ public class NonThrownExecutor implements AutoCloseable {
}
}
+ private <E extends Throwable> Runnable wrapAction(
+ final ThrowingRunnable<E> action,
+ final ExceptionHook hook,
+ final String actionName,
+ final Object... actionParams) {
+
+ return () -> {
+ final Supplier<String> actionString = getActionString(actionName,
actionParams);
+ try {
+ action.run();
+ logger.info("Executor executes action [{}] success!",
actionString.get());
+ } catch (Throwable t) {
+ handleException(t, hook, actionString);
+ }
+ };
+ }
+
+ private void handleException(Throwable t, ExceptionHook hook,
Supplier<String> actionString) {
+ // if we have a JVM critical error, promote it immediately, there is a good
+ // chance the
+ // logging or job failing will not succeed any more
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+ final String errMsg = String.format("Executor executes action [%s] error",
actionString.get());
+ logger.error(errMsg, t);
+ if (hook != null) {
+ hook.apply(errMsg, t);
+ }
+ }
+
+ private Supplier<String> getActionString(String actionName, Object...
actionParams) {
+ // avoid String.format before OOM rethrown
+ return () -> String.format(actionName, actionParams);
+ }
+
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 59a0580e56..d5d35f7494 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
+import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestUtils;
@@ -46,11 +47,14 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
@@ -298,6 +302,40 @@ public class TestStreamWriteOperatorCoordinator {
assertThat(completedTimeline.lastInstant().get().getTimestamp(),
is(instant));
}
+ @Test
+ public void testEndInputIsTheLastEvent() throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ MockOperatorCoordinatorContext context = new
MockOperatorCoordinatorContext(new OperatorID(), 1);
+ Logger logger = Mockito.mock(Logger.class); // avoid too many logs by
executor
+ NonThrownExecutor executor =
NonThrownExecutor.builder(logger).waitForTasksFinish(true).build();
+
+ try (StreamWriteOperatorCoordinator coordinator = new
StreamWriteOperatorCoordinator(conf, context)) {
+ coordinator.start();
+ coordinator.setExecutor(executor);
+ coordinator.handleEventFromOperator(0,
WriteMetadataEvent.emptyBootstrap(0));
+ TimeUnit.SECONDS.sleep(5); // wait for handled bootstrap event
+
+ int eventCount = 20_000; // big enough to fill executor's queue
+ for (int i = 0; i < eventCount; i++) {
+ coordinator.handleEventFromOperator(0, createOperatorEvent(0,
coordinator.getInstant(), "par1", true, 0.1));
+ }
+
+ WriteMetadataEvent endInput = WriteMetadataEvent.builder()
+ .taskID(0)
+ .instantTime(coordinator.getInstant())
+ .writeStatus(Collections.emptyList())
+ .endInput(true)
+ .build();
+ coordinator.handleEventFromOperator(0, endInput);
+
+ // wait for submitted events completed
+ executor.close();
+
+ // there should be no events after endInput
+ assertNull(coordinator.getEventBuffer()[0]);
+ }
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------