Merged apache master
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/377fb897 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/377fb897 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/377fb897 Branch: refs/heads/STREAMS-49 Commit: 377fb89717f34e6e109cf2655d6e3e335a494499 Parents: 015fade 5c2e832 Author: Ryan Ebanks <[email protected]> Authored: Fri Oct 31 16:32:16 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Fri Oct 31 16:32:16 2014 -0500 ---------------------------------------------------------------------- streams-contrib/streams-amazon-aws/pom.xml | 2 +- .../org/apache/streams/s3/S3Configurator.java | 55 ++++----- .../org/apache/streams/s3/S3PersistReader.java | 11 +- .../org/apache/streams/s3/S3PersistWriter.java | 10 +- .../org/apache/streams/s3/S3Configuration.json | 18 ++- .../streams-persist-elasticsearch/pom.xml | 47 +++++++ .../ElasticsearchMetadataUtil.java | 106 ++++++++++++++++ .../ElasticsearchPersistDeleter.java | 6 +- .../ElasticsearchPersistUpdater.java | 6 +- .../ElasticsearchPersistWriter.java | 47 +------ .../DatumFromMetadataAsDocumentProcessor.java | 122 +++++++++++++++++++ .../processor/DatumFromMetadataProcessor.java | 103 ++++++++++++++++ .../processor/DocumentToMetadataProcessor.java | 100 +++++++++++++++ .../ElasticsearchConfiguration.json | 3 +- .../test/TestDatumFromMetadataProcessor.java | 81 ++++++++++++ .../test/TestDocumentToMetadataProcessor.java | 63 ++++++++++ .../test/TestElasticsearchPersistWriter.java | 70 +++++++++++ .../google-gplus/pom.xml | 2 +- .../processor/GooglePlusTypeConverter.java | 91 ++++++++++++++ .../util/GPlusPersonDeserializer.java | 107 ++++++++++++++++ .../serializer/util/GooglePlusActivityUtil.java | 115 +++++++++++++++++ .../google/gplus/GooglePlusPersonSerDeTest.java | 97 +++++++++++++++ .../processor/GooglePlusTypeConverterTest.java | 98 +++++++++++++++ .../test/resources/google_plus_person_jsons.txt | 2 + .../provider/TwitterTimelineProvider.java | 14 ++- .../provider/TwitterTimelineProviderTest.java | 39 ++++++ streams-runtimes/streams-runtime-local/pom.xml | 6 +- .../local/counters/DatumStatusCounter.java | 15 +-- .../local/counters/StreamsTaskCounter.java | 10 +- .../local/tasks/StreamsProcessorTask.java | 3 +- .../local/builders/LocalStreamBuilderTest.java | 12 ++ ...nhandledThrowableThreadPoolExecutorTest.java | 11 ++ .../queues/ThroughputQueueMulitThreadTest.java | 10 ++ .../queues/ThroughputQueueSingleThreadTest.java | 11 ++ .../streams/local/tasks/BasicTasksTest.java | 15 +-- .../local/tasks/StreamsProviderTaskTest.java | 10 ++ .../tests/TestComponentsLocalStream.java | 13 +- .../tests/TestExpectedDatumsPersitWriter.java | 13 +- .../component/tests/TestFileReaderProvider.java | 12 +- .../org/apache/streams/util/ComponentUtils.java | 33 +++++ 40 files changed, 1455 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/377fb897/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java ---------------------------------------------------------------------- diff --cc streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java index 8d66847,1bb565d..33c5827 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java @@@ -119,15 -117,11 +119,14 @@@ public class StreamsProcessorTask exten Thread.currentThread().interrupt(); } if(datum != null) { + this.counter.incrementReceivedCount(); try { + long startTime = System.currentTimeMillis(); List<StreamsDatum> output = this.processor.process(datum); + this.counter.addTime(System.currentTimeMillis() - startTime); if(output != null) { for(StreamsDatum outDatum : output) { - super.addToOutgoingQueue(datum); - this.counter.incrementEmittedCount(); + super.addToOutgoingQueue(outDatum); statusCounter.incrementStatus(DatumStatus.SUCCESS); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/377fb897/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/377fb897/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java ---------------------------------------------------------------------- diff --cc streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java index f62250d,2d28602..a0e28cd --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java @@@ -42,18 -38,12 +43,13 @@@ import static org.junit.Assert.* public class BasicTasksTest { + private static final String MBEAN_ID = "test_bean"; - - /** - * Remove registered mbeans from previous tests - * @throws Exception - */ @After - public void unregisterMXBean() throws Exception { + public void removeLocalMBeans() { try { - ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID))); - } catch (InstanceNotFoundException ife) { - //No-op + ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); + } catch (Exception e) { + //No op. proceed to next test } }
