This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch pr-37886
in repository https://gitbox.apache.org/repos/asf/beam.git

commit eaaf5078b7d4499fc4fb538de2c5c00053d3730e
Author: Tobi Kaymak <[email protected]>
AuthorDate: Wed Mar 18 10:08:58 2026 +0000

    Fix SamzaDoFnRunner (fixes #26126)
    
    no unchecked warning suppression
    
    address code review comment
    
    Upgrade to Samza 1.8.0 / Scala 2.12
    
    trigger post-commit test
    
    Fix SamzaStoreStateInternalsTest to reflect updated signature
---
 .../trigger_files/beam_PostCommit_XVR_Samza.json   |  2 +-
 runners/samza/build.gradle                         | 16 ++--
 .../runners/samza/runtime/SamzaDoFnRunners.java    | 87 +++++++++++++++++++++-
 .../runtime/SamzaStoreStateInternalsTest.java      |  4 +-
 sdks/go/test/integration/integration.go            |  3 -
 5 files changed, 93 insertions(+), 19 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_XVR_Samza.json 
b/.github/trigger_files/beam_PostCommit_XVR_Samza.json
index 2bf3f556083..34de7f992fa 100644
--- a/.github/trigger_files/beam_PostCommit_XVR_Samza.json
+++ b/.github/trigger_files/beam_PostCommit_XVR_Samza.json
@@ -1 +1 @@
-{"modification":  2}
\ No newline at end of file
+{"modification":  3}
\ No newline at end of file
diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle
index 626588b79a5..205f585ae70 100644
--- a/runners/samza/build.gradle
+++ b/runners/samza/build.gradle
@@ -40,7 +40,7 @@ configurations {
   validatesRunner
 }
 
-def samza_version = "1.6.0"
+def samza_version = "1.8.0"
 
 dependencies {
   implementation library.java.vendored_guava_32_1_2_jre
@@ -55,14 +55,14 @@ dependencies {
   implementation library.java.commons_io
   implementation library.java.commons_collections
   runtimeOnly "org.rocksdb:rocksdbjni:6.15.2"
-  runtimeOnly "org.scala-lang:scala-library:2.11.8"
+  runtimeOnly "org.scala-lang:scala-library:2.12.20"
   implementation "org.apache.samza:samza-api:$samza_version"
-  implementation "org.apache.samza:samza-core_2.11:$samza_version"
-  runtimeOnly "org.apache.samza:samza-kafka_2.11:$samza_version"
-  runtimeOnly "org.apache.samza:samza-kv_2.11:$samza_version"
-  implementation "org.apache.samza:samza-kv-rocksdb_2.11:$samza_version"
-  implementation "org.apache.samza:samza-kv-inmemory_2.11:$samza_version"
-  implementation "org.apache.samza:samza-yarn_2.11:$samza_version"
+  implementation "org.apache.samza:samza-core_2.12:$samza_version"
+  runtimeOnly "org.apache.samza:samza-kafka_2.12:$samza_version"
+  runtimeOnly "org.apache.samza:samza-kv_2.12:$samza_version"
+  implementation "org.apache.samza:samza-kv-rocksdb_2.12:$samza_version"
+  implementation "org.apache.samza:samza-kv-inmemory_2.12:$samza_version"
+  implementation "org.apache.samza:samza-yarn_2.12:$samza_version"
   compileOnly library.java.error_prone_annotations
   runtimeOnly "org.apache.kafka:kafka-clients:2.0.1"
   implementation library.java.vendored_grpc_1_69_0
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index a2ec88a4341..4e3e5285f7c 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -34,11 +34,15 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.fnexecution.control.BundleCheckpointHandler;
+import org.apache.beam.runners.fnexecution.control.BundleCheckpointHandlers;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.runners.fnexecution.control.RemoteBundle;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
@@ -58,6 +62,7 @@ import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
 import org.apache.beam.sdk.util.construction.Timer;
 import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
 import org.apache.beam.sdk.util.construction.graph.PipelineNode;
@@ -236,6 +241,19 @@ public class SamzaDoFnRunners {
                 sideInputMapping,
             sideInputHandler);
 
+    final Coder<BoundedWindow> windowCoder =
+        WindowUtils.getWindowStrategy(
+                executableStage.getInputPCollection().getId(), 
executableStage.getComponents())
+            .getWindowFn()
+            .windowCoder();
+    final BundleCheckpointHandler bundleCheckpointHandler =
+        createBundleCheckpointHandler(
+            executableStage,
+            nonKeyedStateInternalsFactory,
+            timerInternalsFactory,
+            windowedValueCoder,
+            windowCoder);
+
     final SamzaExecutionContext executionContext =
         (SamzaExecutionContext) context.getApplicationContainerContext();
     final DoFnRunner<InT, FnOutT> underlyingRunner =
@@ -251,13 +269,47 @@ public class SamzaDoFnRunners {
             bundledEventsBag,
             stateRequestHandler,
             samzaExecutionContext,
-            executableStage.getTransforms());
+            executableStage.getTransforms(),
+            bundleCheckpointHandler,
+            nonKeyedStateInternalsFactory,
+            windowedValueCoder,
+            windowCoder);
     return pipelineOptions.getEnableMetrics()
         ? DoFnRunnerWithMetrics.wrap(
             underlyingRunner, executionContext.getMetricsContainer(), 
transformFullName)
         : underlyingRunner;
   }
 
+  private static boolean hasSDF(ExecutableStage executableStage) {
+    return executableStage.getTransforms().stream()
+        .map(transform -> transform.getTransform().getSpec().getUrn())
+        .anyMatch(
+            
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN::equals);
+  }
+
+  private static <InT> BundleCheckpointHandler createBundleCheckpointHandler(
+      ExecutableStage executableStage,
+      SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
+      SamzaTimerInternalsFactory<?> timerInternalsFactory,
+      Coder<WindowedValue<InT>> windowedValueCoder,
+      Coder<BoundedWindow> windowCoder) {
+    if (!hasSDF(executableStage)) {
+      return response -> {
+        throw new UnsupportedOperationException(
+            "Self-checkpoint is only supported on splittable DoFn.");
+      };
+    }
+    // For SDF in a non-keyed context, we always use null as the state/timer 
key.
+    // Factories are typed as <InT> so the handler's type parameter matches 
the coder,
+    // avoiding any unchecked cast.
+    StateInternalsFactory<InT> sdfStateFactory =
+        key -> nonKeyedStateInternalsFactory.stateInternalsForKey(null);
+    TimerInternalsFactory<InT> sdfTimerFactory =
+        key -> timerInternalsFactory.timerInternalsForKey(null);
+    return new BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler<>(
+        sdfTimerFactory, sdfStateFactory, windowedValueCoder, windowCoder);
+  }
+
   static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT, 
FnOutT> {
 
     private static final int DEFAULT_METRIC_SAMPLE_RATE = 100;
@@ -277,6 +329,10 @@ public class SamzaDoFnRunners {
     private long startBundleTime;
     private final String stepName;
     private final Collection<PipelineNode.PTransformNode> pTransformNodes;
+    private final BundleCheckpointHandler bundleCheckpointHandler;
+    private final SamzaStoreStateInternals.Factory<?> 
nonKeyedStateInternalsFactory;
+    private final Coder<WindowedValue<InT>> windowedValueCoder;
+    private final Coder<BoundedWindow> windowCoder;
 
     private SdkHarnessDoFnRunner(
         SamzaPipelineOptions pipelineOptions,
@@ -289,7 +345,11 @@ public class SamzaDoFnRunners {
         BagState<WindowedValue<InT>> bundledEventsBag,
         StateRequestHandler stateRequestHandler,
         SamzaExecutionContext samzaExecutionContext,
-        Collection<PipelineNode.PTransformNode> pTransformNodes) {
+        Collection<PipelineNode.PTransformNode> pTransformNodes,
+        BundleCheckpointHandler bundleCheckpointHandler,
+        SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
+        Coder<WindowedValue<InT>> windowedValueCoder,
+        Coder<BoundedWindow> windowCoder) {
       this.pipelineOptions = pipelineOptions;
       this.timerInternalsFactory = timerInternalsFactory;
       this.windowingStrategy = windowingStrategy;
@@ -301,6 +361,10 @@ public class SamzaDoFnRunners {
       this.samzaExecutionContext = samzaExecutionContext;
       this.stepName = stepName;
       this.pTransformNodes = pTransformNodes;
+      this.bundleCheckpointHandler = bundleCheckpointHandler;
+      this.nonKeyedStateInternalsFactory = nonKeyedStateInternalsFactory;
+      this.windowedValueCoder = windowedValueCoder;
+      this.windowCoder = windowCoder;
     }
 
     @SuppressWarnings("unchecked")
@@ -328,7 +392,6 @@ public class SamzaDoFnRunners {
               }
             };
 
-        final Coder<BoundedWindow> windowCoder = 
windowingStrategy.getWindowFn().windowCoder();
         final TimerReceiverFactory timerReceiverFactory =
             new TimerReceiverFactory(stageBundleFactory, 
this::timerDataConsumer, windowCoder);
 
@@ -350,7 +413,9 @@ public class SamzaDoFnRunners {
                 receiverFactory,
                 timerReceiverFactory,
                 stateRequestHandler,
-                samzaMetricsBundleProgressHandler);
+                samzaMetricsBundleProgressHandler,
+                null,
+                bundleCheckpointHandler);
 
         startBundleTime = getStartBundleTime();
 
@@ -433,6 +498,20 @@ public class SamzaDoFnRunners {
         Instant outputTimestamp,
         TimeDomain timeDomain,
         CausedByDrain causedByDrain) {
+      // SDF checkpoint timers are handled by loading the stored residual and 
re-processing it.
+      if 
(BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler.isSdfTimer(timerId))
 {
+        StateInternals stateInternals = 
nonKeyedStateInternalsFactory.stateInternalsForKey(null);
+        WindowedValue<InT> residual =
+            stateInternals
+                .state(
+                    StateNamespaces.window(windowCoder, window),
+                    StateTags.value(timerId, windowedValueCoder))
+                .read();
+        if (residual != null) {
+          processElement(residual);
+        }
+        return;
+      }
       final KV<String, String> timerReceiverKey =
           TimerReceiverFactory.decodeTimerDataTimerId(timerFamilyId);
       final FnDataReceiver<Timer> timerReceiver =
diff --git 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
index 9409efbcf39..475c09a05ca 100644
--- 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
+++ 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
@@ -75,7 +75,6 @@ import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
 import 
org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory;
 import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore;
-import org.apache.samza.system.SystemStreamPartition;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -309,11 +308,10 @@ public class SamzaStoreStateInternalsTest implements 
Serializable {
   public static class TestStorageEngine extends 
InMemoryKeyValueStorageEngineFactory {
 
     @Override
-    public KeyValueStore<byte[], byte[]> getKVStore(
+    protected KeyValueStore<byte[], byte[]> getKVStore(
         String storeName,
         File storeDir,
         MetricsRegistry registry,
-        SystemStreamPartition changeLogSystemStreamPartition,
         JobContext jobContext,
         ContainerContext containerContext,
         StorageEngineFactory.StoreMode readWrite) {
diff --git a/sdks/go/test/integration/integration.go 
b/sdks/go/test/integration/integration.go
index ea23c5f9ae0..54cd07075b2 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -237,9 +237,6 @@ var samzaFilters = []string{
        "TestMapStateClear",
        "TestSetState",
        "TestSetStateClear",
-       // TODO(https://github.com/apache/beam/issues/26126): Java runner issue 
(AcitveBundle has no regsitered handler)
-       "TestDebeziumIO_BasicRead",
-
        // Samza does not support state.
        "TestTimers.*",
        "TestBagStateBlindWrite",

Reply via email to