Abacn commented on code in PR #38255:
URL: https://github.com/apache/beam/pull/38255#discussion_r3203058196


##########
runners/spark/4/build.gradle:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '..'
+/* All properties required for loading the Spark build script */
+project.ext {
+  spark_major = '4'
+  // Spark 4 version as defined in BeamModulePlugin; requires Scala 2.13 and 
Java 17
+  spark_version = spark4_version
+  spark_scala_version = '2.13'
+  archives_base_name = 'beam-runners-spark-4'
+}
+
+// Load the main build script which contains all build logic.
+// spark_runner.gradle handles the per-version source-overrides Copy:
+// shared base (runners/spark/src/) + previous majors + this module's ./src/ 
are
+// merged into build/source-overrides/src using DuplicatesStrategy.INCLUDE so 
the
+// 11 files under runners/spark/4/src/.../structuredstreaming/ override the
+// shared-base versions.
+apply from: "$basePath/spark_runner.gradle"
+
+// Spark 4 always requires Java 17, so unconditionally add the --add-opens 
flags
+// required by Kryo and other libraries that use reflection on JDK internals.
+test {
+  jvmArgs "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
+          "--add-opens=java.base/java.nio=ALL-UNNAMED",
+          "--add-opens=java.base/java.util=ALL-UNNAMED",
+          "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
+}
+
+// Exclude DStream-based streaming tests from the shared-base copy: the Spark 
4 module
+// supports only structured streaming (batch) and does not include legacy 
DStream support.
+// Streaming test utilities also depend on kafka.server.KafkaServerStartable 
which was
+// removed in Kafka 2.8.0 (the first Kafka version with a _2.13 artifact).
+tasks.named("copyTestSourceOverrides") {
+  exclude "**/translation/streaming/**"
+}
+
+// Additional supported Spark 4.x versions for compatibility tests.
+// Can be expanded as new patch releases are published.
+def sparkVersions = [
+    // "402": "4.0.2",  // primary version; already tested via the default 
build
+]
+
+sparkVersions.each { kv ->
+  configurations.create("sparkVersion$kv.key")
+  configurations."sparkVersion$kv.key" {
+    resolutionStrategy {
+      spark.components.each { component -> force "$component:$kv.value" }
+    }
+  }
+
+  dependencies {
+    spark.components.each { component -> "sparkVersion$kv.key" 
"$component:$kv.value" }
+  }
+
+  tasks.register("sparkVersion${kv.key}Test", Test) {
+    group = "Verification"
+    description = "Verifies code compatibility with Spark $kv.value"
+    classpath = configurations."sparkVersion$kv.key" + 
sourceSets.test.runtimeClasspath
+    systemProperties test.systemProperties
+
+    include "**/*.class"
+    maxParallelForks 4
+  }
+}
+
+tasks.register("sparkVersionsTest") {
+  group = "Verification"
+  dependsOn sparkVersions.collect{k,v -> "sparkVersion${k}Test"}
+}

Review Comment:
   Check failure in my side:
   
   ```
   * What went wrong:
   Execution failed for task ':runners:spark:4:analyzeClassesDependencies'.
   > Dependency analysis found issues.
     usedUndeclaredArtifacts
      - org.apache.spark:spark-connect-shims_2.13:4.0.2@jar
   ```



