This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1d008bad91f [runners-spark] Add Spark 4 runner (#38255)
1d008bad91f is described below
commit 1d008bad91fd1347cd991b0f06b342d4462bcf39
Author: Tobias Kaymak <[email protected]>
AuthorDate: Mon May 11 20:50:17 2026 +0200
[runners-spark] Add Spark 4 runner (#38255)
* build: add Spark 4.0.2 version property and Scala 2.13 support
Add spark4_version (4.0.2) to BeamModulePlugin alongside the existing
spark3_version. Update spark_runner.gradle to conditionally select the
correct Scala library (2.13 vs 2.12), Jackson module, Kafka test
dependency, and require Java 17 when building against Spark 4.
Register the new :runners:spark:4 module in settings.gradle.kts.
These changes are purely additive — all conditionals gate on
spark_version.startsWith("4") or spark_scala_version == '2.13', leaving
the Spark 3 build path untouched.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* refactor: make shared Spark source compatible with Scala 2.12 and 2.13
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* build: add runners/spark/4/ build configuration
Add the Gradle build file for the Spark 4 structured streaming runner.
The module mirrors runners/spark/3/ — it inherits the shared RDD-base
source from runners/spark/src/ via copySourceBase and adds its own
Structured Streaming implementation in src/main/java.
Key differences from the Spark 3 build:
- Uses spark4_version (4.0.2) with Scala 2.13.
- Excludes DStream-based streaming tests (Spark 4 supports only
structured streaming batch).
- Unconditionally adds --add-opens JVM flags required by Kryo on
Java 17 (Spark 4's minimum).
- Binds Spark driver to 127.0.0.1 for macOS compatibility.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* feat: add Spark 4 structured streaming runner source
Add the Spark 4 structured streaming runner implementation and tests.
Most files are adapted from the Spark 3 structured streaming runner
with targeted changes for Spark 4 / Scala 2.13 API compatibility.
Key Spark 4-specific changes (diff against runners/spark/3/src/):
EncoderFactory — Replaced the direct ExpressionEncoder constructor
(removed in Spark 4) with BeamAgnosticEncoder, a named class
implementing both AgnosticExpressionPathEncoder (for expression
delegation via toCatalyst/fromCatalyst) and AgnosticEncoders
.StructEncoder (so Dataset.select(TypedColumn) creates an N-attribute
plan, preventing FIELD_NUMBER_MISMATCH). The toCatalyst/fromCatalyst
methods substitute the provided input expression via transformUp,
enabling correct nesting inside composite encoders like
Encoders.tuple().
EncoderHelpers — Added toExpressionEncoder() helper to handle Spark 4
built-in encoders that are AgnosticEncoder subclasses rather than
ExpressionEncoder.
GroupByKeyTranslatorBatch — Migrated from internal catalyst Expression
API (CreateNamedStruct, Literal$) to public Column API (struct(),
lit(), array()), as required by Spark 4.
BoundedDatasetFactory — Use classic.Dataset$.MODULE$.ofRows() as
Dataset moved to org.apache.spark.sql.classic in Spark 4.
ScalaInterop — Replace WrappedArray.ofRef (removed in Scala 2.13)
with JavaConverters.asScalaBuffer().toList() in seqOf().
GroupByKeyHelpers, CombinePerKeyTranslatorBatch — Replace
TraversableOnce with IterableOnce (Scala 2.13 rename).
SparkStructuredStreamingPipelineResult — Replace sparkproject.guava
with Beam's vendored Guava.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* ci: add Spark 4 PreCommit and PostCommit workflows
Add GitHub Actions workflows for the Spark 4 runner module:
- beam_PreCommit_Java_Spark4_Versions: runs sparkVersionsTest on
changes to runners/spark/**. Currently a no-op (the sparkVersions
map is empty) but scaffolds future patch version coverage.
- beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming: runs
the structured streaming test suite on Java 17.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* Add PreCommit Java Spark4 Versions workflow
* Add cancellation support to Spark pipeline execution
* Remove unused endOfData() call in close method
Remove endOfData() call in close method.
* build: add Spark 4 job-server and container modules
Add job-server and container build configurations for Spark 4,
mirroring the existing Spark 3 job-server setup. The container
uses eclipse-temurin:17 (Spark 4 requires Java 17). The shared
spark_job_server.gradle gains a requireJavaVersion conditional
for Spark 4 parent projects.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* build: remove spark.driver.host workaround from Spark 4 build
The hostname binding hack is no longer needed now that the local
machine resolves its hostname to 127.0.0.1 via /etc/hosts.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* docs: add Spark 4 runner entry to CHANGES.md
Called out in /ultrareview as a missing contributor checklist item.
Adds a Highlight line and a New Features / Improvements entry under
the 2.74.0 Unreleased section, referencing issue #36841.
* docs: explain classic.SparkSession downcast in BoundedDatasetFactory
Per /ultrareview feedback: the one-line comment didn't make clear why
the cast is safe. Expand it to note that SparkSession.builder() always
returns a classic.SparkSession at runtime, which is why the downcast
avoids reflection.
* fix: log warning when neither WrappedArray nor ArraySeq class is found
Per /ultrareview feedback: the fallback branch silently swallowed the
second ClassNotFoundException. In practice one of the two classes is
always present (Scala 2.12 vs 2.13 stdlib), but a silent skip could
mask a broken classpath. Emit a LOG.warn instead.
* build: compare spark_version numerically via isSparkAtLeast helper
Per /ultrareview feedback: the five `"$spark_version" >= "3.5.0"` checks
were lexicographic string comparisons. They happened to work for 3.5.0
and 4.0.2 only because '4' > '3' as chars — a future "3.10.0" release
would compare less than "3.5.0" and silently drop the Spark 3.5+
dependencies and exclusions.
Introduce an `isSparkAtLeast` closure that tokenizes on `.` and `-`,
keeps numeric parts, and compares component-by-component. Replace all
five call sites.
* [Spark Runner] Slim Spark 4 to override-only files
With spark_runner.gradle now layering per-major source overrides on top
of the shared base, runners/spark/4/src/ no longer needs to duplicate
62 byte-identical structured-streaming files. Keep only the 11 files
that actually differ for Spark 4 / Scala 2.13. Switch the build.gradle
to spark_major = '4' (the new mechanism) and bump spark_versions to 3,4.
Compiled output unchanged — the deleted files are reproduced identically
inside build/source-overrides by the Copy task.
* [Spark Runner] Use java.io.Serializable in DoFnRunnerFactory base
scala.Serializable was removed in Scala 2.13. java.io.Serializable
works identically on both Scala 2.12 and 2.13, so this can live in
the shared base instead of needing a Spark-4-only override file.
* [Spark Runner] Null-guard error message logging in EvaluationContext base
Wrap Throwables.getRootCause(e).getMessage() in String.valueOf(...)
to make the error logging robust to a null root-cause message. The
behaviour change applies equally to Spark 3 and Spark 4, so the
fix lives in the shared base and the Spark-4 override is dropped.
* [Spark Runner] Cancel execution future and use Beam-vendored Guava in
PipelineResult
Two changes that previously lived only in the Spark-4 override and
are equally valid for Spark 3:
1. cancel() now actually cancels the executing future
(pipelineExecution.cancel(true)) in addition to setting the state
to CANCELLED. Without this, calling cancel() left the pipeline
running silently — a real bug, not a Spark-4 specific concern.
2. Switch from Spark's shaded guava (org.sparkproject.guava) to the
Beam-vendored guava that is already on the classpath. Spark 4
no longer exposes the sparkproject guava package; using the
vendored one removes the version coupling for both runners.
* ci: re-trigger to clear flaky UnboundedScheduledExecutorServiceTest
Empty commit to re-run CI. The only failure on the prior head was
UnboundedScheduledExecutorServiceTest.testThreadsAreAddedOnlyAsNeededWithContention,
a known flake (apache/beam#31590) — the test itself acknowledges
contention-induced extra threads in its inline comment. Squash or
drop on rebase before merge.
* [Spark Runner] Fix maxTimestamp to handle multi-window values
Iterables.getOnlyElement(windows) crashes with IllegalArgumentException
when a WindowedValue is associated with more than one window (e.g. after
a sliding window assignment). Compute the max maxTimestamp() across all
associated windows instead, falling back to a clear error if the iterable
is unexpectedly empty.
Applied identically to the shared base and the Spark 4 override. Flagged
by Gemini Code Assist on PR #38255.
* [Spark Runner] Drop unchecked cast in BoundedDatasetFactory.split
source.split returns List<? extends BoundedSource<T>>, which already
satisfies the subsequent stream usage. The cast was unchecked and would
trip heap-pollution warnings. Applied identically to the shared base
and the Spark 4 override. Flagged by Gemini Code Assist on PR #38255.
* [Spark Runner] Drop redundant Iterator cast in Spark 4
GroupByKeyTranslatorBatch
The (Iterator<V>) cast inside fun2 is redundant: fun2's signature
infers the iterator type. The shared base translator at the analogous
call site already calls iterableOnce(it) without a cast. Flagged by
Gemini Code Assist on PR #38255.
* [Spark Runner] Spark 4 EncoderFactory: stable constructor lookup +
document trait setter
Replace getConstructors()[0] (JVM-defined ordering, not stable) with a
helper that picks the widest public constructor. The downstream switch
already dispatches on parameter count to pick the right argument shape
per Spark version, so this just makes the choice deterministic.
Also document the org$apache$spark...$_setter_$isStruct_$eq method —
it is the synthetic setter the Scala compiler emits for trait val fields,
required when implementing AgnosticEncoders.StructEncoder from Java.
Both flagged by Gemini Code Assist on PR #38255.
* [Spark Runner] Fix Javadoc/comment typos flagged by Gemini
Three trivial typos flagged on PR #38255 round 2 review, applied
identically to the shared base and the Spark 4 override:
- CombinePerKeyTranslatorBatch: "other there other missing features?"
-> "are there other missing features?"
- GroupByKeyTranslatorBatch: "build-in" -> "built-in"
- EncoderHelpers: PRIMITIV_TYPES -> PRIMITIVE_TYPES (constant + caller)
* [Spark Runner] Switch EncoderFactory.invoke on the right constructor
In EncoderFactory.invoke(Expression obj, ...), the switch was keyed on
STATIC_INVOKE_CONSTRUCTOR.getParameterCount() but the body actually
calls INVOKE_CONSTRUCTOR. This worked by coincidence: across the
supported Spark 3.x versions both constructors happen to share the
same parameter counts at the same dispatch points. A future Spark
release where the two diverge would silently pick the wrong branch.
Switch on INVOKE_CONSTRUCTOR.getParameterCount() to match the
constructor that is actually invoked, and align with the convention
used by newInstance() further down. In the Spark 4 override this also
lets us collapse the `case 8: case 9:` fallthrough back to a single
`case 8:`, since INVOKE_CONSTRUCTOR remains 8 params in Spark 4 even
though STATIC_INVOKE_CONSTRUCTOR grew to 9.
Applied identically to the shared base and the Spark 4 override.
Flagged by Gemini Code Assist on PR #38255.
* Update CHANGES.md
Co-authored-by: gemini-code-assist[bot]
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
* [Spark 4] Drop redundant Collection cast in GroupByKeyHelpers
WindowedValue#getWindows() returns Collection<? extends BoundedWindow>,
which is already an Iterable and can be passed straight to
ScalaInterop.scalaIterator(...). The intermediate local variable and the
unchecked cast to Collection<BoundedWindow> were redundant.
Applied in both the shared base and the Spark 4 override.
Co-authored-by: gemini-code-assist[bot]
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
* [Spark 4] Add module README with slf4j-jdk14 known-issue note
Documents the Spark 4 runner's requirements (Java 17, Scala 2.13,
Spark 4.0.x, batch-only) and the slf4j-jdk14 ↔ jul-to-slf4j conflict
that is the Spark 4 manifestation of #26985 (fixed for Spark 3 in
#27001). The shared spark_runner.gradle already excludes slf4j-jdk14
for in-tree builds; this note tells downstream consumers to mirror the
exclude when assembling their own runtime classpath against
beam-runners-spark-4.
* [runners-spark] Address Gemini nits: use encoder terminology in exception
messages
* Trigger Build
* ci: re-trigger to clear flaky FlinkRequiresStableInputTest
The PreCommit Java failure on the previous run was a single timeout in
FlinkRequiresStableInputTest.testParDoRequiresStableInputPortable
(:runners:flink:1.17:test) — known flake tracked in #21333. This PR
does not touch any Flink code. Squash or drop on rebase before merge.
* ci: re-trigger to clear flaky
SqsIOWriteBatchesTest.testWriteBatchesToDynamicWithStrictTimeout
Wall-clock-timing test (100ms inter-message + 150ms strict batch
timeout) in sdks/java/io/amazon-web-services2 SQS — unrelated to
this PR (no AWS2/SQS/Direct-runner files touched), and master is
green for the same PreCommit on 6106b308.
* ci: re-trigger to clear Maven Central 403 on Windows wordcount
`Java Wordcount Direct Runner (windows-latest)` failed at the
:buildSrc configure step with HTTP 403 fetching legacy Spotless
5.6.1 transitive deps from repo.maven.apache.org
(spotless-lib:2.7.0, durian-*:1.2.0, jgit:5.8.0). Network/infra
flake — PR doesn't touch examples or buildSrc, master 'Java Tests'
workflow consistently green.
* ci: re-trigger to clear flaky Spotless + GCP IO Direct PreCommits
Both checks failed on the prior empty retry commit (e19b80c4).
Reproduced locally at e19b80c4: spotlessCheck and Spark
checkStyleMain/Test all pass. PR doesn't touch any GCP IO code,
and both checks were green on the immediately preceding branch
commits (5abbb21d, 604037f5) and on master (6106b308, e01f7114).
Treating as infra flakes; squash before merge.
* [runners-spark] Cover Scala-array fallback and EvaluationContext error
paths
Address codecov/patch on PR #38255 by exercising the new branches added for
Scala 2.13 / null-safe error logging:
- Refactor SparkRunnerKryoRegistrator's nested Scala-array Class.forName
fallback into a small @VisibleForTesting findFirstAvailableClass helper
and add unit tests for first-hit, fallback, no-match, and empty-input
paths.
- Add EvaluationContextTest covering the catch (RuntimeException) /
catch (Exception) blocks in evaluate() and collect(), including the
null-message path that motivated the String.valueOf wrap.
* [runners-spark] spotless: inline two findFirstAvailableClass calls in test
* flaky SqsIOWriteBatchesTest retry
* flaky ExampleEchoPipelineTest retry
* rebase cleanup: drop duplicate isSparkAtLeast helper now in master via
#38324
* Address @Abacn 2026-05-07 review
- SparkRunnerKryoRegistrator: throw IllegalStateException instead of
LOG.warn when neither ArraySeq$ofRef (Scala 2.13) nor WrappedArray$ofRef
(Scala 2.12) is on the classpath, so the missing class isn't silently
ignored. Drops the now-unused Logger field and slf4j imports.
- spark_runner.gradle: declare org.apache.spark:spark-connect-shims_2.13
as a provided dep gated on isSparkAtLeast("4.0.0"). Spark 4 splits the
Connect shim classes out of spark-sql; with enableStrictDependencies
this surfaced as analyzeClassesDependencies usedUndeclaredArtifacts.
The artifact does not exist for Spark 3, so the gate prevents Spark 3
resolution failures.
- runners/spark/4/build.gradle: drop the empty sparkVersions test
scaffolding (no additional Spark 4.x patch versions to test against
yet) and delete the now-unused
.github/workflows/beam_PreCommit_Java_Spark4_Versions.yml workflow
+ its README.md row.
- EncoderFactory (shared base): revert the line 94 switch to
STATIC_INVOKE_CONSTRUCTOR.getParameterCount(), keeping Spark 3 behavior
byte-for-byte unchanged. Spark 4's complete EncoderFactory override
under runners/spark/4/src/.../EncoderFactory.java is unaffected.
- CHANGES.md: drop the Highlights line for Spark 4. Will re-add when
ValidatesRunner tests are set up and confirmed working, matching the
Phase 1 #38324 pattern.
- runners/spark/4/job-server/container: delete the entire module
(build.gradle + Dockerfile) and remove its include() from
settings.gradle.kts. Per @Abacn's offer to defer the container module
to portable runner support later. The fat-jar :runners:spark:4:job-server
module is kept.
---------
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
Co-authored-by: gemini-code-assist[bot]
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
---
..._ValidatesRunner_Spark4StructuredStreaming.json | 3 +
.../beam_PreCommit_Java_Spark4_Versions.json | 3 +
.github/workflows/README.md | 1 +
...a_ValidatesRunner_Spark4StructuredStreaming.yml | 97 ++++++
CHANGES.md | 1 -
gradle.properties | 2 +-
runners/spark/4/README.md | 73 +++++
runners/spark/4/build.gradle | 53 ++++
runners/spark/4/job-server/build.gradle | 31 ++
.../io/BoundedDatasetFactory.java | 13 +-
.../batch/CombinePerKeyTranslatorBatch.java | 6 +-
.../translation/batch/GroupByKeyHelpers.java | 8 +-
.../batch/GroupByKeyTranslatorBatch.java | 78 ++---
.../translation/helpers/EncoderFactory.java | 333 +++++++++++++++++++++
.../translation/helpers/EncoderHelpers.java | 45 ++-
.../translation/utils/ScalaInterop.java | 114 +++++++
.../translation/helpers/EncoderHelpersTest.java | 298 ++++++++++++++++++
runners/spark/spark_runner.gradle | 23 +-
.../spark/coders/SparkRunnerKryoRegistrator.java | 28 +-
.../SparkGroupAlsoByWindowViaWindowSet.java | 4 +-
.../SparkStructuredStreamingPipelineResult.java | 1 +
.../io/BoundedDatasetFactory.java | 2 +-
.../translation/EvaluationContext.java | 10 +-
.../batch/CombinePerKeyTranslatorBatch.java | 2 +-
.../translation/batch/GroupByKeyHelpers.java | 4 +-
.../batch/GroupByKeyTranslatorBatch.java | 2 +-
.../translation/helpers/EncoderHelpers.java | 21 +-
.../coders/SparkRunnerKryoRegistratorTest.java | 49 +++
.../translation/EvaluationContextTest.java | 84 ++++++
settings.gradle.kts | 2 +
30 files changed, 1307 insertions(+), 84 deletions(-)
diff --git
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.json
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.json
new file mode 100644
index 00000000000..c4edaa85a89
--- /dev/null
+++
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.json
@@ -0,0 +1,3 @@
+{
+ "comment": "Modify this file in a trivial way to cause this test suite to
run"
+}
diff --git a/.github/trigger_files/beam_PreCommit_Java_Spark4_Versions.json
b/.github/trigger_files/beam_PreCommit_Java_Spark4_Versions.json
new file mode 100644
index 00000000000..c4edaa85a89
--- /dev/null
+++ b/.github/trigger_files/beam_PreCommit_Java_Spark4_Versions.json
@@ -0,0 +1,3 @@
+{
+ "comment": "Modify this file in a trivial way to cause this test suite to
run"
+}
diff --git a/.github/workflows/README.md b/.github/workflows/README.md
index a6539d18f36..c3c73c0317a 100644
--- a/.github/workflows/README.md
+++ b/.github/workflows/README.md
@@ -370,6 +370,7 @@ PostCommit Jobs run in a schedule against master branch and
generally do not get
| [ PostCommit Java ValidatesRunner Flink
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml)
| N/A |`beam_PostCommit_Java_ValidatesRunner_Flink.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml?query
[...]
| [ PostCommit Java ValidatesRunner Spark
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Spark.yml)
| N/A |`beam_PostCommit_Java_ValidatesRunner_Spark.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Spark.yml?query
[...]
| [ PostCommit Java ValidatesRunner SparkStructuredStreaming
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.yml)
| N/A |`beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json`|
[](https:
[...]
+| [ PostCommit Java ValidatesRunner Spark4StructuredStreaming
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml)
| N/A |`beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.json`|
[](h
[...]
| [ PostCommit Java ValidatesRunner Twister2
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Twister2.yml)
| N/A |`beam_PostCommit_Java_ValidatesRunner_Twister2.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_
[...]
| [ PostCommit Java ValidatesRunner ULR
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_ULR.yml)
| N/A |`beam_PostCommit_Java_ValidatesRunner_ULR.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_ULR.yml?query=event%3Asch
[...]
| [ PostCommit Java
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java.yml) |
N/A |`beam_PostCommit_Java.json`|
[](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java.yml?query=event%3Aschedule)
|
diff --git
a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml
b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml
new file mode 100644
index 00000000000..b595afe6f42
--- /dev/null
+++
b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PostCommit Java ValidatesRunner Spark4 StructuredStreaming
+
+on:
+ schedule:
+ - cron: '45 4/6 * * *'
+ pull_request_target:
+ paths: ['release/trigger_all_tests.json',
'.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.json']
+ workflow_dispatch:
+
+#Setting explicit permissions for the action to avoid the default permissions
which are `write-all` in case of pull_request_target event
+permissions:
+ actions: write
+ pull-requests: write
+ checks: write
+ contents: read
+ deployments: read
+ id-token: none
+ issues: write
+ discussions: read
+ packages: read
+ pages: read
+ repository-projects: read
+ security-events: read
+ statuses: read
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+ group: '${{ github.workflow }} @ ${{ github.event.pull_request.number ||
github.sha || github.head_ref || github.ref }}-${{ github.event.schedule ||
github.event.comment.id || github.event.sender.login }}'
+ cancel-in-progress: true
+
+env:
+ DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
+ GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+ GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+ beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming:
+ name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+ runs-on: [self-hosted, ubuntu-24.04, main]
+ timeout-minutes: 120
+ strategy:
+ matrix:
+ job_name:
[beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming]
+ job_phrase: [Run Spark4 StructuredStreaming ValidatesRunner]
+ if: |
+ github.event_name == 'workflow_dispatch' ||
+ github.event_name == 'pull_request_target' ||
+ (github.event_name == 'schedule' && github.repository == 'apache/beam')
||
+ github.event.comment.body == 'Run Spark4 StructuredStreaming
ValidatesRunner'
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup repository
+ uses: ./.github/actions/setup-action
+ with:
+ comment_phrase: ${{ matrix.job_phrase }}
+ github_token: ${{ secrets.GITHUB_TOKEN }}
+ github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+ - name: Setup environment
+ uses: ./.github/actions/setup-environment-action
+ with:
+ java-version: '17'
+ - name: run validatesStructuredStreamingRunnerBatch script
+ uses: ./.github/actions/gradle-command-self-hosted-action
+ with:
+ gradle-command:
:runners:spark:4:validatesStructuredStreamingRunnerBatch
+ arguments: |
+ -PtestJavaVersion=17 \
+ -PdisableSpotlessCheck=true \
+ - name: Archive JUnit Test Results
+ uses: actions/upload-artifact@v4
+ if: ${{ !success() }}
+ with:
+ name: JUnit Test Results
+ path: "**/build/reports/tests/"
+ - name: Publish JUnit Test Results
+ uses: EnricoMi/publish-unit-test-result-action@v2
+ if: always()
+ with:
+ commit: '${{ env.prsha || env.GITHUB_SHA }}'
+ comment_mode: ${{ github.event_name == 'issue_comment' && 'always'
|| 'off' }}
+ files: '**/build/test-results/**/*.xml'
+ large_files: true
diff --git a/CHANGES.md b/CHANGES.md
index 6deb653ffef..2cf7f983fa3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -60,7 +60,6 @@
## Highlights
* New highly anticipated feature X added to Python SDK
([#X](https://github.com/apache/beam/issues/X)).
-* New highly anticipated feature Y added to Java SDK
([#Y](https://github.com/apache/beam/issues/Y)).
## I/Os
diff --git a/gradle.properties b/gradle.properties
index 583eb24d896..fc2813f11fe 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -41,6 +41,6 @@ docker_image_default_repo_prefix=beam_
# supported flink versions
flink_versions=1.17,1.18,1.19,1.20,2.0
# supported spark versions
-spark_versions=3
+spark_versions=3,4
# supported python versions
python_versions=3.10,3.11,3.12,3.13,3.14
diff --git a/runners/spark/4/README.md b/runners/spark/4/README.md
new file mode 100644
index 00000000000..371a4451265
--- /dev/null
+++ b/runners/spark/4/README.md
@@ -0,0 +1,73 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+# Apache Beam Spark 4 Runner
+
+Experimental Beam runner for Apache Spark 4 (batch-only). Built on the shared
+`runners/spark` source base via `spark_runner.gradle`'s per-version
+source-overrides mechanism: this module contributes the small set of files
+under `src/main/java/.../structuredstreaming/` that diverge from the Spark 3
+implementation. See the parent `runners/spark/` module for the bulk of the
+runner code.
+
+## Requirements
+
+* **Spark 4.0.2** (and other Spark 4.0.x patch releases)
+* **Scala 2.13**
+* **Java 17** — Spark 4 does not run on earlier JDKs
+
+## Status
+
+Batch only. Streaming is tracked in
+[#36841](https://github.com/apache/beam/issues/36841).
+
+## Known issues
+
+### `StackOverflowError` from `slf4j-jdk14` on the runtime classpath
+
+Spark 4 ships `org.slf4j:jul-to-slf4j` to route `java.util.logging` records
+into SLF4J. If `org.slf4j:slf4j-jdk14` is also resolved at runtime — it routes
+the other direction (SLF4J → JUL) — the first log line creates an infinite
+loop:
+
+```
+java.lang.StackOverflowError
+ at org.slf4j.bridge.SLF4JBridgeHandler.publish(...)
+ at java.util.logging.Logger.log(...)
+ at org.slf4j.impl.JDK14LoggerAdapter.log(...)
+ at org.slf4j.bridge.SLF4JBridgeHandler.publish(...)
+ ...
+```
+
+This is the same condition that broke the Spark 3 runner in
+[#26985](https://github.com/apache/beam/issues/26985), fixed in
+[#27001](https://github.com/apache/beam/pull/27001).
+
+The shared `spark_runner.gradle` already excludes `slf4j-jdk14` from the
+runner module's own `configurations.all`, so in-tree builds are unaffected.
+Downstream Gradle consumers that assemble a runtime classpath against
+`beam-runners-spark-4` should mirror that exclude:
+
+```groovy
+configurations.all {
+ exclude group: "org.slf4j", module: "slf4j-jdk14"
+}
+```
+
+For Maven, exclude `org.slf4j:slf4j-jdk14` from any dependency that pulls it
+transitively (commonly the Beam SDK harness and several IO connectors).
diff --git a/runners/spark/4/build.gradle b/runners/spark/4/build.gradle
new file mode 100644
index 00000000000..01fb3680b07
--- /dev/null
+++ b/runners/spark/4/build.gradle
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '..'
+/* All properties required for loading the Spark build script */
+project.ext {
+ spark_major = '4'
+ // Spark 4 version as defined in BeamModulePlugin; requires Scala 2.13 and
Java 17
+ spark_version = spark4_version
+ spark_scala_version = '2.13'
+ archives_base_name = 'beam-runners-spark-4'
+}
+
+// Load the main build script which contains all build logic.
+// spark_runner.gradle handles the per-version source-overrides Copy:
+// shared base (runners/spark/src/) + previous majors + this module's ./src/
are
+// merged into build/source-overrides/src using DuplicatesStrategy.INCLUDE so
the
+// 11 files under runners/spark/4/src/.../structuredstreaming/ override the
+// shared-base versions.
+apply from: "$basePath/spark_runner.gradle"
+
+// Spark 4 always requires Java 17, so unconditionally add the --add-opens
flags
+// required by Kryo and other libraries that use reflection on JDK internals.
+test {
+ jvmArgs "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
+ "--add-opens=java.base/java.nio=ALL-UNNAMED",
+ "--add-opens=java.base/java.util=ALL-UNNAMED",
+ "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
+}
+
+// Exclude DStream-based streaming tests from the shared-base copy: the Spark
4 module
+// supports only structured streaming (batch) and does not include legacy
DStream support.
+// Streaming test utilities also depend on kafka.server.KafkaServerStartable
which was
+// removed in Kafka 2.8.0 (the first Kafka version with a _2.13 artifact).
+tasks.named("copyTestSourceOverrides") {
+ exclude "**/translation/streaming/**"
+}
+
diff --git a/runners/spark/4/job-server/build.gradle
b/runners/spark/4/job-server/build.gradle
new file mode 100644
index 00000000000..598cf3b4913
--- /dev/null
+++ b/runners/spark/4/job-server/build.gradle
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server'
+
+project.ext {
+ // Look for the source code in the parent module
+ main_source_dirs = ["$basePath/src/main/java"]
+ test_source_dirs = ["$basePath/src/test/java"]
+ main_resources_dirs = ["$basePath/src/main/resources"]
+ test_resources_dirs = ["$basePath/src/test/resources"]
+ archives_base_name = 'beam-runners-spark-4-job-server'
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/spark_job_server.gradle"
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
similarity index 95%
copy from
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
copy to
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
index c0d46e77c1d..d32dc14eccc 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
+++
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
@@ -51,6 +51,7 @@ import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.Serializer;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.classic.Dataset$;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
@@ -85,7 +86,14 @@ public class BoundedDatasetFactory {
Params<T> params = new Params<>(encoder, options,
session.sparkContext().defaultParallelism());
BeamTable<T> table = new BeamTable<>(source, params);
LogicalPlan logicalPlan = DataSourceV2Relation.create(table,
Option.empty(), Option.empty());
- return Dataset.ofRows(session, logicalPlan).as(encoder);
+ // In Spark 4.0+, Dataset$ moved to org.apache.spark.sql.classic and its
ofRows() now
+ // takes the classic SparkSession subclass. The runtime instance returned
by
+ // SparkSession.builder() is always a classic.SparkSession, so the
downcast is safe and
+ // avoids reflection.
+ return (Dataset<WindowedValue<T>>)
+ Dataset$.MODULE$
+ .ofRows((org.apache.spark.sql.classic.SparkSession) session,
logicalPlan)
+ .as(encoder);
}
/**
@@ -241,7 +249,7 @@ public class BoundedDatasetFactory {
try {
PipelineOptions options = params.options.get();
long desiredSize = source.getEstimatedSizeBytes(options) /
params.numPartitions;
- List<BoundedSource<T>> split = (List<BoundedSource<T>>)
source.split(desiredSize, options);
+ List<? extends BoundedSource<T>> split = source.split(desiredSize,
options);
IntSupplier idxSupplier = new AtomicInteger(0)::getAndIncrement;
return split.stream().map(s -> new SourcePartition<>(s,
idxSupplier)).collect(toList());
} catch (Exception e) {
@@ -279,7 +287,6 @@ public class BoundedDatasetFactory {
@SuppressWarnings("nullness") // ok, reader not used any longer
public void close() throws IOException {
if (reader != null) {
- endOfData();
try {
reader.close();
} finally {
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
similarity index 97%
copy from
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
copy to
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 02c56a8081c..e483c2db0df 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -46,7 +46,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.expressions.Aggregator;
import scala.Tuple2;
-import scala.collection.TraversableOnce;
+import scala.collection.IterableOnce;
/**
* Translator for {@link Combine.PerKey} using {@link Dataset#groupByKey} with
a Spark {@link
@@ -64,7 +64,7 @@ import scala.collection.TraversableOnce;
* TODOs:
* <li>combine with context (CombineFnWithContext)?
* <li>combine with sideInputs?
- * <li>other there other missing features?
+ * <li>are there other missing features?
*/
class CombinePerKeyTranslatorBatch<K, InT, AccT, OutT>
extends TransformTranslator<
@@ -140,7 +140,7 @@ class CombinePerKeyTranslatorBatch<K, InT, AccT, OutT>
}
private static <K, V>
- Fun1<Tuple2<K, Collection<WindowedValue<V>>>,
TraversableOnce<WindowedValue<KV<K, V>>>>
+ Fun1<Tuple2<K, Collection<WindowedValue<V>>>,
IterableOnce<WindowedValue<KV<K, V>>>>
explodeWindows() {
return t ->
ScalaInterop.scalaIterator(t._2).map(wv -> wv.withValue(KV.of(t._1,
wv.getValue())));
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
similarity index 92%
copy from
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
copy to
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
index b7c139068d1..f25121e1b47 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
+++
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
@@ -21,7 +21,6 @@ import static
org.apache.beam.runners.spark.structuredstreaming.translation.util
import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
import static
org.apache.beam.sdk.transforms.windowing.TimestampCombiner.END_OF_WINDOW;
-import java.util.Collection;
import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -32,7 +31,7 @@ import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
import scala.Tuple2;
-import scala.collection.TraversableOnce;
+import scala.collection.IterableOnce;
/**
* Package private helpers to support translating grouping transforms using
`groupByKey` such as
@@ -75,13 +74,12 @@ class GroupByKeyHelpers {
* traversable of composite keys {@code (BoundedWindow, Key)} and value.
*/
static <K, V, T>
- Fun1<WindowedValue<KV<K, V>>,
TraversableOnce<Tuple2<Tuple2<BoundedWindow, K>, T>>>
+ Fun1<WindowedValue<KV<K, V>>, IterableOnce<Tuple2<Tuple2<BoundedWindow,
K>, T>>>
explodeWindowedKey(Fun1<WindowedValue<KV<K, V>>, T> valueFn) {
return v -> {
T value = valueFn.apply(v);
K key = v.getValue().getKey();
- Collection<BoundedWindow> windows = (Collection<BoundedWindow>)
v.getWindows();
- return ScalaInterop.scalaIterator(windows).map(w -> tuple(tuple(w, key),
value));
+ return ScalaInterop.scalaIterator(v.getWindows()).map(w ->
tuple(tuple(w, key), value));
};
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
similarity index 85%
copy from
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
copy to
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 46cda333482..7caf06cb38f 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -33,7 +33,6 @@ import static
org.apache.beam.runners.spark.structuredstreaming.translation.util
import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun2;
import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.javaIterator;
-import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
@@ -67,21 +66,16 @@ import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.TypedColumn;
-import org.apache.spark.sql.catalyst.expressions.CreateArray;
-import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
-import org.apache.spark.sql.catalyst.expressions.Expression;
-import org.apache.spark.sql.catalyst.expressions.Literal;
-import org.apache.spark.sql.catalyst.expressions.Literal$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.checkerframework.checker.nullness.qual.NonNull;
import scala.Tuple2;
import scala.collection.Iterator;
-import scala.collection.Seq;
+import scala.collection.JavaConverters;
import scala.collection.immutable.List;
/**
- * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
build-in aggregation
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
built-in aggregation
* function {@code collect_list} when applicable.
*
* <p>Note: Using {@code collect_list} isn't any worse than using {@link
ReduceFnRunner}. In the
@@ -106,10 +100,10 @@ class GroupByKeyTranslatorBatch<K, V>
PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K,
V>> {
/** Literal of binary encoded Pane info. */
- private static final Expression PANE_NO_FIRING = lit(toByteArray(NO_FIRING,
PaneInfoCoder.of()));
+ private static final Column PANE_NO_FIRING = lit(toByteArray(NO_FIRING,
PaneInfoCoder.of()));
/** Defaults for value in single global window. */
- private static final List<Expression> GLOBAL_WINDOW_DETAILS =
+ private static final List<Column> GLOBAL_WINDOW_DETAILS =
windowDetails(lit(new byte[][] {EMPTY_BYTE_ARRAY}));
GroupByKeyTranslatorBatch() {
@@ -137,7 +131,8 @@ class GroupByKeyTranslatorBatch<K, V>
.as(SparkCommonPipelineOptions.class)
.getPreferGroupByKeyToHandleHugeValues();
if (useCollectList && eligibleForGlobalGroupBy(windowing, false)) {
- // Collects all values per key in memory. This might be problematic if
there's few keys only
+ // Collects all values per key in memory. This might be problematic if
there's
+ // few keys only
// or some highly skewed distribution.
result =
input
@@ -149,7 +144,8 @@ class GroupByKeyTranslatorBatch<K, V>
windowTimestamp(tsCombiner)));
} else if (eligibleForGlobalGroupBy(windowing, true)) {
- // Produces an iterable that can be traversed exactly once. However, on
the plus side, data is
+ // Produces an iterable that can be traversed exactly once. However, on
the plus
+ // side, data is
// not collected in memory until serialized or done by the user.
result =
cxt.getDataset(cxt.getInput())
@@ -161,8 +157,10 @@ class GroupByKeyTranslatorBatch<K, V>
} else if (useCollectList
&& eligibleForGroupByWindow(windowing, false)
&& (windowing.getWindowFn().assignsToOneWindow() ||
transform.fewKeys())) {
- // Using the window as part of the key should help to better distribute
the data. However, if
- // values are assigned to multiple windows, more data would be shuffled
around. If there's few
+ // Using the window as part of the key should help to better distribute
the
+ // data. However, if
+ // values are assigned to multiple windows, more data would be shuffled
around.
+ // If there's few
// keys only, this is still valuable.
// Collects all values per key & window in memory.
result =
@@ -178,24 +176,28 @@ class GroupByKeyTranslatorBatch<K, V>
} else if (eligibleForGroupByWindow(windowing, true)
&& (windowing.getWindowFn().assignsToOneWindow() ||
transform.fewKeys())) {
- // Using the window as part of the key should help to better distribute
the data. However, if
- // values are assigned to multiple windows, more data would be shuffled
around. If there's few
+ // Using the window as part of the key should help to better distribute
the
+ // data. However, if
+ // values are assigned to multiple windows, more data would be shuffled
around.
+ // If there's few
// keys only, this is still valuable.
- // Produces an iterable that can be traversed exactly once. However, on
the plus side, data is
+ // Produces an iterable that can be traversed exactly once. However, on
the plus
+ // side, data is
// not collected in memory until serialized or done by the user.
Encoder<Tuple2<BoundedWindow, K>> windowedKeyEnc =
cxt.tupleEncoder(cxt.windowEncoder(), keyEnc);
result =
cxt.getDataset(cxt.getInput())
.flatMap(explodeWindowedKey(valueValue()),
cxt.tupleEncoder(windowedKeyEnc, valueEnc))
- .groupByKey(fun1(Tuple2::_1), windowedKeyEnc)
- .mapValues(fun1(Tuple2::_2), valueEnc)
+ .groupByKey(fun1(t -> t._1()), windowedKeyEnc)
+ .mapValues(fun1(t -> t._2()), valueEnc)
.mapGroups(
fun2((wKey, it) -> windowedKV(wKey, iterableOnce(it))),
cxt.windowedEncoder(outputCoder));
} else {
- // Collects all values per key in memory. This might be problematic if
there's few keys only
+ // Collects all values per key in memory. This might be problematic if
there's
+ // few keys only
// or some highly skewed distribution.
// FIXME Revisit this case, implementation is far from ideal:
@@ -236,12 +238,12 @@ class GroupByKeyTranslatorBatch<K, V>
return new Column[] {agg.as("timestamp")};
}
- private static Expression windowTimestamp(TimestampCombiner tsCombiner) {
+ private static Column windowTimestamp(TimestampCombiner tsCombiner) {
if (tsCombiner.equals(TimestampCombiner.END_OF_WINDOW)) {
// null will be set to END_OF_WINDOW by the respective deserializer
return litNull(DataTypes.LongType);
}
- return col("timestamp").expr();
+ return col("timestamp");
}
/**
@@ -260,35 +262,37 @@ class GroupByKeyTranslatorBatch<K, V>
}
private static <InT, T> TypedColumn<InT, WindowedValue<T>> inGlobalWindow(
- TypedColumn<?, T> value, Expression ts) {
- List<Expression> fields = concat(timestampedValue(value, ts),
GLOBAL_WINDOW_DETAILS);
+ TypedColumn<?, T> value, Column ts) {
+ List<Column> fields = concat(timestampedValue(value, ts),
GLOBAL_WINDOW_DETAILS);
Encoder<WindowedValue<T>> enc =
windowedValueEncoder(value.encoder(), encoderOf(GlobalWindow.class));
- return (TypedColumn<InT, WindowedValue<T>>) new Column(new
CreateNamedStruct(fields)).as(enc);
+ return (TypedColumn<InT, WindowedValue<T>>)
+ struct(JavaConverters.asJavaCollection(fields).toArray(new
Column[0])).as(enc);
}
public static <InT, T> TypedColumn<InT, WindowedValue<T>> inSingleWindow(
- TypedColumn<?, T> value, TypedColumn<?, ? extends BoundedWindow> window,
Expression ts) {
- Expression windows = new CreateArray(listOf(window.expr()));
- Seq<Expression> fields = concat(timestampedValue(value, ts),
windowDetails(windows));
+ TypedColumn<?, T> value, TypedColumn<?, ? extends BoundedWindow> window,
Column ts) {
+ Column windows = org.apache.spark.sql.functions.array(window);
+ List<Column> fields = concat(timestampedValue(value, ts),
windowDetails(windows));
Encoder<WindowedValue<T>> enc = windowedValueEncoder(value.encoder(),
window.encoder());
- return (TypedColumn<InT, WindowedValue<T>>) new Column(new
CreateNamedStruct(fields)).as(enc);
+ return (TypedColumn<InT, WindowedValue<T>>)
+ struct(JavaConverters.asJavaCollection(fields).toArray(new
Column[0])).as(enc);
}
- private static List<Expression> timestampedValue(Column value, Expression
ts) {
- return seqOf(lit("value"), value.expr(), lit("timestamp"), ts).toList();
+ private static List<Column> timestampedValue(Column value, Column ts) {
+ return seqOf(value.as("value"), ts.as("timestamp")).toList();
}
- private static List<Expression> windowDetails(Expression windows) {
- return seqOf(lit("windows"), windows, lit("paneInfo"),
PANE_NO_FIRING).toList();
+ private static List<Column> windowDetails(Column windows) {
+ return seqOf(windows.as("windows"),
PANE_NO_FIRING.as("paneInfo")).toList();
}
- private static <T extends @NonNull Object> Expression lit(T t) {
- return Literal$.MODULE$.apply(t);
+ private static <T extends @NonNull Object> Column lit(T t) {
+ return org.apache.spark.sql.functions.lit(t);
}
@SuppressWarnings("nullness") // NULL literal
- private static Expression litNull(DataType dataType) {
- return new Literal(null, dataType);
+ private static Column litNull(DataType dataType) {
+ return org.apache.spark.sql.functions.lit(null).cast(dataType);
}
}
diff --git
a/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java
new file mode 100644
index 00000000000..6565c2a01c6
--- /dev/null
+++
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.emptyList;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.replace;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders;
+import org.apache.spark.sql.catalyst.encoders.AgnosticExpressionPathEncoder;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.expressions.BoundReference;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.objects.Invoke;
+import org.apache.spark.sql.catalyst.expressions.objects.NewInstance;
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import scala.Option;
+import scala.collection.Iterator;
+import scala.collection.immutable.Seq;
+import scala.reflect.ClassTag;
+
+public class EncoderFactory {
+ // Resolve the Scala case-class primary constructor (the one with the most
parameters).
+ // Constructor ordering returned by Class.getConstructors() is JVM-defined
and not stable
+ // across Spark versions, so we pick the widest constructor explicitly and
then dispatch on
+ // parameter count below to pick the right argument shape per Spark version.
+ private static final Constructor<StaticInvoke> STATIC_INVOKE_CONSTRUCTOR =
+ primaryConstructor(StaticInvoke.class);
+
+ private static final Constructor<Invoke> INVOKE_CONSTRUCTOR =
primaryConstructor(Invoke.class);
+
+ private static final Constructor<NewInstance> NEW_INSTANCE_CONSTRUCTOR =
+ primaryConstructor(NewInstance.class);
+
+ @SuppressWarnings("unchecked")
+ private static <T> Constructor<T> primaryConstructor(Class<T> cls) {
+ Constructor<?>[] ctors = cls.getConstructors();
+ Constructor<?> widest = ctors[0];
+ for (int i = 1; i < ctors.length; i++) {
+ if (ctors[i].getParameterCount() > widest.getParameterCount()) {
+ widest = ctors[i];
+ }
+ }
+ return (Constructor<T>) widest;
+ }
+
+ @SuppressWarnings({"nullness", "unchecked"})
+ static <T> ExpressionEncoder<T> create(
+ Expression serializer, Expression deserializer, Class<? super T> clazz) {
+ AgnosticEncoder<T> agnosticEncoder = new BeamAgnosticEncoder<>(serializer,
deserializer, clazz);
+ return ExpressionEncoder.apply(agnosticEncoder, serializer, deserializer);
+ }
+
+ /**
+ * An {@link AgnosticEncoder} that implements both {@link
AgnosticExpressionPathEncoder} (so that
+ * {@code SerializerBuildHelper} / {@code DeserializerBuildHelper} delegate
to our pre-built
+ * expressions) and {@link AgnosticEncoders.StructEncoder} (so that {@code
+ * Dataset.select(TypedColumn)} creates an N-attribute plan instead of a
1-attribute wrapped plan,
+ * preventing {@code FIELD_NUMBER_MISMATCH} errors).
+ *
+ * <p>The {@code toCatalyst} / {@code fromCatalyst} methods substitute the
{@code input}
+ * expression into the pre-built serializer / deserializer via {@code
transformUp}, so that when
+ * this encoder is nested inside a composite encoder (e.g. {@code
Encoders.tuple}) the correct
+ * field-level expression is used in place of the root {@code
BoundReference} / {@code
+ * GetColumnByOrdinal}.
+ */
+ @SuppressWarnings({"nullness", "unchecked", "deprecation"})
+ private static final class BeamAgnosticEncoder<T>
+ implements AgnosticExpressionPathEncoder<T>,
AgnosticEncoders.StructEncoder<T> {
+
+ private final Expression serializer;
+ private final Expression deserializer;
+ private final Class<? super T> clazz;
+ private final Seq<AgnosticEncoders.EncoderField> encoderFields;
+
+ BeamAgnosticEncoder(Expression serializer, Expression deserializer,
Class<? super T> clazz) {
+ this.serializer = serializer;
+ this.deserializer = deserializer;
+ this.clazz = clazz;
+ this.encoderFields = buildFields(serializer.dataType());
+ }
+
+ private static Seq<AgnosticEncoders.EncoderField> buildFields(DataType dt)
{
+ if (dt instanceof StructType) {
+ StructField[] structFields = ((StructType) dt).fields();
+ List<AgnosticEncoders.EncoderField> fields = new
ArrayList<>(structFields.length);
+ for (StructField sf : structFields) {
+ fields.add(
+ new AgnosticEncoders.EncoderField(
+ sf.name(),
+ new FieldEncoder<>(sf.dataType(), sf.nullable()),
+ sf.nullable(),
+ sf.metadata(),
+ Option.empty(),
+ Option.empty()));
+ }
+ return seqOf(fields.toArray(new AgnosticEncoders.EncoderField[0]));
+ } else {
+ // Non-struct: wrap in a single "value" field so StructEncoder sees
one field.
+ return seqOf(
+ new AgnosticEncoders.EncoderField(
+ "value",
+ new FieldEncoder<>(dt, true),
+ true,
+ Metadata.empty(),
+ Option.empty(),
+ Option.empty()));
+ }
+ }
+
+ // --- AgnosticExpressionPathEncoder ---
+
+ @Override
+ public Expression toCatalyst(Expression input) {
+ return serializer.transformUp(replace(BoundReference.class, input));
+ }
+
+ @Override
+ public Expression fromCatalyst(Expression input) {
+ return deserializer.transformUp(replace(GetColumnByOrdinal.class,
input));
+ }
+
+ // --- AgnosticEncoders.StructEncoder ---
+
+ @Override
+ public Seq<AgnosticEncoders.EncoderField> fields() {
+ return encoderFields;
+ }
+
+ @Override
+ public boolean isStruct() {
+ return true;
+ }
+
+ /**
+ * Setter required by the Scala compiler when implementing the {@link
+ * AgnosticEncoders.StructEncoder} trait from Java. Scala traits with
concrete {@code val}
+ * fields generate a synthetic mangled setter ({@code
<trait>$_setter_<field>_$eq}) that the
+ * trait's initializer invokes on subclasses. Java cannot declare {@code
val} fields, so we
+ * implement {@link #isStruct()} directly above and accept-but-ignore the
trait setter here. The
+ * mangled name is brittle and tied to Spark's Scala source layout — if
Spark removes the {@code
+ * isStruct} field from {@code StructEncoder}, this method becomes dead
code; if Spark renames
+ * it, compilation will fail and the new mangled name must be substituted.
+ */
+ @Override
+ public void
+
org$apache$spark$sql$catalyst$encoders$AgnosticEncoders$StructEncoder$_setter_$isStruct_$eq(
+ boolean v) {
+ // no-op: isStruct() is implemented directly above
+ }
+
+ // --- AgnosticEncoder / Encoder (explicit to resolve default-method
ambiguity) ---
+
+ @Override
+ public boolean isPrimitive() {
+ return false;
+ }
+
+ @Override
+ public StructType schema() {
+ // Build StructType from fields — mirrors the StructEncoder.schema()
default.
+ List<StructField> sfs = new ArrayList<>(encoderFields.size());
+ Iterator<AgnosticEncoders.EncoderField> it = encoderFields.iterator();
+ while (it.hasNext()) {
+ sfs.add(it.next().structField());
+ }
+ return new StructType(sfs.toArray(new StructField[0]));
+ }
+
+ @Override
+ public DataType dataType() {
+ return schema();
+ }
+
+ @Override
+ public ClassTag<T> clsTag() {
+ return (ClassTag<T>) ClassTag.apply(clazz);
+ }
+ }
+
+ /**
+ * Minimal {@link AgnosticEncoder} stub used to carry per-field {@link
DataType} metadata inside
+ * {@link AgnosticEncoders.EncoderField}. The actual serialization /
deserialization is handled by
+ * {@link BeamAgnosticEncoder#toCatalyst} and {@link
BeamAgnosticEncoder#fromCatalyst}.
+ */
+ @SuppressWarnings({"nullness", "unchecked"})
+ private static final class FieldEncoder<V> implements AgnosticEncoder<V> {
+ private final DataType fieldDataType;
+ private final boolean fieldNullable;
+
+ FieldEncoder(DataType dataType, boolean nullable) {
+ this.fieldDataType = dataType;
+ this.fieldNullable = nullable;
+ }
+
+ @Override
+ public boolean isPrimitive() {
+ return false;
+ }
+
+ @Override
+ public DataType dataType() {
+ return fieldDataType;
+ }
+
+ @Override
+ public StructType schema() {
+ return new StructType().add("value", fieldDataType, fieldNullable);
+ }
+
+ @Override
+ public boolean nullable() {
+ return fieldNullable;
+ }
+
+ @Override
+ public ClassTag<V> clsTag() {
+ return (ClassTag<V>) ClassTag.apply(Object.class);
+ }
+ }
+
+ /**
+ * Invoke method {@code fun} on Class {@code cls}, immediately propagating
{@code null} if any
+ * input arg is {@code null}.
+ */
+ static Expression invokeIfNotNull(Class<?> cls, String fun, DataType type,
Expression... args) {
+ return invoke(cls, fun, type, true, args);
+ }
+
+ /** Invoke method {@code fun} on Class {@code cls}. */
+ static Expression invoke(Class<?> cls, String fun, DataType type,
Expression... args) {
+ return invoke(cls, fun, type, false, args);
+ }
+
+ private static Expression invoke(
+ Class<?> cls, String fun, DataType type, boolean propagateNull,
Expression... args) {
+ try {
+ // To address breaking interfaces between various versions of Spark,
expressions are
+ // created reflectively. This is fine as it's just needed once to create
the query plan.
+ switch (STATIC_INVOKE_CONSTRUCTOR.getParameterCount()) {
+ case 6:
+ // Spark 3.1.x
+ return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+ cls, type, fun, seqOf(args), propagateNull, true);
+ case 7:
+ // Spark 3.2.0
+ return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+ cls, type, fun, seqOf(args), emptyList(), propagateNull, true);
+ case 8:
+ // Spark 3.2.x, 3.3.x
+ return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+ cls, type, fun, seqOf(args), emptyList(), propagateNull, true,
true);
+ case 9:
+ // Spark 4.0.x: added Option<ScalarFunction<?>> parameter
+ return STATIC_INVOKE_CONSTRUCTOR.newInstance(
+ cls, type, fun, seqOf(args), emptyList(), propagateNull, true,
true, Option.empty());
+ default:
+ throw new RuntimeException("Unsupported version of Spark");
+ }
+ } catch (IllegalArgumentException | ReflectiveOperationException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /** Invoke method {@code fun} on {@code obj} with provided {@code args}. */
+ static Expression invoke(
+ Expression obj, String fun, DataType type, boolean nullable,
Expression... args) {
+ try {
+ // To address breaking interfaces between various versions of Spark,
expressions are
+ // created reflectively. This is fine as it's just needed once to create
the query plan.
+ switch (INVOKE_CONSTRUCTOR.getParameterCount()) {
+ case 6:
+ // Spark 3.1.x
+ return INVOKE_CONSTRUCTOR.newInstance(obj, fun, type, seqOf(args),
false, nullable);
+ case 7:
+ // Spark 3.2.0
+ return INVOKE_CONSTRUCTOR.newInstance(
+ obj, fun, type, seqOf(args), emptyList(), false, nullable);
+ case 8:
+ // Spark 3.2.x, 3.3.x, 4.0.x: Invoke constructor is 8 params across
all these versions
+ return INVOKE_CONSTRUCTOR.newInstance(
+ obj, fun, type, seqOf(args), emptyList(), false, nullable, true);
+ default:
+ throw new RuntimeException("Unsupported version of Spark");
+ }
+ } catch (IllegalArgumentException | ReflectiveOperationException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ static Expression newInstance(Class<?> cls, DataType type, Expression...
args) {
+ try {
+ // To address breaking interfaces between various versions of Spark,
expressions are
+ // created reflectively. This is fine as it's just needed once to create
the query plan.
+ switch (NEW_INSTANCE_CONSTRUCTOR.getParameterCount()) {
+ case 5:
+ return NEW_INSTANCE_CONSTRUCTOR.newInstance(cls, seqOf(args), true,
type, Option.empty());
+ case 6:
+ // Spark 3.2.x, 3.3.x, 4.0.x: added immutable.Seq<AbstractDataType>
parameter
+ return NEW_INSTANCE_CONSTRUCTOR.newInstance(
+ cls, seqOf(args), emptyList(), true, type, Option.empty());
+ default:
+ throw new RuntimeException("Unsupported version of Spark");
+ }
+ } catch (IllegalArgumentException | ReflectiveOperationException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
similarity index 93%
copy from
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
copy to
runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index daf8451faac..173b4653a19 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -48,13 +48,13 @@ import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.SerializerBuildHelper;
import
org.apache.spark.sql.catalyst.SerializerBuildHelper.MapElementInformation;
import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.expressions.BoundReference;
import org.apache.spark.sql.catalyst.expressions.Coalesce;
@@ -98,7 +98,7 @@ public class EncoderHelpers {
private static final DataType LIST_TYPE = new ObjectType(List.class);
// Collections / maps of these types can be (de)serialized without
(de)serializing each member
- private static final Set<Class<?>> PRIMITIV_TYPES =
+ private static final Set<Class<?>> PRIMITIVE_TYPES =
ImmutableSet.of(
Boolean.class,
Byte.class,
@@ -154,7 +154,7 @@ public class EncoderHelpers {
public static <T> Encoder<T> encoderOf(Class<? super T> cls) {
Encoder<T> enc = getOrCreateDefaultEncoder(cls);
if (enc == null) {
- throw new IllegalArgumentException("No default coder available for class
" + cls);
+ throw new IllegalArgumentException("No default encoder available for
class " + cls);
}
return enc;
}
@@ -481,7 +481,7 @@ public class EncoderHelpers {
}
private static <T> boolean isPrimitiveEnc(Encoder<T> enc) {
- return PRIMITIV_TYPES.contains(enc.clsTag().runtimeClass());
+ return PRIMITIVE_TYPES.contains(enc.clsTag().runtimeClass());
}
private static <T> Expression serialize(Expression input, Encoder<T> enc) {
@@ -492,20 +492,35 @@ public class EncoderHelpers {
return deserializer(enc).transformUp(replace(GetColumnByOrdinal.class,
input));
}
+ /**
+ * Wraps an {@link Encoder} as an {@link ExpressionEncoder}. In Spark 4.x,
built-in encoders (e.g.
+ * {@code Encoders.INT()}) are {@link AgnosticEncoder} subclasses rather
than {@link
+ * ExpressionEncoder}s, so we convert them on demand.
+ */
+ @SuppressWarnings("unchecked")
+ private static <T> ExpressionEncoder<T> toExpressionEncoder(Encoder<T> enc) {
+ if (enc instanceof ExpressionEncoder) {
+ return (ExpressionEncoder<T>) enc;
+ } else if (enc instanceof AgnosticEncoder) {
+ return ExpressionEncoder.apply((AgnosticEncoder<T>) enc);
+ }
+ throw new IllegalArgumentException("Unsupported encoder type: " +
enc.getClass());
+ }
+
private static <T> Expression serializer(Encoder<T> enc) {
- return ((ExpressionEncoder<T>) enc).objSerializer();
+ return toExpressionEncoder(enc).objSerializer();
}
private static <T> Expression deserializer(Encoder<T> enc) {
- return ((ExpressionEncoder<T>) enc).objDeserializer();
+ return toExpressionEncoder(enc).objDeserializer();
}
private static <T> DataType serializedType(Encoder<T> enc) {
- return ((ExpressionEncoder<T>) enc).objSerializer().dataType();
+ return toExpressionEncoder(enc).objSerializer().dataType();
}
private static <T> DataType deserializedType(Encoder<T> enc) {
- return ((ExpressionEncoder<T>) enc).objDeserializer().dataType();
+ return toExpressionEncoder(enc).objDeserializer().dataType();
}
private static Expression rootRef(DataType dt, boolean nullable) {
@@ -548,9 +563,17 @@ public class EncoderHelpers {
return CoderHelpers.toByteArray(paneInfo, PaneInfoCoder.of());
}
- /** The end of the only window (max timestamp). */
- public static Instant maxTimestamp(Iterable<BoundedWindow> windows) {
- return Iterables.getOnlyElement(windows).maxTimestamp();
+ /** The maximum {@code maxTimestamp} across all associated windows. */
+ public static Instant maxTimestamp(Iterable<? extends BoundedWindow>
windows) {
+ Instant maxTimestamp = null;
+ for (BoundedWindow window : windows) {
+ Instant timestamp = window.maxTimestamp();
+ if (maxTimestamp == null || timestamp.isAfter(maxTimestamp)) {
+ maxTimestamp = timestamp;
+ }
+ }
+ return Preconditions.checkNotNull(
+ maxTimestamp, "WindowedValue must have at least one window");
}
public static List<Object> copyToList(ArrayData arrayData, DataType type) {
diff --git
a/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop.java
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop.java
new file mode 100644
index 00000000000..175e144d650
--- /dev/null
+++
b/runners/spark/4/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.utils;
+
+import java.io.Serializable;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Function1;
+import scala.Function2;
+import scala.PartialFunction;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+import scala.collection.immutable.Nil$;
+
+/** Utilities for easier interoperability with the Spark Scala API. */
+public class ScalaInterop {
+ private ScalaInterop() {}
+
+ public static <T> scala.collection.immutable.Seq<T> seqOf(T... t) {
+ return (scala.collection.immutable.Seq<T>)
+ JavaConverters.asScalaBuffer(java.util.Arrays.asList(t)).toList();
+ }
+
+ public static <T> List<T> concat(List<T> a, List<T> b) {
+ return b.$colon$colon$colon(a);
+ }
+
+ public static <T> Seq<T> listOf(T t) {
+ return emptyList().$colon$colon(t);
+ }
+
+ public static <T> List<T> emptyList() {
+ return (List<T>) Nil$.MODULE$;
+ }
+
+ /** Scala {@link Iterator} of Java {@link Iterable}. */
+ public static <T extends @NonNull Object> Iterator<T>
scalaIterator(Iterable<T> iterable) {
+ return scalaIterator(iterable.iterator());
+ }
+
+ /** Scala {@link Iterator} of Java {@link java.util.Iterator}. */
+ public static <T extends @NonNull Object> Iterator<T>
scalaIterator(java.util.Iterator<T> it) {
+ return JavaConverters.asScalaIterator(it);
+ }
+
+ /** Java {@link java.util.Iterator} of Scala {@link Iterator}. */
+ public static <T extends @NonNull Object> java.util.Iterator<T>
javaIterator(Iterator<T> it) {
+ return JavaConverters.asJavaIterator(it);
+ }
+
+ public static <T1, T2> Tuple2<T1, T2> tuple(T1 t1, T2 t2) {
+ return new Tuple2<>(t1, t2);
+ }
+
+ public static <T extends @NonNull Object, V> PartialFunction<T, T> replace(
+ Class<V> clazz, T replace) {
+ return new PartialFunction<T, T>() {
+
+ @Override
+ public boolean isDefinedAt(T x) {
+ return clazz.isAssignableFrom(x.getClass());
+ }
+
+ @Override
+ public T apply(T x) {
+ return replace;
+ }
+ };
+ }
+
+ public static <T extends @NonNull Object, V> PartialFunction<T, V>
match(Class<V> clazz) {
+ return new PartialFunction<T, V>() {
+
+ @Override
+ public boolean isDefinedAt(T x) {
+ return clazz.isAssignableFrom(x.getClass());
+ }
+
+ @Override
+ public V apply(T x) {
+ return (V) x;
+ }
+ };
+ }
+
+ public static <T, V> Fun1<T, V> fun1(Fun1<T, V> fun) {
+ return fun;
+ }
+
+ public static <T1, T2, V> Fun2<T1, T2, V> fun2(Fun2<T1, T2, V> fun) {
+ return fun;
+ }
+
+ public interface Fun1<T, V> extends Function1<T, V>, Serializable {}
+
+ public interface Fun2<T1, T2, V> extends Function2<T1, T2, V>, Serializable
{}
+}
diff --git
a/runners/spark/4/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java
b/runners/spark/4/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java
new file mode 100644
index 00000000000..48c1e645f6e
--- /dev/null
+++
b/runners/spark/4/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import static java.util.Arrays.asList;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.collectionEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.encoderFor;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.kvEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.mapEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.oneOfEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.windowedValueEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.tuple;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates.notNull;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.createStructField;
+import static org.apache.spark.sql.types.DataTypes.createStructType;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.function.Function;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.BigEndianShortCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DelegateCoder;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.FloatCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.joda.time.Instant;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import scala.Tuple2;
+
+/** Test of the wrapping of Beam Coders as Spark ExpressionEncoders. */
+@RunWith(JUnit4.class)
+public class EncoderHelpersTest {
+ @ClassRule public static SparkSessionRule sessionRule = new
SparkSessionRule("local[1]");
+
+ private static final Encoder<GlobalWindow> windowEnc =
+ EncoderHelpers.encoderOf(GlobalWindow.class);
+
+ private static final Map<Coder<?>, List<?>> BASIC_CASES =
+ ImmutableMap.<Coder<?>, List<?>>builder()
+ .put(BooleanCoder.of(), asList(true, false, null))
+ .put(ByteCoder.of(), asList((byte) 1, null))
+ .put(BigEndianShortCoder.of(), asList((short) 1, null))
+ .put(BigEndianIntegerCoder.of(), asList(1, 2, 3, null))
+ .put(VarIntCoder.of(), asList(1, 2, 3, null))
+ .put(BigEndianLongCoder.of(), asList(1L, 2L, 3L, null))
+ .put(VarLongCoder.of(), asList(1L, 2L, 3L, null))
+ .put(FloatCoder.of(), asList((float) 1.0, (float) 2.0, null))
+ .put(DoubleCoder.of(), asList(1.0, 2.0, null))
+ .put(StringUtf8Coder.of(), asList("1", "2", null))
+ .put(BigDecimalCoder.of(), asList(bigDecimalOf(1L),
bigDecimalOf(2L), null))
+ .put(InstantCoder.of(), asList(Instant.ofEpochMilli(1), null))
+ .build();
+
+ private <T> Dataset<T> createDataset(List<T> data, Encoder<T> encoder) {
+ Dataset<T> ds = sessionRule.getSession().createDataset(data, encoder);
+ ds.printSchema();
+ return ds;
+ }
+
+ @Test
+ public void testBeamEncoderMappings() {
+ BASIC_CASES.forEach(
+ (coder, data) -> {
+ Encoder<?> encoder = encoderFor(coder);
+ serializeAndDeserialize(data.get(0), (Encoder) encoder);
+ Dataset<?> dataset = createDataset(data, (Encoder) encoder);
+ assertThat(dataset.collect(), equalTo(data.toArray()));
+ });
+ }
+
+ @Test
+ public void testBeamEncoderOfPrivateType() {
+ // Verify concrete types are not used in coder generation.
+ // In case of private types this would cause an IllegalAccessError.
+ List<PrivateString> data = asList(new PrivateString("1"), new
PrivateString("2"));
+ Dataset<PrivateString> dataset = createDataset(data,
encoderFor(PrivateString.CODER));
+ assertThat(dataset.collect(), equalTo(data.toArray()));
+ }
+
+ @Test
+ public void testBeamWindowedValueEncoderMappings() {
+ BASIC_CASES.forEach(
+ (coder, data) -> {
+ List<WindowedValue<?>> windowed =
+ Lists.transform(data, WindowedValues::valueInGlobalWindow);
+
+ Encoder<?> encoder = windowedValueEncoder(encoderFor(coder),
windowEnc);
+ serializeAndDeserialize(windowed.get(0), (Encoder) encoder);
+
+ Dataset<?> dataset = createDataset(windowed, (Encoder) encoder);
+ assertThat(dataset.collect(), equalTo(windowed.toArray()));
+ });
+ }
+
+ @Test
+ public void testCollectionEncoder() {
+ BASIC_CASES.forEach(
+ (coder, data) -> {
+ Encoder<? extends Collection<?>> encoder =
collectionEncoder(encoderFor(coder), true);
+ Collection<?> collection = Collections.unmodifiableCollection(data);
+
+ Dataset<Collection<?>> dataset = createDataset(asList(collection),
(Encoder) encoder);
+ assertThat(dataset.head(), equalTo(data));
+ });
+ }
+
+ private void testMapEncoder(Class<?> cls, Function<Map<?, ?>, Map<?, ?>>
decorator) {
+ BASIC_CASES.forEach(
+ (coder, data) -> {
+ Encoder<?> enc = encoderFor(coder);
+ Encoder<Map<?, ?>> mapEncoder = mapEncoder(enc, enc, (Class) cls);
+ Map<?, ?> map =
+ decorator.apply(
+ data.stream().filter(notNull()).collect(toMap(identity(),
identity())));
+
+ Dataset<Map<?, ?>> dataset = createDataset(asList(map), mapEncoder);
+ Map<?, ?> head = dataset.head();
+ assertThat(head, equalTo(map));
+ assertThat(head, instanceOf(cls));
+ });
+ }
+
+ @Test
+ public void testMapEncoder() {
+ testMapEncoder(Map.class, identity());
+ }
+
+ @Test
+ public void testHashMapEncoder() {
+ testMapEncoder(HashMap.class, identity());
+ }
+
+ @Test
+ public void testTreeMapEncoder() {
+ testMapEncoder(TreeMap.class, TreeMap::new);
+ }
+
+ @Test
+ public void testBeamBinaryEncoder() {
+ List<List<String>> data = asList(asList("a1", "a2", "a3"), asList("b1",
"b2"), asList("c1"));
+
+ Encoder<List<String>> encoder =
encoderFor(ListCoder.of(StringUtf8Coder.of()));
+ serializeAndDeserialize(data.get(0), encoder);
+
+ Dataset<List<String>> dataset = createDataset(data, encoder);
+ assertThat(dataset.collect(), equalTo(data.toArray()));
+ }
+
+ @Test
+ public void testEncoderForKVCoder() {
+ List<KV<Integer, String>> data =
+ asList(KV.of(1, "value1"), KV.of(null, "value2"), KV.of(3, null));
+
+ Encoder<KV<Integer, String>> encoder =
+ kvEncoder(encoderFor(VarIntCoder.of()),
encoderFor(StringUtf8Coder.of()));
+ serializeAndDeserialize(data.get(0), encoder);
+
+ Dataset<KV<Integer, String>> dataset = createDataset(data, encoder);
+
+ StructType kvSchema =
+ createStructType(
+ new StructField[] {
+ createStructField("key", IntegerType, true),
+ createStructField("value", StringType, true)
+ });
+
+ assertThat(dataset.schema(), equalTo(kvSchema));
+ assertThat(dataset.collectAsList(), equalTo(data));
+ }
+
+ @Test
+ public void testOneOffEncoder() {
+ List<Coder<?>> coders = ImmutableList.copyOf(BASIC_CASES.keySet());
+ List<Encoder<?>> encoders =
coders.stream().map(EncoderHelpers::encoderFor).collect(toList());
+
+ // build oneOf tuples of type index and corresponding value
+ List<Tuple2<Integer, ?>> data =
+ BASIC_CASES.entrySet().stream()
+ .map(e -> tuple(coders.indexOf(e.getKey()), (Object)
e.getValue().get(0)))
+ .collect(toList());
+
+ // dataset is a sparse dataset with only one column set per row
+ Dataset<Tuple2<Integer, ?>> dataset = createDataset(data,
oneOfEncoder((List) encoders));
+ assertThat(dataset.collectAsList(), equalTo(data));
+ }
+
+ // fix scale/precision to system default to compare using equals
+ private static BigDecimal bigDecimalOf(long l) {
+ DecimalType type = DecimalType.SYSTEM_DEFAULT();
+ return new BigDecimal(l, new
MathContext(type.precision())).setScale(type.scale());
+ }
+
+ // test and explicit serialization roundtrip
+ @SuppressWarnings("unchecked")
+ private static <T> void serializeAndDeserialize(T data, Encoder<T> enc) {
+ ExpressionEncoder<T> bound;
+ if (enc instanceof ExpressionEncoder) {
+ bound = (ExpressionEncoder<T>) enc;
+ } else {
+ bound = ExpressionEncoder.apply((AgnosticEncoder<T>) enc);
+ }
+ bound =
+ bound.resolveAndBind(bound.resolveAndBind$default$1(),
bound.resolveAndBind$default$2());
+
+ InternalRow row = bound.createSerializer().apply(data);
+ T deserialized = bound.createDeserializer().apply(row);
+
+ assertThat(deserialized, equalTo(data));
+ }
+
+ private static class PrivateString {
+ private static final Coder<PrivateString> CODER =
+ DelegateCoder.of(
+ StringUtf8Coder.of(),
+ str -> str.string,
+ PrivateString::new,
+ new TypeDescriptor<PrivateString>() {});
+
+ private final String string;
+
+ public PrivateString(String string) {
+ this.string = string;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PrivateString)) {
+ return false;
+ }
+ PrivateString that = (PrivateString) o;
+ return Objects.equals(string, that.string);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(string);
+ }
+ }
+}
diff --git a/runners/spark/spark_runner.gradle
b/runners/spark/spark_runner.gradle
index 9d7be01e882..cd108372fea 100644
--- a/runners/spark/spark_runner.gradle
+++ b/runners/spark/spark_runner.gradle
@@ -257,12 +257,23 @@ dependencies {
implementation
"org.apache.spark:spark-common-utils_$spark_scala_version:$spark_version"
implementation
"org.apache.spark:spark-sql-api_$spark_scala_version:$spark_version"
}
+ if (isSparkAtLeast("4.0.0")) {
+ // Spark 4 splits the Connect shims out of spark-sql; classes are
referenced directly
+ // (e.g. via SparkSession builder paths) so strict dependency analysis
requires it as
+ // a declared provided dep. The artifact does not exist for Spark 3.
+ provided
"org.apache.spark:spark-connect-shims_$spark_scala_version:$spark_version"
+ }
permitUnusedDeclared
"org.apache.spark:spark-network-common_$spark_scala_version:$spark_version"
implementation "io.dropwizard.metrics:metrics-core:4.1.1" // version used by
Spark 3.1
- compileOnly "org.scala-lang:scala-library:2.12.15"
- runtimeOnly library.java.jackson_module_scala_2_12
- // Force paranamer 2.8 to avoid issues when using Scala 2.12
- runtimeOnly "com.thoughtworks.paranamer:paranamer:2.8"
+ if (spark_scala_version == '2.13') {
+ compileOnly "org.scala-lang:scala-library:2.13.15"
+ runtimeOnly library.java.jackson_module_scala_2_13
+ } else {
+ compileOnly "org.scala-lang:scala-library:2.12.15"
+ runtimeOnly library.java.jackson_module_scala_2_12
+ // Force paranamer 2.8 to avoid issues when using Scala 2.12
+ runtimeOnly "com.thoughtworks.paranamer:paranamer:2.8"
+ }
provided "org.apache.hadoop:hadoop-client-api:3.3.1"
provided library.java.commons_io
provided library.java.hamcrest
@@ -277,7 +288,9 @@ dependencies {
testImplementation project(path: ":sdks:java:extensions:avro",
configuration: "testRuntimeMigration")
testImplementation project(":sdks:java:harness")
testImplementation library.java.avro
- testImplementation "org.apache.kafka:kafka_$spark_scala_version:2.4.1"
+ // kafka_2.13 artifacts were first published in 2.5.0; use a later version
for Scala 2.13
+ def kafka_version = (spark_scala_version == '2.13') ? '2.8.0' : '2.4.1'
+ testImplementation
"org.apache.kafka:kafka_$spark_scala_version:$kafka_version"
testImplementation library.java.kafka_clients
testImplementation library.java.junit
testImplementation library.java.mockito_core
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistrator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistrator.java
index 68c602ff7f5..75c33b7dc5b 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistrator.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistrator.java
@@ -28,9 +28,10 @@ import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable;
import org.apache.spark.serializer.KryoRegistrator;
-import scala.collection.mutable.WrappedArray;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Custom {@link KryoRegistrator}s for Beam's Spark runner needs and
registering used class in spark
@@ -61,7 +62,18 @@ public class SparkRunnerKryoRegistrator implements
KryoRegistrator {
kryo.register(PaneInfo.class);
kryo.register(StateAndTimers.class);
kryo.register(TupleTag.class);
- kryo.register(WrappedArray.ofRef.class);
+ // Scala 2.12 uses WrappedArray$ofRef, Scala 2.13 renamed it to
ArraySeq$ofRef
+ Class<?> scalaArrayClass =
+ findFirstAvailableClass(
+ "scala.collection.mutable.ArraySeq$ofRef",
+ "scala.collection.mutable.WrappedArray$ofRef");
+ if (scalaArrayClass == null) {
+ throw new IllegalStateException(
+ "Neither scala.collection.mutable.ArraySeq$ofRef (Scala 2.13) nor "
+ + "scala.collection.mutable.WrappedArray$ofRef (Scala 2.12) was
found on the "
+ + "classpath. Cannot register Scala wrapped arrays with Kryo.");
+ }
+ kryo.register(scalaArrayClass);
try {
kryo.register(
@@ -74,4 +86,16 @@ public class SparkRunnerKryoRegistrator implements
KryoRegistrator {
throw new IllegalStateException("Unable to register classes with kryo.",
e);
}
}
+
+ @VisibleForTesting
+ static @Nullable Class<?> findFirstAvailableClass(String... classNames) {
+ for (String name : classNames) {
+ try {
+ return Class.forName(name);
+ } catch (ClassNotFoundException ignored) {
+ // try the next candidate
+ }
+ }
+ return null;
+ }
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1c7a4c2a241..be69ee78e51 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -522,7 +522,9 @@ public class SparkGroupAlsoByWindowViaWindowSet implements
Serializable {
Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K,
Itr<I>>>*/ List<byte[]>>>>
firedStream =
pairDStream.updateStateByKey(
- updateFunc,
+ // Raw cast to AbstractFunction1 suppresses Scala 2.12
(collection.Seq) vs
+ // Scala 2.13 (immutable.Seq) type difference — safe at
runtime due to erasure.
+ (scala.runtime.AbstractFunction1) updateFunc,
pairDStream.defaultPartitioner(pairDStream.defaultPartitioner$default$1()),
true,
JavaSparkContext$.MODULE$.fakeClassTag());
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
index 806d838d9bf..9d3419e1947 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
@@ -113,6 +113,7 @@ public class SparkStructuredStreamingPipelineResult
implements PipelineResult {
@Override
public PipelineResult.State cancel() throws IOException {
+ pipelineExecution.cancel(true);
offerNewState(PipelineResult.State.CANCELLED);
return state;
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
index c0d46e77c1d..d7cdefc929b 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/io/BoundedDatasetFactory.java
@@ -241,7 +241,7 @@ public class BoundedDatasetFactory {
try {
PipelineOptions options = params.options.get();
long desiredSize = source.getEstimatedSizeBytes(options) /
params.numPartitions;
- List<BoundedSource<T>> split = (List<BoundedSource<T>>)
source.split(desiredSize, options);
+ List<? extends BoundedSource<T>> split = source.split(desiredSize,
options);
IntSupplier idxSupplier = new AtomicInteger(0)::getAndIncrement;
return split.stream().map(s -> new SourcePartition<>(s,
idxSupplier)).collect(toList());
} catch (Exception e) {
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
index 55c4bbaedd3..b8448567eaf 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.java
@@ -86,7 +86,10 @@ public final class EvaluationContext {
ds.write().mode("overwrite").format("noop").save();
LOG.info("Evaluated dataset {} in {}", name, durationSince(startMs));
} catch (RuntimeException e) {
- LOG.error("Failed to evaluate dataset {}: {}", name,
Throwables.getRootCause(e).getMessage());
+ LOG.error(
+ "Failed to evaluate dataset {}: {}",
+ name,
+ String.valueOf(Throwables.getRootCause(e).getMessage()));
throw new RuntimeException(e);
}
}
@@ -102,7 +105,10 @@ public final class EvaluationContext {
LOG.info("Collected dataset {} in {} [size: {}]", name,
durationSince(startMs), res.length);
return res;
} catch (Exception e) {
- LOG.error("Failed to collect dataset {}: {}", name,
Throwables.getRootCause(e).getMessage());
+ LOG.error(
+ "Failed to collect dataset {}: {}",
+ name,
+ String.valueOf(Throwables.getRootCause(e).getMessage()));
throw new RuntimeException(e);
}
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 02c56a8081c..513ef28a589 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -64,7 +64,7 @@ import scala.collection.TraversableOnce;
* TODOs:
* <li>combine with context (CombineFnWithContext)?
* <li>combine with sideInputs?
- * <li>other there other missing features?
+ * <li>are there other missing features?
*/
class CombinePerKeyTranslatorBatch<K, InT, AccT, OutT>
extends TransformTranslator<
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
index b7c139068d1..2105fd05d49 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyHelpers.java
@@ -21,7 +21,6 @@ import static
org.apache.beam.runners.spark.structuredstreaming.translation.util
import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
import static
org.apache.beam.sdk.transforms.windowing.TimestampCombiner.END_OF_WINDOW;
-import java.util.Collection;
import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -80,8 +79,7 @@ class GroupByKeyHelpers {
return v -> {
T value = valueFn.apply(v);
K key = v.getValue().getKey();
- Collection<BoundedWindow> windows = (Collection<BoundedWindow>)
v.getWindows();
- return ScalaInterop.scalaIterator(windows).map(w -> tuple(tuple(w, key),
value));
+ return ScalaInterop.scalaIterator(v.getWindows()).map(w ->
tuple(tuple(w, key), value));
};
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 46cda333482..c55781ff84a 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -81,7 +81,7 @@ import scala.collection.Seq;
import scala.collection.immutable.List;
/**
- * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
build-in aggregation
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
built-in aggregation
* function {@code collect_list} when applicable.
*
* <p>Note: Using {@code collect_list} isn't any worse than using {@link
ReduceFnRunner}. In the
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index daf8451faac..29f01c84c02 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
@@ -98,7 +97,7 @@ public class EncoderHelpers {
private static final DataType LIST_TYPE = new ObjectType(List.class);
// Collections / maps of these types can be (de)serialized without
(de)serializing each member
- private static final Set<Class<?>> PRIMITIV_TYPES =
+ private static final Set<Class<?>> PRIMITIVE_TYPES =
ImmutableSet.of(
Boolean.class,
Byte.class,
@@ -154,7 +153,7 @@ public class EncoderHelpers {
public static <T> Encoder<T> encoderOf(Class<? super T> cls) {
Encoder<T> enc = getOrCreateDefaultEncoder(cls);
if (enc == null) {
- throw new IllegalArgumentException("No default coder available for class
" + cls);
+ throw new IllegalArgumentException("No default encoder available for
class " + cls);
}
return enc;
}
@@ -481,7 +480,7 @@ public class EncoderHelpers {
}
private static <T> boolean isPrimitiveEnc(Encoder<T> enc) {
- return PRIMITIV_TYPES.contains(enc.clsTag().runtimeClass());
+ return PRIMITIVE_TYPES.contains(enc.clsTag().runtimeClass());
}
private static <T> Expression serialize(Expression input, Encoder<T> enc) {
@@ -548,9 +547,17 @@ public class EncoderHelpers {
return CoderHelpers.toByteArray(paneInfo, PaneInfoCoder.of());
}
- /** The end of the only window (max timestamp). */
- public static Instant maxTimestamp(Iterable<BoundedWindow> windows) {
- return Iterables.getOnlyElement(windows).maxTimestamp();
+ /** The maximum {@code maxTimestamp} across all associated windows. */
+ public static Instant maxTimestamp(Iterable<? extends BoundedWindow>
windows) {
+ Instant maxTimestamp = null;
+ for (BoundedWindow window : windows) {
+ Instant timestamp = window.maxTimestamp();
+ if (maxTimestamp == null || timestamp.isAfter(maxTimestamp)) {
+ maxTimestamp = timestamp;
+ }
+ }
+ return Preconditions.checkNotNull(
+ maxTimestamp, "WindowedValue must have at least one window");
}
public static List<Object> copyToList(ArrayData arrayData, DataType type) {
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
index ddd0e74d1c9..56eb0dfea5f 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
@@ -17,7 +17,10 @@
*/
package org.apache.beam.runners.spark.coders;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import com.esotericsoftware.kryo.Kryo;
@@ -73,6 +76,52 @@ public class SparkRunnerKryoRegistratorTest {
}
}
+ /** Unit tests for the {@link
SparkRunnerKryoRegistrator#findFirstAvailableClass} helper. */
+ public static class FindFirstAvailableClassTest {
+
+ @Test
+ public void returnsFirstWhenAvailable() {
+ Class<?> result =
+ SparkRunnerKryoRegistrator.findFirstAvailableClass(
+ "java.lang.String", "java.lang.Integer");
+ assertSame(String.class, result);
+ }
+
+ @Test
+ public void fallsBackWhenFirstMissing() {
+ Class<?> result =
+ SparkRunnerKryoRegistrator.findFirstAvailableClass("does.not.Exist",
"java.lang.Integer");
+ assertSame(Integer.class, result);
+ }
+
+ @Test
+ public void returnsNullWhenNoneAvailable() {
+ Class<?> result =
+
SparkRunnerKryoRegistrator.findFirstAvailableClass("does.not.Exist1",
"does.not.Exist2");
+ assertNull(result);
+ }
+
+ @Test
+ public void returnsNullForEmptyInput() {
+ assertNull(SparkRunnerKryoRegistrator.findFirstAvailableClass());
+ }
+
+ @Test
+ public void resolvesScalaWrappedArrayClassOnRealClasspath() {
+ // On any supported Scala version (2.12 ArraySeq$ofRef does not exist;
2.13 it does), at
+ // least one of the two wrapped-array class names must resolve. This is
the production call
+ // the registrator makes.
+ Class<?> result =
+ SparkRunnerKryoRegistrator.findFirstAvailableClass(
+ "scala.collection.mutable.ArraySeq$ofRef",
+ "scala.collection.mutable.WrappedArray$ofRef");
+ assertEquals(
+ "expected one of the Scala wrapped-array classes to be on the
classpath",
+ true,
+ result != null);
+ }
+ }
+
// Hide TestKryoRegistrator from the Enclosed JUnit runner
interface Others {
class TestKryoRegistrator extends SparkRunnerKryoRegistrator {
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContextTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContextTest.java
new file mode 100644
index 00000000000..d1669e45c18
--- /dev/null
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContextTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import org.apache.spark.sql.Dataset;
+import org.junit.Test;
+
+/**
+ * Unit tests for the static error-path branches of {@link EvaluationContext}.
The happy-path
+ * branches are covered end-to-end by the structured-streaming translation
tests; these tests
+ * specifically exercise the {@code catch} blocks that wrap and rethrow
underlying Spark failures.
+ */
+public class EvaluationContextTest {
+
+ @Test
+ public void evaluateWrapsAndRethrowsRuntimeException() {
+ @SuppressWarnings("unchecked")
+ Dataset<Object> ds = mock(Dataset.class);
+ RuntimeException underlying = new RuntimeException("boom");
+ doThrow(underlying).when(ds).write();
+
+ RuntimeException thrown =
+ assertThrows(RuntimeException.class, () ->
EvaluationContext.evaluate("test-ds", ds));
+ assertSame(underlying, thrown.getCause());
+ }
+
+ @Test
+ public void evaluateHandlesNullExceptionMessage() {
+ // Reproduces the original NPE motivation for the String.valueOf wrap: a
RuntimeException
+ // whose root cause carries a null message must not crash the error logger.
+ @SuppressWarnings("unchecked")
+ Dataset<Object> ds = mock(Dataset.class);
+ RuntimeException underlying = new RuntimeException((String) null);
+ doThrow(underlying).when(ds).write();
+
+ RuntimeException thrown =
+ assertThrows(RuntimeException.class, () ->
EvaluationContext.evaluate("test-ds", ds));
+ assertSame(underlying, thrown.getCause());
+ }
+
+ @Test
+ public void collectWrapsAndRethrowsException() {
+ @SuppressWarnings("unchecked")
+ Dataset<Object> ds = mock(Dataset.class);
+ RuntimeException underlying = new RuntimeException("boom");
+ doThrow(underlying).when(ds).collect();
+
+ RuntimeException thrown =
+ assertThrows(RuntimeException.class, () ->
EvaluationContext.collect("test-ds", ds));
+ assertSame(underlying, thrown.getCause());
+ }
+
+ @Test
+ public void collectHandlesNullExceptionMessage() {
+ @SuppressWarnings("unchecked")
+ Dataset<Object> ds = mock(Dataset.class);
+ RuntimeException underlying = new RuntimeException((String) null);
+ doThrow(underlying).when(ds).collect();
+
+ RuntimeException thrown =
+ assertThrows(RuntimeException.class, () ->
EvaluationContext.collect("test-ds", ds));
+ assertSame(underlying, thrown.getCause());
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 60ff1cf8ce1..fc5f40c23d1 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -150,6 +150,8 @@ include(":runners:prism:java")
include(":runners:spark:3")
include(":runners:spark:3:job-server")
include(":runners:spark:3:job-server:container")
+include(":runners:spark:4")
+include(":runners:spark:4:job-server")
include(":sdks:go")
include(":sdks:go:container")
include(":sdks:go:examples")