[
https://issues.apache.org/jira/browse/BEAM-14334?focusedWorklogId=768895&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768895
]
ASF GitHub Bot logged work on BEAM-14334:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/May/22 08:32
Start Date: 11/May/22 08:32
Worklog Time Spent: 10m
Work Description: mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870022120
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -22,91 +22,94 @@
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Registration;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkRunner;
+import org.apache.beam.runners.spark.SparkContextRule;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.beam.sdk.values.KV;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
-/** Testing of beam registrar. */
+/**
+ * Testing of beam registrar. Note: There can only be one Spark context at a
time. For that reason
+ * tests requiring a different context have to be forked using separate test
classes.
+ */
@SuppressWarnings({
"rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
})
+@RunWith(Enclosed.class)
public class SparkRunnerKryoRegistratorTest {
- @Test
- public void testKryoRegistration() {
- SparkConf conf = new SparkConf();
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
- runSimplePipelineWithSparkContext(conf);
- assertTrue(
- "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is
not set",
- WrapperKryoRegistrator.wasInitiated);
- }
+ public static class WithKryoSerializer {
- @Test
- public void testDefaultSerializerNotCallingKryo() {
- SparkConf conf = new SparkConf();
- conf.set("spark.kryo.registrator",
KryoRegistratorIsNotCalled.class.getName());
- runSimplePipelineWithSparkContext(conf);
- }
+ @ClassRule
+ public static SparkContextRule contextRule =
+ new SparkContextRule(
+ KV.of("spark.serializer",
"org.apache.spark.serializer.KryoSerializer"),
+ KV.of("spark.kryo.registrator",
WrapperKryoRegistrator.class.getName()));
- private void runSimplePipelineWithSparkContext(SparkConf conf) {
- SparkPipelineOptions options =
- PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
- options.setRunner(TestSparkRunner.class);
+ @Test
+ public void testKryoRegistration() {
+ runSimplePipelineWithSparkContextOptions(contextRule);
+ assertTrue(
+ "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is
not set",
+ WrapperKryoRegistrator.wasInitiated);
+ }
- conf.set("spark.master", "local");
- conf.setAppName("test");
+ /**
+ * A {@link SparkRunnerKryoRegistrator} that registers an internal class
to validate
+ * KryoSerialization resolution. Use only for test purposes. Needs to be
public for
+ * serialization.
+ */
+ public static class WrapperKryoRegistrator extends
SparkRunnerKryoRegistrator {
- JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
- options.setUsesProvidedSparkContext(true);
-
options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
- Pipeline p = Pipeline.create(options);
- p.apply(Create.of("a")); // some operation to trigger pipeline construction
- p.run().waitUntilFinish();
- javaSparkContext.stop();
- }
+ static boolean wasInitiated = false;
- /**
- * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for
test purposes. Needs to
- * be public for serialization.
- */
- public static class KryoRegistratorIsNotCalled extends
SparkRunnerKryoRegistrator {
+ public WrapperKryoRegistrator() {
+ wasInitiated = true;
+ }
- @Override
- public void registerClasses(Kryo kryo) {
- fail(
- "Default spark.serializer is JavaSerializer"
- + " so spark.kryo.registrator shouldn't be called");
+ @Override
+ public void registerClasses(Kryo kryo) {
+ super.registerClasses(kryo);
+ Registration registration =
kryo.getRegistration(MicrobatchSource.class);
+ com.esotericsoftware.kryo.Serializer kryoSerializer =
registration.getSerializer();
+ assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
+ }
}
}
- /**
- * A {@link SparkRunnerKryoRegistrator} that registers an internal class to
validate
- * KryoSerialization resolution. Use only for test purposes. Needs to be
public for serialization.
- */
- public static class WrapperKryoRegistrator extends
SparkRunnerKryoRegistrator {
+ public static class WithoutKryoSerializer {
+ @ClassRule
+ public static SparkContextRule contextRule =
+ new SparkContextRule(
+ KV.of("spark.kryo.registrator",
KryoRegistratorIsNotCalled.class.getName()));
- static boolean wasInitiated = false;
-
- public WrapperKryoRegistrator() {
- wasInitiated = true;
+ @Test
+ public void testDefaultSerializerNotCallingKryo() {
Review Comment:
Not my code, but that seems to be the approach... it would throw if it
attempted to initialize the Kryo registrator
Issue Time Tracking
-------------------
Worklog Id: (was: 768895)
Time Spent: 3h 20m (was: 3h 10m)
> Avoid using forkEvery in Spark runner tests
> -------------------------------------------
>
> Key: BEAM-14334
> URL: https://issues.apache.org/jira/browse/BEAM-14334
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark, testing
> Reporter: Moritz Mack
> Assignee: Moritz Mack
> Priority: P2
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> Usage of *{color:#FF0000}forkEvery 1{color}* is typically a strong sign of
> poor quality / bad code and should be avoided:
> * It significantly impacts performance when running tests.
> * And it often hides resource leaks, either in code or worse in the runner
> itself.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)