Repository: storm Updated Branches: refs/heads/master e9785d8f1 -> d593918a5
STORM-1649 kryo serialization in windowing Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b850847d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b850847d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b850847d Branch: refs/heads/master Commit: b850847d946ba5ad9809308065352a94bd6287b3 Parents: 33e4994 Author: Satish Duggana <[email protected]> Authored: Tue Mar 29 10:02:28 2016 +0530 Committer: Satish Duggana <[email protected]> Committed: Fri Apr 1 09:28:22 2016 +0530 ---------------------------------------------------------------------- .../trident/windowing/HBaseWindowsStore.java | 40 +++++---- .../windowing/HBaseWindowsStoreFactory.java | 4 +- .../windowing/InMemoryWindowsStoreFactory.java | 6 +- .../trident/windowing/WindowKryoSerializer.java | 87 ++++++++++++++++++++ .../windowing/WindowTridentProcessor.java | 8 +- .../trident/windowing/WindowsStateUpdater.java | 2 +- .../storm/trident/windowing/WindowsStore.java | 1 + .../trident/windowing/WindowsStoreFactory.java | 9 +- 8 files changed, 122 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java index ccce03a..e319a55 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java @@ -18,9 +18,6 @@ */ package org.apache.storm.hbase.trident.windowing; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -29,11 +26,11 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.storm.trident.windowing.WindowKryoSerializer; import org.apache.storm.trident.windowing.WindowsStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; @@ -41,6 +38,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -53,11 +51,12 @@ public class HBaseWindowsStore implements WindowsStore { public static final String UTF_8 = "utf-8"; private final ThreadLocal<HTable> threadLocalHtable; + private final ThreadLocal<WindowKryoSerializer> threadLocalWindowKryoSerializer; private final Queue<HTable> htables = new ConcurrentLinkedQueue<>(); private final byte[] family; private final byte[] qualifier; - public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { + public HBaseWindowsStore(final Map stormConf, final Configuration config, final String tableName, byte[] family, byte[] qualifier) { this.family = family; this.qualifier = qualifier; @@ -74,12 +73,23 @@ public class HBaseWindowsStore implements WindowsStore { } }; + threadLocalWindowKryoSerializer = new ThreadLocal<WindowKryoSerializer>(){ + @Override + protected WindowKryoSerializer initialValue() { + return new WindowKryoSerializer(stormConf); + } + }; + } private HTable htable() { return threadLocalHtable.get(); } + private WindowKryoSerializer windowKryoSerializer() { + return threadLocalWindowKryoSerializer.get(); + } + private byte[] effectiveKey(String key) { try { return key.getBytes(UTF_8); @@ -105,11 +115,7 @@ public class HBaseWindowsStore implements WindowsStore { return null; } - Kryo kryo = new Kryo(); - Input input = new Input(result.getValue(family, qualifier)); - Object resultObject = kryo.readClassAndObject(input); - return resultObject; - + return windowKryoSerializer().deserialize(result.getValue(family, qualifier)); } @Override @@ -129,7 +135,6 @@ public class HBaseWindowsStore implements WindowsStore { throw new RuntimeException(e); } - Kryo kryo = new Kryo(); List<Object> values = new ArrayList<>(); for (int i=0; i<results.length; i++) { Result result = results[i]; @@ -137,8 +142,7 @@ public class HBaseWindowsStore implements WindowsStore { LOG.error("Got empty result for key [{}]", keys.get(i)); throw new RuntimeException("Received empty result for key: "+keys.get(i)); } - Input input = new Input(result.getValue(family, qualifier)); - Object resultObject = kryo.readClassAndObject(input); + Object resultObject = windowKryoSerializer().deserialize(result.getValue(family, qualifier)); values.add(resultObject); } @@ -200,10 +204,7 @@ public class HBaseWindowsStore implements WindowsStore { throw new IllegalArgumentException("Invalid value of null with key: "+key); } Put put = new Put(effectiveKey(key)); - Kryo kryo = new Kryo(); - Output output = new Output(new ByteArrayOutputStream()); - kryo.writeClassAndObject(output, value); - put.addColumn(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position())); + put.addColumn(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), windowKryoSerializer().serializeToByteBuffer(value)); try { htable().put(put); } catch (IOException e) { @@ -216,10 +217,7 @@ public class HBaseWindowsStore implements WindowsStore { List<Put> list = new ArrayList<>(); for (Entry entry : entries) { Put put = new Put(effectiveKey(entry.key)); - Output output = new Output(new ByteArrayOutputStream()); - Kryo kryo = new Kryo(); - kryo.writeClassAndObject(output, entry.value); - put.addColumn(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position())); + put.addColumn(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), windowKryoSerializer().serializeToByteBuffer(entry.value)); list.add(put); } http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java index a47d5fb..a455924 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java @@ -42,14 +42,14 @@ public class HBaseWindowsStoreFactory implements WindowsStoreFactory { this.qualifier = qualifier; } - public WindowsStore create() { + public WindowsStore create(Map stormConf) { Configuration configuration = HBaseConfiguration.create(); for (Map.Entry<String, Object> entry : config.entrySet()) { if (entry.getValue() != null) { configuration.set(entry.getKey(), entry.getValue().toString()); } } - return new HBaseWindowsStore(configuration, tableName, family, qualifier); + return new HBaseWindowsStore(stormConf, configuration, tableName, family, qualifier); } } http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java index cf65594..f7e114d 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java @@ -18,12 +18,10 @@ */ package org.apache.storm.trident.windowing; -import org.apache.storm.trident.operation.Aggregator; import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.windowing.config.WindowConfig; -import org.apache.storm.tuple.Fields; import java.util.List; +import java.util.Map; /** * InMemoryWindowsStoreFactory contains a single instance of {@link InMemoryWindowsStore} which will be used for @@ -37,7 +35,7 @@ public class InMemoryWindowsStoreFactory implements WindowsStoreFactory { private InMemoryWindowsStore inMemoryWindowsStore; @Override - public WindowsStore create() { + public WindowsStore create(Map stormConf) { if(inMemoryWindowsStore == null) { inMemoryWindowsStore = new InMemoryWindowsStore(); } http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java new file mode 100644 index 0000000..b105180 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowKryoSerializer.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.storm.serialization.SerializationFactory; + +import java.nio.ByteBuffer; +import java.util.Map; + +/** + * Kryo serializer/deserializer for values that are stored as part of windowing. This can be used in {@link WindowsStore}. + * This class is not thread safe. + * + */ +public class WindowKryoSerializer { + + private final Kryo kryo; + private final Output output; + private final Input input; + + public WindowKryoSerializer(Map stormConf) { + kryo = SerializationFactory.getKryo(stormConf); + output = new Output(2000, 2_000_000_000); + input = new Input(); + } + + /** + * Serializes the given object into a byte array using Kryo serialization. + * + * @param obj Object to be serialized. + */ + public byte[] serialize(Object obj) { + output.clear(); + kryo.writeClassAndObject(output, obj); + return output.toBytes(); + } + + /** + * Serializes the given object into a {@link ByteBuffer} backed by the byte array returned by Kryo serialization. + * + * @param obj Object to be serialized. + */ + public ByteBuffer serializeToByteBuffer(Object obj) { + output.clear(); + kryo.writeClassAndObject(output, obj); + return ByteBuffer.wrap(output.getBuffer(), 0, output.position()); + } + + /** + * Returns an Object which is created using Kryo deserialization of given byte array instance. + * + * @param buff byte array to be deserialized into an Object + */ + public Object deserialize(byte[] buff) { + input.setBuffer(buff); + return kryo.readClassAndObject(input); + } + + /** + * Returns an Object which is created using Kryo deserialization of given {@code byteBuffer} instance. + * + * @param byteBuffer byte buffer to be deserialized into an Object + */ + public Object deserialize(ByteBuffer byteBuffer) { + input.setBuffer(byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.position()); + return kryo.readClassAndObject(input); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java index 5125e41..9b12057 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java @@ -65,7 +65,6 @@ public class WindowTridentProcessor implements TridentProcessor { private WindowsStoreFactory windowStoreFactory; private WindowsStore windowStore; - private Map conf; private TopologyContext topologyContext; private FreshCollector collector; private TridentTupleView.ProjectionFactory projection; @@ -85,21 +84,20 @@ public class WindowTridentProcessor implements TridentProcessor { } @Override - public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) { - this.conf = conf; + public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) { this.topologyContext = context; List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories(); if (parents.size() != 1) { throw new RuntimeException("Aggregation related operation can only have one parent"); } - Long maxTuplesCacheSize = getWindowTuplesCacheSize(conf); + Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf); this.tridentContext = tridentContext; collector = new FreshCollector(tridentContext); projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields); - windowStore = windowStoreFactory.create(); + windowStore = windowStoreFactory.create(stormConf); windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR; windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId); http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java index 6664b41..8042e93 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java @@ -71,7 +71,7 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> { @Override public void prepare(Map conf, TridentOperationContext context) { - windowsStore = windowStoreFactory.create(); + windowsStore = windowStoreFactory.create(conf); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java index 8904b7b..e09ac5e 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java @@ -26,6 +26,7 @@ import java.util.List; /** * Store for storing window related entities like windowed tuples, triggers etc. + * {@link WindowKryoSerializer} can be used for kryo serialization/deserialization of keys and values. * */ public interface WindowsStore extends Serializable { http://git-wip-us.apache.org/repos/asf/storm/blob/b850847d/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java index 409d672..edd0cb2 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStoreFactory.java @@ -18,7 +18,11 @@ */ package org.apache.storm.trident.windowing; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.planner.processor.TridentContext; + import java.io.Serializable; +import java.util.Map; /** * Factory to create instances of {@code WindowsStore}. @@ -29,7 +33,8 @@ public interface WindowsStoreFactory extends Serializable { /** * Creates a window store * - * @return + * @param stormConf storm topology configuration passed in {@link org.apache.storm.trident.planner.TridentProcessor#prepare(Map, TopologyContext, TridentContext)} + * */ - public WindowsStore create(); + public WindowsStore create(Map stormConf); }
