STREAMS-197 | Merged apache master and handle merge conflict
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f9588905 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f9588905 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f9588905 Branch: refs/heads/master Commit: f9588905deeaa4bed0e3bad0945c626202e4c211 Parents: 0006a04 2c71550 Author: Ryan Ebanks <[email protected]> Authored: Tue Oct 21 18:00:34 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Tue Oct 21 18:00:34 2014 -0500 ---------------------------------------------------------------------- .../streams-persist-elasticsearch/pom.xml | 47 +++++++ .../ElasticsearchMetadataUtil.java | 106 ++++++++++++++++ .../ElasticsearchPersistDeleter.java | 6 +- .../ElasticsearchPersistUpdater.java | 6 +- .../ElasticsearchPersistWriter.java | 47 +------ .../DatumFromMetadataAsDocumentProcessor.java | 122 +++++++++++++++++++ .../processor/DatumFromMetadataProcessor.java | 103 ++++++++++++++++ .../processor/DocumentToMetadataProcessor.java | 100 +++++++++++++++ .../ElasticsearchConfiguration.json | 3 +- .../test/TestDatumFromMetadataProcessor.java | 81 ++++++++++++ .../test/TestDocumentToMetadataProcessor.java | 63 ++++++++++ .../test/TestElasticsearchPersistWriter.java | 70 +++++++++++ .../provider/TwitterTimelineProvider.java | 14 ++- .../provider/TwitterTimelineProviderTest.java | 39 ++++++ .../local/tasks/StreamsProcessorTask.java | 2 +- .../local/builders/LocalStreamBuilderTest.java | 12 ++ ...nhandledThrowableThreadPoolExecutorTest.java | 11 ++ .../queues/ThroughputQueueMulitThreadTest.java | 10 ++ .../queues/ThroughputQueueSingleThreadTest.java | 11 ++ .../streams/local/tasks/BasicTasksTest.java | 10 ++ .../local/tasks/StreamsProviderTaskTest.java | 10 ++ .../tests/TestComponentsLocalStream.java | 13 +- .../tests/TestExpectedDatumsPersitWriter.java | 13 +- .../component/tests/TestFileReaderProvider.java | 12 +- .../org/apache/streams/util/ComponentUtils.java | 18 ++- 25 files changed, 868 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9588905/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java ---------------------------------------------------------------------- diff --cc streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java index bb65c4c,9f73560..169fe91 --- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java +++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java @@@ -108,19 -110,17 +110,33 @@@ public class ComponentUtils } /** + * Removes all mbeans registered undered a specific domain. Made specificly to clean up at unit tests + * @param domain + */ + public static void removeAllMBeansOfDomain(String domain) throws Exception { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + domain = domain.endsWith(":") ? domain : domain+":"; + ObjectName objectName = new ObjectName(domain+"*"); + Set<ObjectName> mbeanNames = mbs.queryNames(objectName, null); + for(ObjectName name : mbeanNames) { + mbs.unregisterMBean(name); + } + } + ++ /** + * Attempts to register an object with local MBeanServer. Throws runtime exception on errors. + * @param name name to register bean with + * @param mbean mbean to register + */ + public static <V> void registerLocalMBean(String name, V mbean) { + try { + ObjectName objectName = new ObjectName(name); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + mbs.registerMBean(mbean, objectName); + } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { + LOGGER.error("Failed to register MXBean : {}", e); + throw new RuntimeException(e); + } + } + }
