[ 
https://issues.apache.org/jira/browse/BEAM-14334?focusedWorklogId=768895&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768895
 ]

ASF GitHub Bot logged work on BEAM-14334:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/22 08:32
            Start Date: 11/May/22 08:32
    Worklog Time Spent: 10m 
      Work Description: mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870022120


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -22,91 +22,94 @@
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkRunner;
+import org.apache.beam.runners.spark.SparkContextRule;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.beam.sdk.values.KV;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
 
-/** Testing of beam registrar. */
+/**
+ * Testing of beam registrar. Note: There can only be one Spark context at a 
time. For that reason
+ * tests requiring a different context have to be forked using separate test 
classes.
+ */
 @SuppressWarnings({
   "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
 })
+@RunWith(Enclosed.class)
 public class SparkRunnerKryoRegistratorTest {
 
-  @Test
-  public void testKryoRegistration() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-    assertTrue(
-        "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is 
not set",
-        WrapperKryoRegistrator.wasInitiated);
-  }
+  public static class WithKryoSerializer {
 
-  @Test
-  public void testDefaultSerializerNotCallingKryo() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.kryo.registrator", 
KryoRegistratorIsNotCalled.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-  }
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer"),
+            KV.of("spark.kryo.registrator", 
WrapperKryoRegistrator.class.getName()));
 
-  private void runSimplePipelineWithSparkContext(SparkConf conf) {
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
-    options.setRunner(TestSparkRunner.class);
+    @Test
+    public void testKryoRegistration() {
+      runSimplePipelineWithSparkContextOptions(contextRule);
+      assertTrue(
+          "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is 
not set",
+          WrapperKryoRegistrator.wasInitiated);
+    }
 
-    conf.set("spark.master", "local");
-    conf.setAppName("test");
+    /**
+     * A {@link SparkRunnerKryoRegistrator} that registers an internal class 
to validate
+     * KryoSerialization resolution. Use only for test purposes. Needs to be 
public for
+     * serialization.
+     */
+    public static class WrapperKryoRegistrator extends 
SparkRunnerKryoRegistrator {
 
-    JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
-    options.setUsesProvidedSparkContext(true);
-    
options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of("a")); // some operation to trigger pipeline construction
-    p.run().waitUntilFinish();
-    javaSparkContext.stop();
-  }
+      static boolean wasInitiated = false;
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for 
test purposes. Needs to
-   * be public for serialization.
-   */
-  public static class KryoRegistratorIsNotCalled extends 
SparkRunnerKryoRegistrator {
+      public WrapperKryoRegistrator() {
+        wasInitiated = true;
+      }
 
-    @Override
-    public void registerClasses(Kryo kryo) {
-      fail(
-          "Default spark.serializer is JavaSerializer"
-              + " so spark.kryo.registrator shouldn't be called");
+      @Override
+      public void registerClasses(Kryo kryo) {
+        super.registerClasses(kryo);
+        Registration registration = 
kryo.getRegistration(MicrobatchSource.class);
+        com.esotericsoftware.kryo.Serializer kryoSerializer = 
registration.getSerializer();
+        assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
+      }
     }
   }
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that registers an internal class to 
validate
-   * KryoSerialization resolution. Use only for test purposes. Needs to be 
public for serialization.
-   */
-  public static class WrapperKryoRegistrator extends 
SparkRunnerKryoRegistrator {
+  public static class WithoutKryoSerializer {
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.kryo.registrator", 
KryoRegistratorIsNotCalled.class.getName()));
 
-    static boolean wasInitiated = false;
-
-    public WrapperKryoRegistrator() {
-      wasInitiated = true;
+    @Test
+    public void testDefaultSerializerNotCallingKryo() {

Review Comment:
   Not my code, but that seems to be the approach... it would throw if it 
attempted to initialize the Kryo registrator 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 768895)
    Time Spent: 3h 20m  (was: 3h 10m)

> Avoid using forkEvery in Spark runner tests
> -------------------------------------------
>
>                 Key: BEAM-14334
>                 URL: https://issues.apache.org/jira/browse/BEAM-14334
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark, testing
>            Reporter: Moritz Mack
>            Assignee: Moritz Mack
>            Priority: P2
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Usage of *{color:#FF0000}forkEvery 1{color}* is typically a strong sign of 
> poor quality / bad code and should be avoided: 
>  * It significantly impacts performance when running tests.
>  * And it often hides resource leaks, either in code or worse in the runner 
> itself.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to