APEXMALHAR-1984 #resolve #comment Create util function to clone operator object by kryo
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/cbc1bbbf Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/cbc1bbbf Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/cbc1bbbf Branch: refs/heads/release-3.3 Commit: cbc1bbbf20e60ee2d25d1a666b1f7a3674e3688a Parents: ca1c7e6 Author: Siyuan Hua <[email protected]> Authored: Thu Jan 28 15:30:04 2016 -0800 Committer: Thomas Weise <[email protected]> Committed: Sat Feb 20 00:18:39 2016 -0800 ---------------------------------------------------------------------- .../AbstractCouchBaseInputOperator.java | 18 +- .../kafka/AbstractKafkaInputOperator.java | 16 +- .../kinesis/AbstractKinesisInputOperator.java | 16 +- .../contrib/kafka/SimpleKakfaConsumerTest.java | 5 +- .../util/FieldValueSerializableGenerator.java | 1 + .../malhar/kafka/AbstractKafkaPartitioner.java | 14 +- .../lib/io/block/AbstractBlockReader.java | 20 +-- .../lib/io/fs/AbstractFileInputOperator.java | 15 +- .../datatorrent/lib/util/KryoCloneUtils.java | 171 +++++++++++++++++++ .../datastructs/DimensionalTableTest.java | 7 +- .../appdata/schemas/ResultFormatterTest.java | 6 +- .../schemas/SchemaRegistryMultipleTest.java | 5 +- .../snapshot/AppDataSnapshotServerMapTest.java | 5 +- .../lib/util/KryoCloneUtilsTest.java | 131 ++++++++++++++ .../com/datatorrent/lib/util/PojoUtilsTest.java | 3 +- .../com/datatorrent/lib/util/TestUtils.java | 20 --- 16 files changed, 335 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java index 60938ab..1cd4eb5 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java @@ -26,23 +26,19 @@ import java.util.concurrent.TimeUnit; import com.couchbase.client.CouchbaseClient; import com.couchbase.client.vbucket.config.Config; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.io.output.ByteArrayOutputStream; - import com.datatorrent.lib.db.AbstractStoreInputOperator; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultPartition; import com.datatorrent.api.Partitioner; +import com.datatorrent.lib.util.KryoCloneUtils; import com.datatorrent.netlet.util.DTThrowable; /** @@ -149,19 +145,13 @@ public abstract class AbstractCouchBaseInputOperator<T> extends AbstractStoreInp int numPartitions = conf.getServers().size(); List<String> list = conf.getServers(); Collection<Partition<AbstractCouchBaseInputOperator<T>>> newPartitions = Lists.newArrayListWithExpectedSize(numPartitions); - Kryo kryo = new Kryo(); + KryoCloneUtils<AbstractCouchBaseInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this); for (int i = 0; i < numPartitions; i++) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - Output output = new Output(bos); - kryo.writeObject(output, this); - output.close(); - Input lInput = new Input(bos.toByteArray()); - @SuppressWarnings("unchecked") - AbstractCouchBaseInputOperator<T> oper = kryo.readObject(lInput, this.getClass()); + AbstractCouchBaseInputOperator<T> oper = cloneUtils.getClone(); oper.setServerIndex(i); oper.setServerURIString(list.get(i)); logger.debug("oper {} urlstring is {}", i, oper.getServerURIString()); - newPartitions.add(new DefaultPartition<AbstractCouchBaseInputOperator<T>>(oper)); + newPartitions.add(new DefaultPartition<>(oper)); } return newPartitions; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index 671a76f..b166b9e 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -29,9 +29,8 @@ import com.datatorrent.api.StatsListener; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.io.IdempotentStorageManager; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; +import com.datatorrent.lib.util.KryoCloneUtils; + import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; @@ -54,7 +53,6 @@ import org.slf4j.LoggerFactory; import javax.validation.Valid; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; @@ -640,14 +638,8 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem // Create a new partition with the partition Ids and initial offset positions protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> initOffsets, Collection<IdempotentStorageManager> newManagers) { - Kryo kryo = new Kryo(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - Output output = new Output(bos); - kryo.writeObject(output, this); - output.close(); - Input lInput = new Input(bos.toByteArray()); - @SuppressWarnings("unchecked") - Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<AbstractKafkaInputOperator<K>>(kryo.readObject(lInput, this.getClass())); + + Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<>(KryoCloneUtils.cloneObject(this)); if (p.getPartitionedInstance().getConsumer() instanceof SimpleKafkaConsumer) { p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets); if (initOffsets != null) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java index e7186b5..6306b04 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java @@ -26,11 +26,9 @@ import com.datatorrent.api.*; import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.common.util.Pair; import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.lib.util.KryoCloneUtils; + import com.esotericsoftware.kryo.DefaultSerializer; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.NotNull; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.collect.Sets; @@ -40,8 +38,8 @@ import org.slf4j.LoggerFactory; import javax.validation.Valid; import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.lang.reflect.Array; import java.util.*; @@ -374,13 +372,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, private Partition<AbstractKinesisInputOperator> createPartition(Set<String> shardIds, Map<String, String> initShardPos, Collection<IdempotentStorageManager> newManagers) { - Kryo kryo = new Kryo(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - Output output = new Output(bos); - kryo.writeObject(output, this); - output.close(); - Input lInput = new Input(bos.toByteArray()); - Partition<AbstractKinesisInputOperator> p = new DefaultPartition<AbstractKinesisInputOperator>(kryo.readObject(lInput, this.getClass())); + Partition<AbstractKinesisInputOperator> p = new DefaultPartition<AbstractKinesisInputOperator>(KryoCloneUtils.cloneObject(this)); newManagers.add(p.getPartitionedInstance().idempotentStorageManager); p.getPartitionedInstance().getConsumer().setShardIds(shardIds); p.getPartitionedInstance().getConsumer().resetShardPositions(initShardPos); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java index d2491e1..1368cc0 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java @@ -18,8 +18,7 @@ */ package com.datatorrent.contrib.kafka; -import com.datatorrent.lib.util.TestUtils; -import com.esotericsoftware.kryo.Kryo; +import com.datatorrent.lib.util.KryoCloneUtils; import org.junit.Assert; import org.junit.Test; @@ -39,7 +38,7 @@ public class SimpleKakfaConsumerTest kc.setTopic("test_topic"); kc.setClientId("test_clientid"); - SimpleKafkaConsumer kcClone = TestUtils.clone(new Kryo(), kc); + SimpleKafkaConsumer kcClone = KryoCloneUtils.cloneObject(kc); Assert.assertEquals("Buffer size is " + bufferSize, bufferSize, kcClone.getBufferSize()); Assert.assertEquals("Cache size is " + cacheSize, cacheSize, kcClone.getCacheSize()); Assert.assertEquals("Clint id is same", kc.getClientId(), kcClone.getClientId()); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java b/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java index 5340b0e..2975c9c 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java +++ b/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java @@ -123,6 +123,7 @@ public class FieldValueSerializableGenerator< T extends FieldInfo> extends Field { if( _kryo == null ) _kryo = new Kryo(); + _kryo.setClassLoader(clazz.getClassLoader()); } } return _kryo; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index 53bbd2a..bf36064 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -18,7 +18,6 @@ */ package org.apache.apex.malhar.kafka; -import java.io.ByteArrayOutputStream; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -38,14 +37,12 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import com.google.common.base.Joiner; import com.datatorrent.api.DefaultPartition; import com.datatorrent.api.Partitioner; import com.datatorrent.api.StatsListener; +import com.datatorrent.lib.util.KryoCloneUtils; /** * Abstract partitioner used to manage the partitions of kafka input operator. @@ -161,14 +158,7 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition(Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment) { - Kryo kryo = new Kryo(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - Output output = new Output(bos); - kryo.writeObject(output, prototypeOperator); - output.close(); - Input lInput = new Input(bos.toByteArray()); - Partitioner.Partition<AbstractKafkaInputOperator> p = (Partitioner.Partition<AbstractKafkaInputOperator>) - new DefaultPartition<>(kryo.readObject(lInput, prototypeOperator.getClass())); + Partitioner.Partition<AbstractKafkaInputOperator> p = new DefaultPartition<AbstractKafkaInputOperator>(KryoCloneUtils.cloneObject(prototypeOperator)); p.getPartitionedInstance().assign(partitionAssignment); return p; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java index afde623..a6d1bd9 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java @@ -18,7 +18,6 @@ */ package com.datatorrent.lib.io.block; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.Collection; @@ -36,9 +35,6 @@ import org.slf4j.LoggerFactory; import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.fs.PositionedReadable; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -52,6 +48,7 @@ import com.datatorrent.api.Stats; import com.datatorrent.api.StatsListener; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.counters.BasicCounters; +import com.datatorrent.lib.util.KryoCloneUtils; /** * AbstractBlockReader processes a block of data from a stream.<br/> @@ -295,20 +292,9 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext partitionIterator.remove(); } } else { - //Add more partitions - Kryo kryo = new Kryo(); + KryoCloneUtils<AbstractBlockReader<R, B, STREAM>> cloneUtils = KryoCloneUtils.createCloneUtils(this); while (morePartitionsToCreate-- > 0) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - Output loutput = new Output(bos); - kryo.writeObject(loutput, this); - loutput.close(); - Input lInput = new Input(bos.toByteArray()); - - @SuppressWarnings("unchecked") - AbstractBlockReader<R, B, STREAM> blockReader = kryo.readObject(lInput, this.getClass()); - - DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<>( - blockReader); + DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<>(cloneUtils.getClone()); newPartitions.add(partition); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index 5067b07..0bcf956 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -25,9 +25,6 @@ import java.util.regex.Pattern; import javax.validation.constraints.NotNull; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -45,6 +42,7 @@ import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.api.*; import com.datatorrent.api.Context.CountersAggregator; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.util.KryoCloneUtils; /** * This is the base implementation of a directory input operator, which scans a directory for files. @@ -829,21 +827,14 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par */ List<DirectoryScanner> scanners = scanner.partition(totalCount, oldscanners); - Kryo kryo = new Kryo(); Collection<Partition<AbstractFileInputOperator<T>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount); Collection<IdempotentStorageManager> newManagers = Lists.newArrayListWithExpectedSize(totalCount); + KryoCloneUtils<AbstractFileInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this); for (int i=0; i<scanners.size(); i++) { - // Kryo.copy fails as it attempts to clone transient fields - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - Output loutput = new Output(bos); - kryo.writeObject(loutput, this); - loutput.close(); - Input lInput = new Input(bos.toByteArray()); @SuppressWarnings("unchecked") - AbstractFileInputOperator<T> oper = kryo.readObject(lInput, this.getClass()); - lInput.close(); + AbstractFileInputOperator<T> oper = cloneUtils.getClone(); DirectoryScanner scn = scanners.get(i); oper.setScanner(scn); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java b/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java new file mode 100644 index 0000000..fbe6ce4 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.util; + +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Array; + +import org.apache.commons.io.IOUtils; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * + * A Kryo Clone Util class that clone object by using Kryo serializer and deserializer + * The class has static method that are can be used directly to clone one object + * Or it can be used as util instance to clone as many objects as you need from the one source object + * + * @since 3.4.0 + */ +public class KryoCloneUtils<T> +{ + + /** + * Reusable Kryo object as deserializer + */ + private final Kryo kryo; + + /** + * Reusable binary data for object that would be deserialized from + */ + private final byte[] bin; + + /** + * The class of the object + */ + private final Class<T> clazz; + + @SuppressWarnings("unchecked") + private KryoCloneUtils(Kryo kryo, T t) + { + this.kryo = kryo; + ByteArrayOutputStream bos = null; + Output output = null; + try { + bos = new ByteArrayOutputStream(); + output = new Output(bos); + kryo.writeObject(output, t); + output.close(); + bin = bos.toByteArray(); + } finally { + IOUtils.closeQuietly(output); + IOUtils.closeQuietly(bos); + } + clazz = (Class<T>)t.getClass(); + kryo.setClassLoader(clazz.getClassLoader()); + } + + /** + * Clone from the binary data of the source object + * @return T + */ + public T getClone() + { + try (Input input = new Input(bin)) { + return kryo.readObject(input, clazz); + } + } + + /** + * Clone array of objects from source object + * @param num size of the return array + * @return array of T + */ + @SuppressWarnings("unchecked") + public T[] getClones(int num) + { + T[] ts = (T[])Array.newInstance(clazz, num); + try (Input input = new Input(bin)) { + for (int i = 0; i < ts.length; i++) { + input.rewind(); + ts[i] = kryo.readObject(input, clazz); + } + } + return ts; + } + + + + /** + * Clone object by serializing and deserializing using Kryo. + * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields. + * + * @param kryo kryo object used to clone objects + * @param src src object that copy from + * @return + */ + @SuppressWarnings("unchecked") + public static <SRC> SRC cloneObject(Kryo kryo, SRC src) + { + kryo.setClassLoader(src.getClass().getClassLoader()); + ByteArrayOutputStream bos = null; + Output output; + Input input = null; + try { + bos = new ByteArrayOutputStream(); + output = new Output(bos); + kryo.writeObject(output, src); + output.close(); + input = new Input(bos.toByteArray()); + return (SRC)kryo.readObject(input, src.getClass()); + } finally { + IOUtils.closeQuietly(input); + IOUtils.closeQuietly(bos); + } + } + + + /** + * Clone object by serializing and deserializing using default Kryo. + * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields. + * + * @param src src object that copy from + * @return + */ + public static <SRC> SRC cloneObject(SRC src) + { + return cloneObject(new Kryo(), src); + } + + /** + * Factory function to return CloneUtils object + * @param template + * @param <SRC> + * @return + */ + public static <SRC> KryoCloneUtils<SRC> createCloneUtils(SRC template) + { + return createCloneUtils(new Kryo(), template); + } + + /** + * Factory function to return CloneUtils object with customized Kryo + * @param kryo + * @param template + * @param <SRC> + * @return + */ + public static <SRC> KryoCloneUtils<SRC> createCloneUtils(Kryo kryo, SRC template) + { + return new KryoCloneUtils<>(kryo, template); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java b/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java index bd2c660..db74007 100644 --- a/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java +++ b/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java @@ -21,7 +21,6 @@ package com.datatorrent.lib.appdata.datastructs; import java.util.Map; import java.util.Set; -import com.esotericsoftware.kryo.Kryo; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -29,7 +28,7 @@ import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; -import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.lib.util.KryoCloneUtils; public class DimensionalTableTest { @@ -38,7 +37,7 @@ public class DimensionalTableTest { DimensionalTable<Integer> table = createTestTable(); - TestUtils.clone(new Kryo(), table); + KryoCloneUtils.cloneObject(table); } @Test @@ -47,7 +46,7 @@ public class DimensionalTableTest DimensionalTable<Integer> table = createTestTable(); int size = table.size(); - table = TestUtils.clone(new Kryo(), table); + table = KryoCloneUtils.cloneObject(table); Assert.assertEquals(size, table.size()); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java index 1f1895b..5f2d924 100644 --- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java +++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java @@ -18,19 +18,17 @@ */ package com.datatorrent.lib.appdata.schemas; -import com.esotericsoftware.kryo.Kryo; - import org.junit.Assert; import org.junit.Test; -import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.lib.util.KryoCloneUtils; public class ResultFormatterTest { @Test public void serializationTest() throws Exception { - TestUtils.clone(new Kryo(), new ResultFormatter()); + KryoCloneUtils.cloneObject(new ResultFormatter()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java index 8d997d8..0513079 100644 --- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java +++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java @@ -21,14 +21,13 @@ package com.datatorrent.lib.appdata.schemas; import java.util.Collections; import java.util.Map; -import com.esotericsoftware.kryo.Kryo; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; -import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.lib.util.KryoCloneUtils; public class SchemaRegistryMultipleTest { @@ -50,7 +49,7 @@ public class SchemaRegistryMultipleTest { SchemaRegistryMultiple schemaRegistryMultiple = createSchemaRegistry(); - schemaRegistryMultiple = TestUtils.clone(new Kryo(), schemaRegistryMultiple); + schemaRegistryMultiple = KryoCloneUtils.cloneObject(schemaRegistryMultiple); Assert.assertEquals(2, schemaRegistryMultiple.size()); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java index e02e2c0..96b348c 100644 --- a/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java +++ b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java @@ -21,7 +21,6 @@ package com.datatorrent.lib.appdata.snapshot; import java.util.List; import java.util.Map; -import com.esotericsoftware.kryo.Kryo; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -32,7 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.lib.util.KryoCloneUtils; public class AppDataSnapshotServerMapTest { @@ -103,7 +102,7 @@ public class AppDataSnapshotServerMapTest Assert.assertEquals("b", secondRow.getString("word")); //Test serialization - TestUtils.clone(new Kryo(), snapshotServer); + KryoCloneUtils.cloneObject(snapshotServer); } private static final Logger LOG = LoggerFactory.getLogger(AppDataSnapshotServerMapTest.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java b/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java new file mode 100644 index 0000000..5eaea2e --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.util; + +import java.util.Objects; + +import org.junit.Test; + +import org.apache.commons.lang.math.RandomUtils; +import org.apache.commons.lang3.RandomStringUtils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Unit test for KryoCloneUtils + */ +public class KryoCloneUtilsTest +{ + + @Test + public void testGetClone() throws Exception + { + TestEntity from = getTestEntity(5); + KryoCloneUtils<TestEntity> cloneUtils = KryoCloneUtils.createCloneUtils(from); + TestEntity to = cloneUtils.getClone(); + assertFalse(from == to); + assertEquals(from, to); + assertFalse(from.transientProp.equals(to.transientProp)); + } + + @Test + public void testGetClones() throws Exception + { + TestEntity from = getTestEntity(5); + KryoCloneUtils<TestEntity> cloneUtils = KryoCloneUtils.createCloneUtils(from); + TestEntity[] to = cloneUtils.getClones(10); + for (TestEntity te : to) { + assertFalse(te == from); + assertEquals(from, te); + assertFalse(from.transientProp.equals(te.transientProp)); + } + } + + @Test + public void testCloneObject() throws Exception + { + TestEntity from = getTestEntity(5); + TestEntity to = KryoCloneUtils.cloneObject(from); + assertFalse(from == to); + assertEquals(from, to); + assertFalse(from.transientProp.equals(to.transientProp)); + } + + private TestEntity getTestEntity(int depth) { + TestEntity returnVal = null; + TestEntity curr = null; + while (depth-- > 0) { + if (curr == null) { + curr = returnVal = new TestEntity(); + } else { + curr.nestedProp = new TestEntity(); + curr = curr.nestedProp; + } + } + return returnVal; + } + + + static class TestEntity { + + String strProp = RandomStringUtils.random(10); + + int intProp = RandomUtils.nextInt(1000); + + // transient should be skipped + transient String transientProp = RandomStringUtils.random(20); + + // deep clone should be supported + TestEntity nestedProp = null; + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestEntity that = (TestEntity)o; + return Objects.equals(intProp, that.intProp) && + Objects.equals(strProp, that.strProp) && + Objects.equals(nestedProp, that.nestedProp); + } + + @Override + public int hashCode() + { + return Objects.hash(strProp, intProp, nestedProp); + } + + @Override + public String toString() + { + return "TestEntity{" + + "strProp='" + strProp + '\'' + + ", intProp=" + intProp + + ", transientProp='" + transientProp + '\'' + + ", nestedProp=" + nestedProp + + '}'; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java b/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java index 3114800..5b1b3f1 100644 --- a/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java +++ b/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java @@ -61,7 +61,6 @@ import com.datatorrent.lib.util.PojoUtils.SetterInt; import com.datatorrent.lib.util.PojoUtils.SetterLong; import com.datatorrent.lib.util.PojoUtils.SetterShort; -import com.esotericsoftware.kryo.Kryo; public class PojoUtilsTest @@ -113,7 +112,7 @@ public class PojoUtilsTest public void testSerialization() throws Exception { GetterBoolean<Object> getBoolean = createGetterBoolean(fqcn, "innerObj.boolVal"); - TestUtils.clone(new Kryo(), getBoolean); + KryoCloneUtils.cloneObject(getBoolean); } @Test http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cbc1bbbf/library/src/test/java/com/datatorrent/lib/util/TestUtils.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java index 2932089..37d55e7 100644 --- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java +++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java @@ -51,26 +51,6 @@ public class TestUtils } } - /** - * Clone object by serializing and deserializing using Kryo. - * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields. - * @param kryo - * @param src - * @return - * @throws IOException - */ - public static <T> T clone(Kryo kryo, T src) throws IOException - { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - Output output = new Output(bos); - kryo.writeObject(output, src); - output.close(); - Input input = new Input(bos.toByteArray()); - @SuppressWarnings("unchecked") - Class<T> clazz = (Class<T>)src.getClass(); - return kryo.readObject(input, clazz); - } - @SuppressWarnings({ "unchecked", "rawtypes" }) public static <S extends Sink, T> S setSink(OutputPort<T> port, S sink) {