##########
runners/spark/4/build.gradle:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '..'
+/* All properties required for loading the Spark build script */
+project.ext {
+  spark_major = '4'
+  // Spark 4 version as defined in BeamModulePlugin; requires Scala 2.13 and 
Java 17
+  spark_version = spark4_version
+  spark_scala_version = '2.13'
+  archives_base_name = 'beam-runners-spark-4'
+}
+
+// Load the main build script which contains all build logic.
+// spark_runner.gradle handles the per-version source-overrides Copy:
+// shared base (runners/spark/src/) + previous majors + this module's ./src/ 
are
+// merged into build/source-overrides/src using DuplicatesStrategy.INCLUDE so 
the
+// 11 files under runners/spark/4/src/.../structuredstreaming/ override the
+// shared-base versions.
+apply from: "$basePath/spark_runner.gradle"
+
+// Spark 4 always requires Java 17, so unconditionally add the --add-opens 
flags
+// required by Kryo and other libraries that use reflection on JDK internals.
+test {
+  jvmArgs "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
+          "--add-opens=java.base/java.nio=ALL-UNNAMED",
+          "--add-opens=java.base/java.util=ALL-UNNAMED",
+          "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
+}
+
+// Exclude DStream-based streaming tests from the shared-base copy: the Spark 
4 module
+// supports only structured streaming (batch) and does not include legacy 
DStream support.
+// Streaming test utilities also depend on kafka.server.KafkaServerStartable 
which was
+// removed in Kafka 2.8.0 (the first Kafka version with a _2.13 artifact).
+tasks.named("copyTestSourceOverrides") {
+  exclude "**/translation/streaming/**"
+}
+
+// Additional supported Spark 4.x versions for compatibility tests.
+// Can be expanded as new patch releases are published.
+def sparkVersions = [
+    // "402": "4.0.2",  // primary version; already tested via the default 
build

Review Comment:
   Since this is empty, we can remove the SparkVersion tests in build.gradle 
and beam_PreCommit_Java_Spark4_Versions is not needed for now. This makes PR 
simpler as well.



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderFactory.java:
##########
@@ -91,7 +91,7 @@ static Expression invoke(
     try {
       // To address breaking interfaces between various version of Spark 3,  
expressions are
       // created reflectively. This is fine as it's just needed once to create 
the query plan.
-      switch (STATIC_INVOKE_CONSTRUCTOR.getParameterCount()) {
+      switch (INVOKE_CONSTRUCTOR.getParameterCount()) {

Review Comment:
   I think this has reverted a workaround commented above, or addressing an 
existing bug? If this is needed for Spark 4, consider make this patch spark 4 
specific (put the modified source into spark/4/src)



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java:
##########
@@ -113,6 +113,7 @@ public MetricResults metrics() {
 
   @Override
   public PipelineResult.State cancel() throws IOException {
+    pipelineExecution.cancel(true);

Review Comment:
   good catch



##########
CHANGES.md:
##########
@@ -60,7 +60,7 @@
 ## Highlights
 
 * New highly anticipated feature X added to Python SDK 
([#X](https://github.com/apache/beam/issues/X)).
-* New highly anticipated feature Y added to Java SDK 
([#Y](https://github.com/apache/beam/issues/Y)).
+* Experimental Spark 4 runner added (Java), built against Spark 4.0.2 / Scala 
2.13 and requiring Java 17. Currently supports batch only; streaming is not yet 
supported ([#36841](https://github.com/apache/beam/issues/36841)).

Review Comment:
   We can add to CHANGES.md latter when ValidatesRunner tests all setup and 
confirmed working.



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistrator.java:
##########
@@ -61,7 +66,20 @@ public void registerClasses(Kryo kryo) {
     kryo.register(PaneInfo.class);
     kryo.register(StateAndTimers.class);
     kryo.register(TupleTag.class);
-    kryo.register(WrappedArray.ofRef.class);
+    // Scala 2.12 uses WrappedArray$ofRef, Scala 2.13 renamed it to 
ArraySeq$ofRef
+    Class<?> scalaArrayClass =
+        findFirstAvailableClass(
+            "scala.collection.mutable.ArraySeq$ofRef",
+            "scala.collection.mutable.WrappedArray$ofRef");
+    if (scalaArrayClass != null) {
+      kryo.register(scalaArrayClass);
+    } else {
+      LOG.warn(
+          "Neither scala.collection.mutable.ArraySeq$ofRef (Scala 2.13) nor "

Review Comment:
   Should we throw here, in case warning silently ignored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to