STORM-676 addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/532bb79b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/532bb79b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/532bb79b Branch: refs/heads/1.x-branch Commit: 532bb79b30fdfee04c24d7ed04f63b65c4c44862 Parents: 89c03b8 Author: Satish Duggana <[email protected]> Authored: Sun Mar 13 10:25:21 2016 +0530 Committer: Satish Duggana <[email protected]> Committed: Sun Mar 27 10:46:16 2016 +0530 ---------------------------------------------------------------------- .../trident/TridentHBaseWindowingStoreTopology.java | 6 ++---- .../TridentWindowingInmemoryStoreTopology.java | 7 +++---- .../hbase/trident/windowing/HBaseWindowsStore.java | 6 +++--- .../windowing/AbstractTridentWindowManager.java | 14 +++++++------- .../windowing/InMemoryTridentWindowManager.java | 10 +++++----- .../windowing/InMemoryWindowsStoreFactory.java | 13 +++++++++++-- .../windowing/StoreBasedTridentWindowManager.java | 14 +++++++------- .../trident/windowing/WindowTridentProcessor.java | 15 ++++++++++----- .../apache/storm/trident/windowing/WindowsState.java | 6 +++--- .../storm/trident/windowing/WindowsStateUpdater.java | 8 ++++---- 10 files changed, 55 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java index 24cc8d9..0ebaa1f 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java @@ -23,7 +23,6 @@ import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory; -import org.apache.storm.topology.base.BaseWindowedBolt; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.BaseFunction; @@ -44,13 +43,12 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; /** * */ public class TridentHBaseWindowingStoreTopology { - private static final Logger log = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class); + private static final Logger LOG = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class); public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), @@ -84,7 +82,7 @@ public class TridentHBaseWindowingStoreTopology { @Override public void execute(TridentTuple tuple, TridentCollector collector) { - log.info("##########Echo.execute: " + tuple); + LOG.info("##########Echo.execute: " + tuple); collector.emit(tuple.getValues()); } http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java index 5f0cb4f..a2455a0 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java @@ -25,7 +25,6 @@ import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.base.BaseWindowedBolt; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentTopology; -import org.apache.storm.trident.operation.BaseAggregator; import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.Function; import org.apache.storm.trident.operation.TridentCollector; @@ -53,10 +52,10 @@ import java.util.Map; import java.util.concurrent.TimeUnit; /** - * + * Sample application of trident windowing which uses inmemory store for storing tuples in window. */ public class TridentWindowingInmemoryStoreTopology { - private static final Logger log = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class); + private static final Logger LOG = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class); public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), @@ -91,7 +90,7 @@ public class TridentWindowingInmemoryStoreTopology { @Override public void execute(TridentTuple tuple, TridentCollector collector) { - log.info("##########Echo.execute: " + tuple); + LOG.info("##########Echo.execute: " + tuple); collector.emit(tuple.getValues()); } http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java index 47879a4..ff3fbf9 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java @@ -51,7 +51,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; * */ public class HBaseWindowsStore implements WindowsStore { - private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class); + private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class); public static final String UTF_8 = "utf-8"; private final ThreadLocal<HTable> threadLocalHtable; @@ -136,7 +136,7 @@ public class HBaseWindowsStore implements WindowsStore { for (int i=0; i<results.length; i++) { Result result = results[i]; if(result.isEmpty()) { - log.error("Got empty result for key [{}]", keys.get(i)); + LOG.error("Got empty result for key [{}]", keys.get(i)); throw new RuntimeException("Received empty result for key: "+keys.get(i)); } Input input = new Input(result.getValue(family, qualifier)); @@ -267,7 +267,7 @@ public class HBaseWindowsStore implements WindowsStore { try { htable.close(); } catch (IOException e) { - log.error(e.getMessage(), e); + LOG.error(e.getMessage(), e); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java index 2941e28..fd7a957 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java @@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger; * */ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager { - private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractTridentWindowManager.class); protected final WindowManager<T> windowManager; protected final Aggregator aggregator; @@ -89,12 +89,12 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM } private void preInitialize() { - log.debug("Getting current trigger count for this component/task"); + LOG.debug("Getting current trigger count for this component/task"); // get trigger count value from store Object result = windowStore.get(windowTriggerCountId); Integer currentCount = 0; if(result == null) { - log.info("No current trigger count in windows store."); + LOG.info("No current trigger count in windows store."); } else { currentCount = (Integer) result + 1; } @@ -119,13 +119,13 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM @Override public void onExpiry(List<T> expiredEvents) { - log.debug("onExpiry is invoked"); + LOG.debug("onExpiry is invoked"); onTuplesExpired(expiredEvents); } @Override public void onActivation(List<T> events, List<T> newEvents, List<T> expired) { - log.debug("onActivation is invoked with events size: {}", events.size()); + LOG.debug("onActivation is invoked with events size: [{}]", events.size()); // trigger occurred, create an aggregation and keep them in store int currentTriggerId = triggerId.incrementAndGet(); execAggregatorAndStoreResult(currentTriggerId, events); @@ -230,10 +230,10 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM public void shutdown() { try { - log.info("window manager [{}] is being shutdown", windowManager); + LOG.info("window manager [{}] is being shutdown", windowManager); windowManager.shutdown(); } finally { - log.info("window store [{}] is being shutdown", windowStore); + LOG.info("window store [{}] is being shutdown", windowStore); windowStore.shutdown(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java index cbb30af..e47cc9a 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java @@ -33,7 +33,7 @@ import java.util.List; * This {@code ITridentWindowManager} instance stores all the tuples and trigger related information inmemory. */ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<TridentTuple> { - private static final Logger log = LoggerFactory.getLogger(InMemoryTridentWindowManager.class); + private static final Logger LOG = LoggerFactory.getLogger(InMemoryTridentWindowManager.class); public InMemoryTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator, BatchOutputCollector delegateCollector) { @@ -42,7 +42,7 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T @Override protected void initialize() { - log.debug("noop in initialize"); + LOG.debug("noop in initialize"); } @Override @@ -52,17 +52,17 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T @Override public void onTuplesExpired(List<TridentTuple> expiredTuples) { - log.debug("InMemoryTridentWindowManager.onTuplesExpired"); + LOG.debug("InMemoryTridentWindowManager.onTuplesExpired"); } public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) { // check if they are already added then ignore these tuples. This batch is replayed. if (activeBatches.contains(getBatchTxnId(batchId))) { - log.info("Ignoring already added tuples with batch: %s", batchId); + LOG.info("Ignoring already added tuples with batch: [{}]", batchId); return; } - log.debug("Adding tuples to window-manager for batch: ", batchId); + LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId); for (TridentTuple tridentTuple : tuples) { windowManager.add(tridentTuple); } http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java index 32027a9..cf65594 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java @@ -18,9 +18,18 @@ */ package org.apache.storm.trident.windowing; +import org.apache.storm.trident.operation.Aggregator; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; + +import java.util.List; + /** - * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for - * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}. + * InMemoryWindowsStoreFactory contains a single instance of {@link InMemoryWindowsStore} which will be used for + * storing tuples and triggers of the window. The same InMemoryWindowsStoreFactory instance is passed to {@link WindowsStateUpdater}, + * which removes successfully emitted triggers from the same {@code inMemoryWindowsStore} instance in + * {@link WindowsStateUpdater#updateState(WindowsState, List, TridentCollector)}. * */ public class InMemoryWindowsStoreFactory implements WindowsStoreFactory { http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java index 885e508..58b24a2 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java @@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicLong; * */ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> { - private static final Logger log = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class); + private static final Logger LOG = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class); private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR; @@ -75,14 +75,14 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager if (key.startsWith(windowTupleTaskId)) { int tupleIndexValue = lastPart(key); String batchId = secondLastPart(key); - log.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue); + LOG.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue); windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue)); } else if (key.startsWith(windowTriggerTaskId)) { triggerKeys.add(key); - log.debug("Received trigger with key [{}]", key); + LOG.debug("Received trigger with key [{}]", key); } else if(key.startsWith(windowTriggerInprocessId)) { attemptedTriggerKeys.add(key); - log.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key); + LOG.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key); } } @@ -99,7 +99,7 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager for (Object triggerObject : triggerObjects) { int id = lastPart(triggerKeys.get(i++)); if(!triggersToBeIgnored.contains(id)) { - log.info("Adding pending trigger value [{}]", triggerObject); + LOG.info("Adding pending trigger value [{}]", triggerObject); pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject)); } } @@ -131,11 +131,11 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) { // check if they are already added then ignore these tuples. This batch is replayed. if (activeBatches.contains(getBatchTxnId(batchId))) { - log.info("Ignoring already added tuples with batch: %s", batchId); + LOG.info("Ignoring already added tuples with batch: [{}]", batchId); return; } - log.debug("Adding tuples to window-manager for batch: ", batchId); + LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId); List<WindowsStore.Entry> entries = new ArrayList<>(); for (int i = 0; i < tuples.size(); i++) { String key = keyOf(batchId); http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java index c2d9362..8898b13 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java @@ -46,7 +46,7 @@ import java.util.Queue; * */ public class WindowTridentProcessor implements TridentProcessor { - private static final Logger log = LoggerFactory.getLogger(WindowTridentProcessor.class); + private static final Logger LOG = LoggerFactory.getLogger(WindowTridentProcessor.class); public static final String TRIGGER_INPROCESS_PREFIX = "tip" + WindowsStore.KEY_SEPARATOR; public static final String TRIGGER_PREFIX = "tr" + WindowsStore.KEY_SEPARATOR; @@ -127,8 +127,13 @@ public class WindowTridentProcessor implements TridentProcessor { @Override public void cleanup() { - log.info("shutting down window manager"); - tridentWindowManager.shutdown(); + LOG.info("shutting down window manager"); + try { + tridentWindowManager.shutdown(); + } catch (Exception ex) { + LOG.error("Error occurred while cleaning up window processor", ex); + throw ex; + } } @Override @@ -150,7 +155,7 @@ public class WindowTridentProcessor implements TridentProcessor { Object batchId = processorContext.batchId; Object batchTxnId = getBatchTxnId(batchId); - log.debug("Received finishBatch of : {} ", batchId); + LOG.debug("Received finishBatch of : [{}] ", batchId); // get all the tuples in a batch and add it to trident-window-manager List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()]; tridentWindowManager.addTuplesBatch(batchId, tuples); @@ -171,7 +176,7 @@ public class WindowTridentProcessor implements TridentProcessor { if(triggerValues == null) { pendingTriggerIds = new ArrayList<>(); Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers(); - log.debug("pending triggers at batch: {} and triggers.size: {} ", batchId, pendingTriggers.size()); + LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size()); try { Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator(); List<Object> values = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java index 378d24f..faf73d6 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory; * */ public class WindowsState implements State { - private static final Logger log = LoggerFactory.getLogger(WindowsState.class); + private static final Logger LOG = LoggerFactory.getLogger(WindowsState.class); private Long currentTxId; @@ -38,12 +38,12 @@ public class WindowsState implements State { @Override public void beginCommit(Long txId) { currentTxId = txId; - log.debug(" WindowsState.beginCommit:: [{}] ", txId); + LOG.debug(" WindowsState.beginCommit:: [{}] ", txId); } @Override public void commit(Long txId) { - log.debug("WindowsState.commit :: [{}]", txId); + LOG.debug("WindowsState.commit :: [{}]", txId); } public Long getCurrentTxId() { http://git-wip-us.apache.org/repos/asf/storm/blob/532bb79b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java index 45ac885..6664b41 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java +++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java @@ -36,7 +36,7 @@ import java.util.Map; */ public class WindowsStateUpdater implements StateUpdater<WindowsState> { - private static final Logger log = LoggerFactory.getLogger(WindowsStateUpdater.class); + private static final Logger LOG = LoggerFactory.getLogger(WindowsStateUpdater.class); private final WindowsStoreFactory windowStoreFactory; private WindowsStore windowsStore; @@ -48,7 +48,7 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> { @Override public void updateState(WindowsState state, List<TridentTuple> tuples, TridentCollector collector) { Long currentTxId = state.getCurrentTxId(); - log.debug("Removing triggers using WindowStateUpdater, txnId: {} ", currentTxId); + LOG.debug("Removing triggers using WindowStateUpdater, txnId: [{}] ", currentTxId); for (TridentTuple tuple : tuples) { try { Object fieldValue = tuple.getValueByField(WindowTridentProcessor.TRIGGER_FIELD_NAME); @@ -58,11 +58,11 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> { WindowTridentProcessor.TriggerInfo triggerInfo = (WindowTridentProcessor.TriggerInfo) fieldValue; String triggerCompletedKey = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(triggerInfo.windowTaskId)+currentTxId; - log.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore); + LOG.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore); windowsStore.removeAll(Lists.newArrayList(triggerInfo.generateTriggerKey(), triggerCompletedKey)); } catch (Exception ex) { - log.warn(ex.getMessage()); + LOG.warn(ex.getMessage()); collector.reportError(ex); throw new FailedException(ex); }
