jstorm-runner: Add Kryo serializer for UnmodifiableIterable
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e808730 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e808730 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e808730 Branch: refs/heads/jstorm-runner Commit: 9e8087306b5562fdecf678979b9f2d49dfaf368f Parents: 90ed2ef Author: basti.lj <[email protected]> Authored: Wed Aug 16 19:01:48 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:03:01 2017 +0800 ---------------------------------------------------------------------- .../BeamSdkRepackUtilsSerializer.java | 34 ++++++++++++++++++++ .../serialization/GuavaUtilsSerializer.java | 34 ++++++++++++++++++++ 2 files changed, 68 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9e808730/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java index 4ae47eb..2912194 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java @@ -24,12 +24,14 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; import java.util.EnumMap; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.sdk.repackaged.com.google.common.collect.HashBasedTable; import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList; import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap; import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet; import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable; +import org.apache.beam.sdk.repackaged.com.google.common.collect.Iterables; import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists; import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps; import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets; @@ -244,10 +246,42 @@ public class BeamSdkRepackUtilsSerializer { ImmutableSetSerializer.class); } + /** + * Specific serializer of {@link Kryo} for UnmodifiableIterable. + */ + public static class UnmodifiableIterableSerializer extends Serializer<Iterable<Object>> { + + @Override + public void write(Kryo kryo, Output output, Iterable<Object> object) { + int size = Iterables.size(object); + output.writeInt(size, true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); + } + } + + @Override + public Iterable<Object> read(Kryo kryo, Input input, Class<Iterable<Object>> type) { + final int size = input.readInt(true); + List<Object> iterable = Lists.newArrayList(); + for (int i = 0; i < size; ++i) { + iterable.add(kryo.readClassAndObject(input)); + } + return Iterables.unmodifiableIterable(iterable); + } + } + + private static void registerUnmodifiableIterablesSerializers(Config config) { + config.registerSerialization( + Iterables.unmodifiableIterable(Lists.newArrayList()).getClass(), + UnmodifiableIterableSerializer.class); + } + public static void registerSerializers(Config config) { registerImmutableListSerializers(config); registerImmutableMapSerializers(config); registerImmutableSetSerializers(config); + registerUnmodifiableIterablesSerializers(config); } } http://git-wip-us.apache.org/repos/asf/beam/blob/9e808730/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java index e6f750c..ee83aa6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableTable; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -34,6 +35,7 @@ import com.google.common.collect.Table; import java.util.EnumMap; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -244,9 +246,41 @@ public class GuavaUtilsSerializer { ImmutableSetSerializer.class); } + /** + * Specific serializer of {@link Kryo} for UnmodifiableIterable. + */ + public static class UnmodifiableIterableSerializer extends Serializer<Iterable<Object>> { + + @Override + public void write(Kryo kryo, Output output, Iterable<Object> object) { + int size = Iterables.size(object); + output.writeInt(size, true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); + } + } + + @Override + public Iterable<Object> read(Kryo kryo, Input input, Class<Iterable<Object>> type) { + final int size = input.readInt(true); + List<Object> iterable = Lists.newArrayList(); + for (int i = 0; i < size; ++i) { + iterable.add(kryo.readClassAndObject(input)); + } + return Iterables.unmodifiableIterable(iterable); + } + } + + private static void registerUnmodifiableIterablesSerializers(Config config) { + config.registerSerialization( + Iterables.unmodifiableIterable(Lists.newArrayList()).getClass(), + UnmodifiableIterableSerializer.class); + } + public static void registerSerializers(Config config) { registerImmutableListSerializers(config); registerImmutableMapSerializers(config); registerImmutableSetSerializers(config); + registerUnmodifiableIterablesSerializers(config); } }
