This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a3cb642db76 Merge pull request #38878: Fix OnWindowExpirationContext.
a3cb642db76 is described below

commit a3cb642db76ffd2daf7dc5f2e2d15c0d8fa953d5
Author: Reuven Lax <[email protected]>
AuthorDate: Tue Jun 9 15:19:23 2026 -0700

    Merge pull request #38878: Fix OnWindowExpirationContext.
    
    * foo
    
    * fix compilation
    
    * fix compilation
---
 .../apache/beam/runners/core/SimpleDoFnRunner.java   | 20 ++++++++++++++++++++
 .../reflect/ByteBuddyDoFnInvokerFactory.java         | 12 ++++++++++++
 .../beam/sdk/transforms/reflect/DoFnInvoker.java     | 17 +++++++++++++++++
 .../beam/sdk/transforms/reflect/DoFnSignature.java   |  9 +++++++++
 .../beam/sdk/transforms/reflect/DoFnSignatures.java  |  3 ++-
 .../construction/SplittableParDoNaiveBounded.java    |  6 ++++++
 .../org/apache/beam/sdk/transforms/ParDoTest.java    |  4 ++++
 .../sdk/transforms/reflect/DoFnSignaturesTest.java   |  8 ++++++--
 .../org/apache/beam/fn/harness/FnApiDoFnRunner.java  | 13 +++++++++++++
 9 files changed, 89 insertions(+), 3 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 470e22a6699..1825b77b65f 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -649,6 +649,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
           "Cannot access OnTimerContext outside of @OnTimer methods.");
     }
 
+    @Override
+    public DoFn<InputT, OutputT>.OnWindowExpirationContext 
onWindowExpirationContext(
+        DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access OnWindowExpirationContext outside of 
@OnWindowExpiration methods.");
+    }
+
     @Override
     public RestrictionTracker<?, ?> restrictionTracker() {
       throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
@@ -958,6 +965,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       return this;
     }
 
+    @Override
+    public DoFn<InputT, OutputT>.OnWindowExpirationContext 
onWindowExpirationContext(
+        DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access OnWindowExpirationContext outside of 
@OnWindowExpiration methods.");
+    }
+
     @Override
     public RestrictionTracker<?, ?> restrictionTracker() {
       throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
@@ -1299,6 +1313,12 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
       throw new UnsupportedOperationException("OnTimerContext parameters are 
not supported.");
     }
 
+    @Override
+    public DoFn<InputT, OutputT>.OnWindowExpirationContext 
onWindowExpirationContext(
+        DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
     @Override
     public RestrictionTracker<?, ?> restrictionTracker() {
       throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 3ebabb6e3c3..c08243fda5c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -142,6 +142,8 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
   public static final String OUTPUT_PARAMETER_METHOD = "outputReceiver";
   public static final String TAGGED_OUTPUT_PARAMETER_METHOD = 
"taggedOutputReceiver";
   public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD = 
"onTimerContext";
+  public static final String ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER_METHOD =
+      "onWindowExpirationContext";
   public static final String WINDOW_PARAMETER_METHOD = "window";
   public static final String PANE_INFO_PARAMETER_METHOD = "paneInfo";
   public static final String PIPELINE_OPTIONS_PARAMETER_METHOD = 
"pipelineOptions";
@@ -1170,6 +1172,16 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
                         ON_TIMER_CONTEXT_PARAMETER_METHOD, DoFn.class)));
           }
 
