This is an automated email from the ASF dual-hosted git repository.
rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 2f2c802 METRON-2026 Remove Storm dependency from metron-common
(merrimanr) closes apache/metron#1351
2f2c802 is described below
commit 2f2c80244b34e8182f72e6f4dc98a2940194eaa0
Author: merrimanr <[email protected]>
AuthorDate: Mon Apr 15 16:40:51 2019 -0500
METRON-2026 Remove Storm dependency from metron-common (merrimanr) closes
apache/metron#1351
---
.../metron/profiler/storm/ProfileSplitterBolt.java | 2 +-
.../metron-common-storm/pom.xml | 89 ++++++++++++++++++++++
.../metron/storm}/common/bolt/ConfiguredBolt.java | 2 +-
.../common/bolt/ConfiguredEnrichmentBolt.java | 2 +-
.../storm}/common/bolt/ConfiguredIndexingBolt.java | 2 +-
.../storm}/common/bolt/ConfiguredParserBolt.java | 2 +-
.../storm}/common/bolt/ConfiguredProfilerBolt.java | 2 +-
.../storm}/common/message/BytesFromPosition.java | 2 +-
.../storm}/common/message/JSONFromField.java | 2 +-
.../common/message/JSONFromFieldByReference.java | 2 +-
.../storm}/common/message/JSONFromPosition.java | 2 +-
.../storm}/common/message/MessageGetStrategy.java | 2 +-
.../storm}/common/message/MessageGetters.java | 2 +-
.../storm}/common/message/ObjectFromField.java | 2 +-
.../common/message/metadata/RawMessageUtil.java} | 65 ++++++----------
.../storm/common/utils/StormErrorUtils.java} | 32 +++++---
.../storm}/common/bolt/BaseConfiguredBoltTest.java | 2 +-
.../common/bolt/ConfiguredEnrichmentBoltTest.java | 16 ++--
.../common/bolt/ConfiguredParserBoltTest.java | 13 ++--
.../storm}/common/message/MessageGettersTest.java | 2 +-
.../message/metadata/RawMessageUtilTest.java | 9 ++-
.../metron/storm}/common/utils/ErrorUtilsTest.java | 5 +-
metron-platform/metron-common-streaming/pom.xml | 33 ++++++++
metron-platform/metron-common/pom.xml | 75 ++----------------
.../apache/metron/common/error/MetronError.java | 4 +-
.../common/message/metadata/MetadataUtil.java | 53 -------------
.../common/message/metadata/RawMessageUtil.java | 59 --------------
.../org/apache/metron/common/utils/ErrorUtils.java | 20 -----
.../metron/common/writer/BulkMessageWriter.java | 3 +-
.../elasticsearch/writer/ElasticsearchWriter.java | 3 +-
.../writer/ElasticsearchWriterTest.java | 18 ++---
.../metron/enrichment/bolt/EnrichmentJoinBolt.java | 2 +-
.../enrichment/bolt/GenericEnrichmentBolt.java | 10 +--
.../apache/metron/enrichment/bolt/JoinBolt.java | 10 +--
.../apache/metron/enrichment/bolt/SplitBolt.java | 2 +-
.../enrichment/bolt/ThreatIntelJoinBolt.java | 2 +-
.../enrichment/bolt/UnifiedEnrichmentBolt.java | 12 +--
.../enrichment/bolt/EnrichmentJoinBoltTest.java | 2 +-
.../metron/enrichment/bolt/JoinBoltTest.java | 2 +-
.../enrichment/bolt/ThreatIntelJoinBoltTest.java | 2 +-
.../metron-parsing/metron-parsing-storm/pom.xml | 2 +-
.../org/apache/metron/parsers/bolt/ParserBolt.java | 14 ++--
.../org/apache/metron/parsers/bolt/WriterBolt.java | 8 +-
.../apache/metron/parsers/bolt/WriterHandler.java | 4 +-
.../apache/metron/parsers/bolt/ParserBoltTest.java | 2 +-
.../apache/metron/parsers/bolt/WriterBoltTest.java | 8 +-
.../integration/validation/StormParserDriver.java | 2 +-
.../org/apache/metron/solr/writer/SolrWriter.java | 3 +-
.../schema/SchemaValidationIntegrationTest.java | 2 +-
.../apache/metron/solr/writer/SolrWriterTest.java | 4 +-
metron-platform/metron-writer/pom.xml | 2 +-
.../org/apache/metron/writer/AckTuplesPolicy.java | 2 +-
.../java/org/apache/metron/writer/NoopWriter.java | 3 +-
.../apache/metron/writer/WriterToBulkWriter.java | 3 +-
.../metron/writer/bolt/BulkMessageWriterBolt.java | 16 ++--
.../writer/hbase/SimpleHbaseEnrichmentWriter.java | 3 +-
.../org/apache/metron/writer/hdfs/HdfsWriter.java | 7 +-
.../apache/metron/writer/kafka/KafkaWriter.java | 3 +-
.../apache/metron/writer/AckTuplesPolicyTest.java | 2 +-
.../metron/writer/BulkWriterComponentTest.java | 1 -
.../writer/bolt/BulkMessageWriterBoltTest.java | 12 ++-
.../apache/metron/writer/hdfs/HdfsWriterTest.java | 51 ++++++++-----
metron-platform/pom.xml | 1 +
63 files changed, 331 insertions(+), 395 deletions(-)
diff --git
a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
index ef58ad9..2161910 100644
---
a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
+++
b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
@@ -20,7 +20,7 @@
package org.apache.metron.profiler.storm;
-import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredProfilerBolt;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.profiler.DefaultMessageRouter;
import org.apache.metron.profiler.MessageRoute;
diff --git
a/metron-platform/metron-common-streaming/metron-common-storm/pom.xml
b/metron-platform/metron-common-streaming/metron-common-storm/pom.xml
new file mode 100644
index 0000000..a063163
--- /dev/null
+++ b/metron-platform/metron-common-streaming/metron-common-storm/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See
the
+ NOTICE file distributed with this work for additional information
regarding
+ copyright ownership. The ASF licenses this file to You under the Apache
License,
+ Version 2.0 (the "License"); you may not use this file except in
compliance
+ with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License
for
+ the specific language governing permissions and limitations under the
License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-common-streaming</artifactId>
+ <version>0.7.1</version>
+ </parent>
+ <artifactId>metron-common-storm</artifactId>
+ <name>metron-common-storm</name>
+ <description>Components common to Storm</description>
+ <url>https://metron.apache.org/</url>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <commons.config.version>1.10</commons.config.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-core</artifactId>
+ <version>${global_flux_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ <version>${global_curator_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${global_curator_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-test-utilities</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredBolt.java
similarity index 99%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredBolt.java
index 221edad..a437574 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredBolt.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
import java.lang.invoke.MethodHandles;
import java.util.Map;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBolt.java
similarity index 97%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBolt.java
index 2e03a36..bde85bb 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBolt.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredIndexingBolt.java
similarity index 96%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredIndexingBolt.java
index 27e081e..2d650f0 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredIndexingBolt.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
import java.lang.invoke.MethodHandles;
import org.apache.metron.common.configuration.IndexingConfigurations;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredParserBolt.java
similarity index 97%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredParserBolt.java
index 17b614b..5074271 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredParserBolt.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredProfilerBolt.java
similarity index 97%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredProfilerBolt.java
index e4d9f7b..b4207c2 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/bolt/ConfiguredProfilerBolt.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
import java.lang.invoke.MethodHandles;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/BytesFromPosition.java
similarity index 96%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/BytesFromPosition.java
index 56c6490..b47f43c 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/BytesFromPosition.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
import org.apache.storm.tuple.Tuple;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromField.java
similarity index 96%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromField.java
index 39fe9dd..b8df105 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromField.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromFieldByReference.java
similarity index 96%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromFieldByReference.java
index a0d4b7d..1868bc7 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromFieldByReference.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromPosition.java
similarity index 97%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromPosition.java
index 15f0447..b81373e 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/JSONFromPosition.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
import org.apache.commons.io.Charsets;
import org.apache.storm.tuple.Tuple;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetStrategy.java
similarity index 95%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetStrategy.java
index 0595ce1..dfe8fff 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetStrategy.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
import org.apache.storm.tuple.Tuple;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetters.java
similarity index 98%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetters.java
index 46bb406..36ad1c9 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/MessageGetters.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
import org.apache.metron.stellar.common.utils.ConversionUtils;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/ObjectFromField.java
similarity index 96%
copy from
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
copy to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/ObjectFromField.java
index 120c09c..dced7d6 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/ObjectFromField.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
import org.apache.storm.tuple.Tuple;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/metadata/RawMessageUtil.java
similarity index 67%
copy from
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
copy to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/metadata/RawMessageUtil.java
index 3034ddd..afd3a87 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/message/metadata/RawMessageUtil.java
@@ -15,9 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.message.metadata;
+package org.apache.metron.storm.common.message.metadata;
import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.message.metadata.MetadataUtil;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.message.metadata.RawMessageStrategy;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
@@ -29,51 +32,29 @@ import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
-/**
- * Captures some common utility methods around metadata manipulation.
- */
-public enum MetadataUtil {
+public enum RawMessageUtil {
+
INSTANCE;
private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- /**
- * The default metadata prefix.
- */
- public static final String METADATA_PREFIX = "metron.metadata";
- /**
- * The config key for defining the prefix.
- */
- public static final String METADATA_PREFIX_CONFIG = "metadataPrefix";
- static final int KEY_INDEX = 1;
- /**
- * Return the prefix that we want to use for metadata keys. This comes from
the config and is defaulted to
- * 'metron.metadata'.
- *
- * @param config The rawMessageStrategyConfig
- * @return the prefix for metadata keys
- */
- public String getMetadataPrefix(Map<String, Object> config) {
- String prefix = (String) config.getOrDefault(METADATA_PREFIX_CONFIG,
METADATA_PREFIX);
- if(StringUtils.isEmpty(prefix)) {
- return null;
- }
- return prefix;
- }
+ static final int KEY_INDEX = 1;
/**
- * Take a field and prefix it with the metadata key.
- *
- * @param prefix The metadata prefix to use (e.g. 'foo')
- * @param key The key name (e.g. my_field)
- * @return The prefixed key separated by a . (e.g. foo.my_field)
+ * Extract the raw message given the strategy, the tuple and the metadata
configs.
+ * @param strategy The {@link RawMessageStrategy} to use for extraction
+ * @param t The tuple to pull the message from
+ * @param rawMessage The raw message in bytes
+ * @param readMetadata True if read metadata, false otherwise
+ * @param config The config to use during extraction
+ * @return The resulting {@link RawMessage}
*/
- public String prefixKey(String prefix, String key) {
- if(StringUtils.isEmpty(prefix)) {
- return key;
- }
- else {
- return prefix + "." + key;
+ public RawMessage getRawMessage(RawMessageStrategy strategy, Tuple t, byte[]
rawMessage, boolean readMetadata, Map<String, Object> config) {
+ Map<String, Object> metadata = new HashMap<>();
+ if(readMetadata) {
+ String prefix = MetadataUtil.INSTANCE.getMetadataPrefix(config);
+ metadata = extractMetadata(prefix, t);
}
+ return strategy.get(metadata, rawMessage, readMetadata, config);
}
/**
@@ -101,7 +82,7 @@ public enum MetadataUtil {
String envMetadataFieldName = tupleFields.get(i);
Object envMetadataFieldValue = t.getValue(i);
if (!StringUtils.isEmpty(envMetadataFieldName) && envMetadataFieldValue
!= null) {
- metadata.put(prefixKey(prefix, envMetadataFieldName),
envMetadataFieldValue);
+ metadata.put(MetadataUtil.INSTANCE.prefixKey(prefix,
envMetadataFieldName), envMetadataFieldValue);
}
}
byte[] keyObj = t.getBinary(KEY_INDEX);
@@ -111,7 +92,7 @@ public enum MetadataUtil {
if (!StringUtils.isEmpty(keyStr)) {
Map<String, Object> rawMetadata = JSONUtils.INSTANCE.load(keyStr,
JSONUtils.MAP_SUPPLIER);
for (Map.Entry<String, Object> kv : rawMetadata.entrySet()) {
- metadata.put(prefixKey(prefix, kv.getKey()), kv.getValue());
+ metadata.put(MetadataUtil.INSTANCE.prefixKey(prefix, kv.getKey()),
kv.getValue());
}
}
@@ -122,4 +103,6 @@ public enum MetadataUtil {
}
return metadata;
}
+
+
}
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/utils/StormErrorUtils.java
similarity index 51%
rename from
metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/utils/StormErrorUtils.java
index 120c09c..cd7f93d 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/common/utils/StormErrorUtils.java
@@ -15,22 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.message;
-import org.apache.storm.tuple.Tuple;
+package org.apache.metron.storm.common.utils;
-public class ObjectFromField implements MessageGetStrategy {
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.error.MetronError;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
- private String fieldValue = "message";
+import java.util.Optional;
- public ObjectFromField() {};
+public class StormErrorUtils {
+ /**
+ * Handles a {@link MetronError} that occurs.
+ *
+ * @param collector The Storm output collector being reported to
+ * @param error The error that occurred
+ */
+ public static void handleError(OutputCollector collector, MetronError error)
+ {
+ collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
+ Optional<Throwable> throwable = error.getThrowable();
+ if (throwable.isPresent()) {
+ collector.reportError(throwable.get());
+ }
- public ObjectFromField(String fieldValue) {
- this.fieldValue = fieldValue;
- }
-
- @Override
- public Object get(Tuple tuple) {
- return tuple.getValueByField(fieldValue);
}
}
diff --git
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/BaseConfiguredBoltTest.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/BaseConfiguredBoltTest.java
similarity index 97%
rename from
metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/BaseConfiguredBoltTest.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/BaseConfiguredBoltTest.java
index f9901cd..ac2d1e8 100644
---
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/BaseConfiguredBoltTest.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/BaseConfiguredBoltTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
import org.apache.metron.test.bolt.BaseBoltTest;
import org.junit.Assert;
diff --git
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBoltTest.java
similarity index 91%
rename from
metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBoltTest.java
index 44612cd..256415a 100644
---
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredEnrichmentBoltTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
import org.apache.log4j.Level;
import org.apache.metron.test.utils.UnitTestHelper;
@@ -37,6 +37,10 @@ import java.util.Set;
public class ConfiguredEnrichmentBoltTest extends BaseConfiguredBoltTest {
+ private static final String sampleConfigPath = "../" +
TestConstants.SAMPLE_CONFIG_PATH;
+ private static final String enrichmentsConfigPath = "../" +
TestConstants.ENRICHMENTS_CONFIGS_PATH;
+ private static final String parserConfigsPath = "../" +
TestConstants.PARSER_CONFIGS_PATH;
+
private Set<String> enrichmentConfigurationTypes = new HashSet<>();
private String zookeeperUrl;
@@ -64,15 +68,15 @@ public class ConfiguredEnrichmentBoltTest extends
BaseConfiguredBoltTest {
public void setupConfiguration() throws Exception {
TestingServer testZkServer = new TestingServer(true);
this.zookeeperUrl = testZkServer.getConnectString();
- byte[] globalConfig =
ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+ byte[] globalConfig =
ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigPath);
ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig,
zookeeperUrl);
enrichmentConfigurationTypes.add(ConfigurationType.GLOBAL.getTypeName());
- Map<String, byte[]> sensorEnrichmentConfigs =
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.ENRICHMENTS_CONFIGS_PATH);
+ Map<String, byte[]> sensorEnrichmentConfigs =
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(enrichmentsConfigPath);
for (String sensorType : sensorEnrichmentConfigs.keySet()) {
ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType,
sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
enrichmentConfigurationTypes.add(sensorType);
}
- Map<String, byte[]> sensorParserConfigs =
ConfigurationsUtils.readSensorParserConfigsFromFile(TestConstants.PARSER_CONFIGS_PATH);
+ Map<String, byte[]> sensorParserConfigs =
ConfigurationsUtils.readSensorParserConfigsFromFile(parserConfigsPath);
for (String sensorType : sensorParserConfigs.keySet()) {
ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensorType,
sensorParserConfigs.get(sensorType), zookeeperUrl);
}
@@ -90,8 +94,8 @@ public class ConfiguredEnrichmentBoltTest extends
BaseConfiguredBoltTest {
UnitTestHelper.setLog4jLevel(ConfiguredBolt.class, Level.ERROR);
configsUpdated = new HashSet<>();
-
sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
- Map<String, byte[]> sensorEnrichmentConfigs =
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.ENRICHMENTS_CONFIGS_PATH);
+
sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigPath));
+ Map<String, byte[]> sensorEnrichmentConfigs =
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(enrichmentsConfigPath);
for (String sensorType : sensorEnrichmentConfigs.keySet()) {
sampleConfigurations.updateSensorEnrichmentConfig(sensorType,
sensorEnrichmentConfigs.get(sensorType));
}
diff --git
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredParserBoltTest.java
similarity index 94%
rename from
metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredParserBoltTest.java
index 3deba78..66668d1 100644
---
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/bolt/ConfiguredParserBoltTest.java
@@ -15,11 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.bolt;
+package org.apache.metron.storm.common.bolt;
import org.apache.log4j.Level;
import org.apache.metron.common.configuration.FieldValidator;
-import org.apache.metron.common.field.validation.FieldValidation;
import org.apache.metron.test.utils.UnitTestHelper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
@@ -67,14 +66,14 @@ public class ConfiguredParserBoltTest extends
BaseConfiguredBoltTest {
public void setupConfiguration() throws Exception {
TestingServer testZkServer = new TestingServer(true);
this.zookeeperUrl = testZkServer.getConnectString();
- byte[] globalConfig =
ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+ byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile("../" +
TestConstants.SAMPLE_CONFIG_PATH);
ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig,
zookeeperUrl);
parserConfigurationTypes.add(ConfigurationType.GLOBAL.getTypeName());
- Map<String, byte[]> sensorEnrichmentConfigs =
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.ENRICHMENTS_CONFIGS_PATH);
+ Map<String, byte[]> sensorEnrichmentConfigs =
ConfigurationsUtils.readSensorEnrichmentConfigsFromFile("../" +
TestConstants.ENRICHMENTS_CONFIGS_PATH);
for (String sensorType : sensorEnrichmentConfigs.keySet()) {
ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType,
sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
}
- Map<String, byte[]> sensorParserConfigs =
ConfigurationsUtils.readSensorParserConfigsFromFile(TestConstants.PARSER_CONFIGS_PATH);
+ Map<String, byte[]> sensorParserConfigs =
ConfigurationsUtils.readSensorParserConfigsFromFile("../" +
TestConstants.PARSER_CONFIGS_PATH);
for (String sensorType : sensorParserConfigs.keySet()) {
ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensorType,
sensorParserConfigs.get(sensorType), zookeeperUrl);
parserConfigurationTypes.add(sensorType);
@@ -93,8 +92,8 @@ public class ConfiguredParserBoltTest extends
BaseConfiguredBoltTest {
UnitTestHelper.setLog4jLevel(ConfiguredBolt.class, Level.ERROR);
configsUpdated = new HashSet<>();
-
sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
- Map<String, byte[]> sensorParserConfigs =
ConfigurationsUtils.readSensorParserConfigsFromFile(TestConstants.PARSER_CONFIGS_PATH);
+
sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile("../"
+ TestConstants.SAMPLE_CONFIG_PATH));
+ Map<String, byte[]> sensorParserConfigs =
ConfigurationsUtils.readSensorParserConfigsFromFile("../" +
TestConstants.PARSER_CONFIGS_PATH);
for (String sensorType : sensorParserConfigs.keySet()) {
sampleConfigurations.updateSensorParserConfig(sensorType,
sensorParserConfigs.get(sensorType));
}
diff --git
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/MessageGettersTest.java
similarity index 99%
rename from
metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/MessageGettersTest.java
index ea7583a..aa3c0a0 100644
---
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/MessageGettersTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.message;
+package org.apache.metron.storm.common.message;
import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;
diff --git
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/metadata/RawMessageUtilTest.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/metadata/RawMessageUtilTest.java
similarity index 96%
rename from
metron-platform/metron-common/src/test/java/org/apache/metron/common/message/metadata/RawMessageUtilTest.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/metadata/RawMessageUtilTest.java
index e5fd80f..cf51f02 100644
---
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/metadata/RawMessageUtilTest.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/message/metadata/RawMessageUtilTest.java
@@ -15,18 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.message.metadata;
+package org.apache.metron.storm.common.message.metadata;
-import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.Constants;
+import org.apache.metron.common.message.metadata.EnvelopedRawMessageStrategy;
+import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.message.metadata.RawMessageStrategies;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.metron.common.message.metadata.MetadataUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -54,7 +57,7 @@ public class RawMessageUtilTest {
}
when(t.getFields()).thenReturn(f);
-
when(t.getBinary(eq(MetadataUtil.KEY_INDEX))).thenReturn(metadata.getBytes());
+
when(t.getBinary(eq(RawMessageUtil.KEY_INDEX))).thenReturn(metadata.getBytes());
return t;
}
diff --git
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/utils/ErrorUtilsTest.java
similarity index 95%
rename from
metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
rename to
metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/utils/ErrorUtilsTest.java
index 77ea9da..037a06c 100644
---
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
+++
b/metron-platform/metron-common-streaming/metron-common-storm/src/test/java/org/apache/metron/storm/common/utils/ErrorUtilsTest.java
@@ -15,10 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.metron.common.utils;
+package org.apache.metron.storm.common.utils;
import org.apache.metron.common.Constants;
import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.test.error.MetronErrorJSONMatcher;
import org.apache.storm.task.OutputCollector;
import org.junit.Rule;
@@ -79,7 +80,7 @@ public class ErrorUtilsTest {
MetronError error = new MetronError().withMessage("error
message").withThrowable(e);
OutputCollector collector = mock(OutputCollector.class);
- ErrorUtils.handleError(collector, error);
+ StormErrorUtils.handleError(collector, error);
verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new
MetronErrorJSONMatcher(error.getJSONObject())));
verify(collector, times(1)).reportError(any());
}
diff --git a/metron-platform/metron-common-streaming/pom.xml
b/metron-platform/metron-common-streaming/pom.xml
new file mode 100644
index 0000000..4746b9e
--- /dev/null
+++ b/metron-platform/metron-common-streaming/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>metron-common-streaming</artifactId>
+ <packaging>pom</packaging>
+ <name>metron-common-streaming</name>
+ <parent>
+ <artifactId>metron-platform</artifactId>
+ <groupId>org.apache.metron</groupId>
+ <version>0.7.1</version>
+ </parent>
+ <description>Common modules for Streaming platforms</description>
+ <url>https://metron.apache.org/</url>
+ <scm>
+
<connection>scm:git:https://gitbox.apache.org/repos/asf/metron.git</connection>
+
<developerConnection>scm:git:https://gitbox.apache.org/repos/asf/metron.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://gitbox.apache.org/repos/asf/metron.git</url>
+ </scm>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+ <modules>
+ <module>metron-common-storm</module>
+ </modules>
+</project>
\ No newline at end of file
diff --git a/metron-platform/metron-common/pom.xml
b/metron-platform/metron-common/pom.xml
index 4735044..6835933 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -29,6 +29,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<commons.config.version>1.10</commons.config.version>
<guava_version>${global_guava_version}</guava_version>
+ <asm.version>5.0.3</asm.version>
</properties>
<repositories>
<repository>
@@ -85,34 +86,6 @@
<version>${global_antlr_version}</version>
</dependency>
<dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${global_storm_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-slf4j-impl</artifactId>
- <groupId>org.apache.logging.log4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${global_kafka_version}</version>
@@ -218,18 +191,6 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-common</artifactId>
- <version>${global_hbase_version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </exclusion>
- </exclusions>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${global_hadoop_version}</version>
@@ -257,33 +218,9 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${global_hbase_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
<exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
</exclusion>
</exclusions>
</dependency>
@@ -303,9 +240,9 @@
<version>${global_jackson_version}</version>
</dependency>
<dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>flux-core</artifactId>
- <version>${global_flux_version}</version>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm</artifactId>
+ <version>${asm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
index df21c3b..46fe3b2 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
+++
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
@@ -23,6 +23,7 @@ import static org.apache.metron.common.Constants.ErrorFields;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -34,7 +35,6 @@ import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.Constants;
import org.apache.metron.common.Constants.ErrorType;
import org.apache.metron.common.utils.HashUtils;
@@ -180,7 +180,7 @@ public class MetronError {
// It's unclear if we need a rawMessageBytes field so commenting out
for now
//String rawMessageBytesField = rawMessages.size() == 1 ?
ErrorFields.RAW_MESSAGE_BYTES.getName() :
ErrorFields.RAW_MESSAGE_BYTES.getName() + "_" + i;
if(rawMessage instanceof byte[]) {
- errorMessage.put(rawMessageField,
Bytes.toString((byte[])rawMessage));
+ errorMessage.put(rawMessageField, new String((byte[])rawMessage,
Charset.forName("UTF-8")));
//errorMessage.put(rawMessageBytesField,
com.google.common.primitives.Bytes.asList((byte[])rawMessage));
} else if (rawMessage instanceof JSONObject) {
JSONObject rawMessageJSON = (JSONObject) rawMessage;
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
index 3034ddd..628ef50 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
+++
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/MetadataUtil.java
@@ -18,15 +18,10 @@
package org.apache.metron.common.message.metadata;
import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -43,7 +38,6 @@ public enum MetadataUtil {
* The config key for defining the prefix.
*/
public static final String METADATA_PREFIX_CONFIG = "metadataPrefix";
- static final int KEY_INDEX = 1;
/**
* Return the prefix that we want to use for metadata keys. This comes from
the config and is defaulted to
@@ -75,51 +69,4 @@ public enum MetadataUtil {
return prefix + "." + key;
}
}
-
- /**
- * Default extraction of metadata. This handles looking in the normal
places for metadata
- * <ul>
- * <li>The kafka key</li>
- * <li>The tuple fields outside of the value (e.g. the topic)</li>
- * </ul>
- *
- * <p>In addition to extracting the metadata into a map, it applies the
appropriate prefix (as configured in the rawMessageStrategyConfig).
- * @param prefix The prefix of the metadata keys
- * @param t The tuple to get metadata from
- * @return A map containing the metadata
- */
- public Map<String, Object> extractMetadata(String prefix, Tuple t) {
- Map<String, Object> metadata = new HashMap<>();
- if(t == null) {
- return metadata;
- }
- Fields tupleFields = t.getFields();
- if(tupleFields == null) {
- return metadata;
- }
- for (int i = 2; i < tupleFields.size(); ++i) {
- String envMetadataFieldName = tupleFields.get(i);
- Object envMetadataFieldValue = t.getValue(i);
- if (!StringUtils.isEmpty(envMetadataFieldName) && envMetadataFieldValue
!= null) {
- metadata.put(prefixKey(prefix, envMetadataFieldName),
envMetadataFieldValue);
- }
- }
- byte[] keyObj = t.getBinary(KEY_INDEX);
- String keyStr = null;
- try {
- keyStr = keyObj == null ? null : new String(keyObj);
- if (!StringUtils.isEmpty(keyStr)) {
- Map<String, Object> rawMetadata = JSONUtils.INSTANCE.load(keyStr,
JSONUtils.MAP_SUPPLIER);
- for (Map.Entry<String, Object> kv : rawMetadata.entrySet()) {
- metadata.put(prefixKey(prefix, kv.getKey()), kv.getValue());
- }
-
- }
- } catch (IOException e) {
- String reason = "Unable to parse metadata; expected JSON Map: " +
(keyStr == null ? "NON-STRING!" : keyStr);
- LOG.error(reason, e);
- throw new IllegalStateException(reason, e);
- }
- return metadata;
- }
}
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageUtil.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageUtil.java
deleted file mode 100644
index 3bd9915..0000000
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/metadata/RawMessageUtil.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.common.message.metadata;
-
-import com.google.common.base.Joiner;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-
-public enum RawMessageUtil {
-
- INSTANCE;
-
-
- /**
- * Extract the raw message given the strategy, the tuple and the metadata
configs.
- * @param strategy The {@link RawMessageStrategy} to use for extraction
- * @param t The tuple to pull the message from
- * @param rawMessage The raw message in bytes
- * @param readMetadata True if read metadata, false otherwise
- * @param config The config to use during extraction
- * @return The resulting {@link RawMessage}
- */
- public RawMessage getRawMessage(RawMessageStrategy strategy, Tuple t, byte[]
rawMessage, boolean readMetadata, Map<String, Object> config) {
- Map<String, Object> metadata = new HashMap<>();
- if(readMetadata) {
- String prefix = MetadataUtil.INSTANCE.getMetadataPrefix(config);
- metadata = MetadataUtil.INSTANCE.extractMetadata(prefix, t);
- }
- return strategy.get(metadata, rawMessage, readMetadata, config);
- }
-
-
-}
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
index e3ad306..bd666c3 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
+++
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
@@ -26,10 +26,6 @@ import java.lang.management.ThreadMXBean;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.error.MetronError;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,22 +86,6 @@ public class ErrorUtils {
}
/**
- * Handles a {@link MetronError} that occurs.
- *
- * @param collector The Storm output collector being reported to
- * @param error The error that occurred
- */
- public static void handleError(OutputCollector collector, MetronError error)
- {
- collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
- Optional<Throwable> throwable = error.getThrowable();
- if (throwable.isPresent()) {
- collector.reportError(throwable.get());
- }
-
- }
-
- /**
* Generates a string version of a thread dump.
*
* @return String of the thread dump
diff --git
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
index 7f92d37..51bc599 100644
---
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
+++
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
@@ -17,7 +17,6 @@
*/
package org.apache.metron.common.writer;
-import org.apache.storm.task.TopologyContext;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import java.io.Serializable;
@@ -26,7 +25,7 @@ import java.util.Map;
public interface BulkMessageWriter<MESSAGE_T> extends AutoCloseable,
Serializable {
- void init(Map stormConf, TopologyContext topologyContext,
WriterConfiguration config) throws Exception;
+ void init(Map stormConf, WriterConfiguration config) throws Exception;
/**
* Writes the messages to a particular output (e.g. Elasticsearch). A
response is returned with successful and failed message ids.
diff --git
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index 7d14c11..b7814b6 100644
---
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -33,7 +33,6 @@ import
org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.apache.storm.task.TopologyContext;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +74,7 @@ public class ElasticsearchWriter implements
BulkMessageWriter<JSONObject>, Seria
private SimpleDateFormat dateFormat;
@Override
- public void init(Map stormConf, TopologyContext topologyContext,
WriterConfiguration configurations) {
+ public void init(Map stormConf, WriterConfiguration configurations) {
Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
diff --git
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
index 3d6a3fa..ba5cfe0 100644
---
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
+++
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
@@ -25,7 +25,6 @@ import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.metron.common.writer.MessageId;
import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter;
import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
-import org.apache.storm.task.TopologyContext;
import org.json.simple.JSONObject;
import org.junit.Before;
import org.junit.Test;
@@ -47,13 +46,10 @@ import static org.mockito.Mockito.when;
public class ElasticsearchWriterTest {
Map stormConf;
- TopologyContext topologyContext;
WriterConfiguration writerConfiguration;
@Before
public void setup() {
- topologyContext = mock(TopologyContext.class);
-
writerConfiguration = mock(WriterConfiguration.class);
when(writerConfiguration.getGlobalConfig()).thenReturn(globals());
@@ -74,7 +70,7 @@ public class ElasticsearchWriterTest {
// attempt to write
ElasticsearchWriter esWriter = new ElasticsearchWriter();
esWriter.setDocumentWriter(docWriter);
- esWriter.init(stormConf, topologyContext, writerConfiguration);
+ esWriter.init(stormConf, writerConfiguration);
BulkWriterResponse response = esWriter.write("bro",
writerConfiguration, messages);
// response should only contain successes
@@ -98,7 +94,7 @@ public class ElasticsearchWriterTest {
// attempt to write
ElasticsearchWriter esWriter = new ElasticsearchWriter();
esWriter.setDocumentWriter(docWriter);
- esWriter.init(stormConf, topologyContext, writerConfiguration);
+ esWriter.init(stormConf, writerConfiguration);
BulkWriterResponse response = esWriter.write("bro",
writerConfiguration, messages);
// response should only contain successes
@@ -123,7 +119,7 @@ public class ElasticsearchWriterTest {
// attempt to write
ElasticsearchWriter esWriter = new ElasticsearchWriter();
esWriter.setDocumentWriter(docWriter);
- esWriter.init(stormConf, topologyContext, writerConfiguration);
+ esWriter.init(stormConf, writerConfiguration);
BulkWriterResponse response = esWriter.write("bro",
writerConfiguration, messages);
// the writer response should only contain failures
@@ -151,7 +147,7 @@ public class ElasticsearchWriterTest {
// attempt to write
ElasticsearchWriter esWriter = new ElasticsearchWriter();
esWriter.setDocumentWriter(docWriter);
- esWriter.init(stormConf, topologyContext, writerConfiguration);
+ esWriter.init(stormConf, writerConfiguration);
BulkWriterResponse response = esWriter.write("bro",
writerConfiguration, messages);
// the writer response should only contain failures
@@ -180,7 +176,7 @@ public class ElasticsearchWriterTest {
// attempt to write
ElasticsearchWriter esWriter = new ElasticsearchWriter();
esWriter.setDocumentWriter(docWriter);
- esWriter.init(stormConf, topologyContext, writerConfiguration);
+ esWriter.init(stormConf, writerConfiguration);
BulkWriterResponse response = esWriter.write("bro",
writerConfiguration, messages);
// response should contain some successes and some failures
@@ -214,7 +210,7 @@ public class ElasticsearchWriterTest {
// attempt to write
ElasticsearchWriter esWriter = new ElasticsearchWriter();
esWriter.setDocumentWriter(docWriter);
- esWriter.init(stormConf, topologyContext, writerConfiguration);
+ esWriter.init(stormConf, writerConfiguration);
BulkWriterResponse response = esWriter.write("bro",
writerConfiguration, messages);
// response should only contain successes
@@ -239,7 +235,7 @@ public class ElasticsearchWriterTest {
// attempt to write
ElasticsearchWriter esWriter = new ElasticsearchWriter();
esWriter.setDocumentWriter(docWriter);
- esWriter.init(stormConf, topologyContext, writerConfiguration);
+ esWriter.init(stormConf, writerConfiguration);
BulkWriterResponse response = esWriter.write("bro",
writerConfiguration, messages);
// response should only contain successes
diff --git
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index fd5f874..671e6b8 100644
---
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -17,7 +17,7 @@
*/
package org.apache.metron.enrichment.bolt;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
import org.apache.storm.task.TopologyContext;
import com.google.common.base.Joiner;
import
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
diff --git
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 55c7b66..f8a4223 100644
---
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -28,18 +28,18 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
import org.apache.metron.common.configuration.ConfigurationType;
import
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
import org.apache.metron.common.error.MetronError;
import org.apache.metron.common.performance.PerformanceLogger;
-import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.enrichment.cache.CacheKey;
import org.apache.metron.enrichment.configuration.Enrichment;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import org.apache.metron.enrichment.utils.EnrichmentUtils;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -214,7 +214,7 @@ public class GenericEnrichmentBolt extends
ConfiguredEnrichmentBolt {
.withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
.withMessage("Unable to find SensorEnrichmentConfig for
sourceType: " + sourceType)
.addRawMessage(rawMessage);
- ErrorUtils.handleError(collector, metronError);
+ StormErrorUtils.handleError(collector, metronError);
continue;
}
config.getConfiguration().putIfAbsent(STELLAR_CONTEXT_CONF,
stellarContext);
@@ -239,7 +239,7 @@ public class GenericEnrichmentBolt extends
ConfiguredEnrichmentBolt {
.withThrowable(e)
.withErrorFields(new HashSet() {{ add(field); }})
.addRawMessage(rawMessage);
- ErrorUtils.handleError(collector, metronError);
+ StormErrorUtils.handleError(collector, metronError);
continue;
}
}
@@ -268,7 +268,7 @@ public class GenericEnrichmentBolt extends
ConfiguredEnrichmentBolt {
.withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
.withThrowable(e)
.addRawMessage(rawMessage);
- ErrorUtils.handleError(collector, error);
+ StormErrorUtils.handleError(collector, error);
}
@Override
diff --git
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
index a9263fb..ac6a1cf 100644
---
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
+++
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
@@ -30,12 +30,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetters;
import org.apache.metron.common.performance.PerformanceLogger;
-import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -169,7 +169,7 @@ public abstract class JoinBolt<V> extends
ConfiguredEnrichmentBolt {
.withMessage("Joining problem: " + message)
.withThrowable(e)
.addRawMessage(message);
- ErrorUtils.handleError(collector, error);
+ StormErrorUtils.handleError(collector, error);
collector.ack(tuple);
}
perfLog.log("execute", "key={}, elapsed time to run execute", key);
diff --git
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
index de69ad4..48cec0b 100644
---
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
+++
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
@@ -20,7 +20,7 @@ package org.apache.metron.enrichment.bolt;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
import org.apache.metron.common.performance.PerformanceLogger;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
diff --git
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index 00c23ff..2b19375 100644
---
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.metron.common.configuration.ConfigurationType;
import
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
import org.apache.metron.common.utils.MessageUtils;
import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase;
import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
diff --git
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
index d9f57eb..dd49de2 100644
---
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
+++
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
@@ -20,14 +20,13 @@ package org.apache.metron.enrichment.bolt;
import static org.apache.metron.common.Constants.STELLAR_CONTEXT_CONF;
import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
import org.apache.metron.common.configuration.ConfigurationType;
import
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetters;
import org.apache.metron.common.performance.PerformanceLogger;
-import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.common.utils.MessageUtils;
import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase;
import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
@@ -41,6 +40,7 @@ import
org.apache.metron.enrichment.parallel.ConcurrencyContext;
import org.apache.metron.enrichment.parallel.WorkerPoolStrategies;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -285,7 +285,7 @@ public class UnifiedEnrichmentBolt extends
ConfiguredEnrichmentBolt {
.withMessage(t.getValue().getMessage())
.withThrowable(t.getValue())
.addRawMessage(t.getKey());
- ErrorUtils.handleError(collector, error);
+ StormErrorUtils.handleError(collector, error);
}
} catch (Exception e) {
//If something terrible and unexpected happens then we want to send an
error along, but this
@@ -296,7 +296,7 @@ public class UnifiedEnrichmentBolt extends
ConfiguredEnrichmentBolt {
.withMessage(e.getMessage())
.withThrowable(e)
.addRawMessage(message);
- ErrorUtils.handleError(collector, error);
+ StormErrorUtils.handleError(collector, error);
}
finally {
collector.ack(input);
diff --git
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
index 9fc8947..ed623f3 100644
---
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
+++
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;
diff --git
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
index bc7dace..8bdf409 100644
---
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
+++
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
@@ -21,7 +21,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.Constants;
import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
import org.apache.metron.test.error.MetronErrorJSONMatcher;
import org.apache.storm.task.TopologyContext;
diff --git
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
index a05c843..aeafede 100644
---
a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
+++
b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
@@ -31,7 +31,7 @@ import java.util.Map;
import org.adrianwalker.multilinestring.Multiline;
import
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
import
org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase;
import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/pom.xml
b/metron-platform/metron-parsing/metron-parsing-storm/pom.xml
index 72770c8..c936ef9 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/pom.xml
+++ b/metron-platform/metron-parsing/metron-parsing-storm/pom.xml
@@ -33,7 +33,7 @@
<!-- Metron dependencies -->
<dependency>
<groupId>org.apache.metron</groupId>
- <artifactId>metron-common</artifactId>
+ <artifactId>metron-common-storm</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
diff --git
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 21aa087..2837fcb 100644
---
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -30,18 +30,18 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredParserBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredParserBolt;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetters;
import org.apache.metron.common.message.metadata.RawMessage;
-import org.apache.metron.common.message.metadata.RawMessageUtil;
-import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.storm.common.message.metadata.RawMessageUtil;
import org.apache.metron.common.utils.MessageUtils;
import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
import org.apache.metron.writer.AckTuplesPolicy;
import org.apache.metron.parsers.ParserRunner;
import org.apache.metron.parsers.ParserRunnerResults;
@@ -255,7 +255,7 @@ public class ParserBolt extends ConfiguredParserBolt
implements Serializable {
, sensorParserConfig.getRawMessageStrategyConfig()
);
ParserRunnerResults<JSONObject> parserRunnerResults =
parserRunner.execute(sensorType, rawMessage, parserConfigurations);
- parserRunnerResults.getErrors().forEach(error ->
ErrorUtils.handleError(collector, error));
+ parserRunnerResults.getErrors().forEach(error ->
StormErrorUtils.handleError(collector, error));
WriterHandler writer = sensorToWriterMap.get(sensorType);
int numWritten = 0;
@@ -326,7 +326,7 @@ public class ParserBolt extends ConfiguredParserBolt
implements Serializable {
.withThrowable(ex)
.withSensorType(Collections.singleton(sensorType))
.addRawMessage(originalMessage);
- ErrorUtils.handleError(collector, error);
+ StormErrorUtils.handleError(collector, error);
}
@Override
diff --git
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
index 6a174f3..1ee6464 100644
---
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
+++
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
@@ -21,11 +21,11 @@ package org.apache.metron.parsers.bolt;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
-import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetters;
import org.apache.metron.common.utils.MessageUtils;
import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
import org.apache.metron.writer.AckTuplesPolicy;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -94,7 +94,7 @@ public class WriterBolt extends BaseRichBolt {
.withThrowable(e)
.withSensorType(Collections.singleton(sensorType))
.addRawMessage(message);
- ErrorUtils.handleError(collector, error);
+ StormErrorUtils.handleError(collector, error);
collector.ack(tuple);
}
}
diff --git
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
index 434db45..4f5ebe9 100644
---
a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
+++
b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
@@ -27,7 +27,7 @@ import
org.apache.metron.common.configuration.writer.ConfigurationStrategy;
import org.apache.metron.common.configuration.writer.ConfigurationsStrategies;
import
org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.common.writer.MessageWriter;
@@ -75,7 +75,7 @@ public class WriterHandler implements Serializable {
writerTransformer = config -> new
SingleBatchConfigurationFacade(configStrategy.createWriterConfig(messageWriter,
config));
}
try {
- messageWriter.init(stormConf, topologyContext,
writerTransformer.apply(configurations));
+ messageWriter.init(stormConf, writerTransformer.apply(configurations));
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize message writer",
e);
}
diff --git
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 90c882f..4240f7a 100644
---
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -22,7 +22,7 @@ import
org.apache.metron.common.configuration.IndexingConfigurations;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
import org.apache.metron.common.message.metadata.RawMessage;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.writer.AckTuplesPolicy;
diff --git
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
index 68fc15f..e16c5e9 100644
---
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
+++
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
@@ -106,7 +106,7 @@ public class WriterBoltTest extends BaseBoltTest{
}
bolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(batchWriter, times(1)).init(any(), any(), any());
+ verify(batchWriter, times(1)).init(any(), any());
for(int i = 0;i < 4;++i) {
Tuple t = tuples.get(i);
bolt.execute(t);
@@ -206,7 +206,7 @@ public class WriterBoltTest extends BaseBoltTest{
bolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(batchWriter, times(1)).init(any(), any(), any());
+ verify(batchWriter, times(1)).init(any(), any());
for(int i = 0;i < 4;++i) {
Tuple t = tuples.get(i);
@@ -264,7 +264,7 @@ public class WriterBoltTest extends BaseBoltTest{
when(errorTuple.getValueByField(eq("message"))).thenReturn(errorMessage);
bolt.prepare(new HashMap(), topologyContext, outputCollector);
- verify(batchWriter, times(1)).init(any(), any(), any());
+ verify(batchWriter, times(1)).init(any(), any());
for(int i = 0;i < 4;++i) {
Tuple t = tuples.get(i);
@@ -316,7 +316,7 @@ public class WriterBoltTest extends BaseBoltTest{
bolt.prepare(new HashMap(), topologyContext, outputCollector);
doThrow(new Exception()).when(batchWriter).write(any(), any(), any());
- verify(batchWriter, times(1)).init(any(), any(), any());
+ verify(batchWriter, times(1)).init(any(), any());
for(int i = 0;i < 4;++i) {
Tuple t = tuples.get(i);
bolt.execute(t);
diff --git
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
index 66b6c2b..0d46d73 100644
---
a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
+++
b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
@@ -60,7 +60,7 @@ public class StormParserDriver extends ParserDriver {
}
@Override
- public void init(Map stormConf, TopologyContext topologyContext,
WriterConfiguration config) throws Exception {
+ public void init(Map stormConf, WriterConfiguration config) throws
Exception {
}
diff --git
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
index 6fb2b35..b23a517 100644
---
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
+++
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
@@ -50,7 +50,6 @@ import
org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.storm.task.TopologyContext;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,7 +152,7 @@ public class SolrWriter implements
BulkMessageWriter<JSONObject>, Serializable {
}
@Override
- public void init(Map stormConf, TopologyContext topologyContext,
WriterConfiguration configurations) throws IOException, SolrServerException {
+ public void init(Map stormConf, WriterConfiguration configurations) throws
IOException, SolrServerException {
Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
initializeFromGlobalConfig(globalConfiguration);
LOG.info("Initializing SOLR writer: {}", zookeeperUrl);
diff --git
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
index a4908ad..ddc7801 100644
---
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
+++
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
@@ -157,7 +157,7 @@ public class SchemaValidationIntegrationTest {
}
};
- solrWriter.init(null, null, writerConfig);
+ solrWriter.init(null, writerConfig);
BulkWriterResponse response = solrWriter.write(sensorType, writerConfig,
messages);
Assert.assertTrue(response.getErrors().isEmpty());
diff --git
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
index 0f8dab1..515f2f8 100644
---
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
+++
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
@@ -121,7 +121,7 @@ public class SolrWriterTest {
String collection = "metron";
MetronSolrClient solr = Mockito.mock(MetronSolrClient.class);
SolrWriter writer = new SolrWriter().withMetronSolrClient(solr);
- writer.init(null, null,new IndexingWriterConfiguration("solr",
configurations));
+ writer.init(null,new IndexingWriterConfiguration("solr", configurations));
verify(solr, times(1)).setDefaultCollection(collection);
collection = "metron2";
@@ -129,7 +129,7 @@ public class SolrWriterTest {
globalConfig.put("solr.collection", collection);
configurations.updateGlobalConfig(globalConfig);
writer = new SolrWriter().withMetronSolrClient(solr);
- writer.init(null, null, new IndexingWriterConfiguration("solr",
configurations));
+ writer.init(null, new IndexingWriterConfiguration("solr", configurations));
verify(solr, times(1)).setDefaultCollection(collection);
writer.write("test", new IndexingWriterConfiguration("solr",
configurations), messages);
diff --git a/metron-platform/metron-writer/pom.xml
b/metron-platform/metron-writer/pom.xml
index 2376bff..2205cec 100644
--- a/metron-platform/metron-writer/pom.xml
+++ b/metron-platform/metron-writer/pom.xml
@@ -219,7 +219,7 @@
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
- <artifactId>metron-common</artifactId>
+ <artifactId>metron-common-storm</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AckTuplesPolicy.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AckTuplesPolicy.java
index 64685bf..2543309 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AckTuplesPolicy.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AckTuplesPolicy.java
@@ -20,7 +20,7 @@ package org.apache.metron.writer;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.metron.common.writer.MessageId;
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
index 01def5d..c81076a 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
@@ -19,7 +19,6 @@ package org.apache.metron.writer;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.common.writer.MessageId;
-import org.apache.storm.task.TopologyContext;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -130,7 +129,7 @@ public class NoopWriter extends AbstractWriter implements
BulkMessageWriter<JSON
}
@Override
- public void init(Map stormConf, TopologyContext topologyContext,
WriterConfiguration config) throws Exception {
+ public void init(Map stormConf, WriterConfiguration config) throws Exception
{
}
@Override
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
index 709875a..6b8bb2f 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
@@ -20,7 +20,6 @@ package org.apache.metron.writer;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.common.writer.MessageId;
-import org.apache.storm.task.TopologyContext;
import com.google.common.collect.Iterables;
import
org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -44,7 +43,7 @@ public class WriterToBulkWriter<MESSAGE_T> implements
BulkMessageWriter<MESSAGE_
this.messageWriter = messageWriter;
}
@Override
- public void init(Map stormConf, TopologyContext topologyContext,
WriterConfiguration config) throws Exception {
+ public void init(Map stormConf, WriterConfiguration config) throws Exception
{
messageWriter.init();
}
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index c9215e3..8d38c60 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -29,21 +29,22 @@ import java.util.function.Function;
import com.google.common.collect.Iterables;
import org.apache.metron.common.Constants;
-import org.apache.metron.common.bolt.ConfiguredBolt;
+import org.apache.metron.storm.common.bolt.ConfiguredBolt;
import org.apache.metron.common.configuration.Configurations;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetters;
import org.apache.metron.common.system.Clock;
-import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.common.utils.MessageUtils;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.storm.common.utils.StormErrorUtils;
import org.apache.metron.writer.AckTuplesPolicy;
import org.apache.metron.writer.BulkWriterComponent;
import org.apache.metron.writer.WriterToBulkWriter;
+import org.apache.metron.writer.hdfs.HdfsWriter;
import org.apache.storm.Config;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -235,7 +236,10 @@ public class BulkMessageWriterBolt<CONFIG_T extends
Configurations> extends Conf
BulkWriterComponent<JSONObject> bulkWriterComponent = new
BulkWriterComponent<>(maxBatchTimeout);
bulkWriterComponent.addFlushPolicy(ackTuplesPolicy);
setWriterComponent(bulkWriterComponent);
- bulkMessageWriter.init(stormConf, context, writerconf);
+ bulkMessageWriter.init(stormConf, writerconf);
+ if (bulkMessageWriter instanceof HdfsWriter) {
+ ((HdfsWriter) bulkMessageWriter).initFileNameFormat(context);
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -362,7 +366,7 @@ public class BulkMessageWriterBolt<CONFIG_T extends
Configurations> extends Conf
.withErrorType(Constants.ErrorType.INDEXING_ERROR)
.withThrowable(e);
collector.ack(tuple);
- ErrorUtils.handleError(collector, error);
+ StormErrorUtils.handleError(collector, error);
}
@Override
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
index 50b11e1..d7b2cb9 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.java
@@ -20,7 +20,6 @@ package org.apache.metron.writer.hbase;
import org.apache.metron.common.writer.BulkMessage;
import org.apache.metron.common.writer.MessageId;
-import org.apache.storm.task.TopologyContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.hadoop.conf.Configuration;
@@ -178,7 +177,7 @@ public class SimpleHbaseEnrichmentWriter extends
AbstractWriter implements BulkM
}
@Override
- public void init(Map stormConf, TopologyContext topologyContext,
WriterConfiguration configuration) throws Exception {
+ public void init(Map stormConf, WriterConfiguration configuration) throws
Exception {
if(converter == null) {
converter = new EnrichmentConverter();
}
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index 9e6827b..d251602 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -88,10 +88,9 @@ public class HdfsWriter implements
BulkMessageWriter<JSONObject>, Serializable {
}
@Override
- public void init(Map stormConfig, TopologyContext topologyContext,
WriterConfiguration configurations) {
+ public void init(Map stormConfig, WriterConfiguration configurations) {
this.stormConfig = stormConfig;
this.stellarProcessor = new StellarProcessor();
- this.fileNameFormat.prepare(stormConfig,topologyContext);
if(syncPolicy != null) {
//if the user has specified the sync policy, we don't want to override
their wishes.
LOG.debug("Using user specified sync policy {}",
syncPolicy.getClass().getSimpleName());
@@ -104,6 +103,10 @@ public class HdfsWriter implements
BulkMessageWriter<JSONObject>, Serializable {
}
}
+ public void initFileNameFormat(TopologyContext topologyContext) {
+ this.fileNameFormat.prepare(stormConfig,topologyContext);
+ }
+
@Override
public BulkWriterResponse write(String sensorType, WriterConfiguration
configurations, List<BulkMessage<JSONObject>> messages) throws Exception {
BulkWriterResponse response = new BulkWriterResponse();
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index 78a27fd..a313057 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -44,7 +44,6 @@ import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.metron.common.writer.MessageId;
import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.metron.writer.AbstractWriter;
-import org.apache.storm.task.TopologyContext;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -200,7 +199,7 @@ public class KafkaWriter extends AbstractWriter implements
BulkMessageWriter<JSO
}
@Override
- public void init(Map stormConf, TopologyContext topologyContext,
WriterConfiguration config)
+ public void init(Map stormConf, WriterConfiguration config)
throws Exception {
if(this.zkQuorum != null && this.brokerUrl == null) {
try {
diff --git
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/AckTuplesPolicyTest.java
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/AckTuplesPolicyTest.java
index 1d970cb..04efb3d 100644
---
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/AckTuplesPolicyTest.java
+++
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/AckTuplesPolicyTest.java
@@ -19,7 +19,7 @@ package org.apache.metron.writer;
import org.apache.metron.common.Constants;
import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.storm.common.message.MessageGetStrategy;
import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.metron.common.writer.MessageId;
import org.apache.metron.test.error.MetronErrorJSONMatcher;
diff --git
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index 16f3b4f..e94608f 100644
---
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -18,7 +18,6 @@
package org.apache.metron.writer;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
diff --git
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BulkMessageWriterBoltTest.java
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BulkMessageWriterBoltTest.java
index df80296..0fd784f 100644
---
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BulkMessageWriterBoltTest.java
+++
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BulkMessageWriterBoltTest.java
@@ -42,7 +42,7 @@ import org.apache.log4j.Level;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.IndexingConfigurations;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.storm.common.message.MessageGetters;
import org.apache.metron.common.system.FakeClock;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkMessage;
@@ -51,7 +51,6 @@ import org.apache.metron.common.writer.MessageId;
import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
import org.apache.metron.test.utils.UnitTestHelper;
import org.apache.metron.writer.BulkWriterComponent;
-import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.json.simple.JSONObject;
@@ -163,7 +162,7 @@ public class BulkMessageWriterBoltTest extends
BaseEnrichmentBoltTest {
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
new FileInputStream(sampleSensorIndexingConfigPath));
{
- doThrow(new
Exception()).when(bulkMessageWriter).init(eq(stormConf),any(TopologyContext.class),
any(WriterConfiguration.class));
+ doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf),
any(WriterConfiguration.class));
try {
bulkMessageWriterBolt.prepare(stormConf, topologyContext,
outputCollector);
fail("A runtime exception should be thrown when bulkMessageWriter.init
throws an exception");
@@ -173,7 +172,7 @@ public class BulkMessageWriterBoltTest extends
BaseEnrichmentBoltTest {
{
when(bulkMessageWriter.getName()).thenReturn("hdfs");
bulkMessageWriterBolt.prepare(stormConf, topologyContext,
outputCollector);
- verify(bulkMessageWriter,
times(1)).init(eq(stormConf),any(TopologyContext.class),
any(WriterConfiguration.class));
+ verify(bulkMessageWriter, times(1)).init(eq(stormConf),
any(WriterConfiguration.class));
}
{
for(int i = 0; i < 4; i++) {
@@ -222,7 +221,7 @@ public class BulkMessageWriterBoltTest extends
BaseEnrichmentBoltTest {
Map stormConf = new HashMap();
when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
bulkMessageWriterBolt.prepare(stormConf, topologyContext,
outputCollector, clock);
- verify(bulkMessageWriter,
times(1)).init(eq(stormConf),any(TopologyContext.class),
any(WriterConfiguration.class));
+ verify(bulkMessageWriter, times(1)).init(eq(stormConf),
any(WriterConfiguration.class));
}
{
int batchTimeout = bulkMessageWriterBolt.getMaxBatchTimeout();
@@ -260,8 +259,7 @@ public class BulkMessageWriterBoltTest extends
BaseEnrichmentBoltTest {
Map stormConf = new HashMap();
when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
bulkMessageWriterBolt.prepare(stormConf, topologyContext,
outputCollector, clock);
- verify(bulkMessageWriter,
times(1)).init(eq(stormConf),any(TopologyContext.class)
- , any(WriterConfiguration.class));
+ verify(bulkMessageWriter, times(1)).init(eq(stormConf),
any(WriterConfiguration.class));
}
{
int batchTimeout = bulkMessageWriterBolt.getMaxBatchTimeout();
diff --git
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
index 88ba4fd..e48c8e2 100644
---
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
+++
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@@ -76,7 +76,8 @@ public class HdfsWriterTest {
public void testGetHdfsPathNull() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(),createTopologyContext(), config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
Object result = writer.getHdfsPathExtension(SENSOR_NAME,null, message);
@@ -89,7 +90,8 @@ public class HdfsWriterTest {
public void testGetHdfsPathEmptyString() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
Object result = writer.getHdfsPathExtension(SENSOR_NAME, "", message);
@@ -102,7 +104,8 @@ public class HdfsWriterTest {
public void testGetHdfsPathConstant() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
Object result = writer.getHdfsPathExtension(SENSOR_NAME, "'new'", message);
@@ -115,7 +118,8 @@ public class HdfsWriterTest {
public void testGetHdfsPathDirectVariable() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
@@ -129,7 +133,8 @@ public class HdfsWriterTest {
public void testGetHdfsPathFormatConstant() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
Object result = writer.getHdfsPathExtension(SENSOR_NAME,
"FORMAT('/test/folder/')", message);
@@ -143,7 +148,8 @@ public class HdfsWriterTest {
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
@@ -159,7 +165,8 @@ public class HdfsWriterTest {
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
String filename = writer.fileNameFormat.getName(1,1);
Assert.assertEquals("prefix-Xcom-7-1-1.json", filename);
writer.close();
@@ -171,7 +178,8 @@ public class HdfsWriterTest {
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
@@ -193,7 +201,8 @@ public class HdfsWriterTest {
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
@@ -206,7 +215,8 @@ public class HdfsWriterTest {
public void testGetHdfsPathNonString() {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
new IndexingConfigurations());
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
writer.getHdfsPathExtension(SENSOR_NAME, "{'key':'value'}", message);
@@ -220,7 +230,8 @@ public class HdfsWriterTest {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat)
.withMaxOpenFiles(maxFiles);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
for(int i = 0; i < maxFiles; i++) {
writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null);
@@ -235,7 +246,8 @@ public class HdfsWriterTest {
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
indexingConfig);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat)
.withMaxOpenFiles(maxFiles);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
for(int i = 0; i < maxFiles+1; i++) {
writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null);
@@ -253,7 +265,8 @@ public class HdfsWriterTest {
HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
IndexingConfigurations indexingConfig = new IndexingConfigurations();
WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME,
indexingConfig);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
@@ -296,7 +309,8 @@ public class HdfsWriterTest {
.withExtension(".json")
.withPrefix("prefix-");
HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
// These two messages will be routed to the same folder, because test.key
is the same
JSONObject message = new JSONObject();
@@ -339,7 +353,8 @@ public class HdfsWriterTest {
.withExtension(".json")
.withPrefix("prefix-");
HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
// These two messages will be routed to the same folder, because test.key
is the same
JSONObject message = new JSONObject();
@@ -395,7 +410,8 @@ public class HdfsWriterTest {
.withExtension(".json")
.withPrefix("prefix-");
HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
// These two messages will be routed to the same folder, because test.key
is the same
JSONObject message = new JSONObject();
@@ -428,7 +444,8 @@ public class HdfsWriterTest {
String function = "FORMAT('test-%s/%s', test.key, test.key)";
WriterConfiguration config = buildWriterConfiguration(function);
HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
+ writer.init(new HashMap<String, String>(), config);
+ writer.initFileNameFormat(createTopologyContext());
JSONObject message = new JSONObject();
message.put("test.key", "test.value");
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index af563cf..5fd4f29 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -62,6 +62,7 @@
<module>metron-zookeeper</module>
<module>metron-parsing</module>
<module>metron-hbase-server</module>
+ <module>metron-common-streaming</module>
</modules>
<dependencies>
<dependency>