[ 
https://issues.apache.org/jira/browse/BEAM-14334?focusedWorklogId=769030&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-769030
 ]

ASF GitHub Bot logged work on BEAM-14334:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/22 12:10
            Start Date: 11/May/22 12:10
    Worklog Time Spent: 10m 
      Work Description: echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870222386


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -22,91 +22,94 @@
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkRunner;
+import org.apache.beam.runners.spark.SparkContextRule;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.beam.sdk.values.KV;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
 
-/** Testing of beam registrar. */
+/**
+ * Testing of beam registrar. Note: There can only be one Spark context at a 
time. For that reason
+ * tests requiring a different context have to be forked using separate test 
classes.
+ */
 @SuppressWarnings({
   "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
 })
+@RunWith(Enclosed.class)
 public class SparkRunnerKryoRegistratorTest {
 
-  @Test
-  public void testKryoRegistration() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-    assertTrue(
-        "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is 
not set",
-        WrapperKryoRegistrator.wasInitiated);
-  }
+  public static class WithKryoSerializer {
 
-  @Test
-  public void testDefaultSerializerNotCallingKryo() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.kryo.registrator", 
KryoRegistratorIsNotCalled.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-  }
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer"),
+            KV.of("spark.kryo.registrator", 
WrapperKryoRegistrator.class.getName()));
 
-  private void runSimplePipelineWithSparkContext(SparkConf conf) {
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
-    options.setRunner(TestSparkRunner.class);
+    @Test
+    public void testKryoRegistration() {
+      runSimplePipelineWithSparkContextOptions(contextRule);
+      assertTrue(
+          "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is 
not set",
+          WrapperKryoRegistrator.wasInitiated);
+    }
 
-    conf.set("spark.master", "local");
-    conf.setAppName("test");
+    /**
+     * A {@link SparkRunnerKryoRegistrator} that registers an internal class 
to validate
+     * KryoSerialization resolution. Use only for test purposes. Needs to be 
public for
+     * serialization.
+     */
+    public static class WrapperKryoRegistrator extends 
SparkRunnerKryoRegistrator {
 
-    JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
-    options.setUsesProvidedSparkContext(true);
-    
options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of("a")); // some operation to trigger pipeline construction
-    p.run().waitUntilFinish();
-    javaSparkContext.stop();
-  }
+      static boolean wasInitiated = false;
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for 
test purposes. Needs to
-   * be public for serialization.
-   */
-  public static class KryoRegistratorIsNotCalled extends 
SparkRunnerKryoRegistrator {
+      public WrapperKryoRegistrator() {
+        wasInitiated = true;
+      }
 
-    @Override
-    public void registerClasses(Kryo kryo) {
-      fail(
-          "Default spark.serializer is JavaSerializer"
-              + " so spark.kryo.registrator shouldn't be called");
+      @Override
+      public void registerClasses(Kryo kryo) {
+        super.registerClasses(kryo);
+        Registration registration = 
kryo.getRegistration(MicrobatchSource.class);
+        com.esotericsoftware.kryo.Serializer kryoSerializer = 
registration.getSerializer();
+        assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
+      }
     }
   }
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that registers an internal class to 
validate
-   * KryoSerialization resolution. Use only for test purposes. Needs to be 
public for serialization.
-   */
-  public static class WrapperKryoRegistrator extends 
SparkRunnerKryoRegistrator {
+  public static class WithoutKryoSerializer {
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.kryo.registrator", 
KryoRegistratorIsNotCalled.class.getName()));
 
-    static boolean wasInitiated = false;
-
-    public WrapperKryoRegistrator() {
-      wasInitiated = true;
+    @Test
+    public void testDefaultSerializerNotCallingKryo() {

Review Comment:
   Yes I know it is not your code but I took a look anyway as you changed the 
test structure. 
   I don't like the approach that the original developer  took in several tests 
that the test assertion is the absence of exception. It is not explicit enough 
IMHO. But changing the test would we out of the scope of this PR. Can you just 
add a comment in the code that "the fact that exception is not thrown means 
that the Kryo serializer is not called by default"





Issue Time Tracking
-------------------

    Worklog Id:     (was: 769030)
    Time Spent: 6.5h  (was: 6h 20m)

> Avoid using forkEvery in Spark runner tests
> -------------------------------------------
>
>                 Key: BEAM-14334
>                 URL: https://issues.apache.org/jira/browse/BEAM-14334
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark, testing
>            Reporter: Moritz Mack
>            Assignee: Moritz Mack
>            Priority: P2
>          Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> Usage of *{color:#FF0000}forkEvery 1{color}* is typically a strong sign of 
> poor quality / bad code and should be avoided: 
>  * It significantly impacts performance when running tests.
>  * And it often hides resource leaks, either in code or worse in the runner 
> itself.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to