This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch pr-37886 in repository https://gitbox.apache.org/repos/asf/beam.git
commit eaaf5078b7d4499fc4fb538de2c5c00053d3730e Author: Tobi Kaymak <[email protected]> AuthorDate: Wed Mar 18 10:08:58 2026 +0000 Fix SamzaDoFnRunner (fixes #26126) no unchecked warning suppression address code review comment Upgrade to Samza 1.8.0 / Scala 2.12 trigger post-commit test Fix SamzaStoreStateInternalsTest to reflect updated signature --- .../trigger_files/beam_PostCommit_XVR_Samza.json | 2 +- runners/samza/build.gradle | 16 ++-- .../runners/samza/runtime/SamzaDoFnRunners.java | 87 +++++++++++++++++++++- .../runtime/SamzaStoreStateInternalsTest.java | 4 +- sdks/go/test/integration/integration.go | 3 - 5 files changed, 93 insertions(+), 19 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_XVR_Samza.json b/.github/trigger_files/beam_PostCommit_XVR_Samza.json index 2bf3f556083..34de7f992fa 100644 --- a/.github/trigger_files/beam_PostCommit_XVR_Samza.json +++ b/.github/trigger_files/beam_PostCommit_XVR_Samza.json @@ -1 +1 @@ -{"modification": 2} \ No newline at end of file +{"modification": 3} \ No newline at end of file diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle index 626588b79a5..205f585ae70 100644 --- a/runners/samza/build.gradle +++ b/runners/samza/build.gradle @@ -40,7 +40,7 @@ configurations { validatesRunner } -def samza_version = "1.6.0" +def samza_version = "1.8.0" dependencies { implementation library.java.vendored_guava_32_1_2_jre @@ -55,14 +55,14 @@ dependencies { implementation library.java.commons_io implementation library.java.commons_collections runtimeOnly "org.rocksdb:rocksdbjni:6.15.2" - runtimeOnly "org.scala-lang:scala-library:2.11.8" + runtimeOnly "org.scala-lang:scala-library:2.12.20" implementation "org.apache.samza:samza-api:$samza_version" - implementation "org.apache.samza:samza-core_2.11:$samza_version" - runtimeOnly "org.apache.samza:samza-kafka_2.11:$samza_version" - runtimeOnly "org.apache.samza:samza-kv_2.11:$samza_version" - implementation "org.apache.samza:samza-kv-rocksdb_2.11:$samza_version" - implementation "org.apache.samza:samza-kv-inmemory_2.11:$samza_version" - implementation "org.apache.samza:samza-yarn_2.11:$samza_version" + implementation "org.apache.samza:samza-core_2.12:$samza_version" + runtimeOnly "org.apache.samza:samza-kafka_2.12:$samza_version" + runtimeOnly "org.apache.samza:samza-kv_2.12:$samza_version" + implementation "org.apache.samza:samza-kv-rocksdb_2.12:$samza_version" + implementation "org.apache.samza:samza-kv-inmemory_2.12:$samza_version" + implementation "org.apache.samza:samza-yarn_2.12:$samza_version" compileOnly library.java.error_prone_annotations runtimeOnly "org.apache.kafka:kafka-clients:2.0.1" implementation library.java.vendored_grpc_1_69_0 diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java index a2ec88a4341..4e3e5285f7c 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java @@ -34,11 +34,15 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternalsFactory; +import org.apache.beam.runners.fnexecution.control.BundleCheckpointHandler; +import org.apache.beam.runners.fnexecution.control.BundleCheckpointHandlers; import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; import org.apache.beam.runners.fnexecution.control.RemoteBundle; import org.apache.beam.runners.fnexecution.control.StageBundleFactory; @@ -58,6 +62,7 @@ import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.Timer; import org.apache.beam.sdk.util.construction.graph.ExecutableStage; import org.apache.beam.sdk.util.construction.graph.PipelineNode; @@ -236,6 +241,19 @@ public class SamzaDoFnRunners { sideInputMapping, sideInputHandler); + final Coder<BoundedWindow> windowCoder = + WindowUtils.getWindowStrategy( + executableStage.getInputPCollection().getId(), executableStage.getComponents()) + .getWindowFn() + .windowCoder(); + final BundleCheckpointHandler bundleCheckpointHandler = + createBundleCheckpointHandler( + executableStage, + nonKeyedStateInternalsFactory, + timerInternalsFactory, + windowedValueCoder, + windowCoder); + final SamzaExecutionContext executionContext = (SamzaExecutionContext) context.getApplicationContainerContext(); final DoFnRunner<InT, FnOutT> underlyingRunner = @@ -251,13 +269,47 @@ public class SamzaDoFnRunners { bundledEventsBag, stateRequestHandler, samzaExecutionContext, - executableStage.getTransforms()); + executableStage.getTransforms(), + bundleCheckpointHandler, + nonKeyedStateInternalsFactory, + windowedValueCoder, + windowCoder); return pipelineOptions.getEnableMetrics() ? DoFnRunnerWithMetrics.wrap( underlyingRunner, executionContext.getMetricsContainer(), transformFullName) : underlyingRunner; } + private static boolean hasSDF(ExecutableStage executableStage) { + return executableStage.getTransforms().stream() + .map(transform -> transform.getTransform().getSpec().getUrn()) + .anyMatch( + PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN::equals); + } + + private static <InT> BundleCheckpointHandler createBundleCheckpointHandler( + ExecutableStage executableStage, + SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, + SamzaTimerInternalsFactory<?> timerInternalsFactory, + Coder<WindowedValue<InT>> windowedValueCoder, + Coder<BoundedWindow> windowCoder) { + if (!hasSDF(executableStage)) { + return response -> { + throw new UnsupportedOperationException( + "Self-checkpoint is only supported on splittable DoFn."); + }; + } + // For SDF in a non-keyed context, we always use null as the state/timer key. + // Factories are typed as <InT> so the handler's type parameter matches the coder, + // avoiding any unchecked cast. + StateInternalsFactory<InT> sdfStateFactory = + key -> nonKeyedStateInternalsFactory.stateInternalsForKey(null); + TimerInternalsFactory<InT> sdfTimerFactory = + key -> timerInternalsFactory.timerInternalsForKey(null); + return new BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler<>( + sdfTimerFactory, sdfStateFactory, windowedValueCoder, windowCoder); + } + static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT, FnOutT> { private static final int DEFAULT_METRIC_SAMPLE_RATE = 100; @@ -277,6 +329,10 @@ public class SamzaDoFnRunners { private long startBundleTime; private final String stepName; private final Collection<PipelineNode.PTransformNode> pTransformNodes; + private final BundleCheckpointHandler bundleCheckpointHandler; + private final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory; + private final Coder<WindowedValue<InT>> windowedValueCoder; + private final Coder<BoundedWindow> windowCoder; private SdkHarnessDoFnRunner( SamzaPipelineOptions pipelineOptions, @@ -289,7 +345,11 @@ public class SamzaDoFnRunners { BagState<WindowedValue<InT>> bundledEventsBag, StateRequestHandler stateRequestHandler, SamzaExecutionContext samzaExecutionContext, - Collection<PipelineNode.PTransformNode> pTransformNodes) { + Collection<PipelineNode.PTransformNode> pTransformNodes, + BundleCheckpointHandler bundleCheckpointHandler, + SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory, + Coder<WindowedValue<InT>> windowedValueCoder, + Coder<BoundedWindow> windowCoder) { this.pipelineOptions = pipelineOptions; this.timerInternalsFactory = timerInternalsFactory; this.windowingStrategy = windowingStrategy; @@ -301,6 +361,10 @@ public class SamzaDoFnRunners { this.samzaExecutionContext = samzaExecutionContext; this.stepName = stepName; this.pTransformNodes = pTransformNodes; + this.bundleCheckpointHandler = bundleCheckpointHandler; + this.nonKeyedStateInternalsFactory = nonKeyedStateInternalsFactory; + this.windowedValueCoder = windowedValueCoder; + this.windowCoder = windowCoder; } @SuppressWarnings("unchecked") @@ -328,7 +392,6 @@ public class SamzaDoFnRunners { } }; - final Coder<BoundedWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder(); final TimerReceiverFactory timerReceiverFactory = new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder); @@ -350,7 +413,9 @@ public class SamzaDoFnRunners { receiverFactory, timerReceiverFactory, stateRequestHandler, - samzaMetricsBundleProgressHandler); + samzaMetricsBundleProgressHandler, + null, + bundleCheckpointHandler); startBundleTime = getStartBundleTime(); @@ -433,6 +498,20 @@ public class SamzaDoFnRunners { Instant outputTimestamp, TimeDomain timeDomain, CausedByDrain causedByDrain) { + // SDF checkpoint timers are handled by loading the stored residual and re-processing it. + if (BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler.isSdfTimer(timerId)) { + StateInternals stateInternals = nonKeyedStateInternalsFactory.stateInternalsForKey(null); + WindowedValue<InT> residual = + stateInternals + .state( + StateNamespaces.window(windowCoder, window), + StateTags.value(timerId, windowedValueCoder)) + .read(); + if (residual != null) { + processElement(residual); + } + return; + } final KV<String, String> timerReceiverKey = TimerReceiverFactory.decodeTimerDataTimerId(timerFamilyId); final FnDataReceiver<Timer> timerReceiver = diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java index 9409efbcf39..475c09a05ca 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java @@ -75,7 +75,6 @@ import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.storage.kv.KeyValueStoreMetrics; import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory; import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore; -import org.apache.samza.system.SystemStreamPartition; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -309,11 +308,10 @@ public class SamzaStoreStateInternalsTest implements Serializable { public static class TestStorageEngine extends InMemoryKeyValueStorageEngineFactory { @Override - public KeyValueStore<byte[], byte[]> getKVStore( + protected KeyValueStore<byte[], byte[]> getKVStore( String storeName, File storeDir, MetricsRegistry registry, - SystemStreamPartition changeLogSystemStreamPartition, JobContext jobContext, ContainerContext containerContext, StorageEngineFactory.StoreMode readWrite) { diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index ea23c5f9ae0..54cd07075b2 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -237,9 +237,6 @@ var samzaFilters = []string{ "TestMapStateClear", "TestSetState", "TestSetStateClear", - // TODO(https://github.com/apache/beam/issues/26126): Java runner issue (AcitveBundle has no regsitered handler) - "TestDebeziumIO_BasicRead", - // Samza does not support state. "TestTimers.*", "TestBagStateBlindWrite",
