jstorm-runner: add SdkRepackImmutableMapSerializer.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/588a6981 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/588a6981 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/588a6981 Branch: refs/heads/jstorm-runner Commit: 588a6981855b68b9733a1b0f368dce0ad5cfe837 Parents: ad04648 Author: Pei He <[email protected]> Authored: Wed Jul 19 11:13:04 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:59 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/JStormRunner.java | 2 + .../SdkRepackImmutableMapSerializer.java | 73 ++++++++++++++++++++ 2 files changed, 75 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/588a6981/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index baf4e5a..286a975 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer; import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer; import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer; import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer; +import org.apache.beam.runners.jstorm.serialization.SdkRepackImmutableMapSerializer; import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer; import org.apache.beam.runners.jstorm.translation.AbstractComponent; import org.apache.beam.runners.jstorm.translation.CommonInstance; @@ -103,6 +104,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> { ImmutableSetSerializer.registerSerializers(config); SdkRepackImmuSetSerializer.registerSerializers(config); ImmutableMapSerializer.registerSerializers(config); + SdkRepackImmutableMapSerializer.registerSerializers(config); config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class); return config; http://git-wip-us.apache.org/repos/asf/beam/blob/588a6981/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java new file mode 100644 index 0000000..546538a --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java @@ -0,0 +1,73 @@ +package org.apache.beam.runners.jstorm.serialization; + +import backtype.storm.Config; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap; +import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps; + +/** + * Specific serializer of {@link Kryo} for ImmutableMap. + */ +public class SdkRepackImmutableMapSerializer + extends Serializer<ImmutableMap<Object, ? extends Object>> { + + private static final boolean DOES_NOT_ACCEPT_NULL = true; + private static final boolean IMMUTABLE = true; + + public SdkRepackImmutableMapSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } + + @Override + public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) { + kryo.writeObject(output, Maps.newHashMap(immutableMap)); + } + + @Override + public ImmutableMap<Object, Object> read( + Kryo kryo, + Input input, + Class<ImmutableMap<Object, ? extends Object>> type) { + Map map = kryo.readObject(input, HashMap.class); + return ImmutableMap.copyOf(map); + } + + /** + * Creates a new {@link SdkRepackImmutableMapSerializer} and registers its serializer + * for the several ImmutableMap related classes. + */ + public static void registerSerializers(Config config) { + + config.registerSerialization(ImmutableMap.class, SdkRepackImmutableMapSerializer.class); + config.registerSerialization( + ImmutableMap.of().getClass(), SdkRepackImmutableMapSerializer.class); + + Object o1 = new Object(); + Object o2 = new Object(); + + config.registerSerialization( + ImmutableMap.of(o1, o1).getClass(), SdkRepackImmutableMapSerializer.class); + config.registerSerialization( + ImmutableMap.of(o1, o1, o2, o2).getClass(), + SdkRepackImmutableMapSerializer.class); + Map<DummyEnum, Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class); + for (DummyEnum e : DummyEnum.values()) { + enumMap.put(e, o1); + } + + config.registerSerialization( + ImmutableMap.copyOf(enumMap).getClass(), + SdkRepackImmutableMapSerializer.class); + } + + private enum DummyEnum { + VALUE1, + VALUE2 + } +}
