Refined shutdown processes and logging statements
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/26d7ece8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/26d7ece8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/26d7ece8 Branch: refs/heads/STREAMS-212 Commit: 26d7ece894f485d549f4bcea3bfb37d8410d5a63 Parents: 7dd0b77 Author: Ryan Ebanks <[email protected]> Authored: Thu Nov 20 17:32:54 2014 -0600 Committer: Ryan Ebanks <[email protected]> Committed: Thu Nov 20 17:32:54 2014 -0600 ---------------------------------------------------------------------- .../gplus/provider/GPlusUserDataProvider.java | 2 +- .../streams/local/builders/LocalStreamBuilder.java | 16 ++++++++++++++++ .../apache/streams/local/tasks/BaseStreamsTask.java | 2 ++ .../local/tasks/StreamsPersistWriterTask.java | 5 +++-- .../streams/local/tasks/StreamsProcessorTask.java | 7 +++++-- .../streams/local/tasks/StreamsProviderTask.java | 2 ++ 6 files changed, 29 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java index 74ed2e7..e2c8c5c 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java @@ -13,7 +13,7 @@ import java.util.concurrent.BlockingQueue; */ public class GPlusUserDataProvider extends AbstractGPlusProvider{ - public GPlusUserDataProvider(){ + public GPlusUserDataProvider() { super(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index a9afc3c..778407a 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -219,6 +219,8 @@ public class LocalStreamBuilder implements StreamBuilder { } if(isRunning) { Thread.sleep(3000); + } else { + LOGGER.info("Stream has completed successfully, shutting down @ {}", System.currentTimeMillis()); } } LOGGER.debug("Components are no longer running or timed out"); @@ -377,6 +379,20 @@ public class LocalStreamBuilder implements StreamBuilder { stopInternal(false); } + /** + * Attempts to shut down the stream and let all data in flight finish processing. When shutdown(long) is called, it + * immediately stops all {@link org.apache.streams.core.StreamsProvider}s and will attempt to let all {@link org.apache.streams.core.StreamsProcessor}s + * and all {@link org.apache.streams.core.StreamsPersistWriter}s finish processing. + * @param waitInMs + */ + public void shutdown(Long waitInMs) { + + } + + public void shutdownNow() { + + } + protected void stopInternal(boolean systemExiting) { try { shutdown(tasks); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index 9726963..907bce3 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -229,4 +229,6 @@ public abstract class BaseStreamsTask implements StreamsTask { this.streamIdentifier = LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER; } } + + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java index 235ee92..050e297 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java @@ -111,11 +111,12 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt try { this.blocked.set(true); datum = this.inQueue.poll(5, TimeUnit.SECONDS); - this.blocked.set(false); } catch (InterruptedException ie) { - LOGGER.error("Received InterruptedException. Shutting down and re-applying interrupt status."); + LOGGER.debug("Received InterruptedException. Shutting down and re-applying interrupt status."); this.keepRunning.set(false); Thread.currentThread().interrupt(); + } finally { + this.blocked.set(false); } if(datum != null) { this.counter.incrementReceivedCount(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java index c470d0b..2ec6336 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java @@ -120,11 +120,12 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus try { this.blocked.set(true); datum = this.inQueue.poll(5, TimeUnit.SECONDS); - this.blocked.set(false); } catch (InterruptedException ie) { - LOGGER.warn("Received InteruptedException, shutting down and re-applying interrupt status."); + LOGGER.debug("Received InteruptedException, shutting down and re-applying interrupt status."); this.keepRunning.set(false); Thread.currentThread().interrupt(); + } finally { + this.blocked.set(false); } if(datum != null) { this.counter.incrementReceivedCount(); @@ -171,4 +172,6 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus public void setStreamsTaskCounter(StreamsTaskCounter counter) { this.counter = counter; } + + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java index 8c87d7a..044ea67 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java @@ -247,4 +247,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC public void setStreamsTaskCounter(StreamsTaskCounter counter) { this.counter = counter; } + + }
