[
https://issues.apache.org/jira/browse/BEAM-14334?focusedWorklogId=769030&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-769030
]
ASF GitHub Bot logged work on BEAM-14334:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/May/22 12:10
Start Date: 11/May/22 12:10
Worklog Time Spent: 10m
Work Description: echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870222386
##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -22,91 +22,94 @@
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Registration;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkRunner;
+import org.apache.beam.runners.spark.SparkContextRule;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.beam.sdk.values.KV;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
-/** Testing of beam registrar. */
+/**
+ * Testing of beam registrar. Note: There can only be one Spark context at a
time. For that reason
+ * tests requiring a different context have to be forked using separate test
classes.
+ */
@SuppressWarnings({
"rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
})
+@RunWith(Enclosed.class)
public class SparkRunnerKryoRegistratorTest {
- @Test
- public void testKryoRegistration() {
- SparkConf conf = new SparkConf();
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
- runSimplePipelineWithSparkContext(conf);
- assertTrue(
- "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is
not set",
- WrapperKryoRegistrator.wasInitiated);
- }
+ public static class WithKryoSerializer {
- @Test
- public void testDefaultSerializerNotCallingKryo() {
- SparkConf conf = new SparkConf();
- conf.set("spark.kryo.registrator",
KryoRegistratorIsNotCalled.class.getName());
- runSimplePipelineWithSparkContext(conf);
- }
+ @ClassRule
+ public static SparkContextRule contextRule =
+ new SparkContextRule(
+ KV.of("spark.serializer",
"org.apache.spark.serializer.KryoSerializer"),
+ KV.of("spark.kryo.registrator",
WrapperKryoRegistrator.class.getName()));
- private void runSimplePipelineWithSparkContext(SparkConf conf) {
- SparkPipelineOptions options =
- PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
- options.setRunner(TestSparkRunner.class);
+ @Test
+ public void testKryoRegistration() {
+ runSimplePipelineWithSparkContextOptions(contextRule);
+ assertTrue(
+ "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is
not set",
+ WrapperKryoRegistrator.wasInitiated);
+ }
- conf.set("spark.master", "local");
- conf.setAppName("test");
+ /**
+ * A {@link SparkRunnerKryoRegistrator} that registers an internal class
to validate
+ * KryoSerialization resolution. Use only for test purposes. Needs to be
public for
+ * serialization.
+ */
+ public static class WrapperKryoRegistrator extends
SparkRunnerKryoRegistrator {
- JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
- options.setUsesProvidedSparkContext(true);
-
options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
- Pipeline p = Pipeline.create(options);
- p.apply(Create.of("a")); // some operation to trigger pipeline construction
- p.run().waitUntilFinish();
- javaSparkContext.stop();
- }
+ static boolean wasInitiated = false;
- /**
- * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for
test purposes. Needs to
- * be public for serialization.
- */
- public static class KryoRegistratorIsNotCalled extends
SparkRunnerKryoRegistrator {
+ public WrapperKryoRegistrator() {
+ wasInitiated = true;
+ }
- @Override
- public void registerClasses(Kryo kryo) {
- fail(
- "Default spark.serializer is JavaSerializer"
- + " so spark.kryo.registrator shouldn't be called");
+ @Override
+ public void registerClasses(Kryo kryo) {
+ super.registerClasses(kryo);
+ Registration registration =
kryo.getRegistration(MicrobatchSource.class);
+ com.esotericsoftware.kryo.Serializer kryoSerializer =
registration.getSerializer();
+ assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
+ }
}
}
- /**
- * A {@link SparkRunnerKryoRegistrator} that registers an internal class to
validate
- * KryoSerialization resolution. Use only for test purposes. Needs to be
public for serialization.
- */
- public static class WrapperKryoRegistrator extends
SparkRunnerKryoRegistrator {
+ public static class WithoutKryoSerializer {
+ @ClassRule
+ public static SparkContextRule contextRule =
+ new SparkContextRule(
+ KV.of("spark.kryo.registrator",
KryoRegistratorIsNotCalled.class.getName()));
- static boolean wasInitiated = false;
-
- public WrapperKryoRegistrator() {
- wasInitiated = true;
+ @Test
+ public void testDefaultSerializerNotCallingKryo() {
Review Comment:
Yes I know it is not your code but I took a look anyway as you changed the
test structure.
I don't like the approach that the original developer took in several tests
that the test assertion is the absence of exception. It is not explicit enough
IMHO. But changing the test would we out of the scope of this PR. Can you just
add a comment in the code that "the fact that exception is not thrown means
that the Kryo serializer is not called by default"
Issue Time Tracking
-------------------
Worklog Id: (was: 769030)
Time Spent: 6.5h (was: 6h 20m)
> Avoid using forkEvery in Spark runner tests
> -------------------------------------------
>
> Key: BEAM-14334
> URL: https://issues.apache.org/jira/browse/BEAM-14334
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark, testing
> Reporter: Moritz Mack
> Assignee: Moritz Mack
> Priority: P2
> Time Spent: 6.5h
> Remaining Estimate: 0h
>
> Usage of *{color:#FF0000}forkEvery 1{color}* is typically a strong sign of
> poor quality / bad code and should be avoided:
> * It significantly impacts performance when running tests.
> * And it often hides resource leaks, either in code or worse in the runner
> itself.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)