xxubai commented on code in PR #4124:
URL: https://github.com/apache/amoro/pull/4124#discussion_r2980378559


##########
amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java:
##########
@@ -180,14 +174,41 @@ public void markActive() {
 
     @Override
     public SourceOutput<T> createOutputForSplit(String splitId) {
-      return internal.createOutputForSplit(splitId);
+      SourceOutput<T> splitOutput = internal.createOutputForSplit(splitId);
+      splitOutputs.put(splitId, splitOutput);
+      return splitOutput;
     }
 
     @Override
     public void releaseOutputForSplit(String splitId) {
-      Object splitLocalOutput = 
FlinkClassReflectionUtil.getSplitLocalOutput(internal);
-      FlinkClassReflectionUtil.emitPeriodWatermark(splitLocalOutput);
+      emitPeriodicWatermark(splitOutputs.remove(splitId));
       internal.releaseOutputForSplit(splitId);
     }
+
+    private void emitPeriodicWatermark(SourceOutput<T> splitOutput) {
+      if (splitOutput == null) {
+        return;
+      }
+
+      if (splitOutput instanceof SourceOutputWithWatermarks) {
+        ((SourceOutputWithWatermarks<T>) splitOutput).emitPeriodicWatermark();
+        return;
+      }
+
+      try {
+        java.lang.reflect.Method method =
+            splitOutput.getClass().getDeclaredMethod("emitPeriodicWatermark");
+        method.setAccessible(true);
+        method.invoke(splitOutput);
+        return;
+      } catch (ReflectiveOperationException e) {
+        LOGGER.debug(
+            "Failed to invoke emitPeriodicWatermark on split output {}, 
fallback to reader output",
+            splitOutput.getClass(),
+            e);
+      }
+
+      watermarkOutput.emitPeriodicWatermark();
+    }
   }
 }

Review Comment:
   Thanks, good point. I removed the reflection fallback and simplified it to 
use `watermarkOutput.emitPeriodicWatermark()` directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to