Repository: beam Updated Branches: refs/heads/master e8865843b -> e136f12c3
[BEAM-1144] Spark runner fails to deserialize MicrobatchSource in cluster mode Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ffd6ff8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ffd6ff8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ffd6ff8 Branch: refs/heads/master Commit: 1ffd6ff8f25806ea4421662f0bef570fc8e1eeb6 Parents: e886584 Author: Aviem Zur <[email protected]> Authored: Wed Dec 14 15:19:39 2016 +0200 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Mon Jan 2 11:49:15 2017 +0100 ---------------------------------------------------------------------- runners/spark/pom.xml | 11 +-- .../coders/BeamSparkRunnerRegistrator.java | 2 +- .../spark/coders/StatelessJavaSerializer.java | 97 ++++++++++++++++++++ .../coders/BeamSparkRunnerRegistratorTest.java | 6 +- 4 files changed, 103 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 95b1d2e..dad5718 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -135,14 +135,11 @@ <version>${hadoop.version}</version> <scope>provided</scope> </dependency> - <!-- Kryo bugfix version needed due to a state re-use issue in Kryo version 2.21 used in Spark 1.x - See: https://issues.apache.org/jira/browse/SPARK-7708 - See: https://github.com/EsotericSoftware/kryo/issues/312 - --> <dependency> <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> - <version>2.21.1</version> + <version>2.21</version> + <scope>provided</scope> </dependency> <dependency> <groupId>com.google.code.findbugs</groupId> @@ -387,10 +384,6 @@ <pattern>com.google.thirdparty</pattern> <shadedPattern>org.apache.beam.spark.relocated.com.google.thirdparty</shadedPattern> </relocation> - <relocation> - <pattern>com.esotericsoftware.kryo</pattern> - <shadedPattern>org.apache.beam.spark.relocated.com.esotericsoftware.kryo</shadedPattern> - </relocation> </relocations> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>spark-app</shadedClassifierName> http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java index 41b0a01..93217b7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java @@ -40,7 +40,7 @@ public class BeamSparkRunnerRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { for (Class<?> clazz : ClassesForJavaSerialization.getClasses()) { - kryo.register(clazz, new JavaSerializer()); + kryo.register(clazz, new StatelessJavaSerializer()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java new file mode 100644 index 0000000..b29cf0c --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.coders; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; + + +/** + * Stateless Java Serializer. + * + * <p> + * Solves state re-use issue in Kryo version 2.21 used in Spark 1.x + * See: + * https://issues.apache.org/jira/browse/SPARK-7708 + * https://github.com/EsotericSoftware/kryo/issues/312 + * </p> + * + * <p> + * Also, solves class loading issue in cluster caused by ${@link ObjectInputStream} + * by using ${@link ObjectInputStreamWithClassLoader} + * ${@link ObjectInputStream} uses the last user-defined class loader in the stack which can be the + * wrong class loader. + * This is a known Java issue and a similar solution is often used. + * See: + * https://github.com/apache/spark/blob/v1.6.3/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala#L154 + * https://issues.apache.org/jira/browse/GROOVY-1627 + * https://github.com/spring-projects/spring-loaded/issues/107 + * </p> + */ +class StatelessJavaSerializer extends Serializer { + @SuppressWarnings("unchecked") + public void write(Kryo kryo, Output output, Object object) { + try { + ObjectOutputStream objectStream = new ObjectOutputStream(output); + objectStream.writeObject(object); + objectStream.flush(); + } catch (Exception e) { + throw new KryoException("Error during Java serialization.", e); + } + } + + @SuppressWarnings("unchecked") + public Object read (Kryo kryo, Input input, Class type) { + try { + return new ObjectInputStreamWithClassLoader(input, kryo.getClassLoader()).readObject(); + } catch (Exception e) { + throw new KryoException("Error during Java deserialization.", e); + } + } + + /** + * ObjectInputStream with specific ClassLoader. + */ + private static class ObjectInputStreamWithClassLoader extends ObjectInputStream { + private final ClassLoader classLoader; + + ObjectInputStreamWithClassLoader(InputStream in, ClassLoader classLoader) throws IOException { + super(in); + this.classLoader = classLoader; + } + + @Override + protected Class<?> resolveClass(ObjectStreamClass desc) { + try { + return Class.forName(desc.getName(), false, classLoader); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not find class: " + desc.getName(), e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java index e353017..0468cd0 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.spark.coders; import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.serializers.JavaSerializer; + import com.google.common.collect.Iterables; import java.io.Serializable; import org.apache.beam.sdk.coders.Coder; @@ -49,9 +49,9 @@ public class BeamSparkRunnerRegistratorTest { for (Class<?> clazz : classesForJavaSerialization) { Assert.assertThat("Registered serializer for class " + clazz.getName() - + " was not an instance of " + JavaSerializer.class.getName(), + + " was not an instance of " + StatelessJavaSerializer.class.getName(), kryo.getSerializer(clazz), - Matchers.instanceOf(JavaSerializer.class)); + Matchers.instanceOf(StatelessJavaSerializer.class)); } } }
