This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a88686c1594 Add DoFnRunner::finishKey() method (#38454)
a88686c1594 is described below

commit a88686c15940af3aa790af0b551bf445a92bceaf
Author: Arun Pandian <[email protected]>
AuthorDate: Fri Jun 12 02:40:43 2026 -0700

    Add DoFnRunner::finishKey() method (#38454)
    
    Currently the method is a no-op.
    
    In upcoming changes there'll be multiple streaming work items
    in a single beam bundle. With multiple work items, we've to process
    elements and timers of each work item before moving to the next work
    items.
    
    finishKey() will be called by the NativeIterator classes after iterating
    through all elements from a work item.
---
 .../main/java/org/apache/beam/runners/core/DoFnRunner.java   | 12 ++++++++++++
 .../apache/beam/runners/core/LateDataDroppingDoFnRunner.java |  6 ++++++
 .../java/org/apache/beam/runners/core/SimpleDoFnRunner.java  |  3 +++
 .../org/apache/beam/runners/core/StatefulDoFnRunner.java     |  6 ++++++
 .../runners/core/SimplePushbackSideInputDoFnRunnerTest.java  |  4 ++++
 .../runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java   |  4 ++++
 .../wrappers/streaming/ExecutableStageDoFnOperator.java      |  3 +++
 .../wrappers/streaming/stableinput/BufferingDoFnRunner.java  |  3 +++
 .../translation/wrappers/streaming/DoFnOperatorTest.java     |  3 +++
 .../runners/dataflow/worker/AssignWindowsParDoFnFactory.java |  3 +++
 .../runners/dataflow/worker/BatchModeUngroupingParDoFn.java  |  3 +++
 .../worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java       |  3 +++
 .../runners/dataflow/worker/DataflowProcessFnRunner.java     |  6 ++++++
 .../beam/runners/dataflow/worker/ForwardingParDoFn.java      |  6 ++++++
 .../runners/dataflow/worker/GroupAlsoByWindowFnRunner.java   |  4 ++++
 .../runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java   |  6 ++++++
 .../dataflow/worker/PairWithConstantKeyDoFnFactory.java      |  3 +++
 .../runners/dataflow/worker/PartialGroupByKeyParDoFns.java   |  6 ++++++
 .../worker/ReifyTimestampAndWindowsParDoFnFactory.java       |  3 +++
 .../apache/beam/runners/dataflow/worker/SimpleParDoFn.java   |  3 +++
 .../worker/StreamingKeyedWorkItemSideInputDoFnRunner.java    |  6 ++++++
 .../worker/StreamingKeyedWorkItemSideInputParDoFn.java       |  3 +++
 .../dataflow/worker/StreamingModeExecutionContext.java       |  2 +-
 .../worker/StreamingPCollectionViewWriterParDoFn.java        |  3 +++
 .../dataflow/worker/StreamingSideInputDoFnRunner.java        |  6 ++++++
 .../dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java   |  3 +++
 .../beam/runners/dataflow/worker/ValuesDoFnFactory.java      |  3 +++
 .../dataflow/worker/WorkerCustomSourceOperationExecutor.java |  2 +-
 .../dataflow/worker/util/common/worker/FlattenOperation.java |  3 ++-
 .../dataflow/worker/util/common/worker/MapTaskExecutor.java  |  4 ++--
 .../dataflow/worker/util/common/worker/Operation.java        |  4 +++-
 .../runners/dataflow/worker/util/common/worker/ParDoFn.java  |  4 ++++
 .../dataflow/worker/util/common/worker/ParDoOperation.java   |  4 +++-
 .../dataflow/worker/util/common/worker/ReadOperation.java    |  2 +-
 .../util/common/worker/SimplePartialGroupByKeyParDoFn.java   |  5 +++++
 .../dataflow/worker/util/common/worker/WorkExecutor.java     |  2 +-
 .../dataflow/worker/util/common/worker/WriteOperation.java   |  3 ++-
 .../dataflow/worker/IntrinsicMapTaskExecutorTest.java        | 11 +++++++----
 .../dataflow/worker/StreamingSideInputDoFnRunnerTest.java    |  4 ++++
 .../worker/util/common/worker/ExecutorTestUtils.java         |  2 +-
 .../worker/util/common/worker/MapTaskExecutorTest.java       | 11 +++++++----
 .../worker/util/common/worker/ParDoOperationTest.java        |  8 ++++++--
 .../translation/batch/DoFnRunnerFactory.java                 |  4 ++++
 .../translation/batch/DoFnRunnerWithMetrics.java             |  4 ++++
 .../runners/spark/translation/DoFnRunnerWithMetrics.java     |  4 ++++
 .../spark/translation/SparkInputDataProcessorTest.java       |  4 ++++
 46 files changed, 180 insertions(+), 21 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 500aedecb5d..d1278eddaef 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -63,6 +63,18 @@ public interface DoFnRunner<InputT extends @Nullable Object, 
OutputT extends @Nu
   <KeyT extends @Nullable Object> void onWindowExpiration(
       BoundedWindow window, Instant timestamp, KeyT key);
 
+  /**
+   * Performs per-key cleanup or processing after all elements, timers for a 
key have been processed
+   * and before moving to the next key or before finishBundle for the last key.
+   *
+   * <p>This is an optional method that can be used by runners as a hook to 
reset any per key state
+   * before moving to a different key in the same bundle. Currently used only 
by the Dataflow
+   * Streaming runner.
+   *
+   * @param key current key to clean up or finish processing
+   */
+  <KeyT extends @Nullable Object> void finishKey(KeyT key);
+
   /**
    * Returns the underlying fn instance.
    *
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 41052a76f13..dfab198f893 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
 /**
@@ -101,6 +102,11 @@ public class LateDataDroppingDoFnRunner<K, InputT, 
OutputT, W extends BoundedWin
     doFnRunner.finishBundle();
   }
 
+  @Override
+  public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
+    doFnRunner.finishKey(key);
+  }
+
   @Override
   public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {
     doFnRunner.onWindowExpiration(window, timestamp, key);
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 1825b77b65f..90d974653b6 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -230,6 +230,9 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
   }
 
+  @Override
+  public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
+
   @Override
   public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {
     invoker.invokeOnWindowExpiration(
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 77913883466..f5de79652f2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -131,6 +132,11 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends 
BoundedWindow>
     doFnRunner.finishBundle();
   }
 
+  @Override
+  public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
+    doFnRunner.finishKey(key);
+  }
+
   @Override
   public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {
     doFnRunner.onWindowExpiration(window, timestamp, key);
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
index 1ae937b7a83..aa61122a754 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -379,6 +380,9 @@ public class SimplePushbackSideInputDoFnRunnerTest {
       finished = true;
     }
 
+    @Override
+    public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
+
     @Override
     public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {}
   }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
index 56e077253ae..c327c8d91be 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
 /**
@@ -105,6 +106,9 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> 
implements DoFnRunner<
     container.updateMetrics(stepName);
   }
 
+  @Override
+  public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
+
   @Override
   public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {
     delegate.onWindowExpiration(window, timestamp, key);
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 4ebb359fcea..f7bc62a8ec5 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -1064,6 +1064,9 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
       }
     }
 
+    @Override
+    public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
+
     @Override
     public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {}
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
index 73b20238ef0..6d1d4085f0b 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
@@ -255,6 +255,9 @@ public class BufferingDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT,
     Optional.ofNullable(finishBundleCallback).ifPresent(Runnable::run);
   }
 
+  @Override
+  public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
+
   @Override
   public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {}
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 5c4975ffab0..cce8f808c24 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -523,6 +523,9 @@ public class DoFnOperatorTest {
                 wrappedRunner.finishBundle();
               }
 
+              @Override
+              public <KeyT extends @Nullable Object> void finishKey(KeyT key) 
{}
+
               @Override
               public <KeyT> void onWindowExpiration(
                   BoundedWindow window, Instant timestamp, KeyT key) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java
index 83cbc3aa62c..673130a6048 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java
@@ -119,6 +119,9 @@ class AssignWindowsParDoFnFactory implements ParDoFnFactory 
{
       // Nothing.
     }
 
+    @Override
+    public void finishKey(Object key) throws Exception {}
+
     @Override
     public void finishBundle() throws Exception {
       receiver = null;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java
index 8e2b325b580..c3cf6d9e67e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java
@@ -73,6 +73,9 @@ class BatchModeUngroupingParDoFn<K, V> implements ParDoFn {
     // The timers for the underlying ParDoFn are processed at the end of each 
element
   }
 
+  @Override
+  public void finishKey(Object key) throws Exception {}
+
   @Override
   public void finishBundle() throws Exception {
     underlyingParDoFn.finishBundle();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java
index bd991560c18..afe2e28b2a6 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java
@@ -114,6 +114,9 @@ public class CreateIsmShardKeyAndSortKeyDoFnFactory 
implements ParDoFnFactory {
     @Override
     public void processTimers() {}
 
+    @Override
+    public void finishKey(Object key) throws Exception {}
+
     @Override
     public void finishBundle() {}
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
index ec1fcd6c843..400dbc04744 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
 /**
@@ -128,6 +129,11 @@ class DataflowProcessFnRunner<InputT, OutputT, 
RestrictionT>
     simpleRunner.finishBundle();
   }
 
+  @Override
+  public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
+    simpleRunner.finishKey(key);
+  }
+
   @Override
   public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {
     simpleRunner.onWindowExpiration(window, timestamp, key);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java
index d5518155932..3a864d6caf2 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
 
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * A base class for {@link ParDoFn} implementations for overriding particular 
methods while
@@ -47,6 +48,11 @@ public abstract class ForwardingParDoFn implements ParDoFn {
     delegate.processTimers();
   }
 
+  @Override
+  public void finishKey(@Nullable Object key) throws Exception {
+    delegate.finishKey(key);
+  }
+
   @Override
   public void finishBundle() throws Exception {
     delegate.finishBundle();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
index 4845bb0c98e..aa60a61af8a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValueReceiver;
 import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
 /**
@@ -102,6 +103,9 @@ public class GroupAlsoByWindowFnRunner<InputT, OutputT> 
implements DoFnRunner<In
   @Override
   public void finishBundle() {}
 
+  @Override
+  public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
+
   @Override
   public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {}
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
index 882dd497e3f..e204a78a7d2 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
@@ -142,6 +142,12 @@ public class GroupAlsoByWindowsParDoFn<InputT, K, V, W 
extends BoundedWindow> im
     // it here to build a KeyedWorkItem
   }
 
+  @Override
+  public void finishKey(Object key) throws Exception {
+    checkState(fnRunner != null);
+    fnRunner.finishKey(key);
+  }
+
   @Override
   public void finishBundle() throws Exception {
     checkState(fnRunner != null);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java
index 6951e3a95b2..425184a4a12 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java
@@ -98,6 +98,9 @@ public class PairWithConstantKeyDoFnFactory implements 
ParDoFnFactory {
     @Override
     public void processTimers() {}
 
+    @Override
+    public void finishKey(Object key) throws Exception {}
+
     @Override
     public void finishBundle() {}
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java
index 399258d7dbb..a6d7810412a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java
@@ -317,6 +317,9 @@ public class PartialGroupByKeyParDoFns {
     @Override
     public void processTimers() {}
 
+    @Override
+    public void finishKey(Object key) throws Exception {}
+
     @Override
     public void finishBundle() throws Exception {
       groupingTable.flush(receiver);
@@ -377,6 +380,9 @@ public class PartialGroupByKeyParDoFns {
     @Override
     public void processTimers() {}
 
+    @Override
+    public void finishKey(Object key) throws Exception {}
+
     @Override
     public void finishBundle() throws Exception {
       groupingTable.flush(receiver);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java
index 746c09404f6..0438b525b6b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java
@@ -86,6 +86,9 @@ class ReifyTimestampAndWindowsParDoFnFactory implements 
ParDoFnFactory {
     @Override
     public void processTimers() {}
 
+    @Override
+    public void finishKey(Object key) throws Exception {}
+
     @Override
     public void finishBundle() throws Exception {
       this.receiver = null;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index 34dff6b8835..e0f1e0f410c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -195,6 +195,9 @@ public class SimpleParDoFn<InputT, OutputT, W extends 
BoundedWindow> implements
         () -> sideInputProcessor);
   }
 
+  @Override
+  public void finishKey(Object key) throws Exception {}
+
   @Override
   public void finishBundle() throws Exception {
     helpers.finishBundle(sideInputProcessor);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
index 0b9ccd1f37c..3de7c0f3a9b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.values.WindowedValue;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
 /**
@@ -155,6 +156,11 @@ public class StreamingKeyedWorkItemSideInputDoFnRunner<K, 
InputT, OutputT, W ext
     sideInputFetcher.persist();
   }
 
+  @Override
+  public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
+    simpleDoFnRunner.finishKey(key);
+  }
+
   @Override
   public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {
     simpleDoFnRunner.onWindowExpiration(window, timestamp, key);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java
index 225bc6af0ea..63de0b8d55d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java
@@ -193,6 +193,9 @@ public class StreamingKeyedWorkItemSideInputParDoFn<K, 
InputT, OutputT, W extend
         () -> sideInputProcessor);
   }
 
+  @Override
+  public void finishKey(Object key) throws Exception {}
+
   @Override
   public void finishBundle() throws Exception {
     helpers.finishBundle(sideInputProcessor);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 00fdf67b8d0..d8886474564 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -282,7 +282,7 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
     checkState(!finishKeyCalled, "finishKey was already called");
     checkStateNotNull(workExecutor, "workExecutor must be set before calling 
finishKey()");
     try {
-      workExecutor.finishKey();
+      workExecutor.finishKey(key);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java
index 61730b0c8d8..6b51427bb93 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java
@@ -73,6 +73,9 @@ public class StreamingPCollectionViewWriterParDoFn implements 
ParDoFn {
   @Override
   public void processTimers() {}
 
+  @Override
+  public void finishKey(Object key) throws Exception {}
+
   @Override
   public void finishBundle() throws Exception {}
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
index a64d1a970d3..ef1a5922fe5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
 /**
@@ -80,6 +81,11 @@ public class StreamingSideInputDoFnRunner<InputT, OutputT, W 
extends BoundedWind
         "Attempt to deliver a timer to a DoFn, but timers are not supported in 
Dataflow.");
   }
 
+  @Override
+  public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
+    simpleDoFnRunner.finishKey(key);
+  }
+
   @Override
   public void finishBundle() {
     simpleDoFnRunner.finishBundle();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java
index f9e2d6de246..261ed69fb5d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java
@@ -149,6 +149,9 @@ public class ToIsmRecordForMultimapDoFnFactory implements 
ParDoFnFactory {
     @Override
     public void processTimers() {}
 
+    @Override
+    public void finishKey(Object key) throws Exception {}
+
     @Override
     public void finishBundle() {}
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java
index 3ddb3c2003d..acf143a5467 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java
@@ -80,6 +80,9 @@ public class ValuesDoFnFactory implements ParDoFnFactory {
     @Override
     public void processTimers() {}
 
+    @Override
+    public void finishKey(Object key) throws Exception {}
+
     @Override
     public void finishBundle() {}
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java
index a1321d57ebb..3220dbade61 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java
@@ -90,7 +90,7 @@ public class WorkerCustomSourceOperationExecutor implements 
SourceOperationExecu
   }
 
   @Override
-  public void finishKey() throws Exception {}
+  public void finishKey(Object key) throws Exception {}
 
   @Override
   public SourceOperationResponse getResponse() {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java
index af1b2b9c48b..7ef01126650 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java
@@ -19,6 +19,7 @@ package 
org.apache.beam.runners.dataflow.worker.util.common.worker;
 
 import java.io.Closeable;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** A flatten operation. */
 public class FlattenOperation extends ReceivingOperation {
@@ -44,7 +45,7 @@ public class FlattenOperation extends ReceivingOperation {
   }
 
   @Override
-  public void finishKey() throws Exception {}
+  public void finishKey(@Nullable Object key) throws Exception {}
 
   @Override
   public boolean supportsRestart() {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
index c6e1ae209b9..90fc7627694 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
@@ -110,9 +110,9 @@ public class MapTaskExecutor implements WorkExecutor {
   }
 
   @Override
-  public void finishKey() throws Exception {
+  public void finishKey(@Nullable Object key) throws Exception {
     for (Operation op : operations) {
-      op.finishKey();
+      op.finishKey(key);
     }
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java
index b630da33cfa..138ce525e26 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.dataflow.worker.util.common.worker;
 
+import org.checkerframework.checker.nullness.qual.Nullable;
+
 /**
  * The abstract base class for Operations, which correspond to Instructions in 
the original MapTask
  * InstructionGraph.
@@ -138,7 +140,7 @@ public abstract class Operation {
   }
 
   /** Called when all elements for a specific key have been processed. */
-  public abstract void finishKey() throws Exception;
+  public abstract void finishKey(@Nullable Object key) throws Exception;
 
   /** Aborts this Operation's execution. */
   public void abort() throws Exception {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java
index 84dbbd627b0..75ecdb67dd4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.dataflow.worker.util.common.worker;
 
+import org.checkerframework.checker.nullness.qual.Nullable;
+
 /**
  * Interface for functions invokable by {@link ParDoOperation} instances.
  *
@@ -30,6 +32,8 @@ public interface ParDoFn {
 
   void processTimers() throws Exception;
 
+  void finishKey(@Nullable Object key) throws Exception;
+
   void finishBundle() throws Exception;
 
   void abort() throws Exception;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java
index 68f5fbe688d..a814b90cae2 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java
@@ -19,6 +19,7 @@ package 
org.apache.beam.runners.dataflow.worker.util.common.worker;
 
 import java.io.Closeable;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** A ParDo mapping function. */
 public class ParDoOperation extends ReceivingOperation {
@@ -48,10 +49,11 @@ public class ParDoOperation extends ReceivingOperation {
   // Batch mode does not use this method and instead relies on 
BatchModeUngroupingParDoFn
   // to process timers per key.
   @Override
-  public void finishKey() throws Exception {
+  public void finishKey(@Nullable Object key) throws Exception {
     try (Closeable scope = context.enterProcessTimers()) {
       checkStarted();
       fn.processTimers();
+      fn.finishKey(key);
     }
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
index fabc8d6af25..5d118626f18 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
@@ -272,7 +272,7 @@ public class ReadOperation extends Operation {
   }
 
   @Override
-  public void finishKey() throws Exception {}
+  public void finishKey(@Nullable Object key) throws Exception {}
 
   @Override
   public void abort() throws Exception {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java
index 7a9fcfdf069..cce77f3aceb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.dataflow.worker.util.common.worker;
 
+import org.checkerframework.checker.nullness.qual.Nullable;
+
 /** A partial group-by-key {@link ParDoFn} implementation. */
 public class SimplePartialGroupByKeyParDoFn<K, InputT, AccumT> implements 
ParDoFn {
   private final GroupingTable<K, InputT, AccumT> groupingTable;
@@ -39,6 +41,9 @@ public class SimplePartialGroupByKeyParDoFn<K, InputT, 
AccumT> implements ParDoF
   @Override
   public void processTimers() {}
 
+  @Override
+  public void finishKey(@Nullable Object key) throws Exception {}
+
   @Override
   public void finishBundle() throws Exception {
     groupingTable.flush(receiver);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java
index 1083fdbb9c4..8aab050fb22 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java
@@ -35,7 +35,7 @@ public interface WorkExecutor extends AutoCloseable {
   public abstract void execute() throws Exception;
 
   /** Called when all elements for a specific key have been processed. */
-  void finishKey() throws Exception;
+  void finishKey(@Nullable Object key) throws Exception;
 
   /**
    * Returns the worker's current progress.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java
index d28e7f3e5d3..a97c9920b9a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
 import org.apache.beam.runners.dataflow.worker.counters.Counter;
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** A write operation. */
 @SuppressWarnings({
@@ -106,7 +107,7 @@ public class WriteOperation extends ReceivingOperation {
   }
 
   @Override
-  public void finishKey() throws Exception {}
+  public void finishKey(@Nullable Object key) throws Exception {}
 
   @Override
   public void abort() throws Exception {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java
index 396c8db87e6..73d69bd0f61 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java
@@ -106,7 +106,7 @@ public class IntrinsicMapTaskExecutorTest {
     }
 
     @Override
-    public void finishKey() throws Exception {}
+    public void finishKey(Object key) throws Exception {}
   }
 
   // A mock ReadOperation fed to a MapTaskExecutor in test.
@@ -220,6 +220,9 @@ public class IntrinsicMapTaskExecutorTest {
 
     @Override
     public void abort() {}
+
+    @Override
+    public void finishKey(Object key) throws Exception {}
   }
 
   /** Verify counts for the per-element-output-time counter are correct. */
@@ -317,7 +320,7 @@ public class IntrinsicMapTaskExecutorTest {
               }
 
               @Override
-              public void finishKey() throws Exception {}
+              public void finishKey(Object key) throws Exception {}
             },
             new Operation(new OutputReceiver[] {}, context2) {
               @Override
@@ -329,7 +332,7 @@ public class IntrinsicMapTaskExecutorTest {
               }
 
               @Override
-              public void finishKey() throws Exception {}
+              public void finishKey(Object key) throws Exception {}
             },
             new Operation(new OutputReceiver[] {}, context3) {
               @Override
@@ -341,7 +344,7 @@ public class IntrinsicMapTaskExecutorTest {
               }
 
               @Override
-              public void finishKey() throws Exception {}
+              public void finishKey(Object key) throws Exception {}
             });
 
     try (IntrinsicMapTaskExecutor executor =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java
index d18bc512723..c110cc0d2bf 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java
@@ -150,6 +150,7 @@ public class StreamingSideInputDoFnRunnerTest {
 
     runner.startBundle();
     runner.processElement(createDatum("e", 0));
+    runner.finishKey("key");
     runner.finishBundle();
 
     assertTrue(outputManager.getOutput(mainOutputTag).isEmpty());
@@ -214,6 +215,7 @@ public class StreamingSideInputDoFnRunnerTest {
 
     runner.startBundle();
     runner.processElement(elem);
+    runner.finishKey("key");
     runner.finishBundle();
 
     assertTrue(outputManager.getOutput(mainOutputTag).isEmpty());
@@ -317,6 +319,7 @@ public class StreamingSideInputDoFnRunnerTest {
     when(mockSideInputReader.get(eq(view), 
any(BoundedWindow.class))).thenReturn("data");
 
     runner.startBundle();
+    runner.finishKey("key");
     runner.finishBundle();
 
     assertThat(outputManager.getOutput(mainOutputTag), 
contains(createDatum("e:data", 0)));
@@ -373,6 +376,7 @@ public class StreamingSideInputDoFnRunnerTest {
 
     runner.startBundle();
     runner.processElement(createDatum("e2", 2));
+    runner.finishKey("key");
     runner.finishBundle();
 
     assertThat(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java
index d5e3b9c8713..ac7c787b1d2 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java
@@ -61,7 +61,7 @@ public class ExecutorTestUtils {
     }
 
     @Override
-    public void finishKey() throws Exception {}
+    public void finishKey(Object key) throws Exception {}
   }
 
   /** A {@code Reader<String>} that yields a specified set of values. */
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
index 5d8f8eebb6f..40f4c7b0e71 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
@@ -102,7 +102,7 @@ public class MapTaskExecutorTest {
     }
 
     @Override
-    public void finishKey() throws Exception {}
+    public void finishKey(Object key) throws Exception {}
   }
 
   // A mock ReadOperation fed to a MapTaskExecutor in test.
@@ -216,6 +216,9 @@ public class MapTaskExecutorTest {
 
     @Override
     public void abort() {}
+
+    @Override
+    public void finishKey(Object key) throws Exception {}
   }
 
   /** Verify counts for the per-element-output-time counter are correct. */
@@ -314,7 +317,7 @@ public class MapTaskExecutorTest {
               }
 
               @Override
-              public void finishKey() throws Exception {}
+              public void finishKey(Object key) throws Exception {}
             },
             new Operation(new OutputReceiver[] {}, context2) {
               @Override
@@ -326,7 +329,7 @@ public class MapTaskExecutorTest {
               }
 
               @Override
-              public void finishKey() throws Exception {}
+              public void finishKey(Object key) throws Exception {}
             },
             new Operation(new OutputReceiver[] {}, context3) {
               @Override
@@ -338,7 +341,7 @@ public class MapTaskExecutorTest {
               }
 
               @Override
-              public void finishKey() throws Exception {}
+              public void finishKey(Object key) throws Exception {}
             });
 
     assertEquals(TimeUnit.MINUTES.toMillis(10), 
stateTracker.getNextBundleLullDurationReportMs());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java
index 5d058b1968c..0c9667acf38 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java
@@ -85,6 +85,9 @@ public class ParDoOperationTest {
     public void abort() throws Exception {
       outputReceiver.process("a-aborted");
     }
+
+    @Override
+    public void finishKey(Object key) throws Exception {}
   }
 
   @Test
@@ -104,7 +107,7 @@ public class ParDoOperationTest {
     parDoOperation.process("");
     parDoOperation.process("bob");
 
-    parDoOperation.finishKey();
+    parDoOperation.finishKey("key");
     parDoOperation.finish();
 
     parDoOperation.abort();
@@ -148,7 +151,7 @@ public class ParDoOperationTest {
 
     operation.start();
     operation.process("hello");
-    operation.finishKey();
+    operation.finishKey("key");
     operation.finish();
 
     InOrder inOrder =
@@ -163,6 +166,7 @@ public class ParDoOperationTest {
     inOrder.verify(processCloseable).close();
     inOrder.verify(context).enterProcessTimers();
     inOrder.verify(fn).processTimers();
+    inOrder.verify(fn).finishKey("key");
     inOrder.verify(processTimersCloseable).close();
     inOrder.verify(context).enterFinish();
     inOrder.verify(fn).finishBundle();
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
index 99ce3dc6988..5e8703a05b0 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
 /**
@@ -284,6 +285,9 @@ abstract class DoFnRunnerFactory<InT, T> implements 
Serializable {
         }
       }
 
+      @Override
+      public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
+
       @Override
       public DoFn<InT, T> getFn() {
         throw new UnsupportedOperationException();
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
index 28dbf44cb8f..7202de0f0aa 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
 /** DoFnRunner decorator which registers {@link MetricsContainer}. */
@@ -104,6 +105,9 @@ class DoFnRunnerWithMetrics<InT, OutT> implements 
DoFnRunnerWithTeardown<InT, Ou
     }
   }
 
+  @Override
+  public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
+
   @Override
   public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {
     delegate.onWindowExpiration(window, timestamp, key);
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
index c8cd7eb5f26..bc434b2117d 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
 /** DoFnRunner decorator which registers {@link MetricsContainerImpl}. */
@@ -103,6 +104,9 @@ public class DoFnRunnerWithMetrics<InputT, OutputT> 
implements DoFnRunner<InputT
     }
   }
 
+  @Override
+  public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
+
   @Override
   public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {
     delegate.onWindowExpiration(window, timestamp, key);
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
index 2dc428d5a6b..35fe7b745ea 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -259,6 +260,9 @@ public class SparkInputDataProcessorTest {
     @Override
     public void finishBundle() {}
 
+    @Override
+    public <KeyT extends @Nullable Object> void finishKey(KeyT key) {}
+
     @Override
     public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
timestamp, KeyT key) {}
 

Reply via email to