Repository: metron
Updated Branches:
  refs/heads/master 4b3ccac36 -> 1277b6c32


METRON-934: Component and task id are missing in the indexing topology Hdfs 
file names. closes apache/incubator-metron#574


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/1277b6c3
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/1277b6c3
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/1277b6c3

Branch: refs/heads/master
Commit: 1277b6c3229ee4b7cb066902ac56e03145c18157
Parents: 4b3ccac
Author: dlyle65535 <[email protected]>
Authored: Tue May 16 10:25:09 2017 -0400
Committer: cstella <[email protected]>
Committed: Tue May 16 10:25:09 2017 -0400

----------------------------------------------------------------------
 .../metron/common/writer/BulkMessageWriter.java |  3 +-
 .../writer/ElasticsearchWriter.java             |  3 +-
 .../writer/SimpleHbaseEnrichmentWriter.java     |  3 +-
 .../bolt/BulkMessageWriterBoltTest.java         |  5 ++-
 .../apache/metron/parsers/bolt/ParserBolt.java  |  2 +-
 .../apache/metron/parsers/bolt/WriterBolt.java  |  2 +-
 .../metron/parsers/bolt/WriterHandler.java      |  5 ++-
 .../metron/parsers/bolt/ParserBoltTest.java     | 15 ++++---
 .../metron/parsers/bolt/WriterBoltTest.java     |  8 ++--
 .../apache/metron/solr/writer/SolrWriter.java   |  3 +-
 .../metron/solr/writer/SolrWriterTest.java      |  6 +--
 .../org/apache/metron/writer/NoopWriter.java    |  3 +-
 .../metron/writer/WriterToBulkWriter.java       |  3 +-
 .../writer/bolt/BulkMessageWriterBolt.java      |  6 ++-
 .../apache/metron/writer/hdfs/HdfsWriter.java   |  4 +-
 .../metron/writer/hdfs/HdfsWriterTest.java      | 47 +++++++++++++-------
 16 files changed, 74 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
index 5f427f4..64ba0cc 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.common.writer;
 
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
@@ -28,7 +29,7 @@ import java.util.Map;
 
 public interface BulkMessageWriter<MESSAGE_T> extends AutoCloseable, 
