Repository: incubator-streams Updated Branches: refs/heads/springcleaning adb43b295 -> ab5165ab7
adding platform-level status counters debugging data leak Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ab5165ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ab5165ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ab5165ab Branch: refs/heads/springcleaning Commit: ab5165ab7f7247fdad27586aa589e560b12a5ef7 Parents: adb43b2 Author: Steve Blackmon <sblack...@w2odigital.com> Authored: Mon Mar 24 15:21:14 2014 -0500 Committer: Steve Blackmon <sblack...@w2odigital.com> Committed: Mon Mar 24 15:21:14 2014 -0500 ---------------------------------------------------------------------- .../streams/hdfs/WebHdfsPersistReader.java | 33 ++++++++++++-------- .../streams/hdfs/WebHdfsPersistReaderTask.java | 23 +++++++++----- .../streams-provider-twitter/pom.xml | 2 +- .../provider/TwitterStreamConfigurator.java | 2 ++ .../twitter/provider/TwitterStreamProvider.java | 22 ++++++++++--- .../com/twitter/TwitterStreamConfiguration.json | 22 +++++++++++++ .../src/main/resources/reference.conf | 3 +- .../apache/streams/core/DatumStatusCounter.java | 25 ++++++++++----- streams-runtimes/streams-runtime-local/pom.xml | 17 ++++++++++ .../local/builders/LocalStreamBuilder.java | 25 +++++++++++++-- .../streams/local/builders/StreamComponent.java | 24 +++++++++++--- .../streams/local/tasks/BaseStreamsTask.java | 1 + .../tasks/LocalStreamProcessMonitorThread.java | 2 +- .../local/tasks/StreamsProviderTask.java | 16 +++++----- 14 files changed, 170 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index b0d9904..3a6ff29 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -1,8 +1,6 @@ package org.apache.streams.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.collect.Queues; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -11,9 +9,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistReader; -import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.*; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,7 +19,6 @@ import java.math.BigInteger; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; -import java.util.Collection; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,7 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue; /** * Created by sblackmon on 2/28/14. */ -public class WebHdfsPersistReader implements StreamsPersistReader { +public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCountable { public final static String STREAMS_ID = "WebHdfsPersistReader"; @@ -52,6 +47,9 @@ public class WebHdfsPersistReader implements StreamsPersistReader { private ExecutorService executor; + protected DatumStatusCounter countersTotal = new DatumStatusCounter(); + protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); + public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) { this.hdfsConfiguration = hdfsConfiguration; } @@ -130,7 +128,8 @@ public class WebHdfsPersistReader implements StreamsPersistReader { } catch (IOException e) { e.printStackTrace(); } - persistQueue = new LinkedBlockingQueue<StreamsDatum>(10000); + persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); + //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue()); executor = Executors.newSingleThreadExecutor(); } @@ -154,12 +153,16 @@ public class WebHdfsPersistReader implements StreamsPersistReader { @Override public StreamsResultSet readCurrent() { - Collection<StreamsDatum> currentIterator = Lists.newArrayList(); - Iterators.addAll(currentIterator, persistQueue.iterator()); - - StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator)); + StreamsResultSet current; - persistQueue.clear(); + synchronized( WebHdfsPersistReader.class ) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + persistQueue.clear(); + } return current; } @@ -174,4 +177,8 @@ public class WebHdfsPersistReader implements StreamsPersistReader { return null; } + @Override + public DatumStatusCounter getDatumStatusCounter() { + return countersTotal; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java index 95a8ef6..b04350e 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java @@ -2,6 +2,7 @@ package org.apache.streams.hdfs; import com.google.common.base.Strings; import org.apache.hadoop.fs.FileStatus; +import org.apache.streams.core.DatumStatus; import org.apache.streams.core.StreamsDatum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,19 +41,16 @@ public class WebHdfsPersistReaderTask implements Runnable { try { line = bufferedReader.readLine(); if( !Strings.isNullOrEmpty(line) ) { + reader.countersCurrent.incrementAttempt(); String[] fields = line.split(Character.toString(reader.DELIMITER)); StreamsDatum entry = new StreamsDatum(fields[3], fields[0]); - boolean success; - do { - success = reader.persistQueue.offer(entry); - Thread.yield(); - } - while( success == false ); - + write( entry ); + reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); } } catch (Exception e) { e.printStackTrace(); LOGGER.warn(e.getMessage()); + reader.countersCurrent.incrementStatus(DatumStatus.FAIL); } } while( !Strings.isNullOrEmpty(line) ); LOGGER.info("Finished Processing " + fileStatus.getPath().getName()); @@ -67,4 +65,15 @@ public class WebHdfsPersistReaderTask implements Runnable { } + private void write( StreamsDatum entry ) { + boolean success; + do { + synchronized( WebHdfsPersistReader.class ) { + success = reader.persistQueue.offer(entry); + } + Thread.yield(); + } + while( !success ); + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index 9a12bbc..aec397b 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -63,7 +63,7 @@ <dependency> <groupId>com.twitter</groupId> <artifactId>hbc-core</artifactId> - <version>1.4.2</version> + <version>2.0.0</version> </dependency> <dependency> <groupId>org.twitter4j</groupId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java index 7bb7048..2ae8d59 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java @@ -45,6 +45,8 @@ public class TwitterStreamConfigurator { twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level")); twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint")); + twitterStreamConfiguration.setWith(twitter.getString("with")); + twitterStreamConfiguration.setReplies(twitter.getString("replies")); twitterStreamConfiguration.setJsonStoreEnabled("true"); twitterStreamConfiguration.setIncludeEntities("true"); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java index b0b4cf4..e9ce10e 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java @@ -13,6 +13,7 @@ import com.twitter.hbc.core.endpoint.StreamingEndpoint; import com.twitter.hbc.core.processor.StringDelimitedProcessor; import com.twitter.hbc.httpclient.BasicClient; import com.twitter.hbc.httpclient.auth.Authentication; +import com.twitter.hbc.httpclient.auth.BasicAuth; import com.twitter.hbc.httpclient.auth.OAuth1; import com.typesafe.config.Config; import org.apache.streams.config.StreamsConfigurator; @@ -143,10 +144,23 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable { else return; - Authentication auth = new OAuth1(config.getOauth().getConsumerKey(), - config.getOauth().getConsumerSecret(), - config.getOauth().getAccessToken(), - config.getOauth().getAccessTokenSecret()); + Authentication auth; + if( config.getOauth() != null ) { + auth = new OAuth1(config.getOauth().getConsumerKey(), + config.getOauth().getConsumerSecret(), + config.getOauth().getAccessToken(), + config.getOauth().getAccessTokenSecret()); + } else if( config.getBasicauth() != null ) { + auth = new BasicAuth( + config.getBasicauth().getUsername(), + config.getBasicauth().getPassword() + ); + } else { + return; + } + + endpoint.addPostParameter("with", config.getWith()); + endpoint.addPostParameter("replies", config.getReplies()); client = new ClientBuilder() .name("apache/streams/streams-contrib/streams-provider-twitter") http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json index 087f8fd..c1a0d0c 100644 --- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json @@ -38,6 +38,14 @@ "type": "string", "description": "Setting this parameter to one of none, low, or medium will set the minimum value of the filter_level Tweet attribute required to be included in the stream" }, + "with": { + "type": "string", + "description": "Typically following or user" + }, + "replies": { + "type": "string", + "description": "Set to all, to see all @replies" + }, "follow": { "type": "array", "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream", @@ -74,6 +82,20 @@ "type": "string" } } + }, + "basicauth": { + "type": "object", + "dynamic": "true", + "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "username": { + "type": "string" + }, + "password": { + "type": "string" + } + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf index 49555fc..d437db8 100644 --- a/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf +++ b/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf @@ -8,5 +8,6 @@ twitter { oauth { appName = "Apache Streams" } - + with = "user" + replies = "all" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java index 7798fcd..8730d73 100644 --- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java +++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java @@ -2,40 +2,51 @@ package org.apache.streams.core; public class DatumStatusCounter { + private volatile int attempted = 0; private volatile int success = 0; private volatile int fail = 0; private volatile int partial = 0; - private volatile int recordsEmitted = 0; + private volatile int emitted = 0; + public int getAttempted() { return this.attempted; } public int getSuccess() { return this.success; } public int getFail() { return this.fail; } public int getPartial() { return this.partial; } - public int getEmitted() { return this.recordsEmitted; } + public int getEmitted() { return this.emitted; } + + public DatumStatusCounter() { + } public void add(DatumStatusCounter datumStatusCounter) { + this.attempted += datumStatusCounter.getAttempted(); this.success += datumStatusCounter.getSuccess(); this.partial = datumStatusCounter.getPartial(); this.fail += datumStatusCounter.getFail(); - this.recordsEmitted += datumStatusCounter.getEmitted(); + this.emitted += datumStatusCounter.getEmitted(); + } + + public void incrementAttempt() { + this.attempted += 1; } - public void add(DatumStatus workStatus) { + public synchronized void incrementStatus(DatumStatus workStatus) { // add this to the record counter switch(workStatus) { case SUCCESS: this.success++; break; case PARTIAL: this.partial++; break; case FAIL: this.fail++; break; } - this.recordsEmitted += 1; + this.emitted += 1; } @Override public String toString() { return "DatumStatusCounter{" + - "success=" + success + + "attempted=" + attempted + + ", success=" + success + ", fail=" + fail + ", partial=" + partial + - ", recordsEmitted=" + recordsEmitted + + ", emitted=" + emitted + '}'; } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml index fa64225..d9f6d51 100644 --- a/streams-runtimes/streams-runtime-local/pom.xml +++ b/streams-runtimes/streams-runtime-local/pom.xml @@ -32,6 +32,18 @@ <dependencies> <dependency> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-core</artifactId> + <type>jar</type> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + + <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-core</artifactId> <version>0.1-SNAPSHOT</version> @@ -41,6 +53,11 @@ <artifactId>streams-util</artifactId> <version>0.1-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + <version>0.1-SNAPSHOT</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index 444c2e1..d570573 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -2,6 +2,7 @@ package org.apache.streams.local.builders; import org.apache.streams.core.*; import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread; +import org.apache.streams.local.tasks.StatusCounterMonitorThread; import org.apache.streams.local.tasks.StreamsProviderTask; import org.apache.streams.local.tasks.StreamsTask; import org.apache.streams.util.SerializationUtil; @@ -28,6 +29,7 @@ public class LocalStreamBuilder implements StreamBuilder { private ExecutorService executor; private ExecutorService monitor; private int totalTasks; + private int monitorTasks; private LocalStreamProcessMonitorThread monitorThread; /** @@ -64,6 +66,7 @@ public class LocalStreamBuilder implements StreamBuilder { this.components = new HashMap<String, StreamComponent>(); this.streamConfig = streamConfig; this.totalTasks = 0; + this.monitorTasks = 0; } @Override @@ -71,6 +74,8 @@ public class LocalStreamBuilder implements StreamBuilder { validateId(id); this.providers.put(id, new StreamComponent(id, provider, true)); ++this.totalTasks; + if( provider instanceof DatumStatusCountable ) + ++this.monitorTasks; return this; } @@ -79,6 +84,8 @@ public class LocalStreamBuilder implements StreamBuilder { validateId(id); this.providers.put(id, new StreamComponent(id, provider, false)); ++this.totalTasks; + if( provider instanceof DatumStatusCountable ) + ++this.monitorTasks; return this; } @@ -87,6 +94,8 @@ public class LocalStreamBuilder implements StreamBuilder { validateId(id); this.providers.put(id, new StreamComponent(id, provider, sequence)); ++this.totalTasks; + if( provider instanceof DatumStatusCountable ) + ++this.monitorTasks; return this; } @@ -95,6 +104,8 @@ public class LocalStreamBuilder implements StreamBuilder { validateId(id); this.providers.put(id, new StreamComponent(id, provider, start, end)); ++this.totalTasks; + if( provider instanceof DatumStatusCountable ) + ++this.monitorTasks; return this; } @@ -105,6 +116,8 @@ public class LocalStreamBuilder implements StreamBuilder { this.components.put(id, comp); connectToOtherComponents(inBoundIds, comp); this.totalTasks += numTasks; + if( processor instanceof DatumStatusCountable ) + ++this.monitorTasks; return this; } @@ -115,6 +128,8 @@ public class LocalStreamBuilder implements StreamBuilder { this.components.put(id, comp); connectToOtherComponents(inBoundIds, comp); this.totalTasks += numTasks; + if( writer instanceof DatumStatusCountable ) + ++this.monitorTasks; return this; } @@ -125,11 +140,11 @@ public class LocalStreamBuilder implements StreamBuilder { public void start() { boolean isRunning = true; this.executor = Executors.newFixedThreadPool(this.totalTasks); - this.monitor = Executors.newSingleThreadExecutor(); + this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1); Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>(); Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, List<StreamsTask>>(); - monitorThread = new LocalStreamProcessMonitorThread(this.monitor, 1000); try { + monitorThread = new LocalStreamProcessMonitorThread(executor, 10); this.monitor.submit(monitorThread); for(StreamComponent comp : this.components.values()) { int tasks = comp.getNumTasks(); @@ -139,6 +154,8 @@ public class LocalStreamBuilder implements StreamBuilder { task.setStreamConfig(this.streamConfig); this.executor.submit(task); compTasks.add(task); + if( comp.isOperationCountable() ) + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)comp.getOperation(), 10)); } streamsTasks.put(comp.getId(), compTasks); } @@ -147,6 +164,10 @@ public class LocalStreamBuilder implements StreamBuilder { task.setStreamConfig(this.streamConfig); this.executor.submit(task); provTasks.put(prov.getId(), (StreamsProviderTask) task); + if( prov.isOperationCountable() ) { + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10)); + this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); + } } while(isRunning) { isRunning = false; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java index ecfb22d..6319ba8 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java @@ -1,9 +1,6 @@ package org.apache.streams.local.builders; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.*; import org.apache.streams.local.tasks.StreamsPersistWriterTask; import org.apache.streams.local.tasks.StreamsProcessorTask; import org.apache.streams.local.tasks.StreamsProviderTask; @@ -226,4 +223,23 @@ public class StreamComponent { else return false; } + + protected StreamsOperation getOperation() { + if(this.processor != null) { + return (StreamsOperation) this.processor; + } + else if(this.writer != null) { + return (StreamsOperation) this.writer; + } + else if(this.provider != null) { + return (StreamsOperation) this.provider; + } + else { + throw new InvalidStreamException("Underlying StreamComponoent was NULL."); + } + } + + protected boolean isOperationCountable() { + return getOperation() instanceof DatumStatusCountable; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index 5f2620b..b9af0fd 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -145,4 +145,5 @@ public abstract class BaseStreamsTask implements StreamsTask { } return this.inIndex; } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java index 1325fd6..0b254b6 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java @@ -46,7 +46,7 @@ public class LocalStreamProcessMonitorThread implements Runnable String usedMemory = humanReadableByteCount(memoryUsage.getUsed(), true); - LOGGER.info("[monitor] Used Memory: {}, Max: {}", + LOGGER.debug("[monitor] Used Memory: {}, Max: {}", usedMemory, maxMemory); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab5165ab/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java index b4c929d..7b6792f 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java @@ -1,9 +1,6 @@ package org.apache.streams.local.tasks; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.*; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,10 +13,15 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * */ -public class StreamsProviderTask extends BaseStreamsTask { +public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusCountable { private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class); + @Override + public DatumStatusCounter getDatumStatusCounter() { + return this.statusCounter; + } + private static enum Type { PERPETUAL, READ_CURRENT, @@ -41,7 +43,7 @@ public class StreamsProviderTask extends BaseStreamsTask { private AtomicBoolean isRunning; private int zeros = 0; - private DatumStatusCounter statusCounter; + private DatumStatusCounter statusCounter = new DatumStatusCounter(); /** * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()} @@ -119,7 +121,7 @@ public class StreamsProviderTask extends BaseStreamsTask { zeros = 0; if( resultSet.getCounter() != null ) { LOGGER.debug(resultSet.getCounter().toString()); - statusCounter.add(resultSet.getCounter()); + this.statusCounter.add(resultSet.getCounter()); } } flushResults(resultSet);