Repository: incubator-beam Updated Branches: refs/heads/master e841b1a21 -> bf8a3cb3a
[BEAM-921] spark-runner: register sources and coders to serialize with java serializer Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aba40e2d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aba40e2d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aba40e2d Branch: refs/heads/master Commit: aba40e2de9ba058f33086eb6a913fa583a82b058 Parents: e841b1a Author: Aviem Zur <aviem...@gmail.com> Authored: Thu Dec 8 15:07:06 2016 +0200 Committer: Sela <ans...@paypal.com> Committed: Sun Dec 11 15:18:51 2016 +0200 ---------------------------------------------------------------------- runners/spark/pom.xml | 35 +++++------- .../coders/BeamSparkRunnerRegistrator.java | 60 +++++++++++++++----- .../coders/BeamSparkRunnerRegistratorTest.java | 57 +++++++++++++++++++ 3 files changed, 118 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index d1ef225..86e9039 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -54,7 +54,7 @@ </profile> <profile> - <!-- This profile adds execution of RunnableOnService integration tests + <!-- This profile adds execution of RunnableOnService integration tests against a local Spark endpoint. --> <id>local-runnable-on-service-tests</id> <activation><activeByDefault>false</activeByDefault></activation> @@ -134,28 +134,14 @@ <version>${hadoop.version}</version> <scope>provided</scope> </dependency> + <!-- Kryo bugfix version needed due to a state re-use issue in Kryo version 2.21 used in Spark 1.x + See: https://issues.apache.org/jira/browse/SPARK-7708 + See: https://github.com/EsotericSoftware/kryo/issues/312 + --> <dependency> <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> - <version>2.21</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>de.javakaffee</groupId> - <artifactId>kryo-serializers</artifactId> - <version>0.39</version> - <exclusions> - <!-- Use Spark's Kryo --> - <exclusion> - <groupId>com.esotericsoftware</groupId> - <artifactId>kryo</artifactId> - </exclusion> - <!-- We only really need the serializer implementations --> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - </exclusions> + <version>2.21.1</version> </dependency> <dependency> <groupId>com.google.code.findbugs</groupId> @@ -264,6 +250,11 @@ <artifactId>metrics-core</artifactId> <version>${dropwizard.metrics.version}</version> </dependency> + <dependency> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + <version>0.9.10</version> + </dependency> <!-- KafkaIO --> <dependency> @@ -405,6 +396,10 @@ <pattern>com.google.thirdparty</pattern> <shadedPattern>org.apache.beam.spark.relocated.com.google.thirdparty</shadedPattern> </relocation> + <relocation> + <pattern>com.esotericsoftware.kryo</pattern> + <shadedPattern>org.apache.beam.spark.relocated.com.esotericsoftware.kryo</shadedPattern> + </relocation> </relocations> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>spark-app</shadedClassifierName> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java index 0e62781..41b0a01 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java @@ -19,28 +19,60 @@ package org.apache.beam.runners.spark.coders; import com.esotericsoftware.kryo.Kryo; -import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableListSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer; -import de.javakaffee.kryoserializers.guava.ReverseListSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.Source; import org.apache.spark.serializer.KryoRegistrator; +import org.reflections.Reflections; /** - * Custom {@link com.esotericsoftware.kryo.Serializer}s for Beam's Spark runner needs. + * Custom {@link KryoRegistrator}s for Beam's Spark runner needs. */ public class BeamSparkRunnerRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { - UnmodifiableCollectionsSerializer.registerSerializers(kryo); - // Guava - ImmutableListSerializer.registerSerializers(kryo); - ImmutableSetSerializer.registerSerializers(kryo); - ImmutableMapSerializer.registerSerializers(kryo); - ImmutableMultimapSerializer.registerSerializers(kryo); - ReverseListSerializer.registerSerializers(kryo); + for (Class<?> clazz : ClassesForJavaSerialization.getClasses()) { + kryo.register(clazz, new JavaSerializer()); + } + } + + /** + * Register coders and sources with {@link JavaSerializer} since they aren't guaranteed to be + * Kryo-serializable. + */ + private static class ClassesForJavaSerialization { + private static final Class<?>[] CLASSES_FOR_JAVA_SERIALIZATION = new Class<?>[]{ + Coder.class, Source.class + }; + + private static final Iterable<Class<?>> INSTANCE; + + /** + * Find all subclasses of ${@link CLASSES_FOR_JAVA_SERIALIZATION} + */ + static { + final Reflections reflections = new Reflections(); + INSTANCE = Iterables.concat(Lists.transform(Arrays.asList(CLASSES_FOR_JAVA_SERIALIZATION), + new Function<Class, Set<Class<?>>>() { + @SuppressWarnings({"unchecked", "ConstantConditions"}) + @Nullable + @Override + public Set<Class<?>> apply(@Nullable Class clazz) { + return reflections.getSubTypesOf(clazz); + } + })); + } + + static Iterable<Class<?>> getClasses() { + return INSTANCE; + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java new file mode 100644 index 0000000..e353017 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.coders; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.collect.Iterables; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.Source; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; +import org.reflections.Reflections; + + +/** + * BeamSparkRunnerRegistrator Test. + */ +public class BeamSparkRunnerRegistratorTest { + @Test + public void testCodersAndSourcesRegistration() { + BeamSparkRunnerRegistrator registrator = new BeamSparkRunnerRegistrator(); + + Reflections reflections = new Reflections(); + Iterable<Class<? extends Serializable>> classesForJavaSerialization = + Iterables.concat(reflections.getSubTypesOf(Coder.class), + reflections.getSubTypesOf(Source.class)); + + Kryo kryo = new Kryo(); + + registrator.registerClasses(kryo); + + for (Class<?> clazz : classesForJavaSerialization) { + Assert.assertThat("Registered serializer for class " + clazz.getName() + + " was not an instance of " + JavaSerializer.class.getName(), + kryo.getSerializer(clazz), + Matchers.instanceOf(JavaSerializer.class)); + } + } +}