Repository: apex-malhar Updated Branches: refs/heads/master 0852c6594 -> ca6995ca4
APEXMALHAR-2358 #resolve #comment Optimise GenericSerde to use specific serde to improve the performance Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b2b135df Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b2b135df Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b2b135df Branch: refs/heads/master Commit: b2b135df218422d5966a5db7fb0add749564bb1f Parents: 91767c5 Author: brightchen <[email protected]> Authored: Mon Nov 28 11:17:14 2016 -0800 Committer: brightchen <[email protected]> Committed: Tue Jan 10 17:09:10 2017 -0800 ---------------------------------------------------------------------- .../util/serde/GenericSerdePerformanceTest.java | 118 +++++++++++++++++++ .../malhar/lib/utils/serde/GenericSerde.java | 62 ++++++++-- .../lib/utils/serde/ImmutablePairSerde.java | 60 ++++++++++ .../malhar/lib/utils/serde/TimeWindowSerde.java | 42 +++++++ .../lib/utils/serde/GenericSerdeTest.java | 17 +++ 5 files changed, 288 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java new file mode 100644 index 0000000..98ecf67 --- /dev/null +++ b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.benchmark.util.serde; + +import java.util.Random; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.utils.serde.GenericSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer; +import org.apache.apex.malhar.lib.utils.serde.StringSerde; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.commons.lang3.tuple.ImmutablePair; + +import com.esotericsoftware.kryo.Kryo; + +public class GenericSerdePerformanceTest +{ + private static final transient Logger logger = LoggerFactory.getLogger(GenericSerdePerformanceTest.class); + private SerializationBuffer buffer = SerializationBuffer.READ_BUFFER; + private Random random = new Random(); + private int serdeDataSize = 1000000; + + + @Test + public void testCompareSerdeForString() + { + long beginTime = System.currentTimeMillis(); + testSerdeForString(new GenericSerde<String>(String.class)); + long genericSerdeCost = System.currentTimeMillis() - beginTime; + logger.info("Generic Serde cost for String: {}", genericSerdeCost); + + beginTime = System.currentTimeMillis(); + testSerdeForString(new StringSerde()); + long stringSerdeCost = System.currentTimeMillis() - beginTime; + logger.info("String Serde cost for String: {}", stringSerdeCost); + + beginTime = System.currentTimeMillis(); + Kryo kryo = new Kryo(); + for (int i = 0; i < serdeDataSize; ++i) { + kryo.writeObject(buffer, "" + random.nextInt(1000)); + buffer.toSlice(); + } + buffer.release(); + long kryoSerdeCost = System.currentTimeMillis() - beginTime; + logger.info("Kryo Serde cost for String: {}", kryoSerdeCost); + } + + protected void testSerdeForString(Serde<String> serde) + { + for (int i = 0; i < serdeDataSize; ++i) { + serde.serialize("" + random.nextInt(1000), buffer); + buffer.toSlice(); + } + buffer.release(); + } + + + @Test + public void testCompareSerdeForRealCase() + { + long beginTime = System.currentTimeMillis(); + GenericSerde<ImmutablePair> serde = new GenericSerde<ImmutablePair>(); + for (int i = 0; i < serdeDataSize; ++i) { + serde.serialize(generatePair(beginTime), buffer); + buffer.toSlice(); + } + buffer.release(); + long genericSerdeCost = System.currentTimeMillis() - beginTime; + logger.info("Generic Serde cost for ImmutablePair: {}", genericSerdeCost); + + + beginTime = System.currentTimeMillis(); + Kryo kryo = new Kryo(); + for (int i = 0; i < serdeDataSize; ++i) { + kryo.writeObject(buffer, generatePair(beginTime)); + buffer.toSlice(); + } + buffer.release(); + long kryoSerdeCost = System.currentTimeMillis() - beginTime; + logger.info("Kryo Serde cost for ImmutablePair without class info: {}", kryoSerdeCost); + + + beginTime = System.currentTimeMillis(); + Kryo kryo1 = new Kryo(); + for (int i = 0; i < serdeDataSize; ++i) { + kryo1.writeClassAndObject(buffer, generatePair(beginTime)); + buffer.toSlice(); + } + buffer.release(); + long kryoSerdeCost2 = System.currentTimeMillis() - beginTime; + logger.info("Kryo Serde cost for ImmutablePair with class info: {}", kryoSerdeCost2); + } + + protected ImmutablePair generatePair(long now) + { + return new ImmutablePair(new Window.TimeWindow(now + random.nextInt(100), random.nextInt(100)), "" + random.nextInt(1000)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java index 8501614..4b28a00 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java @@ -18,11 +18,16 @@ */ package org.apache.apex.malhar.lib.utils.serde; +import java.util.Map; + +import org.apache.apex.malhar.lib.window.Window; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.classification.InterfaceStability; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import com.google.common.collect.Maps; /** * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because @@ -36,31 +41,62 @@ import com.esotericsoftware.kryo.io.Output; @InterfaceStability.Evolving public class GenericSerde<T> implements Serde<T> { - private transient ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() - { - @Override - public Kryo get() - { - return new Kryo(); - } - }; + /** + * The default GenericSerde use the default class to serde map + */ + public static final GenericSerde DEFAULT = new GenericSerde(); + private transient Kryo kryo = new Kryo(); private final Class<? extends T> clazz; + @SuppressWarnings("rawtypes") + private Map<Class, Serde> typeToSerde = Maps.newHashMap(); + + public <C> void registerSerde(Class<C> type, Serde<C> serde) + { + typeToSerde.put(type, serde); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void registerDefaultSerdes() + { + registerSerde(String.class, new StringSerde()); + registerSerde(Long.class, new LongSerde()); + registerSerde(Integer.class, new IntSerde()); + registerSerde(ImmutablePair.class, new ImmutablePairSerde()); + registerSerde(Window.TimeWindow.class, new TimeWindowSerde()); + } + public GenericSerde() { - this.clazz = null; + this(null); } public GenericSerde(Class<? extends T> clazz) { this.clazz = clazz; + registerDefaultSerdes(); + } + + public Serde getDefaultSerde(Class type) + { + return typeToSerde.get(type); } @Override public void serialize(T object, Output output) { - Kryo kryo = kryos.get(); + Class type = object.getClass(); + Serde serde = null; + if (clazz == type) { + serde = getDefaultSerde(type); + } + if (serde != null) { + serde.serialize(object, output); + return; + } + + //delegate to kryo if (clazz == null) { kryo.writeClassAndObject(output, object); } else { @@ -71,8 +107,12 @@ public class GenericSerde<T> implements Serde<T> @Override public T deserialize(Input input) { + Serde serde = clazz == null ? null : getDefaultSerde(clazz); + if (serde != null) { + return (T)serde.deserialize(input); + } + T object; - Kryo kryo = kryos.get(); if (clazz == null) { object = (T)kryo.readClassAndObject(input); } else { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java new file mode 100644 index 0000000..98ced16 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * An implementation of {@link Serde} which serializes and deserializes {@link ImmtablePair}s. + * + */ [email protected] +public class ImmutablePairSerde<L, R> implements Serde<ImmutablePair<L, R>> +{ + private Serde<L> leftSerde; + private Serde<R> rightSerde; + + public ImmutablePairSerde() + { + this(GenericSerde.DEFAULT, GenericSerde.DEFAULT); + } + + public ImmutablePairSerde(Serde<L> leftSerde, Serde<R> rightSerde) + { + this.leftSerde = leftSerde; + this.rightSerde = rightSerde; + } + + @Override + public void serialize(ImmutablePair<L, R> pair, Output output) + { + leftSerde.serialize(pair.left, output); + rightSerde.serialize(pair.right, output); + } + + @Override + public ImmutablePair<L, R> deserialize(Input input) + { + throw new RuntimeException("Not Supported."); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java new file mode 100644 index 0000000..268a7ce --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.Window.TimeWindow; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +public class TimeWindowSerde implements Serde<Window.TimeWindow> +{ + @Override + public void serialize(TimeWindow timeWindow, Output output) + { + output.writeLong(timeWindow.getBeginTimestamp()); + output.writeLong(timeWindow.getDurationMillis()); + } + + @Override + public TimeWindow deserialize(Input input) + { + return new TimeWindow(input.readLong(), input.readLong()); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java index 34b7088..5ac043e 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java @@ -24,6 +24,8 @@ import java.util.List; import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.window.Window; + import com.esotericsoftware.kryo.io.Input; import com.google.common.collect.Lists; @@ -70,6 +72,7 @@ public class GenericSerdeTest Assert.assertEquals(stringList, deserializedList); } + @Test public void pojoTest() { @@ -81,4 +84,18 @@ public class GenericSerdeTest TestPojo deserializedPojo = serdePojo.deserialize(new Input(slice.buffer, slice.offset, slice.length)); Assert.assertEquals(pojo, deserializedPojo); } + + @Test + public void timeWindowSerdeTest() + { + GenericSerde<Window.TimeWindow>[] serdes = new GenericSerde[] {new GenericSerde<>(Window.TimeWindow.class), GenericSerde.DEFAULT}; + for (GenericSerde<Window.TimeWindow> serde : serdes) { + Window.TimeWindow pojo = new Window.TimeWindow(System.currentTimeMillis(), 1000); + SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + serde.serialize(pojo, buffer); + Slice slice = buffer.toSlice(); + Window.TimeWindow deserializedPojo = serde.deserialize(new Input(slice.buffer, slice.offset, slice.length)); + Assert.assertEquals(pojo, deserializedPojo); + } + } }
