mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870022120


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -22,91 +22,94 @@
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkRunner;
+import org.apache.beam.runners.spark.SparkContextRule;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.beam.sdk.values.KV;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
 
-/** Testing of beam registrar. */
+/**
+ * Testing of beam registrar. Note: There can only be one Spark context at a 
time. For that reason
+ * tests requiring a different context have to be forked using separate test 
classes.
+ */
 @SuppressWarnings({
   "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
 })
+@RunWith(Enclosed.class)
 public class SparkRunnerKryoRegistratorTest {
 
-  @Test
-  public void testKryoRegistration() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-    assertTrue(
-        "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is 
not set",
-        WrapperKryoRegistrator.wasInitiated);
-  }
+  public static class WithKryoSerializer {
 
-  @Test
-  public void testDefaultSerializerNotCallingKryo() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.kryo.registrator", 
KryoRegistratorIsNotCalled.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-  }
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer"),
+            KV.of("spark.kryo.registrator", 
WrapperKryoRegistrator.class.getName()));
 
-  private void runSimplePipelineWithSparkContext(SparkConf conf) {
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
-    options.setRunner(TestSparkRunner.class);
+    @Test
+    public void testKryoRegistration() {
+      runSimplePipelineWithSparkContextOptions(contextRule);
+      assertTrue(
+          "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is 
not set",
+          WrapperKryoRegistrator.wasInitiated);
+    }
 
-    conf.set("spark.master", "local");
-    conf.setAppName("test");
+    /**
+     * A {@link SparkRunnerKryoRegistrator} that registers an internal class 
to validate
+     * KryoSerialization resolution. Use only for test purposes. Needs to be 
public for
+     * serialization.
+     */
+    public static class WrapperKryoRegistrator extends 
SparkRunnerKryoRegistrator {
 
-    JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
-    options.setUsesProvidedSparkContext(true);
-    
options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of("a")); // some operation to trigger pipeline construction
-    p.run().waitUntilFinish();
-    javaSparkContext.stop();
-  }
+      static boolean wasInitiated = false;
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for 
test purposes. Needs to
-   * be public for serialization.
-   */
-  public static class KryoRegistratorIsNotCalled extends 
SparkRunnerKryoRegistrator {
+      public WrapperKryoRegistrator() {
+        wasInitiated = true;
+      }
 
-    @Override
-    public void registerClasses(Kryo kryo) {
-      fail(
-          "Default spark.serializer is JavaSerializer"
-              + " so spark.kryo.registrator shouldn't be called");
+      @Override
+      public void registerClasses(Kryo kryo) {
+        super.registerClasses(kryo);
+        Registration registration = 
kryo.getRegistration(MicrobatchSource.class);
+        com.esotericsoftware.kryo.Serializer kryoSerializer = 
registration.getSerializer();
+        assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
+      }
     }
   }
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that registers an internal class to 
validate
-   * KryoSerialization resolution. Use only for test purposes. Needs to be 
public for serialization.
-   */
-  public static class WrapperKryoRegistrator extends 
SparkRunnerKryoRegistrator {
+  public static class WithoutKryoSerializer {
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.kryo.registrator", 
KryoRegistratorIsNotCalled.class.getName()));
 
-    static boolean wasInitiated = false;
-
-    public WrapperKryoRegistrator() {
-      wasInitiated = true;
+    @Test
+    public void testDefaultSerializerNotCallingKryo() {

Review Comment:
   Not my code, but that seems to be the approach... it would throw if it 
attempted to initialize the Kryo registrator 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to