Serializable {
 
-  void init(Map stormConf, WriterConfiguration config) throws Exception;
+  void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config) throws Exception;
 
   /**
   * Writes the messages to a particular output (e.g. Elasticsearch). 
Exceptions trigger failure of the entire batch.

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index c90bc8c..a643b44 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.elasticsearch.writer;
 
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
@@ -57,7 +58,7 @@ public class ElasticsearchWriter implements 
BulkMessageWriter<JSONObject>, Seria
   }
 
   @Override
-  public void init(Map stormConf, WriterConfiguration configurations) {
+  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration configurations) {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
 
     Settings.Builder settingsBuilder = Settings.settingsBuilder();

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
index 343eecd..83c7151 100644
--- 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.enrichment.writer;
 
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -122,7 +123,7 @@ public class SimpleHbaseEnrichmentWriter extends 
AbstractWriter implements BulkM
   }
 
   @Override
-  public void init(Map stormConf, WriterConfiguration configuration) throws 
Exception {
+  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration configuration) throws Exception {
     if(converter == null) {
       converter = new EnrichmentConverter();
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 1f33060..79d8285 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -23,6 +23,7 @@ import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.metron.writer.BulkWriterComponent;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.adrianwalker.multilinestring.Multiline;
@@ -127,7 +128,7 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
     bulkMessageWriterBolt.declareOutputFields(declarer);
     verify(declarer, times(1)).declareStream(eq("error"), argThat(new 
FieldsMatcher("message")));
     Map stormConf = new HashMap();
-    doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf), 
any(WriterConfiguration.class));
+    doThrow(new 
Exception()).when(bulkMessageWriter).init(eq(stormConf),any(TopologyContext.class),
 any(WriterConfiguration.class));
     try {
       bulkMessageWriterBolt.prepare(stormConf, topologyContext, 
outputCollector);
       fail("A runtime exception should be thrown when bulkMessageWriter.init 
throws an exception");
@@ -135,7 +136,7 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
     reset(bulkMessageWriter);
     when(bulkMessageWriter.getName()).thenReturn("hdfs");
     bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
-    verify(bulkMessageWriter, times(1)).init(eq(stormConf), 
any(WriterConfiguration.class));
+    verify(bulkMessageWriter, 
times(1)).init(eq(stormConf),any(TopologyContext.class), 
any(WriterConfiguration.class));
     tupleList = new ArrayList<>();
     for(int i = 0; i < 4; i++) {
       when(tuple.getValueByField("message")).thenReturn(messageList.get(i));

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index e73b836..2c43c23 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -97,7 +97,7 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
 
     parser.init();
 
-    writer.init(stormConf, collector, getConfigurations());
+    writer.init(stormConf, context, collector, getConfigurations());
 
     SensorParserConfig config = getSensorParserConfig();
     if(config != null) {

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
index ef7288b..ef93ba2 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
@@ -55,7 +55,7 @@ public class WriterBolt extends BaseRichBolt {
   public void prepare(Map stormConf, TopologyContext context, OutputCollector 
collector) {
     this.collector = collector;
     messageGetStrategy = MessageGetters.DEFAULT_JSON_FROM_FIELD.get();
-    handler.init(stormConf, collector, configuration);
+    handler.init(stormConf, context, collector, configuration);
   }
 
   private JSONObject getMessage(Tuple tuple) {

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
index 3273ca7..2192942 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
@@ -20,6 +20,7 @@ package org.apache.metron.parsers.bolt;
 
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
@@ -57,7 +58,7 @@ public class WriterHandler implements Serializable {
     return isBulk;
   }
 
-  public void init(Map stormConf, OutputCollector collector, 
ParserConfigurations configurations) {
+  public void init(Map stormConf, TopologyContext topologyContext, 
OutputCollector collector, ParserConfigurations configurations) {
     if(isBulk) {
       writerTransformer = config -> new ParserWriterConfiguration(config);
     }
@@ -65,7 +66,7 @@ public class WriterHandler implements Serializable {
       writerTransformer = config -> new SingleBatchConfigurationFacade(new 
ParserWriterConfiguration(config));
     }
     try {
-      messageWriter.init(stormConf, writerTransformer.apply(configurations));
+      messageWriter.init(stormConf, topologyContext, 
writerTransformer.apply(configurations));
     } catch (Exception e) {
       throw new IllegalStateException("Unable to initialize message writer", 
e);
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index e924d50..835f17e 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -24,6 +24,7 @@ import org.apache.metron.common.error.MetronError;
 import org.apache.metron.test.error.MetronErrorJSONMatcher;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.ImmutableList;
 import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
@@ -91,7 +92,7 @@ public class ParserBoltTest extends BaseBoltTest {
     List<JSONObject> records = new ArrayList<>();
 
     @Override
-    public void init(Map stormConf, WriterConfiguration config) throws 
Exception {
+    public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config) throws Exception {
 
     }
 
@@ -297,7 +298,7 @@ public void testImplicitBatchOfOne() throws Exception {
   parserBolt.setTreeCache(cache);
   parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
   verify(parser, times(1)).init();
-  verify(batchWriter, times(1)).init(any(), any());
+  verify(batchWriter, times(1)).init(any(), any(), any());
   when(parser.validate(any())).thenReturn(true);
   
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject())));
   when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
@@ -343,7 +344,7 @@ public void testImplicitBatchOfOne() throws Exception {
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any());
+    verify(batchWriter, times(1)).init(any(), any(), any());
     BulkWriterResponse successResponse = mock(BulkWriterResponse.class);
     when(successResponse.getSuccesses()).thenReturn(ImmutableList.of(t1));
     when(batchWriter.write(any(), any(), any(), 
any())).thenReturn(successResponse);
@@ -380,7 +381,7 @@ public void testImplicitBatchOfOne() throws Exception {
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any());
+    verify(batchWriter, times(1)).init(any(), any(), any());
     when(parser.validate(any())).thenReturn(true);
     
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject(new HashMap<String, Object>() {{
       put("field2", "blah");
@@ -483,7 +484,7 @@ public void testImplicitBatchOfOne() throws Exception {
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any());
+    verify(batchWriter, times(1)).init(any(), any(), any());
     when(parser.validate(any())).thenReturn(true);
     
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject())));
     when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
@@ -522,7 +523,7 @@ public void testImplicitBatchOfOne() throws Exception {
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any());
+    verify(batchWriter, times(1)).init(any(), any(), any());
     when(parser.validate(any())).thenReturn(true);
     
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject())));
     when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
@@ -571,7 +572,7 @@ public void testImplicitBatchOfOne() throws Exception {
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
-    verify(batchWriter, times(1)).init(any(), any());
+    verify(batchWriter, times(1)).init(any(), any(), any());
 
     doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), 
any());
     when(parser.validate(any())).thenReturn(true);

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
index 4511b55..a23c368 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
@@ -92,7 +92,7 @@ public class WriterBoltTest extends BaseBoltTest{
     }
     WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), 
configurations, sensorType);
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(batchWriter, times(1)).init(any(), any());
+    verify(batchWriter, times(1)).init(any(), any(), any());
     for(int i = 0;i < 4;++i) {
       Tuple t = tuples.get(i);
       bolt.execute(t);
@@ -186,7 +186,7 @@ public class WriterBoltTest extends BaseBoltTest{
 
     WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), 
configurations, sensorType);
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(batchWriter, times(1)).init(any(), any());
+    verify(batchWriter, times(1)).init(any(), any(), any());
 
     for(int i = 0;i < 4;++i) {
       Tuple t = tuples.get(i);
@@ -232,7 +232,7 @@ public class WriterBoltTest extends BaseBoltTest{
 
     WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), 
configurations, sensorType);
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(batchWriter, times(1)).init(any(), any());
+    verify(batchWriter, times(1)).init(any(), any(), any());
 
     for(int i = 0;i < 4;++i) {
       Tuple t = tuples.get(i);
@@ -279,7 +279,7 @@ public class WriterBoltTest extends BaseBoltTest{
     WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), 
configurations, sensorType);
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
     doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), 
any());
-    verify(batchWriter, times(1)).init(any(), any());
+    verify(batchWriter, times(1)).init(any(), any(), any());
     for(int i = 0;i < 4;++i) {
       Tuple t = tuples.get(i);
       bolt.execute(t);

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
index 7180ca9..4e3246b 100644
--- 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
+++ 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.solr.writer;
 
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
@@ -56,7 +57,7 @@ public class SolrWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public void init(Map stormConf, WriterConfiguration configurations) throws 
IOException, SolrServerException {
+  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration configurations) throws IOException, SolrServerException {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
     if(solr == null) solr = new MetronSolrClient((String) 
globalConfiguration.get("solr.zookeeper"));
     String collection = getCollection(configurations);

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
index 14e5dcb..a56916f 100644
--- 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
@@ -107,7 +107,7 @@ public class SolrWriterTest {
     String collection = "metron";
     MetronSolrClient solr = Mockito.mock(MetronSolrClient.class);
     SolrWriter writer = new SolrWriter().withMetronSolrClient(solr);
-    writer.init(null, new IndexingWriterConfiguration("solr", configurations));
+    writer.init(null, null,new IndexingWriterConfiguration("solr", 
configurations));
     verify(solr, times(1)).createCollection(collection, 1, 1);
     verify(solr, times(1)).setDefaultCollection(collection);
 
@@ -120,7 +120,7 @@ public class SolrWriterTest {
     globalConfig.put("solr.replicationFactor", replicationFactor);
     configurations.updateGlobalConfig(globalConfig);
     writer = new SolrWriter().withMetronSolrClient(solr);
-    writer.init(null, new IndexingWriterConfiguration("solr", configurations));
+    writer.init(null, null, new IndexingWriterConfiguration("solr", 
configurations));
     verify(solr, times(1)).createCollection(collection, numShards, 
replicationFactor);
     verify(solr, times(1)).setDefaultCollection(collection);
 
@@ -130,7 +130,7 @@ public class SolrWriterTest {
     verify(solr, times(0)).commit(collection);
 
     writer = new 
SolrWriter().withMetronSolrClient(solr).withShouldCommit(true);
-    writer.init(null, new IndexingWriterConfiguration("solr", configurations));
+    writer.init(null, null, new IndexingWriterConfiguration("solr", 
configurations));
     writer.write("test", new IndexingWriterConfiguration("solr", 
configurations), new ArrayList<>(), messages);
     verify(solr, times(2)).add(argThat(new 
SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 
100.0)));
     verify(solr, times(2)).add(argThat(new 
SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 
200.0)));

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
index a31f48e..dbdd93d 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.writer;
 
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
@@ -126,7 +127,7 @@ public class NoopWriter extends AbstractWriter implements 
BulkMessageWriter<JSON
   }
 
   @Override
-  public void init(Map stormConf, WriterConfiguration config) throws Exception 
{
+  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config) throws Exception {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
index 47d9b02..7d7eae5 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.writer;
 
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.Iterables;
 import 
org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
@@ -40,7 +41,7 @@ public class WriterToBulkWriter<MESSAGE_T> implements 
BulkMessageWriter<MESSAGE_
     this.messageWriter = messageWriter;
   }
   @Override
-  public void init(Map stormConf, WriterConfiguration config) throws Exception 
{
+  public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration config) throws Exception {
     messageWriter.init();
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 085ca5c..6e0c371 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -92,8 +92,10 @@ public class BulkMessageWriterBolt extends 
ConfiguredIndexingBolt {
       configurationTransformation = x -> x;
     }
     try {
-      bulkMessageWriter.init(stormConf
-                            , configurationTransformation.apply(new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
+      bulkMessageWriter.init(stormConf,
+                             context,
+                             configurationTransformation.apply(new 
IndexingWriterConfiguration(bulkMessageWriter.getName(),
+                             getConfigurations()))
                             );
     } catch (Exception e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index 4800787..a86dfbc 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -24,6 +24,7 @@ import org.apache.metron.common.dsl.StellarFunctions;
 import org.apache.metron.common.dsl.VariableResolver;
 import org.apache.metron.common.stellar.StellarCompiler;
 import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.writer.BulkMessageWriter;
@@ -76,9 +77,10 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public void init(Map stormConfig, WriterConfiguration configurations) {
+  public void init(Map stormConfig, TopologyContext topologyContext, 
WriterConfiguration configurations) {
     this.stormConfig = stormConfig;
     this.stellarProcessor = new StellarProcessor();
+    this.fileNameFormat.prepare(stormConfig,topologyContext);
   }
 
 

http://git-wip-us.apache.org/repos/asf/metron/blob/1277b6c3/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
index 0a4bdcb..6153ed2 100644
--- 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@@ -23,6 +23,7 @@ import 
org.apache.metron.common.configuration.writer.IndexingWriterConfiguration
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
@@ -63,7 +64,7 @@ public class HdfsWriterTest {
   public void testGetHdfsPathNull() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(),createTopologyContext(), config);
 
     JSONObject message = new JSONObject();
     Object result = writer.getHdfsPathExtension(SENSOR_NAME,null, message);
@@ -76,7 +77,7 @@ public class HdfsWriterTest {
   public void testGetHdfsPathEmptyString() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
 
     JSONObject message = new JSONObject();
     Object result = writer.getHdfsPathExtension(SENSOR_NAME, "", message);
@@ -89,7 +90,7 @@ public class HdfsWriterTest {
   public void testGetHdfsPathConstant() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
 
     JSONObject message = new JSONObject();
     Object result = writer.getHdfsPathExtension(SENSOR_NAME, "'new'", message);
@@ -102,7 +103,7 @@ public class HdfsWriterTest {
   public void testGetHdfsPathDirectVariable() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
@@ -116,7 +117,7 @@ public class HdfsWriterTest {
   public void testGetHdfsPathFormatConstant() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
 
     JSONObject message = new JSONObject();
     Object result = writer.getHdfsPathExtension(SENSOR_NAME, 
"FORMAT('/test/folder/')", message);
@@ -130,7 +131,7 @@ public class HdfsWriterTest {
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
@@ -142,12 +143,22 @@ public class HdfsWriterTest {
   }
 
   @Test
+  public void testSetsCorrectHdfsFilename() {
+    IndexingConfigurations indexingConfig = new IndexingConfigurations();
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
+    String filename = writer.fileNameFormat.getName(1,1);
+    Assert.assertEquals("prefix-Xcom-7-1-1.json", filename);
+  }
+
+  @Test
   @SuppressWarnings("unchecked")
   public void testGetHdfsPathMultipleFunctions() {
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
@@ -169,7 +180,7 @@ public class HdfsWriterTest {
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
@@ -182,7 +193,7 @@ public class HdfsWriterTest {
   public void testGetHdfsPathNonString() {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
 
     JSONObject message = new JSONObject();
     writer.getHdfsPathExtension(SENSOR_NAME, "{'key':'value'}", message);
@@ -195,7 +206,7 @@ public class HdfsWriterTest {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat)
             .withMaxOpenFiles(maxFiles);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
 
     for(int i = 0; i < maxFiles; i++) {
       writer.getSourceHandler(SENSOR_NAME, Integer.toString(i));
@@ -209,7 +220,7 @@ public class HdfsWriterTest {
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat)
                                         .withMaxOpenFiles(maxFiles);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
 
     for(int i = 0; i < maxFiles+1; i++) {
       writer.getSourceHandler(SENSOR_NAME, Integer.toString(i));
@@ -226,7 +237,7 @@ public class HdfsWriterTest {
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
@@ -270,7 +281,7 @@ public class HdfsWriterTest {
             .withExtension(".json")
             .withPrefix("prefix-");
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
 
     // These two messages will be routed to the same folder, because test.key 
is the same
     JSONObject message = new JSONObject();
@@ -314,7 +325,7 @@ public class HdfsWriterTest {
             .withExtension(".json")
             .withPrefix("prefix-");
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(),  
config);
 
     // These two messages will be routed to the same folder, because test.key 
is the same
     JSONObject message = new JSONObject();
@@ -371,7 +382,7 @@ public class HdfsWriterTest {
             .withExtension(".json")
             .withPrefix("prefix-");
     HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
-    writer.init(new HashMap<String, String>(), config);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), 
config);
 
     // These two messages will be routed to the same folder, because test.key 
is the same
     JSONObject message = new JSONObject();
@@ -408,4 +419,10 @@ public class HdfsWriterTest {
     indexingConfig.updateSensorIndexingConfig(SENSOR_NAME, 
sensorIndexingConfig);
     return new IndexingWriterConfiguration(WRITER_NAME, indexingConfig);
   }
+
+  private TopologyContext createTopologyContext(){
+      Map<Integer, String> taskToComponent = new HashMap<Integer, String>();
+      taskToComponent.put(7, "Xcom");
+      return new TopologyContext(null, null, taskToComponent, null, null, 
null, null, null, 7, 6703, null, null, null, null, null, null);
+  }
 }

Reply via email to