STORM-676 Upmerged and resolved conflicts
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/89c03b83 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/89c03b83 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/89c03b83 Branch: refs/heads/1.x-branch Commit: 89c03b83e990aa7fbc732caf055e6bd09e2fc479 Parents: 139a8a3 Author: Satish Duggana <[email protected]> Authored: Wed Mar 23 12:20:21 2016 +0530 Committer: Satish Duggana <[email protected]> Committed: Sun Mar 27 10:45:52 2016 +0530 ---------------------------------------------------------------------- examples/storm-starter/pom.xml | 44 ++- .../TridentHBaseWindowingStoreTopology.java | 124 +++++++++ .../TridentWindowingInmemoryStoreTopology.java | 134 +++++++++ .../trident/windowing/HBaseWindowsStore.java | 275 +++++++++++++++++++ .../windowing/HBaseWindowsStoreFactory.java | 51 ++++ storm-core/src/jvm/org/apache/storm/Config.java | 8 + .../jvm/org/apache/storm/trident/Stream.java | 223 +++++++++++++-- .../apache/storm/trident/TridentTopology.java | 4 + .../storm/trident/fluent/UniqueIdGen.java | 14 +- .../storm/trident/operation/builtin/Debug.java | 4 +- .../windowing/AbstractTridentWindowManager.java | 241 ++++++++++++++++ .../windowing/ITridentWindowManager.java | 59 ++++ .../windowing/InMemoryTridentWindowManager.java | 78 ++++++ .../trident/windowing/InMemoryWindowsStore.java | 200 ++++++++++++++ .../windowing/InMemoryWindowsStoreFactory.java | 37 +++ .../StoreBasedTridentWindowManager.java | 223 +++++++++++++++ .../trident/windowing/TridentBatchTuple.java | 42 +++ .../windowing/WindowTridentProcessor.java | 260 ++++++++++++++++++ .../storm/trident/windowing/WindowsState.java | 52 ++++ .../trident/windowing/WindowsStateFactory.java | 40 +++ .../trident/windowing/WindowsStateUpdater.java | 81 ++++++ .../storm/trident/windowing/WindowsStore.java | 78 ++++++ .../trident/windowing/WindowsStoreFactory.java | 35 +++ .../windowing/config/BaseWindowConfig.java | 48 ++++ .../windowing/config/SlidingCountWindow.java | 40 +++ .../windowing/config/SlidingDurationWindow.java | 42 +++ .../windowing/config/TumblingCountWindow.java | 39 +++ .../config/TumblingDurationWindow.java | 40 +++ .../trident/windowing/config/WindowConfig.java | 55 ++++ .../windowing/strategy/BaseWindowStrategy.java | 32 +++ .../strategy/SlidingCountWindowStrategy.java | 59 ++++ .../strategy/SlidingDurationWindowStrategy.java | 60 ++++ .../strategy/TumblingCountWindowStrategy.java | 60 ++++ .../TumblingDurationWindowStrategy.java | 60 ++++ .../windowing/strategy/WindowStrategy.java | 45 +++ .../strategy/WindowStrategyFactory.java | 60 ++++ .../apache/storm/windowing/TriggerHandler.java | 2 +- .../storm/trident/TridentWindowingTest.java | 110 ++++++++ 38 files changed, 3019 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/examples/storm-starter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index a44e14c..e702a5d 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -31,10 +31,13 @@ <name>storm-starter</name> <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <!-- see comment below... This fixes an annoyance with intellij --> - <provided.scope>provided</provided.scope> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <!-- see comment below... This fixes an annoyance with intellij --> + <provided.scope>provided</provided.scope> + <hbase.version>0.98.4-hadoop2</hbase.version> + <hbase.version>1.1.2</hbase.version> </properties> + <profiles> <profile> <id>intellij</id> @@ -139,6 +142,11 @@ <artifactId>storm-hdfs</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-hbase</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> @@ -167,6 +175,36 @@ <artifactId>storm-redis</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java new file mode 100644 index 0000000..24cc8d9 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class TridentHBaseWindowingStoreTopology { + private static final Logger log = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class); + + public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception { + FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), + new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), + new Values("how many apples can you eat"), new Values("to be or not to be the person")); + spout.setCycle(true); + + TridentTopology topology = new TridentTopology(); + + Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), + new Split(), new Fields("word")) + .window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +// .tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) + .each(new Fields("count"), new Debug()) + .each(new Fields("count"), new Echo(), new Fields("ct")); + + return topology.build(); + } + + public static class Split extends BaseFunction { + @Override + public void execute(TridentTuple tuple, TridentCollector collector) { + String sentence = tuple.getString(0); + for (String word : sentence.split(" ")) { + collector.emit(new Values(word)); + } + } + } + + static class Echo implements Function { + + @Override + public void execute(TridentTuple tuple, TridentCollector collector) { + log.info("##########Echo.execute: " + tuple); + collector.emit(tuple.getValues()); + } + + @Override + public void prepare(Map conf, TridentOperationContext context) { + + } + + @Override + public void cleanup() { + + } + } + + public static void main(String[] args) throws Exception { + Config conf = new Config(); + conf.setMaxSpoutPending(20); + conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 2); + + // window-state table should already be created with cf:tuples column + HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8")); + + if (args.length == 0) { + LocalCluster cluster = new LocalCluster(); + String topologyName = "wordCounterWithWindowing"; + cluster.submitTopology(topologyName, conf, buildTopology(windowStoreFactory)); + Utils.sleep(120 * 1000); + cluster.killTopology(topologyName); + cluster.shutdown(); + System.exit(0); + } else { + conf.setNumWorkers(3); + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(windowStoreFactory)); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java new file mode 100644 index 0000000..5f0cb4f --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseAggregator; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.SlidingCountWindow; +import org.apache.storm.trident.windowing.config.SlidingDurationWindow; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.trident.windowing.config.TumblingDurationWindow; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class TridentWindowingInmemoryStoreTopology { + private static final Logger log = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class); + + public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception { + FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), + new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), + new Values("how many apples can you eat"), new Values("to be or not to be the person")); + spout.setCycle(true); + + TridentTopology topology = new TridentTopology(); + + Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), + new Split(), new Fields("word")) + .window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +// .aggregate(new CountAsAggregator(), new Fields("count")) + .each(new Fields("count"), new Debug()) + .each(new Fields("count"), new Echo(), new Fields("ct")) + .each(new Fields("ct"), new Debug()); + + return topology.build(); + } + + public static class Split extends BaseFunction { + @Override + public void execute(TridentTuple tuple, TridentCollector collector) { + String sentence = tuple.getString(0); + for (String word : sentence.split(" ")) { + collector.emit(new Values(word)); + } + } + } + + static class Echo implements Function { + + @Override + public void execute(TridentTuple tuple, TridentCollector collector) { + log.info("##########Echo.execute: " + tuple); + collector.emit(tuple.getValues()); + } + + @Override + public void prepare(Map conf, TridentOperationContext context) { + + } + + @Override + public void cleanup() { + + } + } + + public static void main(String[] args) throws Exception { + Config conf = new Config(); + WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory(); + + if (args.length == 0) { + List<? extends WindowConfig> list = Arrays.asList( + SlidingCountWindow.of(1000, 100) + ,TumblingCountWindow.of(1000) + ,SlidingDurationWindow.of(new BaseWindowedBolt.Duration(6, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS)) + ,TumblingDurationWindow.of(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS)) + ); + + for (WindowConfig windowConfig : list) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig)); + Utils.sleep(60 * 1000); + cluster.shutdown(); + } + System.exit(0); + } else { + conf.setNumWorkers(3); + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100))); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java new file mode 100644 index 0000000..47879a4 --- /dev/null +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.storm.trident.windowing.WindowsStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * This class stores entries into hbase instance of the given configuration. + * + */ +public class HBaseWindowsStore implements WindowsStore { + private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class); + public static final String UTF_8 = "utf-8"; + + private final ThreadLocal<HTable> threadLocalHtable; + private Queue<HTable> htables = new ConcurrentLinkedQueue<>(); + private final byte[] family; + private final byte[] qualifier; + + public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { + this.family = family; + this.qualifier = qualifier; + + threadLocalHtable = new ThreadLocal<HTable>() { + @Override + protected HTable initialValue() { + try { + HTable hTable = new HTable(config, tableName); + htables.add(hTable); + return hTable; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + + } + + private HTable htable() { + return threadLocalHtable.get(); + } + + private byte[] effectiveKey(String key) { + try { + return key.getBytes(UTF_8); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object get(String key) { + WindowsStore.Entry.nonNullCheckForKey(key); + + byte[] effectiveKey = effectiveKey(key); + Get get = new Get(effectiveKey); + Result result = null; + try { + result = htable().get(get); + } catch (IOException e) { + throw new RuntimeException(e); + } + + if(result.isEmpty()) { + return null; + } + + Kryo kryo = new Kryo(); + Input input = new Input(result.getValue(family, qualifier)); + Object resultObject = kryo.readClassAndObject(input); + return resultObject; + + } + + @Override + public Iterable<Object> get(List<String> keys) { + List<Get> gets = new ArrayList<>(); + for (String key : keys) { + WindowsStore.Entry.nonNullCheckForKey(key); + + byte[] effectiveKey = effectiveKey(key); + gets.add(new Get(effectiveKey)); + } + + Result[] results = null; + try { + results = htable().get(gets); + } catch (IOException e) { + throw new RuntimeException(e); + } + + Kryo kryo = new Kryo(); + List<Object> values = new ArrayList<>(); + for (int i=0; i<results.length; i++) { + Result result = results[i]; + if(result.isEmpty()) { + log.error("Got empty result for key [{}]", keys.get(i)); + throw new RuntimeException("Received empty result for key: "+keys.get(i)); + } + Input input = new Input(result.getValue(family, qualifier)); + Object resultObject = kryo.readClassAndObject(input); + values.add(resultObject); + } + + return values; + } + + @Override + public Iterable<String> getAllKeys() { + Scan scan = new Scan(); + // this filter makes sure to receive only Key or row but not values associated with those rows. + scan.setFilter(new FirstKeyOnlyFilter()); + //scan.setCaching(1000); + + final Iterator<Result> resultIterator; + try { + resultIterator = htable().getScanner(scan).iterator(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + final Iterator<String> iterator = new Iterator<String>() { + @Override + public boolean hasNext() { + return resultIterator.hasNext(); + } + + @Override + public String next() { + Result result = resultIterator.next(); + String key = null; + try { + key = new String(result.getRow(), UTF_8); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + return key; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove operation is not supported"); + } + }; + + return new Iterable<String>() { + @Override + public Iterator<String> iterator() { + return iterator; + } + }; + } + + @Override + public void put(String key, Object value) { + WindowsStore.Entry.nonNullCheckForKey(key); + WindowsStore.Entry.nonNullCheckForValue(value); + + if(value == null) { + throw new IllegalArgumentException("Invalid value of null with key: "+key); + } + Put put = new Put(effectiveKey(key)); + Kryo kryo = new Kryo(); + Output output = new Output(new ByteArrayOutputStream()); + kryo.writeClassAndObject(output, value); + put.add(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position())); + try { + htable().put(put); + } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) { + throw new RuntimeException(e); + } + } + + @Override + public void putAll(Collection<Entry> entries) { + List<Put> list = new ArrayList<>(); + for (Entry entry : entries) { + Put put = new Put(effectiveKey(entry.key)); + Output output = new Output(new ByteArrayOutputStream()); + Kryo kryo = new Kryo(); + kryo.writeClassAndObject(output, entry.value); + put.add(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), ByteBuffer.wrap(output.getBuffer(), 0, output.position())); + list.add(put); + } + + try { + htable().put(list); + } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove(String key) { + WindowsStore.Entry.nonNullCheckForKey(key); + + Delete delete = new Delete(effectiveKey(key), System.currentTimeMillis()); + try { + htable().delete(delete); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void removeAll(Collection<String> keys) { + List<Delete> deleteBatch = new ArrayList<>(); + for (String key : keys) { + WindowsStore.Entry.nonNullCheckForKey(key); + + Delete delete = new Delete(effectiveKey(key), System.currentTimeMillis()); + deleteBatch.add(delete); + } + try { + htable().delete(deleteBatch); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void shutdown() { + // close all the created hTable instances + for (HTable htable : htables) { + try { + htable.close(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java new file mode 100644 index 0000000..56fad58 --- /dev/null +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.storm.trident.windowing.WindowsStore; +import org.apache.storm.trident.windowing.WindowsStoreFactory; + +import java.util.Map; + +public class HBaseWindowsStoreFactory implements WindowsStoreFactory { + private final Map<String, Object> config; + private final String tableName; + private final byte[] family; + private final byte[] qualifier; + + public HBaseWindowsStoreFactory(Map<String, Object> config, String tableName, byte[] family, byte[] qualifier) { + this.config = config; + this.tableName = tableName; + this.family = family; + this.qualifier = qualifier; + } + + public WindowsStore create() { + Configuration configuration = HBaseConfiguration.create(); + for (Map.Entry<String, Object> entry : config.entrySet()) { + if (entry.getValue() != null) { + configuration.set(entry.getKey(), entry.getValue().toString()); + } + } + return new HBaseWindowsStore(configuration, tableName, family, qualifier); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 37565b8..507614b 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -1974,6 +1974,14 @@ public class Config extends HashMap<String, Object> { public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis"; /** + * Maximum number of tuples that can be stored inmemory cache in windowing operators for fast access without fetching + * them from store. + */ + @isInteger + @isPositiveNumber + public static final String TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT="topology.trident.windowing.cache.tuple.limit"; + + /** * Name of the topology. This config is automatically set by Storm when the topology is submitted. */ @isString http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/Stream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java index 4a51b56..b3a4446 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java +++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p/> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,6 +21,7 @@ import org.apache.storm.generated.Grouping; import org.apache.storm.generated.NullStruct; import org.apache.storm.grouping.CustomStreamGrouping; import org.apache.storm.topology.ResourceDeclarer; +import org.apache.storm.topology.base.BaseWindowedBolt; import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer; import org.apache.storm.trident.fluent.GlobalAggregationScheme; import org.apache.storm.trident.fluent.GroupedStream; @@ -68,10 +69,22 @@ import org.apache.storm.trident.state.StateSpec; import org.apache.storm.trident.state.StateUpdater; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.trident.util.TridentUtils; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.trident.windowing.WindowTridentProcessor; +import org.apache.storm.trident.windowing.WindowsStateFactory; +import org.apache.storm.trident.windowing.WindowsStateUpdater; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.SlidingCountWindow; +import org.apache.storm.trident.windowing.config.SlidingDurationWindow; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.trident.windowing.config.TumblingDurationWindow; +import org.apache.storm.trident.windowing.config.WindowConfig; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; +import java.util.ArrayList; import java.util.Comparator; +import java.util.List; /** * A Stream represents the core data model in Trident, and can be thought of as a "stream" of tuples that are processed @@ -92,10 +105,10 @@ import java.util.Comparator; */ // TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE) public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { - Node _node; - TridentTopology _topology; - String _name; - + final Node _node; + final String _name; + private final TridentTopology _topology; + protected Stream(TridentTopology topology, String name, Node node) { _topology = topology; _node = node; @@ -180,7 +193,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { */ public GroupedStream groupBy(Fields fields) { projectionValidation(fields); - return new GroupedStream(this, fields); + return new GroupedStream(this, fields); } /** @@ -287,7 +300,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { if(_node instanceof PartitionNode) { return each(new Fields(), new TrueFilter()).partition(grouping); } else { - return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping)); + return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping)); } } @@ -323,7 +336,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { functionFields, new AggregateProcessor(inputFields, agg))); } - + public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) { projectionValidation(inputFields); String stateId = state._node.stateInfo.id; @@ -335,11 +348,11 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { _topology._colocate.get(stateId).add(n); return _topology.addSourcedNode(this, n); } - + public TridentState partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater, Fields functionFields) { return partitionPersist(new StateSpec(stateFactory), inputFields, updater, functionFields); } - + public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) { projectionValidation(inputFields); String id = _topology.getUniqueStateId(); @@ -352,19 +365,19 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { n.stateInfo = new NodeStateInfo(id, stateSpec); return _topology.addSourcedStateNode(this, n); } - + public TridentState partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater) { return partitionPersist(stateFactory, inputFields, updater, new Fields()); } - + public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater) { - return partitionPersist(stateSpec, inputFields, updater, new Fields()); + return partitionPersist(stateSpec, inputFields, updater, new Fields()); } - + public Stream each(Function function, Fields functionFields) { return each(null, function, functionFields); } - + public Stream each(Fields inputFields, Filter filter) { return each(inputFields, new FilterExecutor(filter), new Fields()); } @@ -448,7 +461,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { public ChainedAggregatorDeclarer chainedAgg() { return new ChainedAggregatorDeclarer(this, new BatchGlobalAggScheme()); } - + public Stream partitionAggregate(Aggregator agg, Fields functionFields) { return partitionAggregate(null, agg, functionFields); } @@ -462,8 +475,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { return chainedAgg() .partitionAggregate(inputFields, agg, functionFields) .chainEnd(); - } - + } + public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) { return partitionAggregate(null, agg, functionFields); } @@ -565,7 +578,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { public Stream aggregate(Aggregator agg, Fields functionFields) { return aggregate(null, agg, functionFields); } - + public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) { projectionValidation(inputFields); return chainedAgg() @@ -594,19 +607,169 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + + /** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents window tuples count + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ + public Stream tumblingCountWindow(int windowCount, Fields inputFields, Aggregator aggregator, Fields functionFields) { + return window(TumblingCountWindow.of(windowCount), inputFields, aggregator, functionFields); + } + + /** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ + public Stream tumblingCountWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { + return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); + } + + /** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount represents sliding count window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ + public Stream slidingCountWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { + return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields); + } + + /** + * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration} + * + * @param windowDuration represents tumbling window duration configuration + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ + public Stream tumblingTimeWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { + return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields); + } + + /** + * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration} + * and completes a window at {@code windowDuration} + * + * @param windowDuration represents window duration configuration + * @param slideDuration represents sliding duration configuration + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ + public Stream slidingTimeWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slideDuration, + WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) { + return window(SlidingDurationWindow.of(windowDuration, slideDuration), windowStoreFactory, inputFields, aggregator, functionFields); + } + + /** + * Returns a stream of aggregated results based on the given window configuration which uses inmemory windowing tuple store. + * + * @param windowConfig window configuration like window length and slide length. + * @param inputFields input fields + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * @return + */ + public Stream window(WindowConfig windowConfig, Fields inputFields, Aggregator aggregator, Fields functionFields) { + // this store is used only for storing triggered aggregated results but not tuples as storeTuplesInStore is set + // as false int he below call. + InMemoryWindowsStoreFactory inMemoryWindowsStoreFactory = new InMemoryWindowsStoreFactory(); + return window(windowConfig, inMemoryWindowsStoreFactory, inputFields, aggregator, functionFields, false); + } + + /** + * Returns stream of aggregated results based on the given window configuration. + * + * @param windowConfig window configuration like window length and slide length. + * @param windowStoreFactory intermediary tuple store for storing tuples for windowing + * @param inputFields input fields + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * @return + */ + public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, + Aggregator aggregator, Fields functionFields) { + return window(windowConfig, windowStoreFactory, inputFields, aggregator, functionFields, true); + } + + private Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, + Fields functionFields, boolean storeTuplesInStore) { + projectionValidation(inputFields); + windowConfig.validate(); + + Fields fields = addTriggerField(functionFields); + + // when storeTuplesInStore is false then the given windowStoreFactory is only used to store triggers and + // that store is passed to WindowStateUpdater to remove them after committing the batch. + Stream stream = _topology.addSourcedNode(this, + new ProcessorNode(_topology.getUniqueStreamId(), + _name, + fields, + fields, + new WindowTridentProcessor(windowConfig, _topology.getUniqueWindowId(), windowStoreFactory, + inputFields, aggregator, storeTuplesInStore))); + + Stream effectiveStream = stream.project(functionFields); + + // create StateUpdater with the given windowStoreFactory to remove triggered aggregation results form store + // when they are successfully processed. + StateFactory stateFactory = new WindowsStateFactory(); + StateUpdater stateUpdater = new WindowsStateUpdater(windowStoreFactory); + stream.partitionPersist(stateFactory, new Fields(WindowTridentProcessor.TRIGGER_FIELD_NAME), stateUpdater, new Fields()); + + return effectiveStream; + } + + private Fields addTriggerField(Fields functionFields) { + List<String> fieldsList = new ArrayList<>(); + fieldsList.add(WindowTridentProcessor.TRIGGER_FIELD_NAME); + for (String field : functionFields) { + fieldsList.add(field); + } + return new Fields(fieldsList); + } + public TridentState partitionPersist(StateFactory stateFactory, StateUpdater updater, Fields functionFields) { return partitionPersist(new StateSpec(stateFactory), updater, functionFields); } - + public TridentState partitionPersist(StateSpec stateSpec, StateUpdater updater, Fields functionFields) { return partitionPersist(stateSpec, null, updater, functionFields); } - + public TridentState partitionPersist(StateFactory stateFactory, StateUpdater updater) { return partitionPersist(stateFactory, updater, new Fields()); } - + public TridentState partitionPersist(StateSpec stateSpec, StateUpdater updater) { return partitionPersist(stateSpec, updater, new Fields()); } @@ -622,7 +785,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) { return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields); } - + public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) { projectionValidation(inputFields); // replaces normal aggregation here with a global grouping because it needs to be consistent across batches @@ -648,7 +811,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { projectionValidation(inputFields); return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields); } - + public Stream stateQuery(TridentState state, QueryFunction function, Fields functionFields) { return stateQuery(state, null, function, functionFields); } @@ -662,7 +825,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { public Fields getOutputFields() { return _node.allOutputFields; } - + static class BatchGlobalAggScheme implements GlobalAggregationScheme<Stream> { @Override @@ -674,9 +837,9 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { public BatchToPartition singleEmitPartitioner() { return new IndexHashBatchToPartition(); } - + } - + static class GlobalAggScheme implements GlobalAggregationScheme<Stream> { @Override @@ -688,7 +851,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> { public BatchToPartition singleEmitPartitioner() { return new GlobalBatchToPartition(); } - + } private void projectionValidation(Fields projFields) { http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java index e0a349b..06e1576 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java +++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java @@ -829,6 +829,10 @@ public class TridentTopology { protected String getUniqueStateId() { return _gen.getUniqueStateId(); } + + protected String getUniqueWindowId() { + return _gen.getUniqueWindowId(); + } protected void registerNode(Node n) { _graph.addVertex(n); http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java b/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java index 1364faf..e063142 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java +++ b/storm-core/src/jvm/org/apache/storm/trident/fluent/UniqueIdGen.java @@ -18,17 +18,21 @@ package org.apache.storm.trident.fluent; public class UniqueIdGen { - int _streamCounter = 0; - + private int _streamCounter = 0; + private int _stateCounter = 0; + private int windowCounter = 0; + public String getUniqueStreamId() { _streamCounter++; return "s" + _streamCounter; } - int _stateCounter = 0; - public String getUniqueStateId() { _stateCounter++; return "state" + _stateCounter; - } + } + + public String getUniqueWindowId() { + return "w"+ (++windowCounter); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java index 07c5ae4..87c4167 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java @@ -20,6 +20,8 @@ package org.apache.storm.trident.operation.builtin; import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.tuple.TridentTuple; +import java.util.Date; + /** * Filter for debugging purposes. The `isKeep()` method simply prints the tuple to `System.out` and returns `true`. */ @@ -40,7 +42,7 @@ public class Debug extends BaseFilter { @Override public boolean isKeep(TridentTuple tuple) { - System.out.println(name + tuple.toString()); + System.out.println("<"+new Date()+"> "+name + tuple.toString()); return true; } } http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java new file mode 100644 index 0000000..2941e28 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import com.google.common.collect.Lists; +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.trident.operation.Aggregator; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.trident.windowing.strategy.WindowStrategy; +import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory; +import org.apache.storm.windowing.EvictionPolicy; +import org.apache.storm.windowing.TriggerPolicy; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.apache.storm.windowing.WindowManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Basic functionality to manage trident tuple events using {@code WindowManager} and {@code WindowsStore} for storing + * tuples and triggers related information. + * + */ +public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager { + private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class); + + protected final WindowManager<T> windowManager; + protected final Aggregator aggregator; + protected final BatchOutputCollector delegateCollector; + protected final String windowTaskId; + protected final WindowsStore windowStore; + + protected final Set<String> activeBatches = new HashSet<>(); + protected final Queue<TriggerResult> pendingTriggers = new ConcurrentLinkedQueue<>(); + protected final AtomicInteger triggerId = new AtomicInteger(); + private final String windowTriggerCountId; + private final TriggerPolicy<T> triggerPolicy; + + public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, + Aggregator aggregator, BatchOutputCollector delegateCollector) { + this.windowTaskId = windowTaskId; + this.windowStore = windowStore; + this.aggregator = aggregator; + this.delegateCollector = delegateCollector; + + windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId; + + windowManager = new WindowManager<>(new TridentWindowLifeCycleListener()); + + WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig); + EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy(); + windowManager.setEvictionPolicy(evictionPolicy); + triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy); + windowManager.setTriggerPolicy(triggerPolicy); + } + + @Override + public void prepare() { + preInitialize(); + + initialize(); + + postInitialize(); + } + + private void preInitialize() { + log.debug("Getting current trigger count for this component/task"); + // get trigger count value from store + Object result = windowStore.get(windowTriggerCountId); + Integer currentCount = 0; + if(result == null) { + log.info("No current trigger count in windows store."); + } else { + currentCount = (Integer) result + 1; + } + windowStore.put(windowTriggerCountId, currentCount); + triggerId.set(currentCount); + } + + private void postInitialize() { + // start trigger once the initialization is done. + triggerPolicy.start(); + } + + /** + * Load and initialize any resources into window manager before windowing for component/task is activated. + */ + protected abstract void initialize(); + + /** + * Listener to reeive any activation/expiry of windowing events and take further action on them. + */ + class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> { + + @Override + public void onExpiry(List<T> expiredEvents) { + log.debug("onExpiry is invoked"); + onTuplesExpired(expiredEvents); + } + + @Override + public void onActivation(List<T> events, List<T> newEvents, List<T> expired) { + log.debug("onActivation is invoked with events size: {}", events.size()); + // trigger occurred, create an aggregation and keep them in store + int currentTriggerId = triggerId.incrementAndGet(); + execAggregatorAndStoreResult(currentTriggerId, events); + } + } + + /** + * Handle expired tuple events which can be removing from cache or store. + * + * @param expiredEvents + */ + protected abstract void onTuplesExpired(List<T> expiredEvents); + + private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) { + List<TridentTuple> resultTuples = getTridentTuples(tupleEvents); + + // run aggregator to compute the result + AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector); + Object state = aggregator.init(currentTriggerId, collector); + for (TridentTuple resultTuple : resultTuples) { + aggregator.aggregate(state, resultTuple, collector); + } + aggregator.complete(state, collector); + + List<List<Object>> resultantAggregatedValue = collector.values; + + ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1), + new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue)); + windowStore.putAll(entries); + + pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue)); + } + + /** + * Return {@code TridentTuple}s from given {@code tupleEvents}. + * @param tupleEvents + * @return + */ + protected abstract List<TridentTuple> getTridentTuples(List<T> tupleEvents); + + /** + * This {@code TridentCollector} accumulates all the values emitted. + */ + static class AccumulatedTuplesCollector implements TridentCollector { + + final List<List<Object>> values = new ArrayList<>(); + private final BatchOutputCollector delegateCollector; + + public AccumulatedTuplesCollector(BatchOutputCollector delegateCollector) { + this.delegateCollector = delegateCollector; + } + + @Override + public void emit(List<Object> values) { + this.values.add(values); + } + + @Override + public void reportError(Throwable t) { + delegateCollector.reportError(t); + } + + } + + static class TriggerResult { + final int id; + final List<List<Object>> result; + + public TriggerResult(int id, List<List<Object>> result) { + this.id = id; + this.result = result; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TriggerResult)) return false; + + TriggerResult that = (TriggerResult) o; + + return id == that.id; + + } + + @Override + public int hashCode() { + return id; + } + + @Override + public String toString() { + return "TriggerResult{" + + "id=" + id + + ", result=" + result + + '}'; + } + } + + public Queue<TriggerResult> getPendingTriggers() { + return pendingTriggers; + } + + public void shutdown() { + try { + log.info("window manager [{}] is being shutdown", windowManager); + windowManager.shutdown(); + } finally { + log.info("window store [{}] is being shutdown", windowStore); + windowStore.shutdown(); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java new file mode 100644 index 0000000..d4aef79 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/ITridentWindowManager.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import org.apache.storm.trident.tuple.TridentTuple; + +import java.util.List; +import java.util.Queue; + +/** + * Window manager to handle trident tuple events. + */ +public interface ITridentWindowManager { + + /** + * This is invoked from {@code org.apache.storm.trident.planner.TridentProcessor}'s prepare method. So any + * initialization tasks can be done before the topology starts accepting tuples. For ex: + * initialize window manager with any earlier stored tuples/triggers and start WindowManager + */ + public void prepare(); + + /** + * This is invoked when from {@code org.apache.storm.trident.planner.TridentProcessor}'s cleanup method. So, any + * cleanup operations like clearing cache or close store connection etc can be done. + */ + public void shutdown(); + + /** + * Add received batch of tuples to cache/store and add them to {@code WindowManager} + * + * @param batchId + * @param tuples + */ + public void addTuplesBatch(Object batchId, List<TridentTuple> tuples); + + /** + * Returns pending triggers to be emitted. + * + * @return + */ + public Queue<StoreBasedTridentWindowManager.TriggerResult> getPendingTriggers(); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java new file mode 100644 index 0000000..cbb30af --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.trident.windowing; + +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.trident.operation.Aggregator; +import org.apache.storm.trident.spout.IBatchID; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * This {@code ITridentWindowManager} instance stores all the tuples and trigger related information inmemory. + */ +public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<TridentTuple> { + private static final Logger log = LoggerFactory.getLogger(InMemoryTridentWindowManager.class); + + public InMemoryTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator, + BatchOutputCollector delegateCollector) { + super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector); + } + + @Override + protected void initialize() { + log.debug("noop in initialize"); + } + + @Override + public List<TridentTuple> getTridentTuples(List<TridentTuple> tridentBatchTuples) { + return tridentBatchTuples; + } + + @Override + public void onTuplesExpired(List<TridentTuple> expiredTuples) { + log.debug("InMemoryTridentWindowManager.onTuplesExpired"); + } + + public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) { + // check if they are already added then ignore these tuples. This batch is replayed. + if (activeBatches.contains(getBatchTxnId(batchId))) { + log.info("Ignoring already added tuples with batch: %s", batchId); + return; + } + + log.debug("Adding tuples to window-manager for batch: ", batchId); + for (TridentTuple tridentTuple : tuples) { + windowManager.add(tridentTuple); + } + } + + public String getBatchTxnId(Object batchId) { + if (!(batchId instanceof IBatchID)) { + throw new IllegalArgumentException("argument should be an IBatchId instance"); + } + return ((IBatchID) batchId).getId().toString(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java new file mode 100644 index 0000000..02d78e7 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStore.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Inmemory store implementation of {@code WindowsStore} which can be backed by persistent store. + * + */ +public class InMemoryWindowsStore implements WindowsStore, Serializable { + + private final ConcurrentHashMap<String, Object> store = new ConcurrentHashMap<>(); + + private int maxSize; + private AtomicInteger currentSize; + private WindowsStore backingStore; + + public InMemoryWindowsStore() { + } + + /** + * + * @param maxSize maximum size of inmemory store + * @param backingStore backing store containing the entries + */ + public InMemoryWindowsStore(int maxSize, WindowsStore backingStore) { + this.maxSize = maxSize; + currentSize = new AtomicInteger(); + this.backingStore = backingStore; + } + + @Override + public Object get(String key) { + Object value = store.get(key); + + if(value == null && backingStore != null) { + value = backingStore.get(key); + } + + return value; + } + + @Override + public Iterable<Object> get(List<String> keys) { + List<Object> values = new ArrayList<>(); + for (String key : keys) { + values.add(get(key)); + } + return values; + } + + @Override + public Iterable<String> getAllKeys() { + if(backingStore != null) { + return backingStore.getAllKeys(); + } + + final Enumeration<String> storeEnumeration = store.keys(); + final Iterator<String> resultIterator = new Iterator<String>() { + @Override + public boolean hasNext() { + return storeEnumeration.hasMoreElements(); + } + + @Override + public String next() { + return storeEnumeration.nextElement(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove operation is not supported as it is immutable."); + } + }; + + return new Iterable<String>() { + @Override + public Iterator<String> iterator() { + return resultIterator; + } + }; + } + + @Override + public void put(String key, Object value) { + _put(key, value); + + if(backingStore != null) { + backingStore.put(key, value); + } + } + + private void _put(String key, Object value) { + if(!canAdd()) { + return; + } + + store.put(key, value); + incrementCurrentSize(); + } + + private void incrementCurrentSize() { + if(backingStore != null) { + currentSize.incrementAndGet(); + } + } + + private boolean canAdd() { + return backingStore == null || currentSize.get() < maxSize; + } + + @Override + public void putAll(Collection<Entry> entries) { + for (Entry entry : entries) { + _put(entry.key, entry.value); + } + if(backingStore != null) { + backingStore.putAll(entries); + } + } + + @Override + public void remove(String key) { + _remove(key); + + if(backingStore != null) { + backingStore.remove(key); + } + } + + private void _remove(String key) { + Object oldValue = store.remove(key); + + if(oldValue != null) { + decrementSize(); + if(backingStore != null) { + backingStore.remove(key); + } + } + + } + + private void decrementSize() { + if(backingStore != null) { + currentSize.decrementAndGet(); + } + } + + @Override + public void removeAll(Collection<String> keys) { + for (String key : keys) { + _remove(key); + } + + if(backingStore != null) { + backingStore.removeAll(keys); + } + } + + @Override + public void shutdown() { + store.clear(); + + if(backingStore != null) { + backingStore.shutdown(); + } + } + + @Override + public String toString() { + return "InMemoryWindowsStore{" + + " store:size = " + store.size() + + " backingStore = " + backingStore + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java new file mode 100644 index 0000000..32027a9 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +/** + * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for + * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}. + * + */ +public class InMemoryWindowsStoreFactory implements WindowsStoreFactory { + + private InMemoryWindowsStore inMemoryWindowsStore; + + @Override + public WindowsStore create() { + if(inMemoryWindowsStore == null) { + inMemoryWindowsStore = new InMemoryWindowsStore(); + } + return inMemoryWindowsStore; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/89c03b83/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java new file mode 100644 index 0000000..885e508 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.trident.operation.Aggregator; +import org.apache.storm.trident.spout.IBatchID; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.tuple.TridentTupleView; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This window manager uses {@code WindowsStore} for storing tuples and other trigger related information. It maintains + * tuples cache of {@code maxCachedTuplesSize} without accessing store for getting them. + * + */ +public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> { + private static final Logger log = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class); + + private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR; + + private final String windowTupleTaskId; + private final TridentTupleView.FreshOutputFactory freshOutputFactory; + + private Long maxCachedTuplesSize; + private final Fields inputFields; + private AtomicLong currentCachedTuplesSize = new AtomicLong(); + + public StoreBasedTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator, + BatchOutputCollector delegateCollector, Long maxTuplesCacheSize, Fields inputFields) { + super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector); + + this.maxCachedTuplesSize = maxTuplesCacheSize; + this.inputFields = inputFields; + freshOutputFactory = new TridentTupleView.FreshOutputFactory(inputFields); + windowTupleTaskId = TUPLE_PREFIX + windowTaskId; + } + + protected void initialize() { + + // get existing tuples and pending/unsuccessful triggers for this operator-component/task and add them to WindowManager + String windowTriggerInprocessId = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(windowTaskId); + String windowTriggerTaskId = WindowTridentProcessor.getWindowTriggerTaskPrefix(windowTaskId); + + List<String> attemptedTriggerKeys = new ArrayList<>(); + List<String> triggerKeys = new ArrayList<>(); + + Iterable<String> allEntriesIterable = windowStore.getAllKeys(); + for (String key : allEntriesIterable) { + if (key.startsWith(windowTupleTaskId)) { + int tupleIndexValue = lastPart(key); + String batchId = secondLastPart(key); + log.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue); + windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue)); + } else if (key.startsWith(windowTriggerTaskId)) { + triggerKeys.add(key); + log.debug("Received trigger with key [{}]", key); + } else if(key.startsWith(windowTriggerInprocessId)) { + attemptedTriggerKeys.add(key); + log.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key); + } + } + + // these triggers will be retried as part of batch retries + Set<Integer> triggersToBeIgnored = new HashSet<>(); + Iterable<Object> attemptedTriggers = windowStore.get(attemptedTriggerKeys); + for (Object attemptedTrigger : attemptedTriggers) { + triggersToBeIgnored.addAll((List<Integer>) attemptedTrigger); + } + + // get trigger values only if they have more than zero + Iterable<Object> triggerObjects = windowStore.get(triggerKeys); + int i=0; + for (Object triggerObject : triggerObjects) { + int id = lastPart(triggerKeys.get(i++)); + if(!triggersToBeIgnored.contains(id)) { + log.info("Adding pending trigger value [{}]", triggerObject); + pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject)); + } + } + + } + + private int lastPart(String key) { + int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR); + if (lastSepIndex < 0) { + throw new IllegalArgumentException("primaryKey does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'"); + } + return Integer.parseInt(key.substring(lastSepIndex+1)); + } + + private String secondLastPart(String key) { + int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR); + if (lastSepIndex < 0) { + throw new IllegalArgumentException("key "+key+" does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'"); + } + String trimKey = key.substring(0, lastSepIndex); + int secondLastSepIndex = trimKey.lastIndexOf(WindowsStore.KEY_SEPARATOR); + if (lastSepIndex < 0) { + throw new IllegalArgumentException("key "+key+" does not have second key separator '" + WindowsStore.KEY_SEPARATOR + "'"); + } + + return key.substring(secondLastSepIndex+1, lastSepIndex); + } + + public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) { + // check if they are already added then ignore these tuples. This batch is replayed. + if (activeBatches.contains(getBatchTxnId(batchId))) { + log.info("Ignoring already added tuples with batch: %s", batchId); + return; + } + + log.debug("Adding tuples to window-manager for batch: ", batchId); + List<WindowsStore.Entry> entries = new ArrayList<>(); + for (int i = 0; i < tuples.size(); i++) { + String key = keyOf(batchId); + TridentTuple tridentTuple = tuples.get(i); + entries.add(new WindowsStore.Entry(key+i, tridentTuple.select(inputFields))); + } + + // tuples should be available in store before they are added to window manager + windowStore.putAll(entries); + + for (int i = 0; i < tuples.size(); i++) { + String key = keyOf(batchId); + TridentTuple tridentTuple = tuples.get(i); + addToWindowManager(i, key, tridentTuple); + } + + } + + private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) { + TridentTuple actualTuple = null; + if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) { + actualTuple = tridentTuple; + } + currentCachedTuplesSize.incrementAndGet(); + windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple)); + } + + public String getBatchTxnId(Object batchId) { + if (!(batchId instanceof IBatchID)) { + throw new IllegalArgumentException("argument should be an IBatchId instance"); + } + return ((IBatchID) batchId).getId().toString(); + } + + public String keyOf(Object batchId) { + return windowTupleTaskId + getBatchTxnId(batchId) + WindowsStore.KEY_SEPARATOR; + } + + public List<TridentTuple> getTridentTuples(List<TridentBatchTuple> tridentBatchTuples) { + List<TridentTuple> resultTuples = new ArrayList<>(); + List<String> keys = new ArrayList<>(); + for (TridentBatchTuple tridentBatchTuple : tridentBatchTuples) { + TridentTuple tuple = collectTridentTupleOrKey(tridentBatchTuple, keys); + if(tuple != null) { + resultTuples.add(tuple); + } + } + + if(keys.size() > 0) { + Iterable<Object> storedTupleValues = windowStore.get(keys); + for (Object storedTupleValue : storedTupleValues) { + TridentTuple tridentTuple = freshOutputFactory.create((List<Object>) storedTupleValue); + resultTuples.add(tridentTuple); + } + } + + return resultTuples; + } + + public TridentTuple collectTridentTupleOrKey(TridentBatchTuple tridentBatchTuple, List<String> keys) { + if (tridentBatchTuple.tridentTuple != null) { + return tridentBatchTuple.tridentTuple; + } + keys.add(tupleKey(tridentBatchTuple)); + return null; + } + + public void onTuplesExpired(List<TridentBatchTuple> expiredTuples) { + if (maxCachedTuplesSize != null) { + currentCachedTuplesSize.addAndGet(-expiredTuples.size()); + } + + List<String> keys = new ArrayList<>(); + for (TridentBatchTuple expiredTuple : expiredTuples) { + keys.add(tupleKey(expiredTuple)); + } + + windowStore.removeAll(keys); + } + + private String tupleKey(TridentBatchTuple tridentBatchTuple) { + return tridentBatchTuple.effectiveBatchId + tridentBatchTuple.tupleIndex; + } + +}
