Repository: metron Updated Branches: refs/heads/master 4b3ccac36 -> 1277b6c32
METRON-934: Component and task id are missing in the indexing topology Hdfs file names. closes apache/incubator-metron#574 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/1277b6c3 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/1277b6c3 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/1277b6c3 Branch: refs/heads/master Commit: 1277b6c3229ee4b7cb066902ac56e03145c18157 Parents: 4b3ccac Author: dlyle65535 <[email protected]> Authored: Tue May 16 10:25:09 2017 -0400 Committer: cstella <[email protected]> Committed: Tue May 16 10:25:09 2017 -0400 ---------------------------------------------------------------------- .../metron/common/writer/BulkMessageWriter.java | 3 +- .../writer/ElasticsearchWriter.java | 3 +- .../writer/SimpleHbaseEnrichmentWriter.java | 3 +- .../bolt/BulkMessageWriterBoltTest.java | 5 ++- .../apache/metron/parsers/bolt/ParserBolt.java | 2 +- .../apache/metron/parsers/bolt/WriterBolt.java | 2 +- .../metron/parsers/bolt/WriterHandler.java | 5 ++- .../metron/parsers/bolt/ParserBoltTest.java | 15 ++++--- .../metron/parsers/bolt/WriterBoltTest.java | 8 ++-- .../apache/metron/solr/writer/SolrWriter.java | 3 +- .../metron/solr/writer/SolrWriterTest.java | 6 +-- .../org/apache/metron/writer/NoopWriter.java | 3 +- .../metron/writer/WriterToBulkWriter.java | 3 +- .../writer/bolt/BulkMessageWriterBolt.java | 6 ++- .../apache/metron/writer/hdfs/HdfsWriter.java | 4 +- .../metron/writer/hdfs/HdfsWriterTest.java | 47 +++++++++++++------- 16 files changed, 74 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java index 5f427f4..64ba0cc 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java @@ -17,6 +17,7 @@ */ package org.apache.metron.common.writer; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.metron.common.configuration.Configurations; import org.apache.metron.common.configuration.EnrichmentConfigurations; @@ -28,7 +29,7 @@ import java.util.Map; public interface BulkMessageWriter<MESSAGE_T> extends AutoCloseable, Serializable { - void init(Map stormConf, WriterConfiguration config) throws Exception; + void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) throws Exception; /** * Writes the messages to a particular output (e.g. Elasticsearch). Exceptions trigger failure of the entire batch. http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index c90bc8c..a643b44 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -17,6 +17,7 @@ */ package org.apache.metron.elasticsearch.writer; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; @@ -57,7 +58,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria } @Override - public void init(Map stormConf, WriterConfiguration configurations) { + public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) { Map<String, Object> globalConfiguration = configurations.getGlobalConfig(); Settings.Builder settingsBuilder = Settings.settingsBuilder(); http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java index 343eecd..83c7151 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java @@ -18,6 +18,7 @@ package org.apache.metron.enrichment.writer; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -122,7 +123,7 @@ public class SimpleHbaseEnrichmentWriter extends AbstractWriter implements BulkM } @Override - public void init(Map stormConf, WriterConfiguration configuration) throws Exception { + public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configuration) throws Exception { if(converter == null) { converter = new EnrichmentConverter(); } http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java index 1f33060..79d8285 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java @@ -23,6 +23,7 @@ import org.apache.metron.common.message.MessageGetters; import org.apache.metron.common.writer.BulkWriterResponse; import org.apache.metron.test.utils.UnitTestHelper; import org.apache.metron.writer.BulkWriterComponent; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.adrianwalker.multilinestring.Multiline; @@ -127,7 +128,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { bulkMessageWriterBolt.declareOutputFields(declarer); verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message"))); Map stormConf = new HashMap(); - doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf), any(WriterConfiguration.class)); + doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class)); try { bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector); fail("A runtime exception should be thrown when bulkMessageWriter.init throws an exception"); @@ -135,7 +136,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { reset(bulkMessageWriter); when(bulkMessageWriter.getName()).thenReturn("hdfs"); bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector); - verify(bulkMessageWriter, times(1)).init(eq(stormConf), any(WriterConfiguration.class)); + verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class)); tupleList = new ArrayList<>(); for(int i = 0; i < 4; i++) { when(tuple.getValueByField("message")).thenReturn(messageList.get(i)); http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index e73b836..2c43c23 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -97,7 +97,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { parser.init(); - writer.init(stormConf, collector, getConfigurations()); + writer.init(stormConf, context, collector, getConfigurations()); SensorParserConfig config = getSensorParserConfig(); if(config != null) { http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java index ef7288b..ef93ba2 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java @@ -55,7 +55,7 @@ public class WriterBolt extends BaseRichBolt { public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; messageGetStrategy = MessageGetters.DEFAULT_JSON_FROM_FIELD.get(); - handler.init(stormConf, collector, configuration); + handler.init(stormConf, context, collector, configuration); } private JSONObject getMessage(Tuple tuple) { http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java index 3273ca7..2192942 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java @@ -20,6 +20,7 @@ package org.apache.metron.parsers.bolt; import org.apache.metron.common.message.MessageGetStrategy; import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.writer.ParserWriterConfiguration; @@ -57,7 +58,7 @@ public class WriterHandler implements Serializable { return isBulk; } - public void init(Map stormConf, OutputCollector collector, ParserConfigurations configurations) { + public void init(Map stormConf, TopologyContext topologyContext, OutputCollector collector, ParserConfigurations configurations) { if(isBulk) { writerTransformer = config -> new ParserWriterConfiguration(config); } @@ -65,7 +66,7 @@ public class WriterHandler implements Serializable { writerTransformer = config -> new SingleBatchConfigurationFacade(new ParserWriterConfiguration(config)); } try { - messageWriter.init(stormConf, writerTransformer.apply(configurations)); + messageWriter.init(stormConf, topologyContext, writerTransformer.apply(configurations)); } catch (Exception e) { throw new IllegalStateException("Unable to initialize message writer", e); } http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java index e924d50..835f17e 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java @@ -24,6 +24,7 @@ import org.apache.metron.common.error.MetronError; import org.apache.metron.test.error.MetronErrorJSONMatcher; import org.apache.metron.test.utils.UnitTestHelper; import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import com.google.common.collect.ImmutableList; import org.apache.metron.common.configuration.writer.ParserWriterConfiguration; @@ -91,7 +92,7 @@ public class ParserBoltTest extends BaseBoltTest { List<JSONObject> records = new ArrayList<>(); @Override - public void init(Map stormConf, WriterConfiguration config) throws Exception { + public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) throws Exception { } @@ -297,7 +298,7 @@ public void testImplicitBatchOfOne() throws Exception { parserBolt.setTreeCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any()); + verify(batchWriter, times(1)).init(any(), any(), any()); when(parser.validate(any())).thenReturn(true); when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); @@ -343,7 +344,7 @@ public void testImplicitBatchOfOne() throws Exception { parserBolt.setTreeCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any()); + verify(batchWriter, times(1)).init(any(), any(), any()); BulkWriterResponse successResponse = mock(BulkWriterResponse.class); when(successResponse.getSuccesses()).thenReturn(ImmutableList.of(t1)); when(batchWriter.write(any(), any(), any(), any())).thenReturn(successResponse); @@ -380,7 +381,7 @@ public void testImplicitBatchOfOne() throws Exception { parserBolt.setTreeCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any()); + verify(batchWriter, times(1)).init(any(), any(), any()); when(parser.validate(any())).thenReturn(true); when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{ put("field2", "blah"); @@ -483,7 +484,7 @@ public void testImplicitBatchOfOne() throws Exception { parserBolt.setTreeCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any()); + verify(batchWriter, times(1)).init(any(), any(), any()); when(parser.validate(any())).thenReturn(true); when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); @@ -522,7 +523,7 @@ public void testImplicitBatchOfOne() throws Exception { parserBolt.setTreeCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any()); + verify(batchWriter, times(1)).init(any(), any(), any()); when(parser.validate(any())).thenReturn(true); when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); @@ -571,7 +572,7 @@ public void testImplicitBatchOfOne() throws Exception { parserBolt.setTreeCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any()); + verify(batchWriter, times(1)).init(any(), any(), any()); doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any()); when(parser.validate(any())).thenReturn(true); http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java index 4511b55..a23c368 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java @@ -92,7 +92,7 @@ public class WriterBoltTest extends BaseBoltTest{ } WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType); bolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(batchWriter, times(1)).init(any(), any()); + verify(batchWriter, times(1)).init(any(), any(), any()); for(int i = 0;i < 4;++i) { Tuple t = tuples.get(i); bolt.execute(t); @@ -186,7 +186,7 @@ public class WriterBoltTest extends BaseBoltTest{ WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType); bolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(batchWriter, times(1)).init(any(), any()); + verify(batchWriter, times(1)).init(any(), any(), any()); for(int i = 0;i < 4;++i) { Tuple t = tuples.get(i); @@ -232,7 +232,7 @@ public class WriterBoltTest extends BaseBoltTest{ WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType); bolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(batchWriter, times(1)).init(any(), any()); + verify(batchWriter, times(1)).init(any(), any(), any()); for(int i = 0;i < 4;++i) { Tuple t = tuples.get(i); @@ -279,7 +279,7 @@ public class WriterBoltTest extends BaseBoltTest{ WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType); bolt.prepare(new HashMap(), topologyContext, outputCollector); doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any()); - verify(batchWriter, times(1)).init(any(), any()); + verify(batchWriter, times(1)).init(any(), any(), any()); for(int i = 0;i < 4;++i) { Tuple t = tuples.get(i); bolt.execute(t); http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java index 7180ca9..4e3246b 100644 --- a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java +++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java @@ -17,6 +17,7 @@ */ package org.apache.metron.solr.writer; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.metron.common.configuration.Configurations; import org.apache.metron.common.configuration.EnrichmentConfigurations; @@ -56,7 +57,7 @@ public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable { } @Override - public void init(Map stormConf, WriterConfiguration configurations) throws IOException, SolrServerException { + public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) throws IOException, SolrServerException { Map<String, Object> globalConfiguration = configurations.getGlobalConfig(); if(solr == null) solr = new MetronSolrClient((String) globalConfiguration.get("solr.zookeeper")); String collection = getCollection(configurations); http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java index 14e5dcb..a56916f 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java @@ -107,7 +107,7 @@ public class SolrWriterTest { String collection = "metron"; MetronSolrClient solr = Mockito.mock(MetronSolrClient.class); SolrWriter writer = new SolrWriter().withMetronSolrClient(solr); - writer.init(null, new IndexingWriterConfiguration("solr", configurations)); + writer.init(null, null,new IndexingWriterConfiguration("solr", configurations)); verify(solr, times(1)).createCollection(collection, 1, 1); verify(solr, times(1)).setDefaultCollection(collection); @@ -120,7 +120,7 @@ public class SolrWriterTest { globalConfig.put("solr.replicationFactor", replicationFactor); configurations.updateGlobalConfig(globalConfig); writer = new SolrWriter().withMetronSolrClient(solr); - writer.init(null, new IndexingWriterConfiguration("solr", configurations)); + writer.init(null, null, new IndexingWriterConfiguration("solr", configurations)); verify(solr, times(1)).createCollection(collection, numShards, replicationFactor); verify(solr, times(1)).setDefaultCollection(collection); @@ -130,7 +130,7 @@ public class SolrWriterTest { verify(solr, times(0)).commit(collection); writer = new SolrWriter().withMetronSolrClient(solr).withShouldCommit(true); - writer.init(null, new IndexingWriterConfiguration("solr", configurations)); + writer.init(null, null, new IndexingWriterConfiguration("solr", configurations)); writer.write("test", new IndexingWriterConfiguration("solr", configurations), new ArrayList<>(), messages); verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 100.0))); verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 200.0))); http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java index a31f48e..dbdd93d 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java @@ -17,6 +17,7 @@ */ package org.apache.metron.writer; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; @@ -126,7 +127,7 @@ public class NoopWriter extends AbstractWriter implements BulkMessageWriter<JSON } @Override - public void init(Map stormConf, WriterConfiguration config) throws Exception { + public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) throws Exception { } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java index 47d9b02..7d7eae5 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java @@ -18,6 +18,7 @@ package org.apache.metron.writer; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import com.google.common.collect.Iterables; import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade; @@ -40,7 +41,7 @@ public class WriterToBulkWriter<MESSAGE_T> implements BulkMessageWriter<MESSAGE_ this.messageWriter = messageWriter; } @Override - public void init(Map stormConf, WriterConfiguration config) throws Exception { + public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) throws Exception { messageWriter.init(); } http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java index 085ca5c..6e0c371 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java @@ -92,8 +92,10 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { configurationTransformation = x -> x; } try { - bulkMessageWriter.init(stormConf - , configurationTransformation.apply(new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())) + bulkMessageWriter.init(stormConf, + context, + configurationTransformation.apply(new IndexingWriterConfiguration(bulkMessageWriter.getName(), + getConfigurations())) ); } catch (Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java index 4800787..a86dfbc 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java @@ -24,6 +24,7 @@ import org.apache.metron.common.dsl.StellarFunctions; import org.apache.metron.common.dsl.VariableResolver; import org.apache.metron.common.stellar.StellarCompiler; import org.apache.metron.common.stellar.StellarProcessor; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.writer.BulkMessageWriter; @@ -76,9 +77,10 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { } @Override - public void init(Map stormConfig, WriterConfiguration configurations) { + public void init(Map stormConfig, TopologyContext topologyContext, WriterConfiguration configurations) { this.stormConfig = stormConfig; this.stellarProcessor = new StellarProcessor(); + this.fileNameFormat.prepare(stormConfig,topologyContext); } http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java index 0a4bdcb..6153ed2 100644 --- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java @@ -23,6 +23,7 @@ import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; import org.junit.Assert; @@ -63,7 +64,7 @@ public class HdfsWriterTest { public void testGetHdfsPathNull() { WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(),createTopologyContext(), config); JSONObject message = new JSONObject(); Object result = writer.getHdfsPathExtension(SENSOR_NAME,null, message); @@ -76,7 +77,7 @@ public class HdfsWriterTest { public void testGetHdfsPathEmptyString() { WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); JSONObject message = new JSONObject(); Object result = writer.getHdfsPathExtension(SENSOR_NAME, "", message); @@ -89,7 +90,7 @@ public class HdfsWriterTest { public void testGetHdfsPathConstant() { WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); JSONObject message = new JSONObject(); Object result = writer.getHdfsPathExtension(SENSOR_NAME, "'new'", message); @@ -102,7 +103,7 @@ public class HdfsWriterTest { public void testGetHdfsPathDirectVariable() { WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); JSONObject message = new JSONObject(); message.put("test.key", "test.value"); @@ -116,7 +117,7 @@ public class HdfsWriterTest { public void testGetHdfsPathFormatConstant() { WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); JSONObject message = new JSONObject(); Object result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('/test/folder/')", message); @@ -130,7 +131,7 @@ public class HdfsWriterTest { IndexingConfigurations indexingConfig = new IndexingConfigurations(); WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); JSONObject message = new JSONObject(); message.put("test.key", "test.value"); @@ -142,12 +143,22 @@ public class HdfsWriterTest { } @Test + public void testSetsCorrectHdfsFilename() { + IndexingConfigurations indexingConfig = new IndexingConfigurations(); + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); + String filename = writer.fileNameFormat.getName(1,1); + Assert.assertEquals("prefix-Xcom-7-1-1.json", filename); + } + + @Test @SuppressWarnings("unchecked") public void testGetHdfsPathMultipleFunctions() { IndexingConfigurations indexingConfig = new IndexingConfigurations(); WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); JSONObject message = new JSONObject(); message.put("test.key", "test.value"); @@ -169,7 +180,7 @@ public class HdfsWriterTest { IndexingConfigurations indexingConfig = new IndexingConfigurations(); WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); JSONObject message = new JSONObject(); message.put("test.key", "test.value"); @@ -182,7 +193,7 @@ public class HdfsWriterTest { public void testGetHdfsPathNonString() { WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); JSONObject message = new JSONObject(); writer.getHdfsPathExtension(SENSOR_NAME, "{'key':'value'}", message); @@ -195,7 +206,7 @@ public class HdfsWriterTest { WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat) .withMaxOpenFiles(maxFiles); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); for(int i = 0; i < maxFiles; i++) { writer.getSourceHandler(SENSOR_NAME, Integer.toString(i)); @@ -209,7 +220,7 @@ public class HdfsWriterTest { WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat) .withMaxOpenFiles(maxFiles); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); for(int i = 0; i < maxFiles+1; i++) { writer.getSourceHandler(SENSOR_NAME, Integer.toString(i)); @@ -226,7 +237,7 @@ public class HdfsWriterTest { HdfsWriter writer = new HdfsWriter().withFileNameFormat(format); IndexingConfigurations indexingConfig = new IndexingConfigurations(); WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); JSONObject message = new JSONObject(); message.put("test.key", "test.value"); @@ -270,7 +281,7 @@ public class HdfsWriterTest { .withExtension(".json") .withPrefix("prefix-"); HdfsWriter writer = new HdfsWriter().withFileNameFormat(format); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); // These two messages will be routed to the same folder, because test.key is the same JSONObject message = new JSONObject(); @@ -314,7 +325,7 @@ public class HdfsWriterTest { .withExtension(".json") .withPrefix("prefix-"); HdfsWriter writer = new HdfsWriter().withFileNameFormat(format); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); // These two messages will be routed to the same folder, because test.key is the same JSONObject message = new JSONObject(); @@ -371,7 +382,7 @@ public class HdfsWriterTest { .withExtension(".json") .withPrefix("prefix-"); HdfsWriter writer = new HdfsWriter().withFileNameFormat(format); - writer.init(new HashMap<String, String>(), config); + writer.init(new HashMap<String, String>(), createTopologyContext(), config); // These two messages will be routed to the same folder, because test.key is the same JSONObject message = new JSONObject(); @@ -408,4 +419,10 @@ public class HdfsWriterTest { indexingConfig.updateSensorIndexingConfig(SENSOR_NAME, sensorIndexingConfig); return new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); } + + private TopologyContext createTopologyContext(){ + Map<Integer, String> taskToComponent = new HashMap<Integer, String>(); + taskToComponent.put(7, "Xcom"); + return new TopologyContext(null, null, taskToComponent, null, null, null, null, null, 7, 6703, null, null, null, null, null, null); + } }
