This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d008bad91f [runners-spark] Add Spark 4 runner (#38255)
1d008bad91f is described below

commit 1d008bad91fd1347cd991b0f06b342d4462bcf39
Author: Tobias Kaymak <[email protected]>
AuthorDate: Mon May 11 20:50:17 2026 +0200

    [runners-spark] Add Spark 4 runner (#38255)
    
    * build: add Spark 4.0.2 version property and Scala 2.13 support
    
    Add spark4_version (4.0.2) to BeamModulePlugin alongside the existing
    spark3_version. Update spark_runner.gradle to conditionally select the
    correct Scala library (2.13 vs 2.12), Jackson module, Kafka test
    dependency, and require Java 17 when building against Spark 4.
    
    Register the new :runners:spark:4 module in settings.gradle.kts.
    
    These changes are purely additive — all conditionals gate on
    spark_version.startsWith("4") or spark_scala_version == '2.13', leaving
    the Spark 3 build path untouched.
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    
    * refactor: make shared Spark source compatible with Scala 2.12 and 2.13
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * build: add runners/spark/4/ build configuration
    
    Add the Gradle build file for the Spark 4 structured streaming runner.
    The module mirrors runners/spark/3/ — it inherits the shared RDD-base
    source from runners/spark/src/ via copySourceBase and adds its own
    Structured Streaming implementation in src/main/java.
    
    Key differences from the Spark 3 build:
    - Uses spark4_version (4.0.2) with Scala 2.13.
    - Excludes DStream-based streaming tests (Spark 4 supports only
      structured streaming batch).
    - Unconditionally adds --add-opens JVM flags required by Kryo on
      Java 17 (Spark 4's minimum).
    - Binds Spark driver to 127.0.0.1 for macOS compatibility.
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    
    * feat: add Spark 4 structured streaming runner source
    
    Add the Spark 4 structured streaming runner implementation and tests.
    Most files are adapted from the Spark 3 structured streaming runner
    with targeted changes for Spark 4 / Scala 2.13 API compatibility.
    
    Key Spark 4-specific changes (diff against runners/spark/3/src/):
    
    EncoderFactory — Replaced the direct ExpressionEncoder constructor
      (removed in Spark 4) with BeamAgnosticEncoder, a named class
      implementing both AgnosticExpressionPathEncoder (for expression
      delegation via toCatalyst/fromCatalyst) and AgnosticEncoders
      .StructEncoder (so Dataset.select(TypedColumn) creates an N-attribute
      plan, preventing FIELD_NUMBER_MISMATCH). The toCatalyst/fromCatalyst
      methods substitute the provided input expression via transformUp,
      enabling correct nesting inside composite encoders like
      Encoders.tuple().
    
    EncoderHelpers — Added toExpressionEncoder() helper to handle Spark 4
      built-in encoders that are AgnosticEncoder subclasses rather than
      ExpressionEncoder.
    
    GroupByKeyTranslatorBatch — Migrated from internal catalyst Expression
      API (CreateNamedStruct, Literal$) to public Column API (struct(),
      lit(), array()), as required by Spark 4.
    
    BoundedDatasetFactory — Use classic.Dataset$.MODULE$.ofRows() as
      Dataset moved to org.apache.spark.sql.classic in Spark 4.
    
    ScalaInterop — Replace WrappedArray.ofRef (removed in Scala 2.13)
      with JavaConverters.asScalaBuffer().toList() in seqOf().
    
    GroupByKeyHelpers, CombinePerKeyTranslatorBatch — Replace
      TraversableOnce with IterableOnce (Scala 2.13 rename).
    
    SparkStructuredStreamingPipelineResult — Replace sparkproject.guava
      with Beam's vendored Guava.
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    
    * ci: add Spark 4 PreCommit and PostCommit workflows
    
    Add GitHub Actions workflows for the Spark 4 runner module:
    
    - beam_PreCommit_Java_Spark4_Versions: runs sparkVersionsTest on
      changes to runners/spark/**.  Currently a no-op (the sparkVersions
      map is empty) but scaffolds future patch version coverage.
    - beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming: runs
      the structured streaming test suite on Java 17.
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    
    * Add PreCommit Java Spark4 Versions workflow
    
    * Add cancellation support to Spark pipeline execution
    
    * Remove unused endOfData() call in close method
    
    Remove endOfData() call in close method.
    
    * build: add Spark 4 job-server and container modules
    
    Add job-server and container build configurations for Spark 4,
    mirroring the existing Spark 3 job-server setup. The container
    uses eclipse-temurin:17 (Spark 4 requires Java 17). The shared
    spark_job_server.gradle gains a requireJavaVersion conditional
    for Spark 4 parent projects.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * build: remove spark.driver.host workaround from Spark 4 build
    
    The hostname binding hack is no longer needed now that the local
    machine resolves its hostname to 127.0.0.1 via /etc/hosts.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * docs: add Spark 4 runner entry to CHANGES.md
    
    Called out in /ultrareview as a missing contributor checklist item.
    Adds a Highlight line and a New Features / Improvements entry under
    the 2.74.0 Unreleased section, referencing issue #36841.
    
    * docs: explain classic.SparkSession downcast in BoundedDatasetFactory
    
    Per /ultrareview feedback: the one-line comment didn't make clear why
    the cast is safe. Expand it to note that SparkSession.builder() always
    returns a classic.SparkSession at runtime, which is why the downcast
    avoids reflection.
    
    * fix: log warning when neither WrappedArray nor ArraySeq class is found
    
    Per /ultrareview feedback: the fallback branch silently swallowed the
    second ClassNotFoundException. In practice one of the two classes is
    always present (Scala 2.12 vs 2.13 stdlib), but a silent skip could
    mask a broken classpath. Emit a LOG.warn instead.
    
    * build: compare spark_version numerically via isSparkAtLeast helper
    
    Per /ultrareview feedback: the five `"$spark_version" >= "3.5.0"` checks
    were lexicographic string comparisons. They happened to work for 3.5.0
    and 4.0.2 only because '4' > '3' as chars — a future "3.10.0" release
    would compare less than "3.5.0" and silently drop the Spark 3.5+
    dependencies and exclusions.
    
    Introduce an `isSparkAtLeast` closure that tokenizes on `.` and `-`,
    keeps numeric parts, and compares component-by-component. Replace all
    five call sites.
    
    * [Spark Runner] Slim Spark 4 to override-only files
    
    With spark_runner.gradle now layering per-major source overrides on top
    of the shared base, runners/spark/4/src/ no longer needs to duplicate
    62 byte-identical structured-streaming files. Keep only the 11 files
    that actually differ for Spark 4 / Scala 2.13. Switch the build.gradle
    to spark_major = '4' (the new mechanism) and bump spark_versions to 3,4.
    
    Compiled output unchanged — the deleted files are reproduced identically
    inside build/source-overrides by the Copy task.
    
    * [Spark Runner] Use java.io.Serializable in DoFnRunnerFactory base
    
    scala.Serializable was removed in Scala 2.13. java.io.Serializable
    works identically on both Scala 2.12 and 2.13, so this can live in
    the shared base instead of needing a Spark-4-only override file.
    
    * [Spark Runner] Null-guard error message logging in EvaluationContext base
    
    Wrap Throwables.getRootCause(e).getMessage() in String.valueOf(...)
    to make the error logging robust to a null root-cause message. The
    behaviour change applies equally to Spark 3 and Spark 4, so the
    fix lives in the shared base and the Spark-4 override is dropped.
    
    * [Spark Runner] Cancel execution future and use Beam-vendored Guava in 
PipelineResult
    
    Two changes that previously lived only in the Spark-4 override and
    are equally valid for Spark 3:
    
    1. cancel() now actually cancels the executing future
       (pipelineExecution.cancel(true)) in addition to setting the state
       to CANCELLED. Without this, calling cancel() left the pipeline
       running silently — a real bug, not a Spark-4 specific concern.
    
    2. Switch from Spark's shaded guava (org.sparkproject.guava) to the
       Beam-vendored guava that is already on the classpath. Spark 4
       no longer exposes the sparkproject guava package; using the
       vendored one removes the version coupling for both runners.
    
    * ci: re-trigger to clear flaky UnboundedScheduledExecutorServiceTest
    
    Empty commit to re-run CI. The only failure on the prior head was
    
UnboundedScheduledExecutorServiceTest.testThreadsAreAddedOnlyAsNeededWithContention,
    a known flake (apache/beam#31590) — the test itself acknowledges
    contention-induced extra threads in its inline comment. Squash or
    drop on rebase before merge.
    
    * [Spark Runner] Fix maxTimestamp to handle multi-window values
    
    Iterables.getOnlyElement(windows) crashes with IllegalArgumentException
    when a WindowedValue is associated with more than one window (e.g. after
    a sliding window assignment). Compute the max maxTimestamp() across all
    associated windows instead, falling back to a clear error if the iterable
    is unexpectedly empty.
    
    Applied identically to the shared base and the Spark 4 override. Flagged
    by Gemini Code Assist on PR #38255.
    
    * [Spark Runner] Drop unchecked cast in BoundedDatasetFactory.split
    
    source.split returns List<? extends BoundedSource<T>>, which already
    satisfies the subsequent stream usage. The cast was unchecked and would
    trip heap-pollution warnings. Applied identically to the shared base
    and the Spark 4 override. Flagged by Gemini Code Assist on PR #38255.
    
    * [Spark Runner] Drop redundant Iterator cast in Spark 4 
GroupByKeyTranslatorBatch
    
    The (Iterator<V>) cast inside fun2 is redundant: fun2's signature
    infers the iterator type. The shared base translator at the analogous
    call site already calls iterableOnce(it) without a cast. Flagged by
    Gemini Code Assist on PR #38255.
    
    * [Spark Runner] Spark 4 EncoderFactory: stable constructor lookup + 
document trait setter
    
    Replace getConstructors()[0] (JVM-defined ordering, not stable) with a
    helper that picks the widest public constructor. The downstream switch
    already dispatches on parameter count to pick the right argument shape
    per Spark version, so this just makes the choice deterministic.
    
    Also document the org$apache$spark...$_setter_$isStruct_$eq method —
    it is the synthetic setter the Scala compiler emits for trait val fields,
    required when implementing AgnosticEncoders.StructEncoder from Java.
    
    Both flagged by Gemini Code Assist on PR #38255.
    
    * [Spark Runner] Fix Javadoc/comment typos flagged by Gemini
    
    Three trivial typos flagged on PR #38255 round 2 review, applied
    identically to the shared base and the Spark 4 override:
    
    - CombinePerKeyTranslatorBatch: "other there other missing features?"
      -> "are there other missing features?"
    - GroupByKeyTranslatorBatch: "build-in" -> "built-in"
    - EncoderHelpers: PRIMITIV_TYPES -> PRIMITIVE_TYPES (constant + caller)
    
    * [Spark Runner] Switch EncoderFactory.invoke on the right constructor
    
    In EncoderFactory.invoke(Expression obj, ...), the switch was keyed on
    STATIC_INVOKE_CONSTRUCTOR.getParameterCount() but the body actually
    calls INVOKE_CONSTRUCTOR. This worked by coincidence: across the
    supported Spark 3.x versions both constructors happen to share the
    same parameter counts at the same dispatch points. A future Spark
    release where the two diverge would silently pick the wrong branch.
    
    Switch on INVOKE_CONSTRUCTOR.getParameterCount() to match the
    constructor that is actually invoked, and align with the convention
    used by newInstance() further down. In the Spark 4 override this also
    lets us collapse the `case 8: case 9:` fallthrough back to a single
    `case 8:`, since INVOKE_CONSTRUCTOR remains 8 params in Spark 4 even
    though STATIC_INVOKE_CONSTRUCTOR grew to 9.
    
    Applied identically to the shared base and the Spark 4 override.
    Flagged by Gemini Code Assist on PR #38255.
    
    * Update CHANGES.md
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
    
    * [Spark 4] Drop redundant Collection cast in GroupByKeyHelpers
    
    WindowedValue#getWindows() returns Collection<? extends BoundedWindow>,
    which is already an Iterable and can be passed straight to
    ScalaInterop.scalaIterator(...). The intermediate local variable and the
    unchecked cast to Collection<BoundedWindow> were redundant.
    
    Applied in both the shared base and the Spark 4 override.
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
    
    * [Spark 4] Add module README with slf4j-jdk14 known-issue note
    
    Documents the Spark 4 runner's requirements (Java 17, Scala 2.13,
    Spark 4.0.x, batch-only) and the slf4j-jdk14 ↔ jul-to-slf4j conflict
    that is the Spark 4 manifestation of #26985 (fixed for Spark 3 in
    #27001). The shared spark_runner.gradle already excludes slf4j-jdk14
    for in-tree builds; this note tells downstream consumers to mirror the
    exclude when assembling their own runtime classpath against
    beam-runners-spark-4.
    
    * [runners-spark] Address Gemini nits: use encoder terminology in exception 
messages
    
    * Trigger Build
    
    * ci: re-trigger to clear flaky FlinkRequiresStableInputTest
    
    The PreCommit Java failure on the previous run was a single timeout in
    FlinkRequiresStableInputTest.testParDoRequiresStableInputPortable
    (:runners:flink:1.17:test) — known flake tracked in #21333. This PR
    does not touch any Flink code. Squash or drop on rebase before merge.
    
    * ci: re-trigger to clear flaky 
SqsIOWriteBatchesTest.testWriteBatchesToDynamicWithStrictTimeout
    
    Wall-clock-timing test (100ms inter-message + 150ms strict batch
    timeout) in sdks/java/io/amazon-web-services2 SQS — unrelated to
    this PR (no AWS2/SQS/Direct-runner files touched), and master is
    green for the same PreCommit on 6106b308.
    
    * ci: re-trigger to clear Maven Central 403 on Windows wordcount
    
    `Java Wordcount Direct Runner (windows-latest)` failed at the
    :buildSrc configure step with HTTP 403 fetching legacy Spotless
    5.6.1 transitive deps from repo.maven.apache.org
    (spotless-lib:2.7.0, durian-*:1.2.0, jgit:5.8.0). Network/infra
    flake — PR doesn't touch examples or buildSrc, master 'Java Tests'
    workflow consistently green.
    
    * ci: re-trigger to clear flaky Spotless + GCP IO Direct PreCommits
    
    Both checks failed on the prior empty retry commit (e19b80c4).
    Reproduced locally at e19b80c4: spotlessCheck and Spark
    checkStyleMain/Test all pass. PR doesn't touch any GCP IO code,
    and both checks were green on the immediately preceding branch
    commits (5abbb21d, 604037f5) and on master (6106b308, e01f7114).
    Treating as infra flakes; squash before merge.
    
    * [runners-spark] Cover Scala-array fallback and EvaluationContext error 
paths
    
    Address codecov/patch on PR #38255 by exercising the new branches added for
    Scala 2.13 / null-safe error logging:
    
    - Refactor SparkRunnerKryoRegistrator's nested Scala-array Class.forName
      fallback into a small @VisibleForTesting findFirstAvailableClass helper
      and add unit tests for first-hit, fallback, no-match, and empty-input
      paths.
    - Add EvaluationContextTest covering the catch (RuntimeException) /
      catch (Exception) blocks in evaluate() and collect(), including the
      null-message path that motivated the String.valueOf wrap.
    
    * [runners-spark] spotless: inline two findFirstAvailableClass calls in test
    
    * flaky SqsIOWriteBatchesTest retry
    
    * flaky ExampleEchoPipelineTest retry
    
    * rebase cleanup: drop duplicate isSparkAtLeast helper now in master via 
#38324
    
    * Address @Abacn 2026-05-07 review
    
    - SparkRunnerKryoRegistrator: throw IllegalStateException instead of
      LOG.warn when neither ArraySeq$ofRef (Scala 2.13) nor WrappedArray$ofRef
      (Scala 2.12) is on the classpath, so the missing class isn't silently
      ignored. Drops the now-unused Logger field and slf4j imports.
    - spark_runner.gradle: declare org.apache.spark:spark-connect-shims_2.13
      as a provided dep gated on isSparkAtLeast("4.0.0"). Spark 4 splits the
      Connect shim classes out of spark-sql; with enableStrictDependencies
      this surfaced as analyzeClassesDependencies usedUndeclaredArtifacts.
      The artifact does not exist for Spark 3, so the gate prevents Spark 3
      resolution failures.
    - runners/spark/4/build.gradle: drop the empty sparkVersions test
      scaffolding (no additional Spark 4.x patch versions to test against
      yet) and delete the now-unused
      .github/workflows/beam_PreCommit_Java_Spark4_Versions.yml workflow
      + its README.md row.
    - EncoderFactory (shared base): revert the line 94 switch to
      STATIC_INVOKE_CONSTRUCTOR.getParameterCount(), keeping Spark 3 behavior
      byte-for-byte unchanged. Spark 4's complete EncoderFactory override
      under runners/spark/4/src/.../EncoderFactory.java is unaffected.
    - CHANGES.md: drop the Highlights line for Spark 4. Will re-add when
      ValidatesRunner tests are set up and confirmed working, matching the
      Phase 1 #38324 pattern.
    - runners/spark/4/job-server/container: delete the entire module
      (build.gradle + Dockerfile) and remove its include() from
      settings.gradle.kts. Per @Abacn's offer to defer the container module
      to portable runner support later. The fat-jar :runners:spark:4:job-server
      module is kept.
    
    ---------
    
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
---
 ..._ValidatesRunner_Spark4StructuredStreaming.json |   3 +
 .../beam_PreCommit_Java_Spark4_Versions.json       |   3 +
 .github/workflows/README.md                        |   1 +
 ...a_ValidatesRunner_Spark4StructuredStreaming.yml |  97 ++++++
 CHANGES.md                                         |   1 -
 gradle.properties                                  |   2 +-
 runners/spark/4/README.md                          |  73 +++++
 runners/spark/4/build.gradle                       |  53 ++++
 runners/spark/4/job-server/build.gradle            |  31 ++
 .../io/BoundedDatasetFactory.java                  |  13 +-
 .../batch/CombinePerKeyTranslatorBatch.java        |   6 +-
 .../translation/batch/GroupByKeyHelpers.java       |   8 +-
 .../batch/GroupByKeyTranslatorBatch.java           |  78 ++---
 .../translation/helpers/EncoderFactory.java        | 333 +++++++++++++++++++++
 .../translation/helpers/EncoderHelpers.java        |  45 ++-
 .../translation/utils/ScalaInterop.java            | 114 +++++++
 .../translation/helpers/EncoderHelpersTest.java    | 298 ++++++++++++++++++
 runners/spark/spark_runner.gradle                  |  23 +-
 .../spark/coders/SparkRunnerKryoRegistrator.java   |  28 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java        |   4 +-
 .../SparkStructuredStreamingPipelineResult.java    |   1 +
 .../io/BoundedDatasetFactory.java                  |   2 +-
 .../translation/EvaluationContext.java             |  10 +-
 .../batch/CombinePerKeyTranslatorBatch.java        |   2 +-
 .../translation/batch/GroupByKeyHelpers.java       |   4 +-
 .../batch/GroupByKeyTranslatorBatch.java           |   2 +-
 .../translation/helpers/EncoderHelpers.java        |  21 +-
 .../coders/SparkRunnerKryoRegistratorTest.java     |  49 +++
 .../translation/EvaluationContextTest.java         |  84 ++++++
 settings.gradle.kts                                |   2 +
 30 files changed, 1307 insertions(+), 84 deletions(-)

diff --git 
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.json
 
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.json
new file mode 100644
index 00000000000..c4edaa85a89
--- /dev/null
+++ 
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.json
@@ -0,0 +1,3 @@
+{
+  "comment": "Modify this file in a trivial way to cause this test suite to 
run"
+}
diff --git a/.github/trigger_files/beam_PreCommit_Java_Spark4_Versions.json 
b/.github/trigger_files/beam_PreCommit_Java_Spark4_Versions.json
new file mode 100644
index 00000000000..c4edaa85a89
--- /dev/null
+++ b/.github/trigger_files/beam_PreCommit_Java_Spark4_Versions.json
@@ -0,0 +1,3 @@
+{
+  "comment": "Modify this file in a trivial way to cause this test suite to 
run"
+}
diff --git a/.github/workflows/README.md b/.github/workflows/README.md
index a6539d18f36..c3c73c0317a 100644
--- a/.github/workflows/README.md
+++ b/.github/workflows/README.md
@@ -370,6 +370,7 @@ PostCommit Jobs run in a schedule against master branch and 
generally do not get
 | [ PostCommit Java ValidatesRunner Flink 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml)
 | N/A |`beam_PostCommit_Java_ValidatesRunner_Flink.json`| 
[![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml?query
 [...]
 | [ PostCommit Java ValidatesRunner Spark 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Spark.yml)
 | N/A |`beam_PostCommit_Java_ValidatesRunner_Spark.json`| 
[![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Spark.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Spark.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Spark.yml?query
 [...]
 | [ PostCommit Java ValidatesRunner SparkStructuredStreaming 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.yml)
 | N/A |`beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json`| 
[![.github/workflows/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.yml/badge.svg?event=schedule)](https:
 [...]
+| [ PostCommit Java ValidatesRunner Spark4StructuredStreaming 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml)
 | N/A |`beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.json`| 
[![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml/badge.svg?event=schedule)](h
 [...]
 | [ PostCommit Java ValidatesRunner Twister2 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Twister2.yml)
 | N/A |`beam_PostCommit_Java_ValidatesRunner_Twister2.json`| 
[![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Twister2.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Twister2.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_
 [...]
 | [ PostCommit Java ValidatesRunner ULR 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_ULR.yml)
 | N/A |`beam_PostCommit_Java_ValidatesRunner_ULR.json`| 
[![.github/workflows/beam_PostCommit_Java_ValidatesRunner_ULR.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_ULR.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_ULR.yml?query=event%3Asch
 [...]
 | [ PostCommit Java 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java.yml) | 
N/A |`beam_PostCommit_Java.json`| 
[![.github/workflows/beam_PostCommit_Java.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java.yml?query=event%3Aschedule)
 |
diff --git 
a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml
 
b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml
new file mode 100644
index 00000000000..b595afe6f42
--- /dev/null
+++ 
b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PostCommit Java ValidatesRunner Spark4 StructuredStreaming
+
+on:
+  schedule:
+    - cron: '45 4/6 * * *'
+  pull_request_target:
+    paths: ['release/trigger_all_tests.json', 
'.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.json']
+  workflow_dispatch:
+
+#Setting explicit permissions for the action to avoid the default permissions 
which are `write-all` in case of pull_request_target event
+permissions:
+  actions: write
+  pull-requests: write
+  checks: write
+  contents: read
+  deployments: read
+  id-token: none
+  issues: write
+  discussions: read
+  packages: read
+  pages: read
+  repository-projects: read
+  security-events: read
+  statuses: read
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+  group: '${{ github.workflow }} @ ${{ github.event.pull_request.number || 
github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || 
github.event.comment.id || github.event.sender.login }}'
+  cancel-in-progress: true
+
+env:
+  DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
+  GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+  GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+  beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming:
+    name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+    runs-on: [self-hosted, ubuntu-24.04, main]
+    timeout-minutes: 120
+    strategy:
+      matrix:
+        job_name: 
[beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming]
+        job_phrase: [Run Spark4 StructuredStreaming ValidatesRunner]
+    if: |
+      github.event_name == 'workflow_dispatch' ||
+      github.event_name == 'pull_request_target' ||
+      (github.event_name == 'schedule' && github.repository == 'apache/beam') 
||
+      github.event.comment.body == 'Run Spark4 StructuredStreaming 
ValidatesRunner'
+    steps:
+      - uses: actions/checkout@v4
+      - name: Setup repository
+        uses: ./.github/actions/setup-action
+        with:
+          comment_phrase: ${{ matrix.job_phrase }}
+          github_token: ${{ secrets.GITHUB_TOKEN }}
+          github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+      - name: Setup environment
+        uses: ./.github/actions/setup-environment-action
+        with:
+          java-version: '17'
+      - name: run validatesStructuredStreamingRunnerBatch script
+        uses: ./.github/actions/gradle-command-self-hosted-action
+        with:
+          gradle-command: 
:runners:spark:4:validatesStructuredStreamingRunnerBatch
+          arguments: |
+            -PtestJavaVersion=17 \
+            -PdisableSpotlessCheck=true \
+      - name: Archive JUnit Test Results
+        uses: actions/upload-artifact@v4
+        if: ${{ !success() }}
+        with:
+          name: JUnit Test Results
+          path: "**/build/reports/tests/"
+      - name: Publish JUnit Test Results
+        uses: EnricoMi/publish-unit-test-result-action@v2
+        if: always()
+        with:
+          commit: '${{ env.prsha || env.GITHUB_SHA }}'
+          comment_mode: ${{ github.event_name == 'issue_comment'  && 'always' 
|| 'off' }}
+          files: '**/build/test-results/**/*.xml'
+          large_files: true
diff --git a/CHANGES.md b/CHANGES.md
index 6deb653ffef..2cf7f983fa3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -60,7 +60,6 @@
 ## Highlights
 
 * New highly anticipated feature X added to Python SDK 
([#X](https://github.com/apache/beam/issues/X)).
-* New highly anticipated feature Y added to Java SDK 
([#Y](https://github.com/apache/beam/issues/Y)).
 
 ## I/Os
 
diff --git a/gradle.properties b/gradle.properties
index 583eb24d896..fc2813f11fe 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -41,6 +41,6 @@ docker_image_default_repo_prefix=beam_
 # supported flink versions
 flink_versions=1.17,1.18,1.19,1.20,2.0
 # supported spark versions
-spark_versions=3
+spark_versions=3,4
 # supported python versions
 python_versions=3.10,3.11,3.12,3.13,3.14
diff --git a/runners/spark/4/README.md b/runners/spark/4/README.md
new file mode 100644
index 00000000000..371a4451265
--- /dev/null
+++ b/runners/spark/4/README.md
@@ -0,0 +1,73 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+# Apache Beam Spark 4 Runner
+
+Experimental Beam runner for Apache Spark 4 (batch-only). Built on the shared
+`runners/spark` source base via `spark_runner.gradle`'s per-version
+source-overrides mechanism: this module contributes the small set of files
+under `src/main/java/.../structuredstreaming/` that diverge from the Spark 3
+implementation. See the parent `runners/spark/` module for the bulk of the
+runner code.
+
+## Requirements
+
+* **Spark 4.0.2** (and other Spark 4.0.x patch releases)
+* **Scala 2.13**
+* **Java 17** — Spark 4 does not run on earlier JDKs
+
+## Status
+
+Batch only. Streaming is tracked in
+[#36841](https://github.com/apache/beam/issues/36841).
+
+## Known issues
+
+### `StackOverflowError` from `slf4j-jdk14` on the runtime classpath
+
+Spark 4 ships `org.slf4j:jul-to-slf4j` to route `java.util.logging` records
+into SLF4J. If `org.slf4j:slf4j-jdk14` is also resolved at runtime — it routes
+the other direction (SLF4J → JUL) — the first log line creates an infinite
+loop:
+
+```
+java.lang.StackOverflowError
+    at org.slf4j.bridge.SLF4JBridgeHandler.publish(...)
+    at java.util.logging.Logger.log(...)
+    at org.slf4j.impl.JDK14LoggerAdapter.log(...)
+    at org.slf4j.bridge.SLF4JBridgeHandler.publish(...)
+    ...
+```
+
+This is the same condition that broke the Spark 3 runner in
+[#26985](https://github.com/apache/beam/issues/26985), fixed in
+[#27001](https://github.com/apache/beam/pull/27001).
+
+The shared `spark_runner.gradle` already excludes `slf4j-jdk14` from the
+runner module's own `configurations.all`, so in-tree builds are unaffected.
+Downstream Gradle consumers that assemble a runtime classpath against
+`beam-runners-spark-4` should mirror that exclude:
+
+```groovy
+configurations.all {
+    exclude group: "org.slf4j", module: "slf4j-jdk14"
+}
+```
+
+For Maven, exclude `org.slf4j:slf4j-jdk14` from any dependency that pulls it
+transitively (commonly the Beam SDK harness and several IO connectors).
diff --git a/runners/spark/4/build.gradle b/runners/spark/4/build.gradle
new file mode 100644
index 00000000000..01fb3680b07
--- /dev/null
+++ b/runners/spark/4/build.gradle
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '..'
+/* All properties required for loading the Spark build script */
+project.ext {
+  spark_major = '4'
+  // Spark 4 version as defined in BeamModulePlugin; requires Scala 2.13 and 
Java 17
+  spark_version = spark4_version
+  spark_scala_version = '2.13'
+  archives_base_name = 'beam-runners-spark-4'
+}
+
+// Load the main build script which contains all build logic.
+// spark_runner.gradle handles the per-version source-overrides Copy:
+// shared base (runners/spark/src/) + previous majors + this module's ./src/ 
are
+// merged into build/source-overrides/src using DuplicatesStrategy.INCLUDE so 
the
+// 11 files under runners/spark/4/src/.../structuredstreaming/ override the
+// shared-base versions.
+apply from: "$basePath/spark_runner.gradle"
+
+// Spark 4 always requires Java 17, so unconditionally add the --add-opens 
flags
+// required by Kryo and other libraries that use reflection on JDK internals.
+test {
+  jvmArgs "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
+          "--add-opens=java.base/java.nio=ALL-UNNAMED",
+          "--add-opens=java.base/java.util=ALL-UNNAMED",
+          "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
+}
+
+// Exclude DStream-based streaming tests from the shared-base copy: the Spark 
4 module
+// supports only structured streaming (batch) and does not include legacy 
DStream support.
+// Streaming test utilities also depend on kafka.server.KafkaServerStartable 
which was
+// removed in Kafka 2.8.0 (the first Kafka version with a _2.13 artifact).
+tasks.named("copyTestSourceOverrides") {
+  exclude "**/translation/streaming/**"
+}
+
diff --git a/runners/spark/4/job-server/build.gradle 
b/runners/spark/4/job-server/build.gradle
new file mode 100644
index 00000000000..598cf3b4913
--- /dev/null
+++ b/runners/spark/4/job-server/build.gradle
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server'
+
+project.ext {
+  // Look for the source code in the parent module
+  main_source_dirs = ["$basePath/src/main/java"]
+  test_source_dirs = ["$basePath/src/test/java"]
+  main_resources_dirs = ["$basePath/src/main/resources"]
+  test_resources_dirs = ["$basePath/src/test/resources"]
+  archives_base_name = 'beam-runners-spark-4-job-server'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/spark_job_server.gradle"
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
similarity index 95%
copy from 
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
copy to 
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
index c0d46e77c1d..d32dc14eccc 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
+++ 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
@@ -51,6 +51,7 @@ import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.Serializer;
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.classic.Dataset$;
 import org.apache.spark.sql.connector.catalog.SupportsRead;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCapability;
@@ -85,7 +86,14 @@ public class BoundedDatasetFactory {
     Params<T> params = new Params<>(encoder, options, 
session.sparkContext().defaultParallelism());
     BeamTable<T> table = new BeamTable<>(source, params);
     LogicalPlan logicalPlan = DataSourceV2Relation.create(table, 
Option.empty(), Option.empty());
-    return Dataset.ofRows(session, logicalPlan).as(encoder);
+    // In Spark 4.0+, Dataset$ moved to org.apache.spark.sql.classic and its 
ofRows() now
+    // takes the classic SparkSession subclass. The runtime instance returned 
by
+    // SparkSession.builder() is always a classic.SparkSession, so the 
downcast is safe and
+    // avoids reflection.
+    return (Dataset<WindowedValue<T>>)
+        Dataset$.MODULE$
+            .ofRows((org.apache.spark.sql.classic.SparkSession) session, 
logicalPlan)
+            .as(encoder);
   }
 
   /**
@@ -241,7 +249,7 @@ public class BoundedDatasetFactory {
       try {
         PipelineOptions options = params.options.get();
         long desiredSize = source.getEstimatedSizeBytes(options) / 
params.numPartitions;
-        List<BoundedSource<T>> split = (List<BoundedSource<T>>) 
source.split(desiredSize, options);
+        List<? extends BoundedSource<T>> split = source.split(desiredSize, 
options);
         IntSupplier idxSupplier = new AtomicInteger(0)::getAndIncrement;
         return split.stream().map(s -> new SourcePartition<>(s, 
idxSupplier)).collect(toList());
       } catch (Exception e) {
@@ -279,7 +287,6 @@ public class BoundedDatasetFactory {
     @SuppressWarnings("nullness") // ok, reader not used any longer
     public void close() throws IOException {
       if (reader != null) {
-        endOfData();
         try {
           reader.close();
         } finally {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
similarity index 97%
copy from 
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
copy to 
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 02c56a8081c..e483c2db0df 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -46,7 +46,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.expressions.Aggregator;
 import scala.Tuple2;
-import scala.collection.TraversableOnce;
+import scala.collection.IterableOnce;
 
 /**
  * Translator for {@link Combine.PerKey} using {@link Dataset#groupByKey} with 
a Spark {@link
@@ -64,7 +64,7 @@ import scala.collection.TraversableOnce;
  * TODOs:
  * <li>combine with context (CombineFnWithContext)?
  * <li>combine with sideInputs?
- * <li>other there other missing features?
+ * <li>are there other missing features?
  */
 class CombinePerKeyTranslatorBatch<K, InT, AccT, OutT>
     extends TransformTranslator<
@@ -140,7 +140,7 @@ class CombinePerKeyTranslatorBatch<K, InT, AccT, OutT>
   }
 
   private static <K, V>
-      Fun1<Tuple2<K, Collection<WindowedValue<V>>>, 
TraversableOnce<WindowedValue<KV<K, V>>>>
+      Fun1<Tuple2<K, Collection<WindowedValue<V>>>, 
IterableOnce<WindowedValue<KV<K, V>>>>
           explodeWindows() {
     return t ->
         ScalaInterop.scalaIterator(t._2).map(wv -> wv.withValue(KV.of(t._1, 
wv.getValue())));
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
similarity index 92%
copy from 
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
copy to 
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
index b7c139068d1..f25121e1b47 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
+++ 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
@@ -21,7 +21,6 @@ import static 
org.apache.beam.runners.spark.structuredstreaming.translation.util
 import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
 import static 
org.apache.beam.sdk.transforms.windowing.TimestampCombiner.END_OF_WINDOW;
 
-import java.util.Collection;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -32,7 +31,7 @@ import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import scala.Tuple2;
-import scala.collection.TraversableOnce;
+import scala.collection.IterableOnce;
 
 /**
  * Package private helpers to support translating grouping transforms using 
`groupByKey` such as
@@ -75,13 +74,12 @@ class GroupByKeyHelpers {
    * traversable of composite keys {@code (BoundedWindow, Key)} and value.
    */
   static <K, V, T>
-      Fun1<WindowedValue<KV<K, V>>, 
TraversableOnce<Tuple2<Tuple2<BoundedWindow, K>, T>>>
+      Fun1<WindowedValue<KV<K, V>>, IterableOnce<Tuple2<Tuple2<BoundedWindow, 
K>, T>>>
           explodeWindowedKey(Fun1<WindowedValue<KV<K, V>>, T> valueFn) {
     return v -> {
       T value = valueFn.apply(v);
       K key = v.getValue().getKey();
-      Collection<BoundedWindow> windows = (Collection<BoundedWindow>) 
v.getWindows();
-      return ScalaInterop.scalaIterator(windows).map(w -> tuple(tuple(w, key), 
value));
+      return ScalaInterop.scalaIterator(v.getWindows()).map(w -> 
tuple(tuple(w, key), value));
     };
   }
 
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
similarity index 85%
copy from 
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
copy to 
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 46cda333482..7caf06cb38f 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -33,7 +33,6 @@ import static 
org.apache.beam.runners.spark.structuredstreaming.translation.util
 import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
 import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun2;
 import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.javaIterator;
-import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
 import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
 import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
@@ -67,21 +66,16 @@ import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.TypedColumn;
-import org.apache.spark.sql.catalyst.expressions.CreateArray;
-import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
-import org.apache.spark.sql.catalyst.expressions.Expression;
-import org.apache.spark.sql.catalyst.expressions.Literal;
-import org.apache.spark.sql.catalyst.expressions.Literal$;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import scala.Tuple2;
 import scala.collection.Iterator;
-import scala.collection.Seq;
+import scala.collection.JavaConverters;
 import scala.collection.immutable.List;
 
 /**
- * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the 
build-in aggregation
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the 
built-in aggregation
  * function {@code collect_list} when applicable.
  *
  * <p>Note: Using {@code collect_list} isn't any worse than using {@link 
ReduceFnRunner}. In the
@@ -106,10 +100,10 @@ class GroupByKeyTranslatorBatch<K, V>
         PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, 
V>> {
 
   /** Literal of binary encoded Pane info. */
-  private static final Expression PANE_NO_FIRING = lit(toByteArray(NO_FIRING, 
PaneInfoCoder.of()));
+  private static final Column PANE_NO_FIRING = lit(toByteArray(NO_FIRING, 
PaneInfoCoder.of()));
 
   /** Defaults for value in single global window. */
-  private static final List<Expression> GLOBAL_WINDOW_DETAILS =
+  private static final List<Column> GLOBAL_WINDOW_DETAILS =
       windowDetails(lit(new byte[][] {EMPTY_BYTE_ARRAY}));
 
   GroupByKeyTranslatorBatch() {
@@ -137,7 +131,8 @@ class GroupByKeyTranslatorBatch<K, V>
             .as(SparkCommonPipelineOptions.class)
             .getPreferGroupByKeyToHandleHugeValues();
     if (useCollectList && eligibleForGlobalGroupBy(windowing, false)) {
-      // Collects all values per key in memory. This might be problematic if 
there's few keys only
+      // Collects all values per key in memory. This might be problematic if 
there's
+      // few keys only
       // or some highly skewed distribution.
       result =
           input
@@ -149,7 +144,8 @@ class GroupByKeyTranslatorBatch<K, V>
                       windowTimestamp(tsCombiner)));
 
     } else if (eligibleForGlobalGroupBy(windowing, true)) {
-      // Produces an iterable that can be traversed exactly once. However, on 
the plus side, data is
+      // Produces an iterable that can be traversed exactly once. However, on 
the plus
+      // side, data is
       // not collected in memory until serialized or done by the user.
       result =
           cxt.getDataset(cxt.getInput())
@@ -161,8 +157,10 @@ class GroupByKeyTranslatorBatch<K, V>
     } else if (useCollectList
         && eligibleForGroupByWindow(windowing, false)
         && (windowing.getWindowFn().assignsToOneWindow() || 
transform.fewKeys())) {
-      // Using the window as part of the key should help to better distribute 
the data. However, if
-      // values are assigned to multiple windows, more data would be shuffled 
around. If there's few
+      // Using the window as part of the key should help to better distribute 
the
+      // data. However, if
+      // values are assigned to multiple windows, more data would be shuffled 
around.
+      // If there's few
       // keys only, this is still valuable.
       // Collects all values per key & window in memory.
       result =
@@ -178,24 +176,28 @@ class GroupByKeyTranslatorBatch<K, V>
 
     } else if (eligibleForGroupByWindow(windowing, true)
         && (windowing.getWindowFn().assignsToOneWindow() || 
transform.fewKeys())) {
-      // Using the window as part of the key should help to better distribute 
the data. However, if
-      // values are assigned to multiple windows, more data would be shuffled 
around. If there's few
+      // Using the window as part of the key should help to better distribute 
the
+      // data. However, if
+      // values are assigned to multiple windows, more data would be shuffled 
around.
+      // If there's few
       // keys only, this is still valuable.
-      // Produces an iterable that can be traversed exactly once. However, on 
the plus side, data is
+      // Produces an iterable that can be traversed exactly once. However, on 
the plus
+      // side, data is
       // not collected in memory until serialized or done by the user.
       Encoder<Tuple2<BoundedWindow, K>> windowedKeyEnc =
           cxt.tupleEncoder(cxt.windowEncoder(), keyEnc);
       result =
           cxt.getDataset(cxt.getInput())
               .flatMap(explodeWindowedKey(valueValue()), 
cxt.tupleEncoder(windowedKeyEnc, valueEnc))
-              .groupByKey(fun1(Tuple2::_1), windowedKeyEnc)
-              .mapValues(fun1(Tuple2::_2), valueEnc)
+              .groupByKey(fun1(t -> t._1()), windowedKeyEnc)
+              .mapValues(fun1(t -> t._2()), valueEnc)
               .mapGroups(
                   fun2((wKey, it) -> windowedKV(wKey, iterableOnce(it))),
                   cxt.windowedEncoder(outputCoder));
 
     } else {
-      // Collects all values per key in memory. This might be problematic if 
there's few keys only
+      // Collects all values per key in memory. This might be problematic if 
there's
+      // few keys only
       // or some highly skewed distribution.
 
       // FIXME Revisit this case, implementation is far from ideal:
@@ -236,12 +238,12 @@ class GroupByKeyTranslatorBatch<K, V>
     return new Column[] {agg.as("timestamp")};
   }
 
-  private static Expression windowTimestamp(TimestampCombiner tsCombiner) {
+  private static Column windowTimestamp(TimestampCombiner tsCombiner) {
     if (tsCombiner.equals(TimestampCombiner.END_OF_WINDOW)) {
       // null will be set to END_OF_WINDOW by the respective deserializer
       return litNull(DataTypes.LongType);
     }
-    return col("timestamp").expr();
+    return col("timestamp");
   }
 
   /**
@@ -260,35 +262,37 @@ class GroupByKeyTranslatorBatch<K, V>
   }
 
   private static <InT, T> TypedColumn<InT, WindowedValue<T>> inGlobalWindow(
-      TypedColumn<?, T> value, Expression ts) {
-    List<Expression> fields = concat(timestampedValue(value, ts), 
GLOBAL_WINDOW_DETAILS);
+      TypedColumn<?, T> value, Column ts) {
+    List<Column> fields = concat(timestampedValue(value, ts), 
GLOBAL_WINDOW_DETAILS);
     Encoder<WindowedValue<T>> enc =
         windowedValueEncoder(value.encoder(), encoderOf(GlobalWindow.class));
-    return (TypedColumn<InT, WindowedValue<T>>) new Column(new 
CreateNamedStruct(fields)).as(enc);
+    return (TypedColumn<InT, WindowedValue<T>>)
+        struct(JavaConverters.asJavaCollection(fields).toArray(new 
Column[0])).as(enc);
   }
 
   public static <InT, T> TypedColumn<InT, WindowedValue<T>> inSingleWindow(
-      TypedColumn<?, T> value, TypedColumn<?, ? extends BoundedWindow> window, 
Expression ts) {
-    Expression windows = new CreateArray(listOf(window.expr()));
-    Seq<Expression> fields = concat(timestampedValue(value, ts), 
windowDetails(windows));
+      TypedColumn<?, T> value, TypedColumn<?, ? extends BoundedWindow> window, 
Column ts) {
+    Column windows = org.apache.spark.sql.functions.array(window);
+    List<Column> fields = concat(timestampedValue(value, ts), 
windowDetails(windows));
     Encoder<WindowedValue<T>> enc = windowedValueEncoder(value.encoder(), 
window.encoder());
-    return (TypedColumn<InT, WindowedValue<T>>) new Column(new 
CreateNamedStruct(fields)).as(enc);
+    return (TypedColumn<InT, WindowedValue<T>>)
+        struct(JavaConverters.asJavaCollection(fields).toArray(new 
Column[0])).as(enc);
   }
 
-  private static List<Expression> timestampedValue(Column value, Expression 
ts) {
-    return seqOf(lit("value"), value.expr(), lit("timestamp"), ts).toList();
+  private static List<Column> timestampedValue(Column value, Column ts) {
+    return seqOf(value.as("value"), ts.as("timestamp")).toList();
   }
 
-  private static List<Expression> windowDetails(Expression windows) {
-    return seqOf(lit("windows"), windows, lit("paneInfo"), 
PANE_NO_FIRING).toList();
+  private static List<Column> windowDetails(Column windows) {
+    return seqOf(windows.as("windows"), 
PANE_NO_FIRING.as("paneInfo")).toList();
   }
 
-  private static <T extends @NonNull Object> Expression lit(T t) {
-    return Literal$.MODULE$.apply(t);
+  private static <T extends @NonNull Object> Column lit(T t) {
+    return org.apache.spark.sql.functions.lit(t);
   }
 
   @SuppressWarnings("nullness") // NULL literal
-  private static Expression litNull(DataType dataType) {
-    return new Literal(null, dataType);
+  private static Column litNull(DataType dataType) {
+    return org.apache.spark.sql.functions.lit(null).cast(dataType);
   }
 }
diff --git 
a/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java
 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java
new file mode 100644
index 00000000000..6565c2a01c6
--- /dev/null
+++ 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.emptyList;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.replace;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders;
+import org.apache.spark.sql.catalyst.encoders.AgnosticExpressionPathEncoder;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.expressions.BoundReference;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.objects.Invoke;
+import org.apache.spark.sql.catalyst.expressions.objects.NewInstance;
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.Option;
+import scala.collection.Iterator;
+import scala.collection.immutable.Seq;
+import scala.reflect.ClassTag;
+
+public class EncoderFactory {
+  // Resolve the Scala case-class primary constructor (the one with the most 
parameters).
+  // Constructor ordering returned by Class.getConstructors() is JVM-defined 
and not stable
+  // across Spark versions, so we pick the widest constructor explicitly and 
then dispatch on
+  // parameter count below to pick the right argument shape per Spark version.
+  private static final Constructor<StaticInvoke> STATIC_INVOKE_CONSTRUCTOR =
+      primaryConstructor(StaticInvoke.class);
+
+  private static final Constructor<Invoke> INVOKE_CONSTRUCTOR = 
primaryConstructor(Invoke.class);
+
+  private static final Constructor<NewInstance> NEW_INSTANCE_CONSTRUCTOR =
+      primaryConstructor(NewInstance.class);
+
+  @SuppressWarnings("unchecked")
+  private static <T> Constructor<T> primaryConstructor(Class<T> cls) {
+    Constructor<?>[] ctors = cls.getConstructors();
+    Constructor<?> widest = ctors[0];
+    for (int i = 1; i < ctors.length; i++) {
+      if (ctors[i].getParameterCount() > widest.getParameterCount()) {
+        widest = ctors[i];
+      }
+    }
+    return (Constructor<T>) widest;
+  }
+
+  @SuppressWarnings({"nullness", "unchecked"})
+  static <T> ExpressionEncoder<T> create(
+      Expression serializer, Expression deserializer, Class<? super T> clazz) {
+    AgnosticEncoder<T> agnosticEncoder = new BeamAgnosticEncoder<>(serializer, 
deserializer, clazz);
+    return ExpressionEncoder.apply(agnosticEncoder, serializer, deserializer);
+  }
+
+  /**
+   * An {@link AgnosticEncoder} that implements both {@link 
AgnosticExpressionPathEncoder} (so that
+   * {@code SerializerBuildHelper} / {@code DeserializerBuildHelper} delegate 
to our pre-built
+   * expressions) and {@link AgnosticEncoders.StructEncoder} (so that {@code
+   * Dataset.select(TypedColumn)} creates an N-attribute plan instead of a 
1-attribute wrapped plan,
+   * preventing {@code FIELD_NUMBER_MISMATCH} errors).
+   *
+   * <p>The {@code toCatalyst} / {@code fromCatalyst} methods substitute the 
{@code input}
+   * expression into the pre-built serializer / deserializer via {@code 
transformUp}, so that when
+   * this encoder is nested inside a composite encoder (e.g. {@code 
Encoders.tuple}) the correct
+   * field-level expression is used in place of the root {@code 
BoundReference} / {@code
+   * GetColumnByOrdinal}.
+   */
+  @SuppressWarnings({"nullness", "unchecked", "deprecation"})
+  private static final class BeamAgnosticEncoder<T>
+      implements AgnosticExpressionPathEncoder<T>, 
AgnosticEncoders.StructEncoder<T> {
+
+    private final Expression serializer;
+    private final Expression deserializer;
+    private final Class<? super T> clazz;
+    private final Seq<AgnosticEncoders.EncoderField> encoderFields;
+
+    BeamAgnosticEncoder(Expression serializer, Expression deserializer, 
Class<? super T> clazz) {
+      this.serializer = serializer;
+      this.deserializer = deserializer;
+      this.clazz = clazz;
+      this.encoderFields = buildFields(serializer.dataType());
+    }
+
+    private static Seq<AgnosticEncoders.EncoderField> buildFields(DataType dt) 
{
+      if (dt instanceof StructType) {
+        StructField[] structFields = ((StructType) dt).fields();
+        List<AgnosticEncoders.EncoderField> fields = new 
ArrayList<>(structFields.length);
+        for (StructField sf : structFields) {
+          fields.add(
+              new AgnosticEncoders.EncoderField(
+                  sf.name(),
+                  new FieldEncoder<>(sf.dataType(), sf.nullable()),
+                  sf.nullable(),
+                  sf.metadata(),
+                  Option.empty(),
+                  Option.empty()));
+        }
+        return seqOf(fields.toArray(new AgnosticEncoders.EncoderField[0]));
+      } else {
+        // Non-struct: wrap in a single "value" field so StructEncoder sees 
one field.
+        return seqOf(
+            new AgnosticEncoders.EncoderField(
+                "value",
+                new FieldEncoder<>(dt, true),
+                true,
+                Metadata.empty(),
+                Option.empty(),
+                Option.empty()));
+      }
+    }
+
+    // --- AgnosticExpressionPathEncoder ---
+
+    @Override
+    public Expression toCatalyst(Expression input) {
+      return serializer.transformUp(replace(BoundReference.class, input));
+    }
+
+    @Override
+    public Expression fromCatalyst(Expression input) {
+      return deserializer.transformUp(replace(GetColumnByOrdinal.class, 
input));
+    }
+
+    // --- AgnosticEncoders.StructEncoder ---
+
+    @Override
+    public Seq<AgnosticEncoders.EncoderField> fields() {
+      return encoderFields;
+    }
+
+    @Override
+    public boolean isStruct() {
+      return true;
+    }
+
+    /**
+     * Setter required by the Scala compiler when implementing the {@link
+     * AgnosticEncoders.StructEncoder} trait from Java. Scala traits with 
concrete {@code val}
+     * fields generate a synthetic mangled setter ({@code 
<trait>$_setter_<field>_$eq}) that the
+     * trait's initializer invokes on subclasses. Java cannot declare {@code 
val} fields, so we
+     * implement {@link #isStruct()} directly above and accept-but-ignore the 
trait setter here. The
+     * mangled name is brittle and tied to Spark's Scala source layout — if 
Spark removes the {@code
+     * isStruct} field from {@code StructEncoder}, this method becomes dead 
code; if Spark renames
+     * it, compilation will fail and the new mangled name must be substituted.
+     */
+    @Override
+    public void
+        
org$apache$spark$sql$catalyst$encoders$AgnosticEncoders$StructEncoder$_setter_$isStruct_$eq(
+            boolean v) {
+      // no-op: isStruct() is implemented directly above
+    }
+
+    // --- AgnosticEncoder / Encoder (explicit to resolve default-method 
ambiguity) ---
+
+    @Override
+    public boolean isPrimitive() {
+      return false;
+    }
+
+    @Override
+    public StructType schema() {
+      // Build StructType from fields — mirrors the StructEncoder.schema() 
default.
+      List<StructField> sfs = new ArrayList<>(encoderFields.size());
+      Iterator<AgnosticEncoders.EncoderField> it = encoderFields.iterator();
+      while (it.hasNext()) {
+        sfs.add(it.next().structField());
+      }
+      return new StructType(sfs.toArray(new StructField[0]));
+    }
+
+    @Override
+    public DataType dataType() {
+      return schema();
+    }
+
+    @Override
+    public ClassTag<T> clsTag() {
+      return (ClassTag<T>) ClassTag.apply(clazz);
+    }
+  }
+
+  /**
+   * Minimal {@link AgnosticEncoder} stub used to carry per-field {@link 
DataType} metadata inside
+   * {@link AgnosticEncoders.EncoderField}. The actual serialization / 
deserialization is handled by
+   * {@link BeamAgnosticEncoder#toCatalyst} and {@link 
BeamAgnosticEncoder#fromCatalyst}.
+   */
+  @SuppressWarnings({"nullness", "unchecked"})
+  private static final class FieldEncoder<V> implements AgnosticEncoder<V> {
+    private final DataType fieldDataType;
+    private final boolean fieldNullable;
+
+    FieldEncoder(DataType dataType, boolean nullable) {
+      this.fieldDataType = dataType;
+      this.fieldNullable = nullable;
+    }
+
+    @Override
+    public boolean isPrimitive() {
+      return false;
+    }
+
+    @Override
+    public DataType dataType() {
+      return fieldDataType;
+    }
+
+    @Override
+    public StructType schema() {
+      return new StructType().add("value", fieldDataType, fieldNullable);
+    }
+
+    @Override
+    public boolean nullable() {
+      return fieldNullable;
+    }
+
+    @Override
+    public ClassTag<V> clsTag() {
+      return (ClassTag<V>) ClassTag.apply(Object.class);
+    }
+  }
+
+  /**
+   * Invoke method {@code fun} on Class {@code cls}, immediately propagating 
{@code null} if any
+   * input arg is {@code null}.
+   */
+  static Expression invokeIfNotNull(Class<?> cls, String fun, DataType type, 
Expression... args) {
+    return invoke(cls, fun, type, true, args);
+  }
+
+  /** Invoke method {@code fun} on Class {@code cls}. */
+  static Expression invoke(Class<?> cls, String fun, DataType type, 
Expression... args) {
+    return invoke(cls, fun, type, false, args);
+  }
+
+  private static Expression invoke(
+      Class<?> cls, String fun, DataType type, boolean propagateNull, 
Expression... args) {
+    try {
+      // To address breaking interfaces between various versions of Spark, 
expressions are
+      // created reflectively. This is fine as it's just needed once to create 
the query plan.
+      switch (STATIC_INVOKE_CONSTRUCTOR.getParameterCount()) {
+        case 6:
+          // Spark 3.1.x
+          return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+              cls, type, fun, seqOf(args), propagateNull, true);
+        case 7:
+          // Spark 3.2.0
+          return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+              cls, type, fun, seqOf(args), emptyList(), propagateNull, true);
+        case 8:
+          // Spark 3.2.x, 3.3.x
+          return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+              cls, type, fun, seqOf(args), emptyList(), propagateNull, true, 
true);
+        case 9:
+          // Spark 4.0.x: added Option<ScalarFunction<?>> parameter
+          return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+              cls, type, fun, seqOf(args), emptyList(), propagateNull, true, 
true, Option.empty());
+        default:
+          throw new RuntimeException("Unsupported version of Spark");
+      }
+    } catch (IllegalArgumentException | ReflectiveOperationException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /** Invoke method {@code fun} on {@code obj} with provided {@code args}. */
+  static Expression invoke(
+      Expression obj, String fun, DataType type, boolean nullable, 
Expression... args) {
+    try {
+      // To address breaking interfaces between various versions of Spark, 
expressions are
+      // created reflectively. This is fine as it's just needed once to create 
the query plan.
+      switch (INVOKE_CONSTRUCTOR.getParameterCount()) {
+        case 6:
+          // Spark 3.1.x
+          return INVOKE_CONSTRUCTOR.newInstance(obj, fun, type, seqOf(args), 
false, nullable);
+        case 7:
+          // Spark 3.2.0
+          return INVOKE_CONSTRUCTOR.newInstance(
+              obj, fun, type, seqOf(args), emptyList(), false, nullable);
+        case 8:
+          // Spark 3.2.x, 3.3.x, 4.0.x: Invoke constructor is 8 params across 
all these versions
+          return INVOKE_CONSTRUCTOR.newInstance(
+              obj, fun, type, seqOf(args), emptyList(), false, nullable, true);
+        default:
+          throw new RuntimeException("Unsupported version of Spark");
+      }
+    } catch (IllegalArgumentException | ReflectiveOperationException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  static Expression newInstance(Class<?> cls, DataType type, Expression... 
args) {
+    try {
+      // To address breaking interfaces between various versions of Spark, 
expressions are
+      // created reflectively. This is fine as it's just needed once to create 
the query plan.
+      switch (NEW_INSTANCE_CONSTRUCTOR.getParameterCount()) {
+        case 5:
+          return NEW_INSTANCE_CONSTRUCTOR.newInstance(cls, seqOf(args), true, 
type, Option.empty());
+        case 6:
+          // Spark 3.2.x, 3.3.x, 4.0.x: added immutable.Seq<AbstractDataType> 
parameter
+          return NEW_INSTANCE_CONSTRUCTOR.newInstance(
+              cls, seqOf(args), emptyList(), true, type, Option.empty());
+        default:
+          throw new RuntimeException("Unsupported version of Spark");
+      }
+    } catch (IllegalArgumentException | ReflectiveOperationException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+}
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
similarity index 93%
copy from 
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
copy to 
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index daf8451faac..173b4653a19 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -48,13 +48,13 @@ import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.catalyst.SerializerBuildHelper;
 import 
org.apache.spark.sql.catalyst.SerializerBuildHelper.MapElementInformation;
 import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
 import org.apache.spark.sql.catalyst.expressions.BoundReference;
 import org.apache.spark.sql.catalyst.expressions.Coalesce;
@@ -98,7 +98,7 @@ public class EncoderHelpers {
   private static final DataType LIST_TYPE = new ObjectType(List.class);
 
   // Collections / maps of these types can be (de)serialized without 
(de)serializing each member
-  private static final Set<Class<?>> PRIMITIV_TYPES =
+  private static final Set<Class<?>> PRIMITIVE_TYPES =
       ImmutableSet.of(
           Boolean.class,
           Byte.class,
@@ -154,7 +154,7 @@ public class EncoderHelpers {
   public static <T> Encoder<T> encoderOf(Class<? super T> cls) {
     Encoder<T> enc = getOrCreateDefaultEncoder(cls);
     if (enc == null) {
-      throw new IllegalArgumentException("No default coder available for class 
" + cls);
+      throw new IllegalArgumentException("No default encoder available for 
class " + cls);
     }
     return enc;
   }
@@ -481,7 +481,7 @@ public class EncoderHelpers {
   }
 
   private static <T> boolean isPrimitiveEnc(Encoder<T> enc) {
-    return PRIMITIV_TYPES.contains(enc.clsTag().runtimeClass());
+    return PRIMITIVE_TYPES.contains(enc.clsTag().runtimeClass());
   }
 
   private static <T> Expression serialize(Expression input, Encoder<T> enc) {
@@ -492,20 +492,35 @@ public class EncoderHelpers {
     return deserializer(enc).transformUp(replace(GetColumnByOrdinal.class, 
input));
   }
 
+  /**
+   * Wraps an {@link Encoder} as an {@link ExpressionEncoder}. In Spark 4.x, 
built-in encoders (e.g.
+   * {@code Encoders.INT()}) are {@link AgnosticEncoder} subclasses rather 
than {@link
+   * ExpressionEncoder}s, so we convert them on demand.
+   */
+  @SuppressWarnings("unchecked")
+  private static <T> ExpressionEncoder<T> toExpressionEncoder(Encoder<T> enc) {
+    if (enc instanceof ExpressionEncoder) {
+      return (ExpressionEncoder<T>) enc;
+    } else if (enc instanceof AgnosticEncoder) {
+      return ExpressionEncoder.apply((AgnosticEncoder<T>) enc);
+    }
+    throw new IllegalArgumentException("Unsupported encoder type: " + 
enc.getClass());
+  }
+
   private static <T> Expression serializer(Encoder<T> enc) {
-    return ((ExpressionEncoder<T>) enc).objSerializer();
+    return toExpressionEncoder(enc).objSerializer();
   }
 
   private static <T> Expression deserializer(Encoder<T> enc) {
-    return ((ExpressionEncoder<T>) enc).objDeserializer();
+    return toExpressionEncoder(enc).objDeserializer();
   }
 
   private static <T> DataType serializedType(Encoder<T> enc) {
-    return ((ExpressionEncoder<T>) enc).objSerializer().dataType();
+    return toExpressionEncoder(enc).objSerializer().dataType();
   }
 
   private static <T> DataType deserializedType(Encoder<T> enc) {
-    return ((ExpressionEncoder<T>) enc).objDeserializer().dataType();
+    return toExpressionEncoder(enc).objDeserializer().dataType();
   }
 
   private static Expression rootRef(DataType dt, boolean nullable) {
@@ -548,9 +563,17 @@ public class EncoderHelpers {
       return CoderHelpers.toByteArray(paneInfo, PaneInfoCoder.of());
     }
 
-    /** The end of the only window (max timestamp). */
-    public static Instant maxTimestamp(Iterable<BoundedWindow> windows) {
-      return Iterables.getOnlyElement(windows).maxTimestamp();
+    /** The maximum {@code maxTimestamp} across all associated windows. */
+    public static Instant maxTimestamp(Iterable<? extends BoundedWindow> 
windows) {
+      Instant maxTimestamp = null;
+      for (BoundedWindow window : windows) {
+        Instant timestamp = window.maxTimestamp();
+        if (maxTimestamp == null || timestamp.isAfter(maxTimestamp)) {
+          maxTimestamp = timestamp;
+        }
+      }
+      return Preconditions.checkNotNull(
+          maxTimestamp, "WindowedValue must have at least one window");
     }
 
     public static List<Object> copyToList(ArrayData arrayData, DataType type) {
diff --git 
a/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop.java
 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop.java
new file mode 100644
index 00000000000..175e144d650
--- /dev/null
+++ 
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.utils;
+
+import java.io.Serializable;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Function1;
+import scala.Function2;
+import scala.PartialFunction;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+import scala.collection.immutable.Nil$;
+
+/** Utilities for easier interoperability with the Spark Scala API. */
+public class ScalaInterop {
+  private ScalaInterop() {}
+
+  public static <T> scala.collection.immutable.Seq<T> seqOf(T... t) {
+    return (scala.collection.immutable.Seq<T>)
+        JavaConverters.asScalaBuffer(java.util.Arrays.asList(t)).toList();
+  }
+
+  public static <T> List<T> concat(List<T> a, List<T> b) {
+    return b.$colon$colon$colon(a);
+  }
+
+  public static <T> Seq<T> listOf(T t) {
+    return emptyList().$colon$colon(t);
+  }
+
+  public static <T> List<T> emptyList() {
+    return (List<T>) Nil$.MODULE$;
+  }
+
+  /** Scala {@link Iterator} of Java {@link Iterable}. */
+  public static <T extends @NonNull Object> Iterator<T> 
scalaIterator(Iterable<T> iterable) {
+    return scalaIterator(iterable.iterator());
+  }
+
+  /** Scala {@link Iterator} of Java {@link java.util.Iterator}. */
+  public static <T extends @NonNull Object> Iterator<T> 
scalaIterator(java.util.Iterator<T> it) {
+    return JavaConverters.asScalaIterator(it);
+  }
+
+  /** Java {@link java.util.Iterator} of Scala {@link Iterator}. */
+  public static <T extends @NonNull Object> java.util.Iterator<T> 
javaIterator(Iterator<T> it) {
+    return JavaConverters.asJavaIterator(it);
+  }
+
+  public static <T1, T2> Tuple2<T1, T2> tuple(T1 t1, T2 t2) {
+    return new Tuple2<>(t1, t2);
+  }
+
+  public static <T extends @NonNull Object, V> PartialFunction<T, T> replace(
+      Class<V> clazz, T replace) {
+    return new PartialFunction<T, T>() {
+
+      @Override
+      public boolean isDefinedAt(T x) {
+        return clazz.isAssignableFrom(x.getClass());
+      }
+
+      @Override
+      public T apply(T x) {
+        return replace;
+      }
+    };
+  }
+
+  public static <T extends @NonNull Object, V> PartialFunction<T, V> 
match(Class<V> clazz) {
+    return new PartialFunction<T, V>() {
+
+      @Override
+      public boolean isDefinedAt(T x) {
+        return clazz.isAssignableFrom(x.getClass());
+      }
+
+      @Override
+      public V apply(T x) {
+        return (V) x;
+      }
+    };
+  }
+
+  public static <T, V> Fun1<T, V> fun1(Fun1<T, V> fun) {
+    return fun;
+  }
+
+  public static <T1, T2, V> Fun2<T1, T2, V> fun2(Fun2<T1, T2, V> fun) {
+    return fun;
+  }
+
+  public interface Fun1<T, V> extends Function1<T, V>, Serializable {}
+
+  public interface Fun2<T1, T2, V> extends Function2<T1, T2, V>, Serializable 
{}
+}
diff --git 
a/runners/spark/4/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java
 
b/runners/spark/4/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java
new file mode 100644
index 00000000000..48c1e645f6e
--- /dev/null
+++ 
b/runners/spark/4/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import static java.util.Arrays.asList;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.collectionEncoder;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.encoderFor;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.kvEncoder;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.mapEncoder;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.oneOfEncoder;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.windowedValueEncoder;
+import static 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.tuple;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates.notNull;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.createStructField;
+import static org.apache.spark.sql.types.DataTypes.createStructType;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.function.Function;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.BigEndianShortCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DelegateCoder;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.FloatCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.joda.time.Instant;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import scala.Tuple2;
+
+/** Test of the wrapping of Beam Coders as Spark ExpressionEncoders. */
+@RunWith(JUnit4.class)
+public class EncoderHelpersTest {
+  @ClassRule public static SparkSessionRule sessionRule = new 
SparkSessionRule("local[1]");
+
+  private static final Encoder<GlobalWindow> windowEnc =
+      EncoderHelpers.encoderOf(GlobalWindow.class);
+
+  private static final Map<Coder<?>, List<?>> BASIC_CASES =
+      ImmutableMap.<Coder<?>, List<?>>builder()
+          .put(BooleanCoder.of(), asList(true, false, null))
+          .put(ByteCoder.of(), asList((byte) 1, null))
+          .put(BigEndianShortCoder.of(), asList((short) 1, null))
+          .put(BigEndianIntegerCoder.of(), asList(1, 2, 3, null))
+          .put(VarIntCoder.of(), asList(1, 2, 3, null))
+          .put(BigEndianLongCoder.of(), asList(1L, 2L, 3L, null))
+          .put(VarLongCoder.of(), asList(1L, 2L, 3L, null))
+          .put(FloatCoder.of(), asList((float) 1.0, (float) 2.0, null))
+          .put(DoubleCoder.of(), asList(1.0, 2.0, null))
+          .put(StringUtf8Coder.of(), asList("1", "2", null))
+          .put(BigDecimalCoder.of(), asList(bigDecimalOf(1L), 
bigDecimalOf(2L), null))
+          .put(InstantCoder.of(), asList(Instant.ofEpochMilli(1), null))
+          .build();
+
+  private <T> Dataset<T> createDataset(List<T> data, Encoder<T> encoder) {
+    Dataset<T> ds = sessionRule.getSession().createDataset(data, encoder);
+    ds.printSchema();
+    return ds;
+  }
+
+  @Test
+  public void testBeamEncoderMappings() {
+    BASIC_CASES.forEach(
+        (coder, data) -> {
+          Encoder<?> encoder = encoderFor(coder);
+          serializeAndDeserialize(data.get(0), (Encoder) encoder);
+          Dataset<?> dataset = createDataset(data, (Encoder) encoder);
+          assertThat(dataset.collect(), equalTo(data.toArray()));
+        });
+  }
+
+  @Test
+  public void testBeamEncoderOfPrivateType() {
+    // Verify concrete types are not used in coder generation.
+    // In case of private types this would cause an IllegalAccessError.
+    List<PrivateString> data = asList(new PrivateString("1"), new 
PrivateString("2"));
+    Dataset<PrivateString> dataset = createDataset(data, 
encoderFor(PrivateString.CODER));
+    assertThat(dataset.collect(), equalTo(data.toArray()));
+  }
+
+  @Test
+  public void testBeamWindowedValueEncoderMappings() {
+    BASIC_CASES.forEach(
+        (coder, data) -> {
+          List<WindowedValue<?>> windowed =
+              Lists.transform(data, WindowedValues::valueInGlobalWindow);
+
+          Encoder<?> encoder = windowedValueEncoder(encoderFor(coder), 
windowEnc);
+          serializeAndDeserialize(windowed.get(0), (Encoder) encoder);
+
+          Dataset<?> dataset = createDataset(windowed, (Encoder) encoder);
+          assertThat(dataset.collect(), equalTo(windowed.toArray()));
+        });
+  }
+
+  @Test
+  public void testCollectionEncoder() {
+    BASIC_CASES.forEach(
+        (coder, data) -> {
+          Encoder<? extends Collection<?>> encoder = 
collectionEncoder(encoderFor(coder), true);
+          Collection<?> collection = Collections.unmodifiableCollection(data);
+
+          Dataset<Collection<?>> dataset = createDataset(asList(collection), 
(Encoder) encoder);
+          assertThat(dataset.head(), equalTo(data));
+        });
+  }
+
+  private void testMapEncoder(Class<?> cls, Function<Map<?, ?>, Map<?, ?>> 
decorator) {
+    BASIC_CASES.forEach(
+        (coder, data) -> {
+          Encoder<?> enc = encoderFor(coder);
+          Encoder<Map<?, ?>> mapEncoder = mapEncoder(enc, enc, (Class) cls);
+          Map<?, ?> map =
+              decorator.apply(
+                  data.stream().filter(notNull()).collect(toMap(identity(), 
identity())));
+
+          Dataset<Map<?, ?>> dataset = createDataset(asList(map), mapEncoder);
+          Map<?, ?> head = dataset.head();
+          assertThat(head, equalTo(map));
+          assertThat(head, instanceOf(cls));
+        });
+  }
+
+  @Test
+  public void testMapEncoder() {
+    testMapEncoder(Map.class, identity());
+  }
+
+  @Test
+  public void testHashMapEncoder() {
+    testMapEncoder(HashMap.class, identity());
+  }
+
+  @Test
+  public void testTreeMapEncoder() {
+    testMapEncoder(TreeMap.class, TreeMap::new);
+  }
+
+  @Test
+  public void testBeamBinaryEncoder() {
+    List<List<String>> data = asList(asList("a1", "a2", "a3"), asList("b1", 
"b2"), asList("c1"));
+
+    Encoder<List<String>> encoder = 
encoderFor(ListCoder.of(StringUtf8Coder.of()));
+    serializeAndDeserialize(data.get(0), encoder);
+
+    Dataset<List<String>> dataset = createDataset(data, encoder);
+    assertThat(dataset.collect(), equalTo(data.toArray()));
+  }
+
+  @Test
+  public void testEncoderForKVCoder() {
+    List<KV<Integer, String>> data =
+        asList(KV.of(1, "value1"), KV.of(null, "value2"), KV.of(3, null));
+
+    Encoder<KV<Integer, String>> encoder =
+        kvEncoder(encoderFor(VarIntCoder.of()), 
encoderFor(StringUtf8Coder.of()));
+    serializeAndDeserialize(data.get(0), encoder);
+
+    Dataset<KV<Integer, String>> dataset = createDataset(data, encoder);
+
+    StructType kvSchema =
+        createStructType(
+            new StructField[] {
+              createStructField("key", IntegerType, true),
+              createStructField("value", StringType, true)
+            });
+
+    assertThat(dataset.schema(), equalTo(kvSchema));
+    assertThat(dataset.collectAsList(), equalTo(data));
+  }
+
+  @Test
+  public void testOneOffEncoder() {
+    List<Coder<?>> coders = ImmutableList.copyOf(BASIC_CASES.keySet());
+    List<Encoder<?>> encoders = 
coders.stream().map(EncoderHelpers::encoderFor).collect(toList());
+
+    // build oneOf tuples of type index and corresponding value
+    List<Tuple2<Integer, ?>> data =
+        BASIC_CASES.entrySet().stream()
+            .map(e -> tuple(coders.indexOf(e.getKey()), (Object) 
e.getValue().get(0)))
+            .collect(toList());
+
+    // dataset is a sparse dataset with only one column set per row
+    Dataset<Tuple2<Integer, ?>> dataset = createDataset(data, 
oneOfEncoder((List) encoders));
+    assertThat(dataset.collectAsList(), equalTo(data));
+  }
+
+  // fix scale/precision to system default to compare using equals
+  private static BigDecimal bigDecimalOf(long l) {
+    DecimalType type = DecimalType.SYSTEM_DEFAULT();
+    return new BigDecimal(l, new 
MathContext(type.precision())).setScale(type.scale());
+  }
+
+  // test and explicit serialization roundtrip
+  @SuppressWarnings("unchecked")
+  private static <T> void serializeAndDeserialize(T data, Encoder<T> enc) {
+    ExpressionEncoder<T> bound;
+    if (enc instanceof ExpressionEncoder) {
+      bound = (ExpressionEncoder<T>) enc;
+    } else {
+      bound = ExpressionEncoder.apply((AgnosticEncoder<T>) enc);
+    }
+    bound =
+        bound.resolveAndBind(bound.resolveAndBind$default$1(), 
bound.resolveAndBind$default$2());
+
+    InternalRow row = bound.createSerializer().apply(data);
+    T deserialized = bound.createDeserializer().apply(row);
+
+    assertThat(deserialized, equalTo(data));
+  }
+
+  private static class PrivateString {
+    private static final Coder<PrivateString> CODER =
+        DelegateCoder.of(
+            StringUtf8Coder.of(),
+            str -> str.string,
+            PrivateString::new,
+            new TypeDescriptor<PrivateString>() {});
+
+    private final String string;
+
+    public PrivateString(String string) {
+      this.string = string;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof PrivateString)) {
+        return false;
+      }
+      PrivateString that = (PrivateString) o;
+      return Objects.equals(string, that.string);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(string);
+    }
+  }
+}
diff --git a/runners/spark/spark_runner.gradle 
b/runners/spark/spark_runner.gradle
index 9d7be01e882..cd108372fea 100644
--- a/runners/spark/spark_runner.gradle
+++ b/runners/spark/spark_runner.gradle
@@ -257,12 +257,23 @@ dependencies {
     implementation 
"org.apache.spark:spark-common-utils_$spark_scala_version:$spark_version"
     implementation 
"org.apache.spark:spark-sql-api_$spark_scala_version:$spark_version"
   }
+  if (isSparkAtLeast("4.0.0")) {
+    // Spark 4 splits the Connect shims out of spark-sql; classes are 
referenced directly
+    // (e.g. via SparkSession builder paths) so strict dependency analysis 
requires it as
+    // a declared provided dep. The artifact does not exist for Spark 3.
+    provided 
"org.apache.spark:spark-connect-shims_$spark_scala_version:$spark_version"
+  }
   permitUnusedDeclared 
"org.apache.spark:spark-network-common_$spark_scala_version:$spark_version"
   implementation "io.dropwizard.metrics:metrics-core:4.1.1" // version used by 
Spark 3.1
-  compileOnly "org.scala-lang:scala-library:2.12.15"
-  runtimeOnly library.java.jackson_module_scala_2_12
-  // Force paranamer 2.8 to avoid issues when using Scala 2.12
-  runtimeOnly "com.thoughtworks.paranamer:paranamer:2.8"
+  if (spark_scala_version == '2.13') {
+    compileOnly "org.scala-lang:scala-library:2.13.15"
+    runtimeOnly library.java.jackson_module_scala_2_13
+  } else {
+    compileOnly "org.scala-lang:scala-library:2.12.15"
+    runtimeOnly library.java.jackson_module_scala_2_12
+    // Force paranamer 2.8 to avoid issues when using Scala 2.12
+    runtimeOnly "com.thoughtworks.paranamer:paranamer:2.8"
+  }
   provided "org.apache.hadoop:hadoop-client-api:3.3.1"
   provided library.java.commons_io
   provided library.java.hamcrest
@@ -277,7 +288,9 @@ dependencies {
   testImplementation project(path: ":sdks:java:extensions:avro", 
configuration: "testRuntimeMigration")
   testImplementation project(":sdks:java:harness")
   testImplementation library.java.avro
-  testImplementation "org.apache.kafka:kafka_$spark_scala_version:2.4.1"
+  // kafka_2.13 artifacts were first published in 2.5.0; use a later version 
for Scala 2.13
+  def kafka_version = (spark_scala_version == '2.13') ? '2.8.0' : '2.4.1'
+  testImplementation 
"org.apache.kafka:kafka_$spark_scala_version:$kafka_version"
   testImplementation library.java.kafka_clients
   testImplementation library.java.junit
   testImplementation library.java.mockito_core
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistrator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistrator.java
index 68c602ff7f5..75c33b7dc5b 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistrator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistrator.java
@@ -28,9 +28,10 @@ import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable;
 import org.apache.spark.serializer.KryoRegistrator;
-import scala.collection.mutable.WrappedArray;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * Custom {@link KryoRegistrator}s for Beam's Spark runner needs and 
registering used class in spark
@@ -61,7 +62,18 @@ public class SparkRunnerKryoRegistrator implements 
KryoRegistrator {
     kryo.register(PaneInfo.class);
     kryo.register(StateAndTimers.class);
     kryo.register(TupleTag.class);
-    kryo.register(WrappedArray.ofRef.class);
+    // Scala 2.12 uses WrappedArray$ofRef, Scala 2.13 renamed it to 
ArraySeq$ofRef
+    Class<?> scalaArrayClass =
+        findFirstAvailableClass(
+            "scala.collection.mutable.ArraySeq$ofRef",
+            "scala.collection.mutable.WrappedArray$ofRef");
+    if (scalaArrayClass == null) {
+      throw new IllegalStateException(
+          "Neither scala.collection.mutable.ArraySeq$ofRef (Scala 2.13) nor "
+              + "scala.collection.mutable.WrappedArray$ofRef (Scala 2.12) was 
found on the "
+              + "classpath. Cannot register Scala wrapped arrays with Kryo.");
+    }
+    kryo.register(scalaArrayClass);
 
     try {
       kryo.register(
@@ -74,4 +86,16 @@ public class SparkRunnerKryoRegistrator implements 
KryoRegistrator {
       throw new IllegalStateException("Unable to register classes with kryo.", 
e);
     }
   }
+
+  @VisibleForTesting
+  static @Nullable Class<?> findFirstAvailableClass(String... classNames) {
+    for (String name : classNames) {
+      try {
+        return Class.forName(name);
+      } catch (ClassNotFoundException ignored) {
+        // try the next candidate
+      }
+    }
+    return null;
+  }
 }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1c7a4c2a241..be69ee78e51 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -522,7 +522,9 @@ public class SparkGroupAlsoByWindowViaWindowSet implements 
Serializable {
             Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, 
Itr<I>>>*/ List<byte[]>>>>
         firedStream =
             pairDStream.updateStateByKey(
-                updateFunc,
+                // Raw cast to AbstractFunction1 suppresses Scala 2.12 
(collection.Seq) vs
+                // Scala 2.13 (immutable.Seq) type difference — safe at 
runtime due to erasure.
+                (scala.runtime.AbstractFunction1) updateFunc,
                 
pairDStream.defaultPartitioner(pairDStream.defaultPartitioner$default$1()),
                 true,
                 JavaSparkContext$.MODULE$.fakeClassTag());
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
index 806d838d9bf..9d3419e1947 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
@@ -113,6 +113,7 @@ public class SparkStructuredStreamingPipelineResult 
implements PipelineResult {
 
   @Override
   public PipelineResult.State cancel() throws IOException {
+    pipelineExecution.cancel(true);
     offerNewState(PipelineResult.State.CANCELLED);
     return state;
   }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
index c0d46e77c1d..d7cdefc929b 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
@@ -241,7 +241,7 @@ public class BoundedDatasetFactory {
       try {
         PipelineOptions options = params.options.get();
         long desiredSize = source.getEstimatedSizeBytes(options) / 
params.numPartitions;
-        List<BoundedSource<T>> split = (List<BoundedSource<T>>) 
source.split(desiredSize, options);
+        List<? extends BoundedSource<T>> split = source.split(desiredSize, 
options);
         IntSupplier idxSupplier = new AtomicInteger(0)::getAndIncrement;
         return split.stream().map(s -> new SourcePartition<>(s, 
idxSupplier)).collect(toList());
       } catch (Exception e) {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
index 55c4bbaedd3..b8448567eaf 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
@@ -86,7 +86,10 @@ public final class EvaluationContext {
       ds.write().mode("overwrite").format("noop").save();
       LOG.info("Evaluated dataset {} in {}", name, durationSince(startMs));
     } catch (RuntimeException e) {
-      LOG.error("Failed to evaluate dataset {}: {}", name, 
Throwables.getRootCause(e).getMessage());
+      LOG.error(
+          "Failed to evaluate dataset {}: {}",
+          name,
+          String.valueOf(Throwables.getRootCause(e).getMessage()));
       throw new RuntimeException(e);
     }
   }
@@ -102,7 +105,10 @@ public final class EvaluationContext {
       LOG.info("Collected dataset {} in {} [size: {}]", name, 
durationSince(startMs), res.length);
       return res;
     } catch (Exception e) {
-      LOG.error("Failed to collect dataset {}: {}", name, 
Throwables.getRootCause(e).getMessage());
+      LOG.error(
+          "Failed to collect dataset {}: {}",
+          name,
+          String.valueOf(Throwables.getRootCause(e).getMessage()));
       throw new RuntimeException(e);
     }
   }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 02c56a8081c..513ef28a589 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -64,7 +64,7 @@ import scala.collection.TraversableOnce;
  * TODOs:
  * <li>combine with context (CombineFnWithContext)?
  * <li>combine with sideInputs?
- * <li>other there other missing features?
+ * <li>are there other missing features?
  */
 class CombinePerKeyTranslatorBatch<K, InT, AccT, OutT>
     extends TransformTranslator<
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
index b7c139068d1..2105fd05d49 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
@@ -21,7 +21,6 @@ import static 
org.apache.beam.runners.spark.structuredstreaming.translation.util
 import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
 import static 
org.apache.beam.sdk.transforms.windowing.TimestampCombiner.END_OF_WINDOW;
 
-import java.util.Collection;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -80,8 +79,7 @@ class GroupByKeyHelpers {
     return v -> {
       T value = valueFn.apply(v);
       K key = v.getValue().getKey();
-      Collection<BoundedWindow> windows = (Collection<BoundedWindow>) 
v.getWindows();
-      return ScalaInterop.scalaIterator(windows).map(w -> tuple(tuple(w, key), 
value));
+      return ScalaInterop.scalaIterator(v.getWindows()).map(w -> 
tuple(tuple(w, key), value));
     };
   }
 
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 46cda333482..c55781ff84a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -81,7 +81,7 @@ import scala.collection.Seq;
 import scala.collection.immutable.List;
 
 /**
- * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the 
build-in aggregation
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the 
built-in aggregation
  * function {@code collect_list} when applicable.
  *
  * <p>Note: Using {@code collect_list} isn't any worse than using {@link 
ReduceFnRunner}. In the
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index daf8451faac..29f01c84c02 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.Encoders;
@@ -98,7 +97,7 @@ public class EncoderHelpers {
   private static final DataType LIST_TYPE = new ObjectType(List.class);
 
   // Collections / maps of these types can be (de)serialized without 
(de)serializing each member
-  private static final Set<Class<?>> PRIMITIV_TYPES =
+  private static final Set<Class<?>> PRIMITIVE_TYPES =
       ImmutableSet.of(
           Boolean.class,
           Byte.class,
@@ -154,7 +153,7 @@ public class EncoderHelpers {
   public static <T> Encoder<T> encoderOf(Class<? super T> cls) {
     Encoder<T> enc = getOrCreateDefaultEncoder(cls);
     if (enc == null) {
-      throw new IllegalArgumentException("No default coder available for class 
" + cls);
+      throw new IllegalArgumentException("No default encoder available for 
class " + cls);
     }
     return enc;
   }
@@ -481,7 +480,7 @@ public class EncoderHelpers {
   }
 
   private static <T> boolean isPrimitiveEnc(Encoder<T> enc) {
-    return PRIMITIV_TYPES.contains(enc.clsTag().runtimeClass());
+    return PRIMITIVE_TYPES.contains(enc.clsTag().runtimeClass());
   }
 
   private static <T> Expression serialize(Expression input, Encoder<T> enc) {
@@ -548,9 +547,17 @@ public class EncoderHelpers {
       return CoderHelpers.toByteArray(paneInfo, PaneInfoCoder.of());
     }
 
-    /** The end of the only window (max timestamp). */
-    public static Instant maxTimestamp(Iterable<BoundedWindow> windows) {
-      return Iterables.getOnlyElement(windows).maxTimestamp();
+    /** The maximum {@code maxTimestamp} across all associated windows. */
+    public static Instant maxTimestamp(Iterable<? extends BoundedWindow> 
windows) {
+      Instant maxTimestamp = null;
+      for (BoundedWindow window : windows) {
+        Instant timestamp = window.maxTimestamp();
+        if (maxTimestamp == null || timestamp.isAfter(maxTimestamp)) {
+          maxTimestamp = timestamp;
+        }
+      }
+      return Preconditions.checkNotNull(
+          maxTimestamp, "WindowedValue must have at least one window");
     }
 
     public static List<Object> copyToList(ArrayData arrayData, DataType type) {
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
index ddd0e74d1c9..56eb0dfea5f 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
@@ -17,7 +17,10 @@
  */
 package org.apache.beam.runners.spark.coders;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 import com.esotericsoftware.kryo.Kryo;
@@ -73,6 +76,52 @@ public class SparkRunnerKryoRegistratorTest {
     }
   }
 
+  /** Unit tests for the {@link 
SparkRunnerKryoRegistrator#findFirstAvailableClass} helper. */
+  public static class FindFirstAvailableClassTest {
+
+    @Test
+    public void returnsFirstWhenAvailable() {
+      Class<?> result =
+          SparkRunnerKryoRegistrator.findFirstAvailableClass(
+              "java.lang.String", "java.lang.Integer");
+      assertSame(String.class, result);
+    }
+
+    @Test
+    public void fallsBackWhenFirstMissing() {
+      Class<?> result =
+          SparkRunnerKryoRegistrator.findFirstAvailableClass("does.not.Exist", 
"java.lang.Integer");
+      assertSame(Integer.class, result);
+    }
+
+    @Test
+    public void returnsNullWhenNoneAvailable() {
+      Class<?> result =
+          
SparkRunnerKryoRegistrator.findFirstAvailableClass("does.not.Exist1", 
"does.not.Exist2");
+      assertNull(result);
+    }
+
+    @Test
+    public void returnsNullForEmptyInput() {
+      assertNull(SparkRunnerKryoRegistrator.findFirstAvailableClass());
+    }
+
+    @Test
+    public void resolvesScalaWrappedArrayClassOnRealClasspath() {
+      // On any supported Scala version (2.12 ArraySeq$ofRef does not exist; 
2.13 it does), at
+      // least one of the two wrapped-array class names must resolve. This is 
the production call
+      // the registrator makes.
+      Class<?> result =
+          SparkRunnerKryoRegistrator.findFirstAvailableClass(
+              "scala.collection.mutable.ArraySeq$ofRef",
+              "scala.collection.mutable.WrappedArray$ofRef");
+      assertEquals(
+          "expected one of the Scala wrapped-array classes to be on the 
classpath",
+          true,
+          result != null);
+    }
+  }
+
   // Hide TestKryoRegistrator from the Enclosed JUnit runner
   interface Others {
     class TestKryoRegistrator extends SparkRunnerKryoRegistrator {
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContextTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContextTest.java
new file mode 100644
index 00000000000..d1669e45c18
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContextTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import org.apache.spark.sql.Dataset;
+import org.junit.Test;
+
+/**
+ * Unit tests for the static error-path branches of {@link EvaluationContext}. 
The happy-path
+ * branches are covered end-to-end by the structured-streaming translation 
tests; these tests
+ * specifically exercise the {@code catch} blocks that wrap and rethrow 
underlying Spark failures.
+ */
+public class EvaluationContextTest {
+
+  @Test
+  public void evaluateWrapsAndRethrowsRuntimeException() {
+    @SuppressWarnings("unchecked")
+    Dataset<Object> ds = mock(Dataset.class);
+    RuntimeException underlying = new RuntimeException("boom");
+    doThrow(underlying).when(ds).write();
+
+    RuntimeException thrown =
+        assertThrows(RuntimeException.class, () -> 
EvaluationContext.evaluate("test-ds", ds));
+    assertSame(underlying, thrown.getCause());
+  }
+
+  @Test
+  public void evaluateHandlesNullExceptionMessage() {
+    // Reproduces the original NPE motivation for the String.valueOf wrap: a 
RuntimeException
+    // whose root cause carries a null message must not crash the error logger.
+    @SuppressWarnings("unchecked")
+    Dataset<Object> ds = mock(Dataset.class);
+    RuntimeException underlying = new RuntimeException((String) null);
+    doThrow(underlying).when(ds).write();
+
+    RuntimeException thrown =
+        assertThrows(RuntimeException.class, () -> 
EvaluationContext.evaluate("test-ds", ds));
+    assertSame(underlying, thrown.getCause());
+  }
+
+  @Test
+  public void collectWrapsAndRethrowsException() {
+    @SuppressWarnings("unchecked")
+    Dataset<Object> ds = mock(Dataset.class);
+    RuntimeException underlying = new RuntimeException("boom");
+    doThrow(underlying).when(ds).collect();
+
+    RuntimeException thrown =
+        assertThrows(RuntimeException.class, () -> 
EvaluationContext.collect("test-ds", ds));
+    assertSame(underlying, thrown.getCause());
+  }
+
+  @Test
+  public void collectHandlesNullExceptionMessage() {
+    @SuppressWarnings("unchecked")
+    Dataset<Object> ds = mock(Dataset.class);
+    RuntimeException underlying = new RuntimeException((String) null);
+    doThrow(underlying).when(ds).collect();
+
+    RuntimeException thrown =
+        assertThrows(RuntimeException.class, () -> 
EvaluationContext.collect("test-ds", ds));
+    assertSame(underlying, thrown.getCause());
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 60ff1cf8ce1..fc5f40c23d1 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -150,6 +150,8 @@ include(":runners:prism:java")
 include(":runners:spark:3")
 include(":runners:spark:3:job-server")
 include(":runners:spark:3:job-server:container")
+include(":runners:spark:4")
+include(":runners:spark:4:job-server")
 include(":sdks:go")
 include(":sdks:go:container")
 include(":sdks:go:examples")

Reply via email to