This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 6991515 Add kryo serializer in Heron API (#3256)
6991515 is described below
commit 699151501bb79e06f6a0f8627120762e16cde3b7
Author: Ning Wang <[email protected]>
AuthorDate: Wed May 15 23:05:16 2019 -0700
Add kryo serializer in Heron API (#3256)
* Add kryo serializer in Heron API
It was not supported in Heron API. As the result, users can't register
serializer like they did with Storm API. The issue is that Kryo is not
part of the Heron API and it is added in this PR.
Integration test is included.
* Update comment
---
heron/api/src/java/BUILD | 5 +-
.../api/src/java/org/apache/heron/api/Config.java | 121 ++++++++++
.../heron/api/serializer/DefaultKryoFactory.java | 74 +++++++
.../heron/api/serializer/IKryoDecorator.java} | 24 +-
.../apache/heron/api/serializer/IKryoFactory.java | 47 ++++
.../heron/api/serializer/KryoSerializer.java | 244 +++++++++++++++++++++
.../api/serializer/SerializableSerializer.java | 65 ++++++
.../src/java/org/apache/heron/api/utils/Utils.java | 9 +
.../java/org/apache/heron/streamlet/Config.java | 50 +++--
.../heron/streamlet/impl/KryoSerializer.java | 92 +-------
.../java/org/apache/heron/api/utils/UtilsTest.java | 10 +
.../topology/serialization/CustomObject.java | 5 +
.../serialization/KryoSerializationTopology.java | 66 ++++++
.../KryoSerializationTopologyResults.json | 1 +
.../UnserializableCustomCheckBolt.java | 58 +++++
.../serialization/UnserializableCustomObject.java | 30 ++-
.../serialization/UnserializableCustomSpout.java | 65 ++++++
.../src/python/test_runner/resources/test.json | 5 +
.../src/java/backtype/storm/utils/ConfigUtils.java | 3 +
.../java/org/apache/storm/utils/ConfigUtils.java | 3 +
website/content/docs/developers/java/eco-api.mmark | 7 +-
21 files changed, 839 insertions(+), 145 deletions(-)
diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD
index d300286..46d389c 100644
--- a/heron/api/src/java/BUILD
+++ b/heron/api/src/java/BUILD
@@ -23,7 +23,9 @@ java_library(
name = "api-java-low-level",
srcs = glob(["org/apache/heron/api/**/*.java"]),
javacopts = DOCLINT_HTML_AND_SYNTAX,
- deps = api_deps_files,
+ deps = api_deps_files + [
+ "//third_party/java:kryo-neverlink"
+ ]
)
# Functional Api
@@ -33,7 +35,6 @@ java_library(
javacopts = DOCLINT_HTML_AND_SYNTAX,
deps = api_deps_files + [
":api-java-low-level",
- "//third_party/java:kryo-neverlink",
"@org_apache_commons_commons_lang3//jar"
]
)
diff --git a/heron/api/src/java/org/apache/heron/api/Config.java
b/heron/api/src/java/org/apache/heron/api/Config.java
index 8da9109..075ebb5 100644
--- a/heron/api/src/java/org/apache/heron/api/Config.java
+++ b/heron/api/src/java/org/apache/heron/api/Config.java
@@ -21,6 +21,7 @@ package org.apache.heron.api;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -29,6 +30,10 @@ import java.util.Set;
import javax.xml.bind.DatatypeConverter;
+import com.esotericsoftware.kryo.Serializer;
+
+import org.apache.heron.api.serializer.IKryoDecorator;
+import org.apache.heron.api.serializer.IKryoFactory;
import org.apache.heron.common.basics.ByteAmount;
/**
@@ -106,6 +111,45 @@ public class Config extends HashMap<String, Object> {
*/
public static final String TOPOLOGY_SERIALIZER_CLASSNAME =
"topology.serializer.classname";
/**
+ * Class that specifies how to create a Kryo instance for serialization.
Heron will then apply
+ * topology.kryo.register. The default implementation
+ * implements topology.fall.back.on.java.serialization and turns references
off.
+ */
+ public static final String TOPOLOGY_KRYO_FACTORY = "topology.kryo.factory";
+ /**
+ * A list of serialization registrations if KryoSerializor is used.
+ * In Kryo, the serialization can be the name of a class (in which case Kryo
will automatically
+ * create a serializer for the class that saves all the object's fields), or
an implementation
+ * of Kryo Serializer.
+ * <p>
+ * See Kryo's documentation for more information about writing custom
serializers.
+ */
+ public static final String TOPOLOGY_KRYO_REGISTER = "topology.kryo.register";
+ /**
+ * A list of classes that customize storm's kryo instance during start-up.
+ * Each listed class name must implement IKryoDecorator. During start-up the
+ * listed class is instantiated with 0 arguments, then its 'decorate' method
+ * is called with storm's kryo instance as the only argument.
+ */
+ public static final String TOPOLOGY_KRYO_DECORATORS =
"topology.kryo.decorators";
+ /**
+ * Whether or not Heron should skip the loading of kryo registrations for
which it
+ * does not know the class or have the serializer implementation. Otherwise,
the task will
+ * fail to load and will throw an error at runtime. The use case of this is
if you want to
+ * declare your serializations on the heron.yaml files on the cluster rather
than every single
+ * time you submit a topology. Different applications may use different
serializations and so
+ * a single application may not have the code for the other serializers used
by other apps.
+ * By setting this config to true, Heron will ignore that it doesn't have
those other serializations
+ * rather than throw an error.
+ */
+ public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS =
+ "topology.skip.missing.kryo.registrations";
+ /**
+ * Whether or not to fallback to Java serialization in a topology.
+ */
+ public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION =
+ "topology.fall.back.on.java.serialization";
+ /**
* Is the topology running in atleast-once mode?
* <p>
* <p>If this is set to false, then Heron will immediately ack tuples as soon
@@ -415,6 +459,58 @@ public class Config extends HashMap<String, Object> {
conf.put(Config.TOPOLOGY_SERIALIZER_CLASSNAME, className);
}
+ public static void setKryoFactory(Map conf, Class<? extends IKryoFactory>
klass) {
+ conf.put(Config.TOPOLOGY_KRYO_FACTORY, klass.getName());
+ }
+
+ public static void setSkipMissingKryoRegistrations(Map conf, boolean skip) {
+ conf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, skip);
+ }
+
+ @SuppressWarnings("rawtypes") // List can contain strings or maps
+ private static List getRegisteredKryoSerializations(Map<String, Object>
conf) {
+ List ret;
+ if (!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
+ ret = new ArrayList();
+ } else {
+ ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_REGISTER));
+ }
+ conf.put(Config.TOPOLOGY_KRYO_REGISTER, ret);
+ return ret;
+ }
+
+ public static void registerKryoSerialization(Map<String, Object> conf, Class
klass) {
+ getRegisteredKryoSerializations(conf).add(klass.getName());
+ }
+
+ public static void registerKryoSerialization(
+ Map<String, Object> conf, Class klass, Class<? extends Serializer>
serializerClass) {
+ Map<String, String> register = new HashMap<>();
+ register.put(klass.getName(), serializerClass.getName());
+ getRegisteredKryoSerializations(conf).add(register);
+ }
+
+ private static List<String> getRegisteredDecorators(Map conf) {
+ List<String> ret;
+ if (!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) {
+ ret = new ArrayList<>();
+ } else {
+ ret = new ArrayList<>((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));
+ }
+ conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret);
+ return ret;
+ }
+
+ public static void registerDecorator(
+ Map conf,
+ Class<? extends IKryoDecorator> klass) {
+ getRegisteredDecorators(conf).add(klass.getName());
+ }
+
+ public static void setFallBackOnJavaSerialization(Map conf, boolean
fallback) {
+ conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, fallback);
+ }
+
/**
* Is topology running with acking enabled?
* @deprecated use {@link #setTopologyReliabilityMode(Map,
TopologyReliabilityMode)} instead.
@@ -715,6 +811,31 @@ public class Config extends HashMap<String, Object> {
setSerializationClassName(this, className);
}
+ public void setKryoFactory(Class<? extends IKryoFactory> klass) {
+ setKryoFactory(this, klass);
+ }
+
+ public void setSkipMissingKryoRegistrations(boolean skip) {
+ setSkipMissingKryoRegistrations(this, skip);
+ }
+
+ public void registerKryoSerialization(Class klass) {
+ registerKryoSerialization(this, klass);
+ }
+
+ public void registerKryoSerialization(Class klass,
+ Class<? extends Serializer> serializerClass) {
+ registerKryoSerialization(this, klass, serializerClass);
+ }
+
+ public void registerDecorator(Class<? extends IKryoDecorator> klass) {
+ registerDecorator(this, klass);
+ }
+
+ public void setFallBackOnJavaSerialization(boolean fallback) {
+ setFallBackOnJavaSerialization(this, fallback);
+ }
+
/**
* Is topology running with acking enabled?
* The SupressWarning will be removed once TOPOLOGY_ENABLE_ACKING is removed
diff --git
a/heron/api/src/java/org/apache/heron/api/serializer/DefaultKryoFactory.java
b/heron/api/src/java/org/apache/heron/api/serializer/DefaultKryoFactory.java
new file mode 100644
index 0000000..98831de
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/api/serializer/DefaultKryoFactory.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.api.serializer;
+
+import java.util.Map;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+
+import org.apache.heron.api.Config;
+
+
+public class DefaultKryoFactory implements IKryoFactory {
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Kryo getKryo(Map conf) {
+ KryoSerializableDefault k = new KryoSerializableDefault();
+ k.setRegistrationRequired(
+ (boolean)
conf.getOrDefault(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false));
+ k.setReferences(false);
+ return k;
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public void preRegister(Kryo k, Map conf) {
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void postRegister(Kryo k, Map conf) {
+ ((KryoSerializableDefault) k).overrideDefault(true);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public void postDecorate(Kryo k, Map conf) {
+ }
+
+ public static class KryoSerializableDefault extends Kryo {
+ private boolean override = false;
+
+ public void overrideDefault(boolean value) {
+ override = value;
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes") // superclass doesn't use types
+ public Serializer getDefaultSerializer(Class type) {
+ if (override) {
+ return new SerializableSerializer();
+ } else {
+ return super.getDefaultSerializer(type);
+ }
+ }
+ }
+}
diff --git a/heron/api/tests/java/org/apache/heron/api/utils/UtilsTest.java
b/heron/api/src/java/org/apache/heron/api/serializer/IKryoDecorator.java
similarity index 52%
copy from heron/api/tests/java/org/apache/heron/api/utils/UtilsTest.java
copy to heron/api/src/java/org/apache/heron/api/serializer/IKryoDecorator.java
index add1fa1..fdabaea 100644
--- a/heron/api/tests/java/org/apache/heron/api/utils/UtilsTest.java
+++ b/heron/api/src/java/org/apache/heron/api/serializer/IKryoDecorator.java
@@ -17,26 +17,10 @@
* under the License.
*/
-package org.apache.heron.api.utils;
+package org.apache.heron.api.serializer;
-import java.util.ArrayList;
+import com.esotericsoftware.kryo.Kryo;
-import org.junit.Test;
-
-import junit.framework.TestCase;
-
-public class UtilsTest extends TestCase {
-
- @Test
- public void testAssignKeyToTask() {
- ArrayList<Integer> taskIds = new ArrayList<Integer>();
- taskIds.add(1);
- taskIds.add(2);
- assertEquals(taskIds.size(), 2);
- assertEquals(Utils.assignKeyToTask(0, taskIds), Integer.valueOf(1));
- assertEquals(Utils.assignKeyToTask(100, taskIds), Integer.valueOf(1));
- assertEquals(Utils.assignKeyToTask(101, taskIds), Integer.valueOf(2));
- assertEquals(Utils.assignKeyToTask(-100, taskIds), Integer.valueOf(1));
- assertEquals(Utils.assignKeyToTask(-101, taskIds), Integer.valueOf(2));
- }
+public interface IKryoDecorator {
+ void decorate(Kryo k);
}
diff --git
a/heron/api/src/java/org/apache/heron/api/serializer/IKryoFactory.java
b/heron/api/src/java/org/apache/heron/api/serializer/IKryoFactory.java
new file mode 100644
index 0000000..9d4be2a
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/api/serializer/IKryoFactory.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.api.serializer;
+
+import java.util.Map;
+
+import com.esotericsoftware.kryo.Kryo;
+
+/**
+ * An interface that controls the Kryo instance used by Storm for
serialization.
+ * The lifecycle is:
+ * <p>
+ * 1. The Kryo instance is constructed using getKryo
+ * 2. Heron registers the default classes (e.g. arrays, lists, maps, etc.)
+ * 3. Heron calls preRegister hook
+ * 4. Heron registers all user-defined registrations through
topology.kryo.register
+ * 5. Heron calls postRegister hook
+ * 6. Heron calls all user-defined decorators through topology.kryo.decorators
+ * 7. Heron calls postDecorate hook
+ */
+@SuppressWarnings("rawtypes")
+public interface IKryoFactory {
+ Kryo getKryo(Map conf);
+
+ void preRegister(Kryo k, Map conf);
+
+ void postRegister(Kryo k, Map conf);
+
+ void postDecorate(Kryo k, Map conf);
+}
diff --git
a/heron/api/src/java/org/apache/heron/api/serializer/KryoSerializer.java
b/heron/api/src/java/org/apache/heron/api/serializer/KryoSerializer.java
new file mode 100644
index 0000000..fe7ebf5
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/api/serializer/KryoSerializer.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.api.serializer;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Logger;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.esotericsoftware.kryo.serializers.DefaultSerializers;
+import com.esotericsoftware.kryo.serializers.MapSerializer;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.api.utils.Utils;
+
+/**
+ * KryoSerializer is a wrapper around Heron's IPluggableSerializer.
+ * Streamlet based topologies turning on kryo serialization are based off of
it.
+ */
+public class KryoSerializer implements IPluggableSerializer {
+ private static final Logger LOG =
Logger.getLogger(KryoSerializer.class.getName());
+ private static final String DEFAULT_FACTORY =
+ "org.apache.heron.api.serializer.DefaultKryoFactory";
+
+ private IKryoFactory kryoFactory;
+ private Kryo kryo;
+ private Output kryoOut;
+ private Input kryoIn;
+
+ @Override
+ public void initialize(Map<String, Object> config) {
+ String factoryClassName =
+ (String) config.getOrDefault(Config.TOPOLOGY_KRYO_FACTORY,
DEFAULT_FACTORY);
+ kryoFactory = (IKryoFactory) Utils.newInstance(factoryClassName);
+ kryo = kryoFactory.getKryo(config);
+ kryoOut = new Output(2000, 2000000000);
+ kryoIn = new Input(1);
+
+ boolean skipMissing =
+ (boolean)
config.getOrDefault(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, false);
+ registerDefaultSerializers(kryo);
+ kryoFactory.preRegister(kryo, config);
+ registerUserSerializers(kryo, config, skipMissing);
+ kryoFactory.postRegister(kryo, config);
+
+ applyUserDecorators(kryo, config, skipMissing);
+ kryoFactory.postDecorate(kryo, config);
+ }
+
+ @Override
+ public byte[] serialize(Object object) {
+ kryoOut.clear();
+ kryo.writeClassAndObject(kryoOut, object);
+ return kryoOut.toBytes();
+ }
+
+ @Override
+ public Object deserialize(byte[] input) {
+ kryoIn.setBuffer(input);
+ return kryo.readClassAndObject(kryoIn);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private static void registerDefaultSerializers(Kryo k) {
+ // Default serializers
+ k.register(byte[].class);
+ k.register(ArrayList.class, new ArrayListSerializer());
+ k.register(HashMap.class, new HashMapSerializer());
+ k.register(HashSet.class, new HashSetSerializer());
+ k.register(BigInteger.class, new
DefaultSerializers.BigIntegerSerializer());
+ k.register(Values.class);
+ }
+
+ private static void registerUserSerializers(
+ Kryo k, Map<String, Object> config, boolean skipMissing) {
+ // Configured serializers
+ Map<String, String> registrations = normalizeKryoRegister(config);
+
+ for (String klassName : registrations.keySet()) {
+ String serializerClassName = registrations.get(klassName);
+ try {
+ Class klass = Class.forName(klassName);
+ Class serializerClass = null;
+ if (serializerClassName != null) {
+ serializerClass = Class.forName(serializerClassName);
+ }
+ LOG.info("Doing kryo.register for class " + klass);
+ if (serializerClass == null) {
+ k.register(klass);
+ } else {
+ k.register(klass, resolveSerializerInstance(k, klass,
serializerClass));
+ }
+
+ } catch (ClassNotFoundException e) {
+ if (skipMissing) {
+ LOG.info("Could not find serialization or class for "
+ + serializerClassName + ". Skipping registration...");
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private static void applyUserDecorators(
+ Kryo k, Map<String, Object> config, boolean skipMissing) {
+ if (config.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) {
+ for (String klassName : (List<String>)
config.get(Config.TOPOLOGY_KRYO_DECORATORS)) {
+ try {
+ Class klass = Class.forName(klassName);
+ IKryoDecorator decorator = (IKryoDecorator) klass.newInstance();
+ decorator.decorate(k);
+ } catch (ClassNotFoundException e) {
+ if (skipMissing) {
+ LOG.info("Could not find kryo decorator named "
+ + klassName + ". Skipping registration...");
+ } else {
+ throw new RuntimeException(e);
+ }
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private static Map<String, String> normalizeKryoRegister(Map<String, Object>
config) {
+ // TODO: de-duplicate this logic with the code in nimbus
+ Object res = config.get(Config.TOPOLOGY_KRYO_REGISTER);
+ if (res == null) {
+ return new TreeMap<String, String>();
+ }
+ Map<String, String> ret = new HashMap<>();
+ // Register config is a list. Each value can be either a String or a map
+ for (Object o: (List) res) {
+ if (o instanceof Map) {
+ ret.putAll((Map) o);
+ } else {
+ ret.put((String) o, null);
+ }
+ }
+
+ //ensure always same order for registrations with TreeMap
+ return new TreeMap<String, String>(ret);
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ private static Serializer resolveSerializerInstance(
+ Kryo k, Class superClass, Class<? extends Serializer> serializerClass) {
+ Constructor<? extends Serializer> ctor;
+
+ try {
+ ctor = serializerClass.getConstructor(Kryo.class, Class.class);
+ return ctor.newInstance(k, superClass);
+ } catch (NoSuchMethodException | InvocationTargetException
+ | InstantiationException | IllegalAccessException ex) {
+ // do nothing
+ }
+
+ try {
+ ctor = serializerClass.getConstructor(Kryo.class);
+ return ctor.newInstance(k);
+ } catch (NoSuchMethodException | InvocationTargetException
+ | InstantiationException | IllegalAccessException ex) {
+ // do nothing
+ }
+
+ try {
+ ctor = serializerClass.getConstructor(Class.class);
+ return ctor.newInstance(k);
+ } catch (NoSuchMethodException | InvocationTargetException
+ | InstantiationException | IllegalAccessException ex) {
+ // do nothing
+ }
+
+ try {
+ return serializerClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException ex) {
+ // do nothing
+ }
+
+ throw new IllegalArgumentException(
+ String.format("Unable to create serializer \"%s\" for class: %s",
+ serializerClass.getName(), superClass.getName()));
+ }
+
+ private static class ArrayListSerializer extends CollectionSerializer {
+ @Override
+ @SuppressWarnings("rawtypes") // extending Kryo class that uses raw types
+ public Collection create(Kryo k, Input input, Class<Collection> type) {
+ return new ArrayList();
+ }
+ }
+
+ private static class HashMapSerializer extends MapSerializer {
+ @Override
+ @SuppressWarnings("rawtypes") // extending kryo class signature that takes
Map
+ public Map<String, Object> create(Kryo k, Input input, Class<Map> type) {
+ return new HashMap<>();
+ }
+ }
+
+ private static class HashSetSerializer extends CollectionSerializer {
+ @Override
+ @SuppressWarnings("rawtypes") // extending Kryo class that uses raw types
+ public Collection create(Kryo k, Input input, Class<Collection> type) {
+ return new HashSet();
+ }
+ }
+}
diff --git
a/heron/api/src/java/org/apache/heron/api/serializer/SerializableSerializer.java
b/heron/api/src/java/org/apache/heron/api/serializer/SerializableSerializer.java
new file mode 100644
index 0000000..fd28667
--- /dev/null
+++
b/heron/api/src/java/org/apache/heron/api/serializer/SerializableSerializer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.api.serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+
+public class SerializableSerializer extends Serializer<Object> {
+
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(object);
+ oos.flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ byte[] ser = bos.toByteArray();
+ output.writeInt(ser.length);
+ output.writeBytes(ser);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Object read(Kryo kryo, Input input, Class c) {
+ int len = input.readInt();
+ byte[] ser = new byte[len];
+ input.readBytes(ser);
+
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(ser);
+ ObjectInputStream ois = new ObjectInputStream(bis)) {
+ return ois.readObject();
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/heron/api/src/java/org/apache/heron/api/utils/Utils.java
b/heron/api/src/java/org/apache/heron/api/utils/Utils.java
index e54dfdd..7118ede 100644
--- a/heron/api/src/java/org/apache/heron/api/utils/Utils.java
+++ b/heron/api/src/java/org/apache/heron/api/utils/Utils.java
@@ -46,6 +46,15 @@ public final class Utils {
private Utils() {
}
+ public static Object newInstance(String klass) {
+ try {
+ Class<?> c = Class.forName(klass);
+ return c.newInstance();
+ } catch (ClassNotFoundException | InstantiationException |
IllegalAccessException e) {
+ throw new RuntimeException("Failed to create instance for class: " +
klass, e);
+ }
+ }
+
public static List<Object> tuple(Object... values) {
List<Object> ret = new ArrayList<>();
Collections.addAll(ret, values);
diff --git a/heron/api/src/java/org/apache/heron/streamlet/Config.java
b/heron/api/src/java/org/apache/heron/streamlet/Config.java
index c9c5338..b98c1d6 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Config.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Config.java
@@ -23,7 +23,6 @@ package org.apache.heron.streamlet;
import java.io.Serializable;
import org.apache.heron.common.basics.ByteAmount;
-import org.apache.heron.streamlet.impl.KryoSerializer;
/**
* Config is the way users configure the execution of the topology.
@@ -33,11 +32,12 @@ import org.apache.heron.streamlet.impl.KryoSerializer;
*/
public final class Config implements Serializable {
private static final long serialVersionUID = 6204498077403076352L;
+
+ private org.apache.heron.api.Config heronConfig;
private final double cpu;
private final ByteAmount ram;
private final DeliverySemantics deliverySemantics;
private final Serializer serializer;
- private org.apache.heron.api.Config heronConfig;
private static final long MB = 1024 * 1024;
private static final long GB = 1024 * MB;
@@ -61,7 +61,6 @@ public final class Config implements Serializable {
}
private static class Defaults {
- static final boolean USE_KRYO = true;
static final double CPU = -1.0; // -1 means
undefined
static final ByteAmount RAM = ByteAmount.fromBytes(-1); // -1 means
undefined
static final DeliverySemantics SEMANTICS = DeliverySemantics.ATMOST_ONCE;
@@ -69,10 +68,10 @@ public final class Config implements Serializable {
}
private Config(Builder builder) {
- serializer = builder.serializer;
heronConfig = builder.config;
cpu = builder.cpu;
ram = builder.ram;
+ serializer = builder.serializer;
deliverySemantics = builder.deliverySemantics;
}
@@ -167,6 +166,18 @@ public final class Config implements Serializable {
}
}
+ private static String translateSerializer(Serializer serializer) {
+ switch (serializer) {
+ case JAVA:
+ return org.apache.heron.api.serializer.JavaSerializer.class.getName();
+ case KRYO:
+ return org.apache.heron.api.serializer.KryoSerializer.class.getName();
+ default:
+ // KryoSerializer is the default in Streamlet API
+ return org.apache.heron.api.serializer.KryoSerializer.class.getName();
+ }
+ }
+
public static final class Builder {
private org.apache.heron.api.Config config;
private double cpu;
@@ -179,7 +190,7 @@ public final class Config implements Serializable {
cpu = Defaults.CPU;
ram = Defaults.RAM;
deliverySemantics = Defaults.SEMANTICS;
- serializer = Serializer.KRYO;
+ serializer = Defaults.SERIALIZER;
}
/**
@@ -245,10 +256,13 @@ public final class Config implements Serializable {
*/
public Builder setDeliverySemantics(DeliverySemantics semantics) {
this.deliverySemantics = semantics;
- config.setTopologyReliabilityMode(Config.translateSemantics(semantics));
return this;
}
+ private void applyDeliverySemantics() {
+
config.setTopologyReliabilityMode(Config.translateSemantics(deliverySemantics));
+ }
+
/**
* Sets some user-defined key/value mapping
* @param key The user-defined key
@@ -259,14 +273,6 @@ public final class Config implements Serializable {
return this;
}
- private void useKryo() {
- try {
- config.setSerializationClassName(KryoSerializer.class.getName());
- } catch (NoClassDefFoundError e) {
- throw new RuntimeException("Linking with kryo is needed because
useKryoSerializer is used");
- }
- }
-
/**
* Sets the {@link Serializer} to be used by the topology (current options
are {@link
* KryoSerializer} and the native Java serializer.
@@ -277,10 +283,20 @@ public final class Config implements Serializable {
return this;
}
- public Config build() {
- if (serializer.equals(Serializer.KRYO)) {
- useKryo();
+ private void applySerializer() {
+ try {
+ String serializerClass = translateSerializer(serializer);
+ config.setSerializationClassName(serializerClass);
+ } catch (NoClassDefFoundError e) {
+ throw new RuntimeException("Linking with serializer" + serializer + "
is needed");
}
+ }
+
+ public Config build() {
+ // Translate and apply Streamlet configs to Heron configs
+ applySerializer();
+ applyDeliverySemantics();
+
return new Config(this);
}
}
diff --git
a/heron/api/src/java/org/apache/heron/streamlet/impl/KryoSerializer.java
b/heron/api/src/java/org/apache/heron/streamlet/impl/KryoSerializer.java
index 9449629..66160f2 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/KryoSerializer.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/KryoSerializer.java
@@ -17,95 +17,13 @@
* under the License.
*/
-
package org.apache.heron.streamlet.impl;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import com.esotericsoftware.kryo.serializers.DefaultSerializers;
-import com.esotericsoftware.kryo.serializers.MapSerializer;
-
-import org.apache.heron.api.serializer.IPluggableSerializer;
-
/**
- * KryoSerializer is a wrapper around Heron's IPluggableSerializer.
- * Streamlet based topologies turning on kryo serialization are based off of
it.
+ * KryoSerializer has been moved to heron.api.serializer package so that it
can be used
+ * by Heron topologies written in low level API. This is just an alias for
backward compatible
+ * purpose in case some users use this class directly. The class should NOT be
used in Heron code
+ * and it should be avoid by users.
*/
-public class KryoSerializer implements IPluggableSerializer {
- private Kryo kryo;
- private Output kryoOut;
- private Input kryoIn;
-
- /**
- * A quick utility function that determines whether kryo has been linked
- * with the streamlet binary
- */
- public static void checkForKryo() {
- Kryo k = new Kryo();
- }
-
- @Override
- public void initialize(Map<String, Object> config) {
- kryo = getKryo();
- kryoOut = new Output(2000, 2000000000);
- kryoIn = new Input(1);
- }
-
- @Override
- public byte[] serialize(Object object) {
- kryoOut.clear();
- kryo.writeClassAndObject(kryoOut, object);
- return kryoOut.toBytes();
- }
-
- @Override
- public Object deserialize(byte[] input) {
- kryoIn.setBuffer(input);
- return kryo.readClassAndObject(kryoIn);
- }
-
- private Kryo getKryo() {
- Kryo k = new Kryo();
- k.setRegistrationRequired(false);
- k.setReferences(false);
- k.register(byte[].class);
- k.register(ArrayList.class, new ArrayListSerializer());
- k.register(HashMap.class, new HashMapSerializer());
- k.register(HashSet.class, new HashSetSerializer());
- k.register(BigInteger.class, new
DefaultSerializers.BigIntegerSerializer());
- return k;
- }
-
- private class ArrayListSerializer extends CollectionSerializer {
- @Override
- @SuppressWarnings("rawtypes") // extending Kryo class that uses raw types
- public Collection create(Kryo k, Input input, Class<Collection> type) {
- return new ArrayList();
- }
- }
-
- private class HashMapSerializer extends MapSerializer {
- @Override
- @SuppressWarnings("rawtypes") // extending kryo class signature that takes
Map
- public Map<String, Object> create(Kryo k, Input input, Class<Map> type) {
- return new HashMap<>();
- }
- }
-
- private class HashSetSerializer extends CollectionSerializer {
- @Override
- @SuppressWarnings("rawtypes") // extending Kryo class that uses raw types
- public Collection create(Kryo k, Input input, Class<Collection> type) {
- return new HashSet();
- }
- }
+public class KryoSerializer extends
org.apache.heron.api.serializer.KryoSerializer {
}
diff --git a/heron/api/tests/java/org/apache/heron/api/utils/UtilsTest.java
b/heron/api/tests/java/org/apache/heron/api/utils/UtilsTest.java
index add1fa1..b97c89f 100644
--- a/heron/api/tests/java/org/apache/heron/api/utils/UtilsTest.java
+++ b/heron/api/tests/java/org/apache/heron/api/utils/UtilsTest.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
import org.junit.Test;
+import org.apache.heron.api.serializer.KryoSerializer;
+
import junit.framework.TestCase;
public class UtilsTest extends TestCase {
@@ -39,4 +41,12 @@ public class UtilsTest extends TestCase {
assertEquals(Utils.assignKeyToTask(-100, taskIds), Integer.valueOf(1));
assertEquals(Utils.assignKeyToTask(-101, taskIds), Integer.valueOf(2));
}
+
+ @Test
+ public void testnewInstance() {
+ // Verify newInstance() works as expected to create a new instance of
+ // org.apache.heron.api.serializer.KryoSerializer.
+ Object o =
Utils.newInstance("org.apache.heron.api.serializer.KryoSerializer");
+ assertTrue(o instanceof KryoSerializer);
+ }
}
diff --git
a/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/CustomObject.java
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/CustomObject.java
index b75ca91..e48939b 100644
---
a/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/CustomObject.java
+++
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/CustomObject.java
@@ -30,6 +30,11 @@ public class CustomObject implements Serializable {
private String name;
private Integer age;
+ public CustomObject() {
+ this.name = null;
+ this.age = null;
+ }
+
public CustomObject(String name, Integer age) {
this.name = name;
this.age = age;
diff --git
a/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/KryoSerializationTopology.java
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/KryoSerializationTopology.java
new file mode 100644
index 0000000..05a34be
--- /dev/null
+++
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/KryoSerializationTopology.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_test.topology.serialization;
+
+import java.net.MalformedURLException;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.integration_test.common.AbstractTestTopology;
+import org.apache.heron.integration_test.common.bolt.IncrementBolt;
+import org.apache.heron.integration_test.core.TestTopologyBuilder;
+
+/**
+ * Topology to test Customized Java Serialization
+ */
+public final class KryoSerializationTopology extends AbstractTestTopology {
+
+ private KryoSerializationTopology(String[] args) throws
MalformedURLException {
+ super(args);
+ }
+
+ @Override
+ protected TestTopologyBuilder buildTopology(TestTopologyBuilder builder) {
+
+ CustomObject[] inputObjects = new CustomObject[]{
+ new CustomObject("A", 10),
+ new CustomObject("B", 20),
+ new CustomObject("C", 30)
+ };
+
+ builder.setSpout("custom-spout", new
UnserializableCustomSpout(inputObjects), 1);
+ builder.setBolt("check-bolt", new
UnserializableCustomCheckBolt(inputObjects), 1)
+ .shuffleGrouping("custom-spout");
+ builder.setBolt("count-bolt", new IncrementBolt(), 1)
+ .shuffleGrouping("check-bolt");
+ return builder;
+ }
+
+ @Override
+ protected Config buildConfig(Config config) {
+
config.setSerializationClassName("org.apache.heron.api.serializer.KryoSerializer");
+ config.registerKryoSerialization(UnserializableCustomObject.class);
+ return config;
+ }
+
+ public static void main(String[] args) throws Exception {
+ KryoSerializationTopology topology = new KryoSerializationTopology(args);
+ topology.submit();
+ }
+}
diff --git
a/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/KryoSerializationTopologyResults.json
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/KryoSerializationTopologyResults.json
new file mode 100644
index 0000000..65806fd
--- /dev/null
+++
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/KryoSerializationTopologyResults.json
@@ -0,0 +1 @@
+[10]
\ No newline at end of file
diff --git
a/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/UnserializableCustomCheckBolt.java
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/UnserializableCustomCheckBolt.java
new file mode 100644
index 0000000..10c8e6f
--- /dev/null
+++
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/UnserializableCustomCheckBolt.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_test.topology.serialization;
+
+import java.util.logging.Logger;
+
+import org.apache.heron.api.bolt.BaseBasicBolt;
+import org.apache.heron.api.bolt.BasicOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.tuple.Values;
+
+/**
+ * A bolt that checks deserialization works fine for UnserializableCustomObject
+ */
+public class UnserializableCustomCheckBolt extends BaseBasicBolt {
+ private static final Logger LOG =
Logger.getLogger(CustomCheckBolt.class.getName());
+ private int nItems;
+ private CustomObject[] inputObjects;
+
+ public UnserializableCustomCheckBolt(CustomObject[] inputObjects) {
+ this.nItems = 0;
+ this.inputObjects = inputObjects;
+ }
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ LOG.info("Received input tuple: " +
input.getValueByField("custom").toString());
+ UnserializableCustomObject unserializable =
+ (UnserializableCustomObject) input.getValueByField("custom");
+ if (unserializable.getObj().equals(inputObjects[(nItems++) %
inputObjects.length])) {
+ collector.emit(new Values(unserializable.getObj()));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("custom"));
+ }
+}
diff --git a/heron/api/tests/java/org/apache/heron/api/utils/UtilsTest.java
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/UnserializableCustomObject.java
similarity index 53%
copy from heron/api/tests/java/org/apache/heron/api/utils/UtilsTest.java
copy to
integration_test/src/java/org/apache/heron/integration_test/topology/serialization/UnserializableCustomObject.java
index add1fa1..af15c5b 100644
--- a/heron/api/tests/java/org/apache/heron/api/utils/UtilsTest.java
+++
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/UnserializableCustomObject.java
@@ -17,26 +17,24 @@
* under the License.
*/
-package org.apache.heron.api.utils;
+package org.apache.heron.integration_test.topology.serialization;
-import java.util.ArrayList;
+/**
+ * An example unserializable custom object. It contains a CustomObject.
+ */
+public class UnserializableCustomObject {
-import org.junit.Test;
+ private CustomObject obj;
-import junit.framework.TestCase;
+ public UnserializableCustomObject() {
+ this.obj = null;
+ }
-public class UtilsTest extends TestCase {
+ public UnserializableCustomObject(CustomObject obj) {
+ this.obj = obj;
+ }
- @Test
- public void testAssignKeyToTask() {
- ArrayList<Integer> taskIds = new ArrayList<Integer>();
- taskIds.add(1);
- taskIds.add(2);
- assertEquals(taskIds.size(), 2);
- assertEquals(Utils.assignKeyToTask(0, taskIds), Integer.valueOf(1));
- assertEquals(Utils.assignKeyToTask(100, taskIds), Integer.valueOf(1));
- assertEquals(Utils.assignKeyToTask(101, taskIds), Integer.valueOf(2));
- assertEquals(Utils.assignKeyToTask(-100, taskIds), Integer.valueOf(1));
- assertEquals(Utils.assignKeyToTask(-101, taskIds), Integer.valueOf(2));
+ public CustomObject getObj() {
+ return obj;
}
}
diff --git
a/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/UnserializableCustomSpout.java
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/UnserializableCustomSpout.java
new file mode 100644
index 0000000..dee4d31
--- /dev/null
+++
b/integration_test/src/java/org/apache/heron/integration_test/topology/serialization/UnserializableCustomSpout.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_test.topology.serialization;
+
+import java.util.Map;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Values;
+
+/**
+ * A spout that emits unserializable custom objects continuously in order,
+ * one object every "nextTuple()" called
+ * Note that the contructor parameter CustomObject is serializable but hte
emitted
+ * objects are not serializable.
+ */
+public class UnserializableCustomSpout extends BaseRichSpout {
+
+ private SpoutOutputCollector collector;
+ private int emitted = 0;
+ private CustomObject[] inputObjects;
+
+ public UnserializableCustomSpout(CustomObject[] inputObjects) {
+ this.inputObjects = inputObjects;
+ }
+
+ @Override
+ public void open(Map<String, Object> conf,
+ TopologyContext context,
+ SpoutOutputCollector outputCollector) {
+ this.collector = outputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ CustomObject obj = inputObjects[(emitted++) % inputObjects.length];
+ UnserializableCustomObject unserializable = new
UnserializableCustomObject(obj);
+ collector.emit(new Values(unserializable));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("custom"));
+ }
+}
diff --git a/integration_test/src/python/test_runner/resources/test.json
b/integration_test/src/python/test_runner/resources/test.json
index 06dcba4..2b4b492 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -100,6 +100,11 @@
"expectedResultRelativePath" :
"serialization/SerializationTopologyResults.json"
},
{
+ "topologyName": "IntegrationTest_CustomKryoSerialization",
+ "classPath": "serialization.KryoSerializationTopology",
+ "expectedResultRelativePath":
"serialization/KryoSerializationTopologyResults.json"
+ },
+ {
"topologyName" : "IntegrationTest_SlidingCountWindow1",
"classPath" : "windowing.count.SlidingCountWindowTest1",
"expectedResultRelativePath" :
"windowing/count/SlidingCountWindowTest1Results.json"
diff --git a/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
b/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
index ec29225..adc396d 100644
--- a/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
+++ b/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
@@ -78,6 +78,9 @@ public final class ConfigUtils {
}
private static void doSerializationTranslation(Config heronConfig) {
+ // Serialization config is handled by HeronPluggableSerializerDelegate
therefore the storm
+ // configs are used here instead of translating to the Heron configs.
Storm relies on Kryo but
+ // Heron abstracts serailizers differently. KryoSerializer is one of the
PluggableSerializer.
if
(heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION)
&&
(heronConfig.get(backtype.storm.Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION)
instanceof Boolean)
diff --git
a/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
b/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
index 3bd755d..c381e61 100644
--- a/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
@@ -77,6 +77,9 @@ public final class ConfigUtils {
}
private static void doSerializationTranslation(Config heronConfig) {
+ // Serialization config is handled by HeronPluggableSerializerDelegate
therefore the storm
+ // configs are used here instead of translating to the Heron configs.
Storm relies on Kryo but
+ // Heron abstracts serailizers differently. KryoSerializer is one of the
PluggableSerializer.
if
(heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION)
&&
(heronConfig.get(org.apache.storm.Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION)
instanceof Boolean)
diff --git a/website/content/docs/developers/java/eco-api.mmark
b/website/content/docs/developers/java/eco-api.mmark
index c25c21e..9a208dd 100644
--- a/website/content/docs/developers/java/eco-api.mmark
+++ b/website/content/docs/developers/java/eco-api.mmark
@@ -267,12 +267,13 @@ the message at a later time.
of when you'd do this is to add a hook that integrates with your internal
monitoring system. These hooks are instantiated using the zero-arg constructor.
* `"topology.serializer.classname"` : The serialization class that is used to
serialize/deserialize tuples
+* `"topology.kryo.register"` : A list of serialization registrations when
KryoSerializor is used.
+* `"topology.kryo.decorators"` : A list of classes that customize storm's kryo
instance during start-up when KryoSerializor is used.
+* `"topology.skip.missing.kryo.registrations"` : When KryoSerializor is used,
whether or not Heron should skip the loading of kryo registrations for which it
does not know the class or have the serializer implementation.
+* `"topology.fall.back.on.java.serialization"` : When KryoSerializor is used,
whether or not to fallback to Java serialization in a topology.
* `"topology.reliability.mode"` : A Heron topology can be run in any one of
the TopologyReliabilityMode
mode. The format of this flag is the string encoded values of the
underlying TopologyReliabilityMode value. Values are `ATMOST_ONCE`,
`ATLEAST_ONCE`, and `EFFECTIVELY_ONCE`.
-* `"topology.reliability.mode"` : A Heron topology can be run in any one of
the TopologyReliabilityMode
-mode. The format of this flag is the string encoded values of the
-underlying TopologyReliabilityMode value.
* `"topology.container.cpu"` : Number of CPU cores per container to be
reserved for this topology.
* `"topology.container.ram"` : Amount of RAM per container to be reserved for
this topology. In bytes.
* `"topology.container.disk"` : Amount of disk per container to be reserved
for this topology. In bytes.