http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index f6fcaac..9e5089e 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -18,25 +18,38 @@ package org.apache.streams.local.builders; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.core.StreamsProvider; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.local.LocalRuntimeConfiguration; import org.apache.streams.local.counters.StreamsTaskCounter; import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor; import org.apache.streams.local.monitoring.MonitoringConfiguration; import org.apache.streams.local.queues.ThroughputQueue; -import org.apache.streams.local.tasks.*; +import org.apache.streams.local.tasks.BaseStreamsTask; +import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread; +import org.apache.streams.local.tasks.StatusCounterMonitorThread; +import org.apache.streams.local.tasks.StreamsProviderTask; +import org.apache.streams.local.tasks.StreamsTask; import org.apache.streams.monitoring.tasks.BroadcastMonitorThread; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.DateTime; import org.slf4j.Logger; import java.math.BigInteger; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -49,452 +62,452 @@ import java.util.concurrent.TimeUnit; */ public class LocalStreamBuilder implements StreamBuilder { - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LocalStreamBuilder.class); - private static final int DEFAULT_QUEUE_SIZE = 500; - - public static final String TIMEOUT_KEY = "TIMEOUT"; - public static final String BROADCAST_KEY = "broadcastURI"; - public static final String STREAM_IDENTIFIER_KEY = "streamsID"; - public static final String BROADCAST_INTERVAL_KEY = "monitoring_broadcast_interval_ms"; - public static final String DEFAULT_STREAM_IDENTIFIER = "Unknown_Stream"; - public static final String DEFAULT_STARTED_AT_KEY = "startedAt"; - - private Map<String, StreamComponent> providers; - private Map<String, StreamComponent> components; - private LocalRuntimeConfiguration streamConfig; - private Map<StreamsTask, Future> futures; - private ExecutorService executor; - private ExecutorService monitor; - private int totalTasks; - private int monitorTasks; - private LocalStreamProcessMonitorThread monitorThread; - private Map<String, List<StreamsTask>> tasks; - private Thread shutdownHook; - private BroadcastMonitorThread broadcastMonitor; - private int maxQueueCapacity; - private String streamIdentifier = DEFAULT_STREAM_IDENTIFIER; - private DateTime startedAt = new DateTime(); - private boolean useDeprecatedMonitors; - - /** - * Creates a local stream builder with all configuration resolved by typesafe - */ - public LocalStreamBuilder() { - this(new ObjectMapper().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class)); + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LocalStreamBuilder.class); + private static final int DEFAULT_QUEUE_SIZE = 500; + + public static final String TIMEOUT_KEY = "TIMEOUT"; + public static final String BROADCAST_KEY = "broadcastURI"; + public static final String STREAM_IDENTIFIER_KEY = "streamsID"; + public static final String BROADCAST_INTERVAL_KEY = "monitoring_broadcast_interval_ms"; + public static final String DEFAULT_STREAM_IDENTIFIER = "Unknown_Stream"; + public static final String DEFAULT_STARTED_AT_KEY = "startedAt"; + + private Map<String, StreamComponent> providers; + private Map<String, StreamComponent> components; + private LocalRuntimeConfiguration streamConfig; + private Map<StreamsTask, Future> futures; + private ExecutorService executor; + private ExecutorService monitor; + private int totalTasks; + private int monitorTasks; + private LocalStreamProcessMonitorThread monitorThread; + private Map<String, List<StreamsTask>> tasks; + private Thread shutdownHook; + private BroadcastMonitorThread broadcastMonitor; + private int maxQueueCapacity; + private String streamIdentifier = DEFAULT_STREAM_IDENTIFIER; + private DateTime startedAt = new DateTime(); + private boolean useDeprecatedMonitors; + + /** + * Creates a local stream builder with all configuration resolved by typesafe + */ + public LocalStreamBuilder() { + this(new ObjectMapper().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class)); + } + + /** + * Creates a local stream builder with a config object and default maximum internal queue size of 500 + * @param streamConfig + * @deprecated use LocalRuntimeConfiguration constructor instead + */ + @Deprecated + public LocalStreamBuilder(Map<String, Object> streamConfig) { + this(DEFAULT_QUEUE_SIZE, streamConfig); + } + + /** + * Creates a local stream builder with no config object. If maxQueueCapacity is less than 1 the queue is + * unbounded. + * @param maxQueueCapacity + * + * @deprecated use LocalRuntimeConfiguration constructor instead + */ + @Deprecated + public LocalStreamBuilder(int maxQueueCapacity) { + this(maxQueueCapacity, null); + } + + /** + * Creates a local stream builder with a config object. If maxQueueCapacity is less than 1 the queue is + * unbounded. + * + * @param maxQueueCapacity + * @param streamConfig + * + * @deprecated use LocalRuntimeConfiguration constructor instead + */ + @Deprecated + public LocalStreamBuilder(int maxQueueCapacity, Map<String, Object> streamConfig) { + this(new LocalRuntimeConfiguration()); + this.streamConfig.setQueueSize(new Long(maxQueueCapacity)); + if( streamConfig != null && streamConfig.get(LocalStreamBuilder.TIMEOUT_KEY) != null ) + this.streamConfig.setProviderTimeoutMs(new Long((Integer) (streamConfig.get(LocalStreamBuilder.TIMEOUT_KEY)))); + if( streamConfig != null && streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) != null ) + this.streamConfig.setIdentifier((String)streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY)); + if( streamConfig != null && streamConfig.get(LocalStreamBuilder.BROADCAST_KEY) != null ) { + MonitoringConfiguration monitoringConfiguration = new MonitoringConfiguration(); + monitoringConfiguration.setBroadcastURI((String)streamConfig.get(LocalStreamBuilder.BROADCAST_KEY)); + if(streamConfig.get(LocalStreamBuilder.BROADCAST_INTERVAL_KEY) != null) + monitoringConfiguration.setMonitoringBroadcastIntervalMs(Long.parseLong((String)streamConfig.get(LocalStreamBuilder.BROADCAST_INTERVAL_KEY))); + this.streamConfig.setMonitoring(monitoringConfiguration); } - - /** - * Creates a local stream builder with a config object and default maximum internal queue size of 500 - * @param streamConfig - * @deprecated use LocalRuntimeConfiguration constructor instead - */ - @Deprecated - public LocalStreamBuilder(Map<String, Object> streamConfig) { - this(DEFAULT_QUEUE_SIZE, streamConfig); - } - - /** - * Creates a local stream builder with no config object. If maxQueueCapacity is less than 1 the queue is - * unbounded. - * @param maxQueueCapacity - * - * @deprecated use LocalRuntimeConfiguration constructor instead - */ - @Deprecated - public LocalStreamBuilder(int maxQueueCapacity) { - this(maxQueueCapacity, null); - } - - /** - * Creates a local stream builder with a config object. If maxQueueCapacity is less than 1 the queue is - * unbounded. - * - * @param maxQueueCapacity - * @param streamConfig - * - * @deprecated use LocalRuntimeConfiguration constructor instead - */ - @Deprecated - public LocalStreamBuilder(int maxQueueCapacity, Map<String, Object> streamConfig) { - this(new LocalRuntimeConfiguration()); - this.streamConfig.setQueueSize(new Long(maxQueueCapacity)); - if( streamConfig != null && streamConfig.get(LocalStreamBuilder.TIMEOUT_KEY) != null ) - this.streamConfig.setProviderTimeoutMs(new Long((Integer) (streamConfig.get(LocalStreamBuilder.TIMEOUT_KEY)))); - if( streamConfig != null && streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) != null ) - this.streamConfig.setIdentifier((String)streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY)); - if( streamConfig != null && streamConfig.get(LocalStreamBuilder.BROADCAST_KEY) != null ) { - MonitoringConfiguration monitoringConfiguration = new MonitoringConfiguration(); - monitoringConfiguration.setBroadcastURI((String)streamConfig.get(LocalStreamBuilder.BROADCAST_KEY)); - if(streamConfig.get(LocalStreamBuilder.BROADCAST_INTERVAL_KEY) != null) - monitoringConfiguration.setMonitoringBroadcastIntervalMs(Long.parseLong((String)streamConfig.get(LocalStreamBuilder.BROADCAST_INTERVAL_KEY))); - this.streamConfig.setMonitoring(monitoringConfiguration); + } + + public LocalStreamBuilder(LocalRuntimeConfiguration streamConfig) { + this.streamConfig = streamConfig; + this.providers = new HashMap<String, StreamComponent>(); + this.components = new HashMap<String, StreamComponent>(); + this.totalTasks = 0; + this.monitorTasks = 0; + this.futures = new HashMap<>(); + } + + public void prepare() { + this.streamIdentifier = streamConfig.getIdentifier(); + this.streamConfig.setStartedAt(startedAt.getMillis()); + final LocalStreamBuilder self = this; + this.shutdownHook = new Thread() { + @Override + public void run() { + LOGGER.debug("Shutdown hook received. Beginning shutdown"); + self.stopInternal(true); + } + }; + this.useDeprecatedMonitors = false; + this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig.getMonitoring()); + } + + public void setUseDeprecatedMonitors(boolean useDeprecatedMonitors) { + this.useDeprecatedMonitors = useDeprecatedMonitors; + } + + @Override + public StreamBuilder newPerpetualStream(String id, StreamsProvider provider) { + validateId(id); + this.providers.put(id, new StreamComponent(id, provider, true, streamConfig)); + ++this.totalTasks; + if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable ) + ++this.monitorTasks; + return this; + } + + @Override + public StreamBuilder newReadCurrentStream(String id, StreamsProvider provider) { + validateId(id); + this.providers.put(id, new StreamComponent(id, provider, false, streamConfig)); + ++this.totalTasks; + if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable ) + ++this.monitorTasks; + return this; + } + + @Override + public StreamBuilder newReadNewStream(String id, StreamsProvider provider, BigInteger sequence) { + validateId(id); + this.providers.put(id, new StreamComponent(id, provider, sequence, streamConfig)); + ++this.totalTasks; + if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable ) + ++this.monitorTasks; + return this; + } + + @Override + public StreamBuilder newReadRangeStream(String id, StreamsProvider provider, DateTime start, DateTime end) { + validateId(id); + this.providers.put(id, new StreamComponent(id, provider, start, end, streamConfig)); + ++this.totalTasks; + if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable ) + ++this.monitorTasks; + return this; + } + + @Override + public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration) { + streamConfig = StreamsJacksonMapper.getInstance().convertValue(configuration, LocalRuntimeConfiguration.class); + return this; + } + + @Override + public StreamsConfiguration getStreamsConfiguration() { + return StreamsJacksonMapper.getInstance().convertValue(streamConfig, StreamsConfiguration.class); + } + + @Override + public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) { + validateId(id); + StreamComponent comp = new StreamComponent(id, processor, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig); + this.components.put(id, comp); + connectToOtherComponents(inBoundIds, comp); + this.totalTasks += numTasks; + if(this.useDeprecatedMonitors && processor instanceof DatumStatusCountable ) + ++this.monitorTasks; + return this; + } + + @Override + public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter writer, int numTasks, String... inBoundIds) { + validateId(id); + StreamComponent comp = new StreamComponent(id, writer, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig); + this.components.put(id, comp); + connectToOtherComponents(inBoundIds, comp); + this.totalTasks += numTasks; + if(this.useDeprecatedMonitors && writer instanceof DatumStatusCountable ) + ++this.monitorTasks; + return this; + } + + /** + * Runs the data stream in the this JVM and blocks till completion. + */ + @Override + public void start() { + prepare(); + attachShutdownHandler(); + boolean isRunning = true; + this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this); + this.monitor = Executors.newCachedThreadPool(); + Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>(); + tasks = new HashMap<String, List<StreamsTask>>(); + boolean forcedShutDown = false; + + try { + if (this.useDeprecatedMonitors) { + monitorThread = new LocalStreamProcessMonitorThread(executor, 10); + this.monitor.submit(monitorThread); + } + setupComponentTasks(tasks); + setupProviderTasks(provTasks); + LOGGER.info("Started stream with {} components", tasks.size()); + while(isRunning) { + Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownCheckDelay(), TimeUnit.MILLISECONDS); + isRunning = false; + for(StreamsProviderTask task : provTasks.values()) { + isRunning = isRunning || task.isRunning(); } - } - - public LocalStreamBuilder(LocalRuntimeConfiguration streamConfig) { - this.streamConfig = streamConfig; - this.providers = new HashMap<String, StreamComponent>(); - this.components = new HashMap<String, StreamComponent>(); - this.totalTasks = 0; - this.monitorTasks = 0; - this.futures = new HashMap<>(); - } - - public void prepare() { - this.streamIdentifier = streamConfig.getIdentifier(); - this.streamConfig.setStartedAt(startedAt.getMillis()); - final LocalStreamBuilder self = this; - this.shutdownHook = new Thread() { - @Override - public void run() { - LOGGER.debug("Shutdown hook received. Beginning shutdown"); - self.stopInternal(true); + for(StreamComponent task: components.values()) { + boolean tasksRunning = false; + for(StreamsTask t : task.getStreamsTasks()) { + if(t instanceof BaseStreamsTask) { + tasksRunning = tasksRunning || ((BaseStreamsTask) t).isRunning(); } - }; - this.useDeprecatedMonitors = false; - this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig.getMonitoring()); - } - - public void setUseDeprecatedMonitors(boolean useDeprecatedMonitors) { - this.useDeprecatedMonitors = useDeprecatedMonitors; - } - - @Override - public StreamBuilder newPerpetualStream(String id, StreamsProvider provider) { - validateId(id); - this.providers.put(id, new StreamComponent(id, provider, true, streamConfig)); - ++this.totalTasks; - if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable ) - ++this.monitorTasks; - return this; - } - - @Override - public StreamBuilder newReadCurrentStream(String id, StreamsProvider provider) { - validateId(id); - this.providers.put(id, new StreamComponent(id, provider, false, streamConfig)); - ++this.totalTasks; - if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable ) - ++this.monitorTasks; - return this; - } - - @Override - public StreamBuilder newReadNewStream(String id, StreamsProvider provider, BigInteger sequence) { - validateId(id); - this.providers.put(id, new StreamComponent(id, provider, sequence, streamConfig)); - ++this.totalTasks; - if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable ) - ++this.monitorTasks; - return this; - } - - @Override - public StreamBuilder newReadRangeStream(String id, StreamsProvider provider, DateTime start, DateTime end) { - validateId(id); - this.providers.put(id, new StreamComponent(id, provider, start, end, streamConfig)); - ++this.totalTasks; - if(this.useDeprecatedMonitors && provider instanceof DatumStatusCountable ) - ++this.monitorTasks; - return this; - } - - @Override - public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration) { - streamConfig = StreamsJacksonMapper.getInstance().convertValue(configuration, LocalRuntimeConfiguration.class); - return this; - } - - @Override - public StreamsConfiguration getStreamsConfiguration() { - return StreamsJacksonMapper.getInstance().convertValue(streamConfig, StreamsConfiguration.class); + } + isRunning = isRunning || (tasksRunning && task.getInBoundQueue().size() > 0); + } + if(isRunning) { + Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownCheckInterval(), TimeUnit.MILLISECONDS); + } + } + LOGGER.info("Components are no longer running or timed out"); + } catch (Exception e){ + LOGGER.warn("Runtime exception. Beginning shutdown"); + forcedShutDown = true; + } finally{ + LOGGER.info("Stream has completed, pausing @ {}", System.currentTimeMillis()); + Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownPauseMs(), TimeUnit.MILLISECONDS); + LOGGER.info("Stream has completed, shutting down @ {}", System.currentTimeMillis()); + stopInternal(forcedShutDown); } - @Override - public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) { - validateId(id); - StreamComponent comp = new StreamComponent(id, processor, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig); - this.components.put(id, comp); - connectToOtherComponents(inBoundIds, comp); - this.totalTasks += numTasks; - if(this.useDeprecatedMonitors && processor instanceof DatumStatusCountable ) - ++this.monitorTasks; - return this; + } + + private void attachShutdownHandler() { + LOGGER.debug("Attaching shutdown handler"); + Runtime.getRuntime().addShutdownHook(shutdownHook); + } + + private void detachShutdownHandler() { + LOGGER.debug("Detaching shutdown handler"); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + + protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) { + LOGGER.debug("Shutdown failed. Forcing shutdown"); + for(List<StreamsTask> tasks : streamsTasks.values()) { + for(StreamsTask task : tasks) { + task.stopTask(); + if(task.isWaiting()) { + this.futures.get(task).cancel(true); + } + } } - - @Override - public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter writer, int numTasks, String... inBoundIds) { - validateId(id); - StreamComponent comp = new StreamComponent(id, writer, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig); - this.components.put(id, comp); - connectToOtherComponents(inBoundIds, comp); - this.totalTasks += numTasks; - if(this.useDeprecatedMonitors && writer instanceof DatumStatusCountable ) - ++this.monitorTasks; - return this; + this.executor.shutdown(); + this.monitor.shutdown(); + try { + if(!this.executor.awaitTermination(streamConfig.getExecutorShutdownPauseMs(), TimeUnit.MILLISECONDS)){ + this.executor.shutdownNow(); + } + if(!this.monitor.awaitTermination(streamConfig.getMonitorShutdownPauseMs(), TimeUnit.MILLISECONDS)){ + this.monitor.shutdownNow(); + } + }catch (InterruptedException ie) { + this.executor.shutdownNow(); + this.monitor.shutdownNow(); + throw new RuntimeException(ie); } + } - /** - * Runs the data stream in the this JVM and blocks till completion. - */ - @Override - public void start() { - prepare(); - attachShutdownHandler(); - boolean isRunning = true; - this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this); - this.monitor = Executors.newCachedThreadPool(); - Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>(); - tasks = new HashMap<String, List<StreamsTask>>(); - boolean forcedShutDown = false; - - try { - if (this.useDeprecatedMonitors) { - monitorThread = new LocalStreamProcessMonitorThread(executor, 10); - this.monitor.submit(monitorThread); - } - setupComponentTasks(tasks); - setupProviderTasks(provTasks); - LOGGER.info("Started stream with {} components", tasks.size()); - while(isRunning) { - Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownCheckDelay(), TimeUnit.MILLISECONDS); - isRunning = false; - for(StreamsProviderTask task : provTasks.values()) { - isRunning = isRunning || task.isRunning(); - } - for(StreamComponent task: components.values()) { - boolean tasksRunning = false; - for(StreamsTask t : task.getStreamsTasks()) { - if(t instanceof BaseStreamsTask) { - tasksRunning = tasksRunning || ((BaseStreamsTask) t).isRunning(); - } - } - isRunning = isRunning || (tasksRunning && task.getInBoundQueue().size() > 0); - } - if(isRunning) { - Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownCheckInterval(), TimeUnit.MILLISECONDS); - } - } - LOGGER.info("Components are no longer running or timed out"); - } catch (Exception e){ - LOGGER.warn("Runtime exception. Beginning shutdown"); - forcedShutDown = true; - } finally{ - LOGGER.info("Stream has completed, pausing @ {}", System.currentTimeMillis()); - Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownPauseMs(), TimeUnit.MILLISECONDS); - LOGGER.info("Stream has completed, shutting down @ {}", System.currentTimeMillis()); - stopInternal(forcedShutDown); - } - + protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws InterruptedException { + LOGGER.info("Attempting to shutdown tasks"); + if (this.monitorThread != null) { + this.monitorThread.shutdown(); } - - private void attachShutdownHandler() { - LOGGER.debug("Attaching shutdown handler"); - Runtime.getRuntime().addShutdownHook(shutdownHook); + this.executor.shutdown(); + //complete stream shut down gracfully + for(StreamComponent prov : this.providers.values()) { + shutDownTask(prov, streamsTasks); } - - private void detachShutdownHandler() { - LOGGER.debug("Detaching shutdown handler"); - Runtime.getRuntime().removeShutdownHook(shutdownHook); + //need to make this configurable + if(!this.executor.awaitTermination(streamConfig.getExecutorShutdownWaitMs(), TimeUnit.MILLISECONDS)) { // all threads should have terminated already. + this.executor.shutdownNow(); + this.executor.awaitTermination(streamConfig.getExecutorShutdownWaitMs(), TimeUnit.MILLISECONDS); } - - protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) { - LOGGER.debug("Shutdown failed. Forcing shutdown"); - for(List<StreamsTask> tasks : streamsTasks.values()) { - for(StreamsTask task : tasks) { - task.stopTask(); - if(task.isWaiting()) { - this.futures.get(task).cancel(true); - } - } - } - this.executor.shutdown(); - this.monitor.shutdown(); - try { - if(!this.executor.awaitTermination(streamConfig.getExecutorShutdownPauseMs(), TimeUnit.MILLISECONDS)){ - this.executor.shutdownNow(); - } - if(!this.monitor.awaitTermination(streamConfig.getMonitorShutdownPauseMs(), TimeUnit.MILLISECONDS)){ - this.monitor.shutdownNow(); - } - }catch (InterruptedException ie) { - this.executor.shutdownNow(); - this.monitor.shutdownNow(); - throw new RuntimeException(ie); - } + if(!this.monitor.awaitTermination(streamConfig.getMonitorShutdownWaitMs(), TimeUnit.MILLISECONDS)) { // all threads should have terminated already. + this.monitor.shutdownNow(); + this.monitor.awaitTermination(streamConfig.getMonitorShutdownWaitMs(), TimeUnit.MILLISECONDS); } - - protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws InterruptedException { - LOGGER.info("Attempting to shutdown tasks"); - if (this.monitorThread != null) { - this.monitorThread.shutdown(); - } - this.executor.shutdown(); - //complete stream shut down gracfully - for(StreamComponent prov : this.providers.values()) { - shutDownTask(prov, streamsTasks); - } - //need to make this configurable - if(!this.executor.awaitTermination(streamConfig.getExecutorShutdownWaitMs(), TimeUnit.MILLISECONDS)) { // all threads should have terminated already. - this.executor.shutdownNow(); - this.executor.awaitTermination(streamConfig.getExecutorShutdownWaitMs(), TimeUnit.MILLISECONDS); - } - if(!this.monitor.awaitTermination(streamConfig.getMonitorShutdownWaitMs(), TimeUnit.MILLISECONDS)) { // all threads should have terminated already. - this.monitor.shutdownNow(); - this.monitor.awaitTermination(streamConfig.getMonitorShutdownWaitMs(), TimeUnit.MILLISECONDS); - } + } + + protected void setupProviderTasks(Map<String, StreamsProviderTask> provTasks) { + for(StreamComponent prov : this.providers.values()) { + StreamsTask task = prov.createConnectedTask(getTimeout()); + task.setStreamConfig(this.streamConfig); + StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId(), streamIdentifier, startedAt.getMillis()); + task.setStreamsTaskCounter(counter); + this.executor.submit(task); + provTasks.put(prov.getId(), (StreamsProviderTask) task); + if(this.useDeprecatedMonitors && prov.isOperationCountable() ) { + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10)); + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); + } } - - protected void setupProviderTasks(Map<String, StreamsProviderTask> provTasks) { - for(StreamComponent prov : this.providers.values()) { - StreamsTask task = prov.createConnectedTask(getTimeout()); - task.setStreamConfig(this.streamConfig); - StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId(), streamIdentifier, startedAt.getMillis()); - task.setStreamsTaskCounter(counter); - this.executor.submit(task); - provTasks.put(prov.getId(), (StreamsProviderTask) task); - if(this.useDeprecatedMonitors && prov.isOperationCountable() ) { - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10)); - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); - } + } + + protected void setupComponentTasks(Map<String, List<StreamsTask>> streamsTasks) { + for(StreamComponent comp : this.components.values()) { + int tasks = comp.getNumTasks(); + List<StreamsTask> compTasks = new LinkedList<StreamsTask>(); + StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId(), streamIdentifier, startedAt.getMillis()); + for(int i=0; i < tasks; ++i) { + StreamsTask task = comp.createConnectedTask(getTimeout()); + task.setStreamsTaskCounter(counter); + task.setStreamConfig(this.streamConfig); + this.futures.put(task, this.executor.submit(task)); + compTasks.add(task); + if(this.useDeprecatedMonitors && comp.isOperationCountable() ) { + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); } + this.monitor.submit(broadcastMonitor); + } + streamsTasks.put(comp.getId(), compTasks); } - - protected void setupComponentTasks(Map<String, List<StreamsTask>> streamsTasks) { - for(StreamComponent comp : this.components.values()) { - int tasks = comp.getNumTasks(); - List<StreamsTask> compTasks = new LinkedList<StreamsTask>(); - StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId(), streamIdentifier, startedAt.getMillis()); - for(int i=0; i < tasks; ++i) { - StreamsTask task = comp.createConnectedTask(getTimeout()); - task.setStreamsTaskCounter(counter); - task.setStreamConfig(this.streamConfig); - this.futures.put(task, this.executor.submit(task)); - compTasks.add(task); - if(this.useDeprecatedMonitors && comp.isOperationCountable() ) { - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); - this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); - } - this.monitor.submit(broadcastMonitor); - } - streamsTasks.put(comp.getId(), compTasks); + } + + /** + * Shutsdown the running tasks in sudo depth first search kind of way. Checks that the upstream components have + * finished running before shutting down. Waits till inbound queue is empty to shutdown. + * @param comp StreamComponent to shut down. + * @param streamTasks the list of non-StreamsProvider tasks for this stream. + * @throws InterruptedException + */ + private void shutDownTask(StreamComponent comp, Map<String, List<StreamsTask>> streamTasks) throws InterruptedException { + List<StreamsTask> tasks = streamTasks.get(comp.getId()); + if(tasks != null) { //not a StreamProvider + boolean parentsShutDown = true; + for(StreamComponent parent : comp.getUpStreamComponents()) { + List<StreamsTask> parentTasks = streamTasks.get(parent.getId()); + //if parentTask == null, its a provider and is not running anymore + if(parentTasks != null) { + for(StreamsTask task : parentTasks) { + parentsShutDown = parentsShutDown && !task.isRunning(); + } } - } - - /** - * Shutsdown the running tasks in sudo depth first search kind of way. Checks that the upstream components have - * finished running before shutting down. Waits till inbound queue is empty to shutdown. - * @param comp StreamComponent to shut down. - * @param streamTasks the list of non-StreamsProvider tasks for this stream. - * @throws InterruptedException - */ - private void shutDownTask(StreamComponent comp, Map<String, List<StreamsTask>> streamTasks) throws InterruptedException { - List<StreamsTask> tasks = streamTasks.get(comp.getId()); - if(tasks != null) { //not a StreamProvider - boolean parentsShutDown = true; - for(StreamComponent parent : comp.getUpStreamComponents()) { - List<StreamsTask> parentTasks = streamTasks.get(parent.getId()); - //if parentTask == null, its a provider and is not running anymore - if(parentTasks != null) { - for(StreamsTask task : parentTasks) { - parentsShutDown = parentsShutDown && !task.isRunning(); - } - } - } - if(parentsShutDown) { - for(StreamsTask task : tasks) { - task.stopTask(); - if(task.isWaiting()) { - this.futures.get(task).cancel(true); // no data to process, interrupt block queue - } - } - for(StreamsTask task : tasks) { - int count = 0; - while(count < streamConfig.getTaskTimeoutMs() / 1000 && task.isRunning()) { - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - count++; - } - - if(task.isRunning()) { - LOGGER.warn("Task {} failed to terminate in allotted timeframe", task.toString()); - } - } - } + } + if(parentsShutDown) { + for(StreamsTask task : tasks) { + task.stopTask(); + if(task.isWaiting()) { + this.futures.get(task).cancel(true); // no data to process, interrupt block queue + } } - Collection<StreamComponent> children = comp.getDownStreamComponents(); - if(children != null) { - for(StreamComponent child : comp.getDownStreamComponents()) { - shutDownTask(child, streamTasks); - } + for(StreamsTask task : tasks) { + int count = 0; + while(count < streamConfig.getTaskTimeoutMs() / 1000 && task.isRunning()) { + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + count++; + } + + if(task.isRunning()) { + LOGGER.warn("Task {} failed to terminate in allotted timeframe", task.toString()); + } } + } } - - /** - * NOT IMPLEMENTED. - */ - @Override - public void stop() { - stopInternal(false); - } - - - - protected void stopInternal(boolean systemExiting) { - try { - shutdown(tasks); - } catch (Exception e) { - LOGGER.error("Exception while trying to shutdown Stream: {}", e); - forceShutdown(tasks); - } finally { - try { - if(!systemExiting) { - detachShutdownHandler(); - } - } catch( Throwable e3 ) { - LOGGER.error("StopInternal caught Throwable: {}", e3); - System.exit(1); - } - } + Collection<StreamComponent> children = comp.getDownStreamComponents(); + if(children != null) { + for(StreamComponent child : comp.getDownStreamComponents()) { + shutDownTask(child, streamTasks); + } } - - private void connectToOtherComponents(String[] conntectToIds, StreamComponent toBeConnected) { - for(String id : conntectToIds) { - StreamComponent upStream = null; - if(this.providers.containsKey(id)) { - upStream = this.providers.get(id); - } - else if(this.components.containsKey(id)) { - upStream = this.components.get(id); - } - else { - throw new InvalidStreamException("Cannot connect to id, "+id+", because id does not exist."); - } - upStream.addOutBoundQueue(toBeConnected, toBeConnected.getInBoundQueue()); - toBeConnected.addInboundQueue(upStream); + } + + /** + * NOT IMPLEMENTED. + */ + @Override + public void stop() { + stopInternal(false); + } + + + + protected void stopInternal(boolean systemExiting) { + try { + shutdown(tasks); + } catch (Exception e) { + LOGGER.error("Exception while trying to shutdown Stream: {}", e); + forceShutdown(tasks); + } finally { + try { + if(!systemExiting) { + detachShutdownHandler(); } + } catch( Throwable e3 ) { + LOGGER.error("StopInternal caught Throwable: {}", e3); + System.exit(1); + } } - - private void validateId(String id) { - if(this.providers.containsKey(id) || this.components.containsKey(id)) { - throw new InvalidStreamException("Duplicate id. "+id+" is already assigned to another component"); - } else if(id.contains(":")) { - throw new InvalidStreamException("Invalid character, ':', in component id : "+id); - } + } + + private void connectToOtherComponents(String[] conntectToIds, StreamComponent toBeConnected) { + for(String id : conntectToIds) { + StreamComponent upStream = null; + if(this.providers.containsKey(id)) { + upStream = this.providers.get(id); + } + else if(this.components.containsKey(id)) { + upStream = this.components.get(id); + } + else { + throw new InvalidStreamException("Cannot connect to id, "+id+", because id does not exist."); + } + upStream.addOutBoundQueue(toBeConnected, toBeConnected.getInBoundQueue()); + toBeConnected.addInboundQueue(upStream); } + } - protected int getTimeout() { - //Set the timeout of it is configured, otherwise signal downstream components to use their default - return streamConfig.getProviderTimeoutMs().intValue(); + private void validateId(String id) { + if(this.providers.containsKey(id) || this.components.containsKey(id)) { + throw new InvalidStreamException("Duplicate id. "+id+" is already assigned to another component"); + } else if(id.contains(":")) { + throw new InvalidStreamException("Invalid character, ':', in component id : "+id); } - - private LocalRuntimeConfiguration convertConfiguration(Map<String, Object> streamConfig) { - LocalRuntimeConfiguration config = new LocalRuntimeConfiguration(); - if( streamConfig != null ) { - for( Map.Entry<String, Object> item : streamConfig.entrySet() ) { - config.setAdditionalProperty(item.getKey(), item.getValue()); - } - } - return config; + } + + protected int getTimeout() { + //Set the timeout of it is configured, otherwise signal downstream components to use their default + return streamConfig.getProviderTimeoutMs().intValue(); + } + + private LocalRuntimeConfiguration convertConfiguration(Map<String, Object> streamConfig) { + LocalRuntimeConfiguration config = new LocalRuntimeConfiguration(); + if( streamConfig != null ) { + for( Map.Entry<String, Object> item : streamConfig.entrySet() ) { + config.setAdditionalProperty(item.getKey(), item.getValue()); + } } + return config; + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java index 9d602cd..75799b7 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java @@ -18,19 +18,30 @@ package org.apache.streams.local.builders; -import com.google.common.collect.Lists; import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsOperation; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.core.StreamsProvider; import org.apache.streams.local.tasks.StreamsPersistWriterTask; import org.apache.streams.local.tasks.StreamsProcessorTask; import org.apache.streams.local.tasks.StreamsProviderTask; import org.apache.streams.local.tasks.StreamsTask; import org.apache.streams.util.SerializationUtil; + +import com.google.common.collect.Lists; import org.joda.time.DateTime; import java.io.Serializable; import java.math.BigInteger; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; /** @@ -39,263 +50,263 @@ import java.util.concurrent.BlockingQueue; */ public class StreamComponent implements Serializable { - private static final int START = 1; - private static final int END = 2; + private static final int START = 1; + private static final int END = 2; - private String id; - private Set<StreamComponent> inBound; - private Map<StreamComponent, BlockingQueue<StreamsDatum>> outBound; - private BlockingQueue<StreamsDatum> inQueue; - private StreamsProvider provider; - private StreamsProcessor processor; - private StreamsPersistWriter writer; - private DateTime[] dateRange; - private BigInteger sequence; - private int numTasks = 1; - private boolean perpetual; + private String id; + private Set<StreamComponent> inBound; + private Map<StreamComponent, BlockingQueue<StreamsDatum>> outBound; + private BlockingQueue<StreamsDatum> inQueue; + private StreamsProvider provider; + private StreamsProcessor processor; + private StreamsPersistWriter writer; + private DateTime[] dateRange; + private BigInteger sequence; + private int numTasks = 1; + private boolean perpetual; - private List<StreamsTask> tasks; + private List<StreamsTask> tasks; - private StreamsConfiguration streamConfig; + private StreamsConfiguration streamConfig; - /** - * - * @param id - * @param provider - */ - public StreamComponent(String id, StreamsProvider provider, boolean perpetual, StreamsConfiguration streamConfig) { - this.id = id; - this.provider = provider; - this.perpetual = perpetual; - this.streamConfig = streamConfig; - initializePrivateVariables(); - } + /** + * + * @param id + * @param provider + */ + public StreamComponent(String id, StreamsProvider provider, boolean perpetual, StreamsConfiguration streamConfig) { + this.id = id; + this.provider = provider; + this.perpetual = perpetual; + this.streamConfig = streamConfig; + initializePrivateVariables(); + } - /** - * - * @param id - * @param provider - * @param start - * @param end - */ - public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end, StreamsConfiguration streamConfig) { - this.id = id; - this.provider = provider; - this.dateRange = new DateTime[2]; - this.dateRange[START] = start; - this.dateRange[END] = end; - this.streamConfig = streamConfig; - initializePrivateVariables(); - } + /** + * + * @param id + * @param provider + * @param start + * @param end + */ + public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end, StreamsConfiguration streamConfig) { + this.id = id; + this.provider = provider; + this.dateRange = new DateTime[2]; + this.dateRange[START] = start; + this.dateRange[END] = end; + this.streamConfig = streamConfig; + initializePrivateVariables(); + } - /** - * - * @param id - * @param provider - * @param sequence - */ - public StreamComponent(String id, StreamsProvider provider, BigInteger sequence, StreamsConfiguration streamConfig) { - this.id = id; - this.provider = provider; - this.sequence = sequence; - this.streamConfig = streamConfig; - } + /** + * + * @param id + * @param provider + * @param sequence + */ + public StreamComponent(String id, StreamsProvider provider, BigInteger sequence, StreamsConfiguration streamConfig) { + this.id = id; + this.provider = provider; + this.sequence = sequence; + this.streamConfig = streamConfig; + } - /** - * - * @param id - * @param processor - * @param inQueue - * @param numTasks - */ - public StreamComponent(String id, StreamsProcessor processor, BlockingQueue<StreamsDatum> inQueue, int numTasks, StreamsConfiguration streamConfig) { - this.id = id; - this.processor = processor; - this.inQueue = inQueue; - this.numTasks = numTasks; - this.streamConfig = streamConfig; - initializePrivateVariables(); - } + /** + * + * @param id + * @param processor + * @param inQueue + * @param numTasks + */ + public StreamComponent(String id, StreamsProcessor processor, BlockingQueue<StreamsDatum> inQueue, int numTasks, StreamsConfiguration streamConfig) { + this.id = id; + this.processor = processor; + this.inQueue = inQueue; + this.numTasks = numTasks; + this.streamConfig = streamConfig; + initializePrivateVariables(); + } - /** - * - * @param id - * @param writer - * @param inQueue - * @param numTasks - */ - public StreamComponent(String id, StreamsPersistWriter writer, BlockingQueue<StreamsDatum> inQueue, int numTasks, StreamsConfiguration streamConfig) { - this.id = id; - this.writer = writer; - this.inQueue = inQueue; - this.numTasks = numTasks; - this.streamConfig = streamConfig; - initializePrivateVariables(); - } + /** + * + * @param id + * @param writer + * @param inQueue + * @param numTasks + */ + public StreamComponent(String id, StreamsPersistWriter writer, BlockingQueue<StreamsDatum> inQueue, int numTasks, StreamsConfiguration streamConfig) { + this.id = id; + this.writer = writer; + this.inQueue = inQueue; + this.numTasks = numTasks; + this.streamConfig = streamConfig; + initializePrivateVariables(); + } - private void initializePrivateVariables() { - this.inBound = new HashSet<StreamComponent>(); - this.outBound = new HashMap<StreamComponent, BlockingQueue<StreamsDatum>>(); - this.tasks = Lists.newArrayList(); - } + private void initializePrivateVariables() { + this.inBound = new HashSet<StreamComponent>(); + this.outBound = new HashMap<StreamComponent, BlockingQueue<StreamsDatum>>(); + this.tasks = Lists.newArrayList(); + } - /** - * Add an outbound queue for this component. The queue should be an inbound queue of a downstream component. - * @param component the component that this supplying their inbound queue - * @param queue the queue to to put post processed/provided datums on - */ - public void addOutBoundQueue(StreamComponent component, BlockingQueue<StreamsDatum> queue) { - this.outBound.put(component, queue); - } + /** + * Add an outbound queue for this component. The queue should be an inbound queue of a downstream component. + * @param component the component that this supplying their inbound queue + * @param queue the queue to to put post processed/provided datums on + */ + public void addOutBoundQueue(StreamComponent component, BlockingQueue<StreamsDatum> queue) { + this.outBound.put(component, queue); + } - /** - * Add a component that supplies data through the inbound queue. - * @param component that supplies data through the inbound queue - */ - public void addInboundQueue(StreamComponent component) { - this.inBound.add(component); - } + /** + * Add a component that supplies data through the inbound queue. + * @param component that supplies data through the inbound queue + */ + public void addInboundQueue(StreamComponent component) { + this.inBound.add(component); + } - /** - * The components that are immediately downstream of this component (aka child nodes) - * @return Collection of child nodes of this component - */ - public Collection<StreamComponent> getDownStreamComponents() { - return this.outBound.keySet(); - } + /** + * The components that are immediately downstream of this component (aka child nodes) + * @return Collection of child nodes of this component + */ + public Collection<StreamComponent> getDownStreamComponents() { + return this.outBound.keySet(); + } - /** - * The components that are immediately upstream of this component (aka parent nodes) - * @return Collection of parent nodes of this component - */ - public Collection<StreamComponent> getUpStreamComponents() { - return this.inBound; - } + /** + * The components that are immediately upstream of this component (aka parent nodes) + * @return Collection of parent nodes of this component + */ + public Collection<StreamComponent> getUpStreamComponents() { + return this.inBound; + } - /** - * The inbound queue for this component - * @return inbound queue - */ - public BlockingQueue<StreamsDatum> getInBoundQueue() { - return this.inQueue; - } + /** + * The inbound queue for this component + * @return inbound queue + */ + public BlockingQueue<StreamsDatum> getInBoundQueue() { + return this.inQueue; + } - /** - * The number of tasks this to run this component - * @return - */ - public int getNumTasks() { - return this.numTasks; - } + /** + * The number of tasks this to run this component + * @return + */ + public int getNumTasks() { + return this.numTasks; + } - /** - * Creates a {@link org.apache.streams.local.tasks.StreamsTask} that is running a clone of this component whose - * inbound and outbound queues are appropriately connected to the parent and child nodes. - * - * @return StreamsTask for this component - * @param timeout The timeout to use in milliseconds for any tasks that support configurable timeout - */ - public StreamsTask createConnectedTask(int timeout) { - StreamsTask task; - if(this.processor != null) { - if(this.numTasks > 1) { - task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor), streamConfig); - task.addInputQueue(this.inQueue); - for(BlockingQueue<StreamsDatum> q : this.outBound.values()) { - task.addOutputQueue(q); - } - } else { - task = new StreamsProcessorTask(this.processor, streamConfig); - task.addInputQueue(this.inQueue); - for(BlockingQueue<StreamsDatum> q : this.outBound.values()) { - task.addOutputQueue(q); - } - } - } - else if(this.writer != null) { - if(this.numTasks > 1) { - task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer), streamConfig); - task.addInputQueue(this.inQueue); - } else { - task = new StreamsPersistWriterTask(this.writer, streamConfig); - task.addInputQueue(this.inQueue); - } - } - else if(this.provider != null) { - StreamsProvider prov; - if(this.numTasks > 1) { - prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider); - } else { - prov = this.provider; - } - if(this.dateRange == null && this.sequence == null) - task = new StreamsProviderTask(prov, this.perpetual, streamConfig); - else if(this.sequence != null) - task = new StreamsProviderTask(prov, this.sequence, streamConfig); - else - task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1], streamConfig); - //Adjust the timeout if necessary - if(timeout != 0) { - ((StreamsProviderTask)task).setTimeout(timeout); - } - for(BlockingQueue<StreamsDatum> q : this.outBound.values()) { - task.addOutputQueue(q); - } - } - else { - throw new InvalidStreamException("Underlying StreamComponoent was NULL."); + /** + * Creates a {@link org.apache.streams.local.tasks.StreamsTask} that is running a clone of this component whose + * inbound and outbound queues are appropriately connected to the parent and child nodes. + * + * @return StreamsTask for this component + * @param timeout The timeout to use in milliseconds for any tasks that support configurable timeout + */ + public StreamsTask createConnectedTask(int timeout) { + StreamsTask task; + if(this.processor != null) { + if(this.numTasks > 1) { + task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor), streamConfig); + task.addInputQueue(this.inQueue); + for(BlockingQueue<StreamsDatum> q : this.outBound.values()) { + task.addOutputQueue(q); } - - if(task != null) { - tasks.add(task); + } else { + task = new StreamsProcessorTask(this.processor, streamConfig); + task.addInputQueue(this.inQueue); + for(BlockingQueue<StreamsDatum> q : this.outBound.values()) { + task.addOutputQueue(q); } - - return task; + } } - - public List<StreamsTask> getStreamsTasks() { - return this.tasks; + else if(this.writer != null) { + if(this.numTasks > 1) { + task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer), streamConfig); + task.addInputQueue(this.inQueue); + } else { + task = new StreamsPersistWriterTask(this.writer, streamConfig); + task.addInputQueue(this.inQueue); + } } - - /** - * The unique of this component - * @return - */ - public String getId() { - return this.id; + else if(this.provider != null) { + StreamsProvider prov; + if(this.numTasks > 1) { + prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider); + } else { + prov = this.provider; + } + if(this.dateRange == null && this.sequence == null) + task = new StreamsProviderTask(prov, this.perpetual, streamConfig); + else if(this.sequence != null) + task = new StreamsProviderTask(prov, this.sequence, streamConfig); + else + task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1], streamConfig); + //Adjust the timeout if necessary + if(timeout != 0) { + ((StreamsProviderTask)task).setTimeout(timeout); + } + for(BlockingQueue<StreamsDatum> q : this.outBound.values()) { + task.addOutputQueue(q); + } } - - @Override - public int hashCode() { - return this.id.hashCode(); + else { + throw new InvalidStreamException("Underlying StreamComponoent was NULL."); } - @Override - public boolean equals(Object o) { - if(o instanceof StreamComponent) - return this.id.equals(((StreamComponent) o).id); - else - return false; + if(task != null) { + tasks.add(task); } - protected StreamsOperation getOperation() { - if(this.processor != null) { - return (StreamsOperation) this.processor; - } - else if(this.writer != null) { - return (StreamsOperation) this.writer; - } - else if(this.provider != null) { - return (StreamsOperation) this.provider; - } - else { - throw new InvalidStreamException("Underlying StreamComponoent was NULL."); - } - } + return task; + } + + public List<StreamsTask> getStreamsTasks() { + return this.tasks; + } + + /** + * The unique of this component + * @return + */ + public String getId() { + return this.id; + } + + @Override + public int hashCode() { + return this.id.hashCode(); + } + + @Override + public boolean equals(Object o) { + if(o instanceof StreamComponent) + return this.id.equals(((StreamComponent) o).id); + else + return false; + } - @Deprecated - protected boolean isOperationCountable() { - return getOperation() instanceof DatumStatusCountable; + protected StreamsOperation getOperation() { + if(this.processor != null) { + return (StreamsOperation) this.processor; } + else if(this.writer != null) { + return (StreamsOperation) this.writer; + } + else if(this.provider != null) { + return (StreamsOperation) this.provider; + } + else { + throw new InvalidStreamException("Underlying StreamComponoent was NULL."); + } + } + + @Deprecated + protected boolean isOperationCountable() { + return getOperation() instanceof DatumStatusCountable; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java index 34d2bcc..e2884d0 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java @@ -17,14 +17,13 @@ */ package org.apache.streams.local.counters; -import net.jcip.annotations.ThreadSafe; import org.apache.streams.local.builders.LocalStreamBuilder; import org.apache.streams.util.ComponentUtils; + +import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.*; -import java.lang.management.ManagementFactory; import java.util.concurrent.atomic.AtomicLong; /** @@ -33,56 +32,56 @@ import java.util.concurrent.atomic.AtomicLong; @ThreadSafe public class DatumStatusCounter implements DatumStatusCounterMXBean{ - public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s,identifier=%s,startedAt=%s"; - private static final Logger LOGGER = LoggerFactory.getLogger(DatumStatusCounter.class); + public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s,identifier=%s,startedAt=%s"; + private static final Logger LOGGER = LoggerFactory.getLogger(DatumStatusCounter.class); - private AtomicLong failed; - private AtomicLong passed; + private AtomicLong failed; + private AtomicLong passed; - public DatumStatusCounter(String id) { - this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); - } + public DatumStatusCounter(String id) { + this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + } - public DatumStatusCounter(String id, String streamIdentifier, long startedAt) { - this.failed = new AtomicLong(0); - this.passed = new AtomicLong(0); - ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, streamIdentifier, startedAt), this); - } + public DatumStatusCounter(String id, String streamIdentifier, long startedAt) { + this.failed = new AtomicLong(0); + this.passed = new AtomicLong(0); + ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, streamIdentifier, startedAt), this); + } - public void incrementFailedCount() { - this.incrementFailedCount(1); - } + public void incrementFailedCount() { + this.incrementFailedCount(1); + } - public void incrementFailedCount(long delta) { - this.failed.addAndGet(delta); - } + public void incrementFailedCount(long delta) { + this.failed.addAndGet(delta); + } - public void incrementPassedCount() { - this.incrementPassedCount(1); - } + public void incrementPassedCount() { + this.incrementPassedCount(1); + } - public void incrementPassedCount(long delta) { - this.passed.addAndGet(delta); - } + public void incrementPassedCount(long delta) { + this.passed.addAndGet(delta); + } - @Override - public double getFailRate() { - double failed = this.failed.get(); - double passed = this.passed.get(); - if(failed == 0.0 && passed == 0) { - return 0.0; - } - return failed / (passed + failed); + @Override + public double getFailRate() { + double failed = this.failed.get(); + double passed = this.passed.get(); + if(failed == 0.0 && passed == 0) { + return 0.0; } + return failed / (passed + failed); + } - @Override - public long getNumFailed() { - return this.failed.get(); - } + @Override + public long getNumFailed() { + return this.failed.get(); + } - @Override - public long getNumPassed() { - return this.passed.get(); - } + @Override + public long getNumPassed() { + return this.passed.get(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java index 7cc8df4..3a318a8 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java @@ -22,22 +22,22 @@ package org.apache.streams.local.counters; */ public interface DatumStatusCounterMXBean { - /** - * Get number of failed datums - * @return number of failed datums - */ - public long getNumFailed(); + /** + * Get number of failed datums + * @return number of failed datums + */ + public long getNumFailed(); - /** - * Get number of passed datums - * @return number of passed datums - */ - public long getNumPassed(); + /** + * Get number of passed datums + * @return number of passed datums + */ + public long getNumPassed(); - /** - * Get the failure rate. Calculated by num failed divided by (num passed + num failed) - * @return the failure rate - */ - public double getFailRate(); + /** + * Get the failure rate. Calculated by num failed divided by (num passed + num failed) + * @return the failure rate + */ + public double getFailRate(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java index 9bd5d49..de37f1b 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java @@ -17,16 +17,14 @@ */ package org.apache.streams.local.counters; -import net.jcip.annotations.GuardedBy; -import net.jcip.annotations.ThreadSafe; import org.apache.streams.local.builders.LocalStreamBuilder; import org.apache.streams.util.ComponentUtils; -import org.joda.time.DateTime; + +import net.jcip.annotations.GuardedBy; +import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.*; -import java.lang.management.ManagementFactory; import java.util.concurrent.atomic.AtomicLong; /** @@ -35,133 +33,133 @@ import java.util.concurrent.atomic.AtomicLong; @ThreadSafe public class StreamsTaskCounter implements StreamsTaskCounterMXBean { - public static final String NAME_TEMPLATE = "org.apache.streams.local:type=StreamsTaskCounter,name=%s,identifier=%s,startedAt=%s"; - private static final Logger LOGGER = LoggerFactory.getLogger(StreamsTaskCounter.class); - - private AtomicLong emitted; - private AtomicLong received; - private AtomicLong errors; - private AtomicLong totalTime; - @GuardedBy("this") - private volatile long maxTime; - - /** - * - * @param id - */ - public StreamsTaskCounter(String id) { - this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); - } - - /** - * - * @param id - */ - public StreamsTaskCounter(String id, String streamId, long startedAt) { - this.emitted = new AtomicLong(0); - this.received = new AtomicLong(0); - this.errors = new AtomicLong(0); - this.totalTime = new AtomicLong(0); - this.maxTime = -1; - ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, streamId, startedAt), this); + public static final String NAME_TEMPLATE = "org.apache.streams.local:type=StreamsTaskCounter,name=%s,identifier=%s,startedAt=%s"; + private static final Logger LOGGER = LoggerFactory.getLogger(StreamsTaskCounter.class); + + private AtomicLong emitted; + private AtomicLong received; + private AtomicLong errors; + private AtomicLong totalTime; + @GuardedBy("this") + private volatile long maxTime; + + /** + * + * @param id + */ + public StreamsTaskCounter(String id) { + this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + } + + /** + * + * @param id + */ + public StreamsTaskCounter(String id, String streamId, long startedAt) { + this.emitted = new AtomicLong(0); + this.received = new AtomicLong(0); + this.errors = new AtomicLong(0); + this.totalTime = new AtomicLong(0); + this.maxTime = -1; + ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, streamId, startedAt), this); + } + + /** + * Increment emitted count + */ + public void incrementEmittedCount() { + this.incrementEmittedCount(1); + } + + /** + * Increment emitted count + * @param delta + */ + public void incrementEmittedCount(long delta) { + this.emitted.addAndGet(delta); + } + + /** + * Increment error count + */ + public void incrementErrorCount() { + this.incrementErrorCount(1); + } + + /** + * Increment error count + * @param delta + */ + public void incrementErrorCount(long delta) { + this.errors.addAndGet(delta); + } + + /** + * Increment received count + */ + public void incrementReceivedCount() { + this.incrementReceivedCount(1); + } + + /** + * Increment received count + * @param delta + */ + public void incrementReceivedCount(long delta) { + this.received.addAndGet(delta); + } + + /** + * Add the time it takes to process a single datum in milliseconds + * @param processTime + */ + public void addTime(long processTime) { + synchronized (this) { + if(processTime > this.maxTime) { + this.maxTime = processTime; + } } + this.totalTime.addAndGet(processTime); + } - /** - * Increment emitted count - */ - public void incrementEmittedCount() { - this.incrementEmittedCount(1); + @Override + public double getErrorRate() { + if(this.received.get() == 0) { + return 0.0; } - - /** - * Increment emitted count - * @param delta - */ - public void incrementEmittedCount(long delta) { - this.emitted.addAndGet(delta); + return (double) this.errors.get() / (double) this.received.get(); + } + + @Override + public long getNumEmitted() { + return this.emitted.get(); + } + + @Override + public long getNumReceived() { + return this.received.get(); + } + + @Override + public long getNumUnhandledErrors() { + return this.errors.get(); + } + + @Override + public double getAvgTime() { + long rec = this.received.get(); + long emit = this.emitted.get(); + if(rec == 0 && emit == 0 ) { + return 0.0; + } else if( rec == 0) { //provider instance + return this.totalTime.get() / (double) emit; + } else { + return this.totalTime.get() / ((double) this.received.get() - this.errors.get()); } + } - /** - * Increment error count - */ - public void incrementErrorCount() { - this.incrementErrorCount(1); - } - - /** - * Increment error count - * @param delta - */ - public void incrementErrorCount(long delta) { - this.errors.addAndGet(delta); - } - - /** - * Increment received count - */ - public void incrementReceivedCount() { - this.incrementReceivedCount(1); - } - - /** - * Increment received count - * @param delta - */ - public void incrementReceivedCount(long delta) { - this.received.addAndGet(delta); - } - - /** - * Add the time it takes to process a single datum in milliseconds - * @param processTime - */ - public void addTime(long processTime) { - synchronized (this) { - if(processTime > this.maxTime) { - this.maxTime = processTime; - } - } - this.totalTime.addAndGet(processTime); - } - - @Override - public double getErrorRate() { - if(this.received.get() == 0) { - return 0.0; - } - return (double) this.errors.get() / (double) this.received.get(); - } - - @Override - public long getNumEmitted() { - return this.emitted.get(); - } - - @Override - public long getNumReceived() { - return this.received.get(); - } - - @Override - public long getNumUnhandledErrors() { - return this.errors.get(); - } - - @Override - public double getAvgTime() { - long rec = this.received.get(); - long emit = this.emitted.get(); - if(rec == 0 && emit == 0 ) { - return 0.0; - } else if( rec == 0) { //provider instance - return this.totalTime.get() / (double) emit; - } else { - return this.totalTime.get() / ((double) this.received.get() - this.errors.get()); - } - } - - @Override - public long getMaxTime() { - return this.maxTime; - } + @Override + public long getMaxTime() { + return this.maxTime; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java index 8ac2e33..062eb04 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java @@ -22,42 +22,42 @@ package org.apache.streams.local.counters; */ public interface StreamsTaskCounterMXBean { - /** - * Get the error rate of the streams process calculated by the number of errors not handled by the {@link org.apache.streams.local.tasks.StreamsTask} - * divided by the number of datums received. - * @return error rate - */ - public double getErrorRate(); - - /** - * Get the number of {@link org.apache.streams.core.StreamsDatum}s emitted by the streams process - * @return number of emitted datums - */ - public long getNumEmitted(); - - /** - * Get the number of {@link org.apache.streams.core.StreamsDatum}s received by the streams process - * @return number of received datums - */ - public long getNumReceived(); - - /** - * Get the number of errors that the process had to catch because the executing Provider/Processor/Writer did not - * catch and handle the exception - * @return number of handled errors - */ - public long getNumUnhandledErrors(); - - /** - * Returns the average time in milliseconds it takes the task to readCurrent, process, or write to return. - * @return - */ - public double getAvgTime(); - - /** - * Returns the max time in milliseconds it takes the task to readCurrent, process, or write to return. - * @return - */ - public long getMaxTime(); + /** + * Get the error rate of the streams process calculated by the number of errors not handled by the {@link org.apache.streams.local.tasks.StreamsTask} + * divided by the number of datums received. + * @return error rate + */ + public double getErrorRate(); + + /** + * Get the number of {@link org.apache.streams.core.StreamsDatum}s emitted by the streams process + * @return number of emitted datums + */ + public long getNumEmitted(); + + /** + * Get the number of {@link org.apache.streams.core.StreamsDatum}s received by the streams process + * @return number of received datums + */ + public long getNumReceived(); + + /** + * Get the number of errors that the process had to catch because the executing Provider/Processor/Writer did not + * catch and handle the exception + * @return number of handled errors + */ + public long getNumUnhandledErrors(); + + /** + * Returns the average time in milliseconds it takes the task to readCurrent, process, or write to return. + * @return + */ + public double getAvgTime(); + + /** + * Returns the max time in milliseconds it takes the task to readCurrent, process, or write to return. + * @return + */ + public long getMaxTime(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java index c9cfec4..38bda24 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java @@ -19,10 +19,13 @@ package org.apache.streams.local.executors; import org.apache.streams.local.builders.LocalStreamBuilder; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * A fixed ThreadPoolExecutor that will shutdown a stream upon a thread ending execution due to an unhandled throwable. @@ -30,35 +33,35 @@ import java.util.concurrent.*; */ public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor { - private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.class); - private LocalStreamBuilder streamBuilder; - private volatile boolean isStoped; + private LocalStreamBuilder streamBuilder; + private volatile boolean isStoped; - /** - * Creates a fixed size thread pool where corePoolSize & maximumPoolSize equal numThreads with an unbounded queue. - * @param numThreads number of threads in pool - * @param streamBuilder streambuilder to call {@link org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled throwable - */ - public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, LocalStreamBuilder streamBuilder) { - super(numThreads, numThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - this.streamBuilder = streamBuilder; - this.isStoped = false; - } + /** + * Creates a fixed size thread pool where corePoolSize & maximumPoolSize equal numThreads with an unbounded queue. + * @param numThreads number of threads in pool + * @param streamBuilder streambuilder to call {@link org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled throwable + */ + public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, LocalStreamBuilder streamBuilder) { + super(numThreads, numThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + this.streamBuilder = streamBuilder; + this.isStoped = false; + } - @Override - protected void afterExecute(Runnable r, Throwable t) { - if(t != null) { - LOGGER.error("Runnable, {}, exited with an unhandled throwable! : {}", r.getClass(), t); - LOGGER.error("Attempting to shut down stream."); - synchronized (this) { - if (!this.isStoped) { - this.isStoped = true; - this.streamBuilder.stop(); - } - } - } else { - LOGGER.trace("Runnable, {}, finished executing.", r.getClass()); + @Override + protected void afterExecute(Runnable r, Throwable t) { + if(t != null) { + LOGGER.error("Runnable, {}, exited with an unhandled throwable! : {}", r.getClass(), t); + LOGGER.error("Attempting to shut down stream."); + synchronized (this) { + if (!this.isStoped) { + this.isStoped = true; + this.streamBuilder.stop(); } + } + } else { + LOGGER.trace("Runnable, {}, finished executing.", r.getClass()); } + } }
