kennknowles commented on code in PR #37012:
URL: https://github.com/apache/beam/pull/37012#discussion_r2818360514


##########
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java:
##########
@@ -326,7 +327,8 @@ private static Object convertValue(Object value, 
CommonCoder coderSpec, Coder co
           windows,
           new Instant(((Number) kvMap.get("fireTimestamp")).longValue()),
           new Instant(((Number) kvMap.get("holdTimestamp")).longValue()),
-          paneInfo);
+          paneInfo,
+          CausedByDrain.NORMAL);

Review Comment:
   I think this should actually come from the data? It is a weird method and I 
have no context for why it is the way it is...



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java:
##########
@@ -396,6 +397,11 @@ public PaneInfo pane() {
       return element.getRecordOffset();
     }
 
+    @Override
+    public CausedByDrain causedByDrain() {
+      return CausedByDrain.NORMAL;

Review Comment:
   Should this come from the element metadata?



##########
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java:
##########
@@ -141,7 +141,8 @@ public void 
testOnTimerExceptionsWrappedAsUserCodeException() {
         GlobalWindow.INSTANCE,
         new Instant(0),
         new Instant(0),
-        TimeDomain.EVENT_TIME);
+        TimeDomain.EVENT_TIME,
+        CausedByDrain.NORMAL);

Review Comment:
   Maybe this unit test file should test caused by drain propagation somehow?



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java:
##########
@@ -396,6 +397,11 @@ public PaneInfo pane() {
       return element.getRecordOffset();
     }
 
+    @Override
+    public CausedByDrain causedByDrain() {
+      return CausedByDrain.NORMAL;

Review Comment:
   Should this come from the element metadata?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java:
##########


Review Comment:
   I am not sure if I understand the comment. Here is what I think are answers:
   
    - Yes, we freeze the API for `outputWindowedValue`.
    - No, we shouldn't lose fields, because `builder(value)` is an abstract 
method and it is the job of `OutputReceiver.builder(...)` to make sure to set 
all the builder values to defaults, and propagate the values from the current 
element or timer context.



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java:
##########
@@ -116,7 +117,15 @@ public <KeyT> void onTimer(
       Instant timestamp,
       Instant outputTimestamp,
       TimeDomain timeDomain) {
-    underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, 
outputTimestamp, timeDomain);
+    underlying.onTimer(
+        timerId,
+        timerFamilyId,
+        key,
+        window,
+        timestamp,
+        outputTimestamp,
+        timeDomain,
+        CausedByDrain.NORMAL);

Review Comment:
   I think it should propagate here? will that be a follow-up that adds it to 
the pushback side input DoFnRunner?



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java:
##########
@@ -126,14 +132,16 @@ public void 
testTimerCoderWithConsistentWithEqualsComponentCoders() throws Excep
             Collections.singletonList(GlobalWindow.INSTANCE),
             FIRE_TIME,
             HOLD_TIME,
-            PaneInfo.NO_FIRING),
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL),
         Timer.of(
             "key",
             "tag",
             Collections.singletonList(GlobalWindow.INSTANCE),
             FIRE_TIME,
             HOLD_TIME,
-            PaneInfo.NO_FIRING));
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL));

Review Comment:
   We need some caused by drain tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to