danny0405 commented on a change in pull request #4561:
URL: https://github.com/apache/hudi/pull/4561#discussion_r782767906



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
##########
@@ -380,8 +390,8 @@ private void handleEndInputEvent(WriteMetadataEvent event) {
       // The executor thread inherits the classloader of the 
#handleEventFromOperator
       // caller, which is a AppClassLoader.
       
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-      // sync Hive if is enabled in batch mode.
-      syncHiveIfEnabled();
+      // sync Hive Synchronously if is enabled in batch mode.
+      syncHiveSynchronously();

Review comment:
       `synchronously if it is`

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
##########
@@ -246,21 +246,25 @@ public void resetToCheckpoint(long checkpointID, byte[] 
checkpointData) {
 
   @Override
   public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
-    executor.execute(
-        () -> {
-          // no event to handle
-          ValidationUtils.checkState(operatorEvent instanceof 
WriteMetadataEvent,
-              "The coordinator can only handle WriteMetaEvent");
-          WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;
-          if (event.isBootstrap()) {
-            handleBootstrapEvent(event);
-          } else if (event.isEndInput()) {
-            handleEndInputEvent(event);
-          } else {
-            handleWriteMetaEvent(event);
-          }
-        }, "handle write metadata event for instant %s", this.instant
-    );
+    // no event to handle
+    ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent,
+            "The coordinator can only handle WriteMetaEvent");
+    WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;
+
+    if (event.isEndInput()) {
+      // Process EndInputEvent synchronously
+      handleEndInputEvent(event);

Review comment:
       `EndInputEvent` => end input event




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to