j1wonpark commented on code in PR #4124:
URL: https://github.com/apache/amoro/pull/4124#discussion_r2978848039


##########
amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java:
##########
@@ -180,14 +174,41 @@ public void markActive() {
 
     @Override
     public SourceOutput<T> createOutputForSplit(String splitId) {
-      return internal.createOutputForSplit(splitId);
+      SourceOutput<T> splitOutput = internal.createOutputForSplit(splitId);
+      splitOutputs.put(splitId, splitOutput);
+      return splitOutput;
     }
 
     @Override
     public void releaseOutputForSplit(String splitId) {
-      Object splitLocalOutput = 
FlinkClassReflectionUtil.getSplitLocalOutput(internal);
-      FlinkClassReflectionUtil.emitPeriodWatermark(splitLocalOutput);
+      emitPeriodicWatermark(splitOutputs.remove(splitId));
       internal.releaseOutputForSplit(splitId);
     }
+
+    private void emitPeriodicWatermark(SourceOutput<T> splitOutput) {
+      if (splitOutput == null) {
+        return;
+      }
+
+      if (splitOutput instanceof SourceOutputWithWatermarks) {
+        ((SourceOutputWithWatermarks<T>) splitOutput).emitPeriodicWatermark();
+        return;
+      }
+
+      try {
+        java.lang.reflect.Method method =
+            splitOutput.getClass().getDeclaredMethod("emitPeriodicWatermark");
+        method.setAccessible(true);
+        method.invoke(splitOutput);
+        return;
+      } catch (ReflectiveOperationException e) {
+        LOGGER.debug(
+            "Failed to invoke emitPeriodicWatermark on split output {}, 
fallback to reader output",
+            splitOutput.getClass(),
+            e);
+      }
+
+      watermarkOutput.emitPeriodicWatermark();
+    }
   }
 }

Review Comment:
   The reflection fallback block (lines 548-560) can be removed entirely.
   
   `SplitLocalOutput` (the private inner class Flink returns from 
`createOutputForSplit`) is still registered in `WatermarkOutputMultiplexer` at 
the point `releaseOutputForSplit` is called — the split is not unregistered 
until `internal.releaseOutputForSplit(splitId)` returns. So calling 
`watermarkOutput.emitPeriodicWatermark()` at the reader level already covers 
this split: it iterates all currently-registered split outputs and computes the 
combined minimum watermark.
   
   Suggested simplification:
   ```java
   private void emitPeriodicWatermark(SourceOutput<T> splitOutput) {
     if (splitOutput == null) {
       return;
     }
   
     if (splitOutput instanceof SourceOutputWithWatermarks) {
       ((SourceOutputWithWatermarks<T>) splitOutput).emitPeriodicWatermark();
       return;
     }
   
     // splitOutput is an internal Flink type (e.g. SplitLocalOutput from
     // ProgressiveTimestampsAndWatermarks) that does not expose
     // SourceOutputWithWatermarks publicly. The reader-level call below is
     // semantically equivalent: it iterates all registered split outputs
     // (this split is still registered until internal.releaseOutputForSplit()
     // returns) and flushes the combined periodic watermark.
     watermarkOutput.emitPeriodicWatermark();
   }
   ```
   
   This removes the `setAccessible(true)` call which is exactly the JDK-17 
blocker this PR aims to fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to