+          @Override
+          public StackManipulation dispatch(
+              DoFnSignature.Parameter.OnWindowExpirationContextParameter p) {
+            return new StackManipulation.Compound(
+                pushDelegate,
+                MethodInvocation.invoke(
+                    getExtraContextFactoryMethodDescription(
+                        ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER_METHOD, 
DoFn.class)));
+          }
+
           @Override
           public StackManipulation dispatch(WindowParameter p) {
             return new StackManipulation.Compound(
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index eaabdff907c..c8c7ddf24b6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -185,6 +185,10 @@ public interface DoFnInvoker<InputT, OutputT> {
     /** Provide a {@link DoFn.OnTimerContext} to use with the given {@link 
DoFn}. */
     DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> 
doFn);
 
+    /** Provide a {@link DoFn.OnWindowExpirationContext} to use with the given 
{@link DoFn}. */
+    DoFn<InputT, OutputT>.OnWindowExpirationContext onWindowExpirationContext(
+        DoFn<InputT, OutputT> doFn);
+
     /** Provide a reference to the input element. */
     InputT element(DoFn<InputT, OutputT> doFn);
 
@@ -447,6 +451,13 @@ public interface DoFnInvoker<InputT, OutputT> {
           String.format("OnTimerContext unsupported in %s", 
getErrorContext()));
     }
 
+    @Override
+    public DoFn<InputT, OutputT>.OnWindowExpirationContext 
onWindowExpirationContext(
+        DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          String.format("OnWindowExpirationContext unsupported in %s", 
getErrorContext()));
+    }
+
     @Override
     public State state(String stateId, boolean alwaysFetched) {
       throw new UnsupportedOperationException(
@@ -538,6 +549,12 @@ public interface DoFnInvoker<InputT, OutputT> {
       return delegate.onTimerContext(doFn);
     }
 
+    @Override
+    public DoFn<InputT, OutputT>.OnWindowExpirationContext 
onWindowExpirationContext(
+        DoFn<InputT, OutputT> doFn) {
+      return delegate.onWindowExpirationContext(doFn);
+    }
+
     @Override
     public InputT element(DoFn<InputT, OutputT> doFn) {
       return delegate.element(doFn);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 51dadd178a6..99b002c1106 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -305,6 +305,8 @@ public abstract class DoFnSignature {
         return cases.dispatch((ProcessContextParameter) this);
       } else if (this instanceof OnTimerContextParameter) {
         return cases.dispatch((OnTimerContextParameter) this);
+      } else if (this instanceof OnWindowExpirationContextParameter) {
+        return cases.dispatch((OnWindowExpirationContextParameter) this);
       } else if (this instanceof WindowParameter) {
         return cases.dispatch((WindowParameter) this);
       } else if (this instanceof PaneInfoParameter) {
@@ -391,6 +393,8 @@ public abstract class DoFnSignature {
 
       ResultT dispatch(OnTimerContextParameter p);
 
+      ResultT dispatch(OnWindowExpirationContextParameter p);
+
       ResultT dispatch(WindowParameter p);
 
       ResultT dispatch(PaneInfoParameter p);
@@ -498,6 +502,11 @@ public abstract class DoFnSignature {
           return dispatchDefault(p);
         }
 
+        @Override
+        public ResultT dispatch(OnWindowExpirationContextParameter p) {
+          return dispatchDefault(p);
+        }
+
         @Override
         public ResultT dispatch(WindowParameter p) {
           return dispatchDefault(p);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 2983fc94021..9f3491bca7b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -229,7 +229,8 @@ public class DoFnSignatures {
               Parameter.StateParameter.class,
               Parameter.TimestampParameter.class,
               Parameter.KeyParameter.class,
-              Parameter.SideInputParameter.class);
+              Parameter.SideInputParameter.class,
+              Parameter.OnWindowExpirationContextParameter.class);
 
   private static final Collection<Class<? extends Parameter>>
       ALLOWED_GET_INITIAL_RESTRICTION_PARAMETERS =
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
index d1fb23e77c4..52b1174e3a0 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
@@ -506,6 +506,12 @@ public class SplittableParDoNaiveBounded {
         throw new IllegalStateException();
       }
 
+      @Override
+      public DoFn<InputT, OutputT>.OnWindowExpirationContext 
onWindowExpirationContext(
+          DoFn<InputT, OutputT> doFn) {
+        throw new IllegalStateException();
+      }
+
       @Override
       public InputT element(DoFn<InputT, OutputT> doFn) {
         return element;
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 0c984d01c8f..6beea338689 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -7335,11 +7335,15 @@ public class ParDoTest implements Serializable {
             @OnWindowExpiration
             public void onWindowExpiration(
                 @AlwaysFetched @StateId(stateId) ValueState<Integer> state,
+                BoundedWindow window,
                 @Key String key,
+                OnWindowExpirationContext context,
                 OutputReceiver<Integer> r) {
               Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
               // verify state
               assertEquals(1, (int) currentValue);
+              Preconditions.checkNotNull(context);
+              assertEquals(window, context.window());
               // To check output is received from OnWindowExpiration
               r.output(currentValue);
             }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 5a5353482c9..330bb5b9441 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -1422,16 +1422,20 @@ public class DoFnSignaturesTest {
                   @StateId("foo") ValueState<Integer> s,
                   PipelineOptions p,
                   OutputReceiver<String> o,
-                  MultiOutputReceiver m) {}
+                  MultiOutputReceiver m,
+                  OnWindowExpirationContext c) {}
             }.getClass());
 
     List<Parameter> params = sig.onWindowExpiration().extraParameters();
-    assertThat(params.size(), equalTo(5));
+    assertThat(params.size(), equalTo(6));
     assertThat(params.get(0), instanceOf(WindowParameter.class));
     assertThat(params.get(1), instanceOf(StateParameter.class));
     assertThat(params.get(2), instanceOf(PipelineOptionsParameter.class));
     assertThat(params.get(3), instanceOf(OutputReceiverParameter.class));
     assertThat(params.get(4), instanceOf(TaggedOutputReceiverParameter.class));
+    assertThat(
+        params.get(5),
+        
instanceOf(DoFnSignature.Parameter.OnWindowExpirationContextParameter.class));
   }
 
   private interface FeatureTest {
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 3e4675ab074..d5b5cebadb3 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2250,6 +2250,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
           "Cannot access OnTimerContext outside of @OnTimer methods.");
     }
 
+    @Override
+    public DoFn<InputT, OutputT>.OnWindowExpirationContext 
onWindowExpirationContext(
+        DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access OnWindowExpirationContext outside of 
@OnWindowExpiration methods.");
+    }
+
     @Override
     public RestrictionTracker<?, ?> restrictionTracker() {
       return currentTracker;
@@ -2469,6 +2476,12 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     private final OnWindowExpirationContext.Context context =
         new OnWindowExpirationContext.Context();
 
+    @Override
+    public DoFn<InputT, OutputT>.OnWindowExpirationContext 
onWindowExpirationContext(
+        DoFn<InputT, OutputT> doFn) {
+      return context;
+    }
+
     @Override
     public BoundedWindow window() {
       return currentWindow;

Reply via email to