Repository: incubator-metron
Updated Branches:
  refs/heads/master 339f95ed3 -> 7e21ad3c7


METRON-817 Customise output file path patterns for HDFS indexing (justinleet) 
closes apache/incubator-metron#505


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/7e21ad3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/7e21ad3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/7e21ad3c

Branch: refs/heads/master
Commit: 7e21ad3c79a337bdc8007c4af8bdd3109c1e7332
Parents: 339f95e
Author: justinleet <justinjl...@gmail.com>
Authored: Mon Apr 10 08:07:46 2017 -0400
Committer: leet <l...@apache.org>
Committed: Mon Apr 10 08:07:46 2017 -0400

----------------------------------------------------------------------
 .../configuration/IndexingConfigurations.java   |  13 +
 .../writer/IndexingWriterConfiguration.java     |   2 -
 metron-platform/metron-writer/README.md         |  79 ++++
 .../apache/metron/writer/hdfs/HdfsWriter.java   |  70 +++-
 .../hdfs/PathExtensionFileNameFormat.java       |  48 +++
 .../writer/hdfs/SourceAwareMoveAction.java      |   3 +-
 .../writer/hdfs/SourceFileNameFormat.java       |  48 ---
 .../metron/writer/hdfs/SourceHandler.java       |  74 ++--
 .../writer/hdfs/SourceHandlerCallback.java      |  35 ++
 .../metron/writer/hdfs/SourceHandlerKey.java    |  62 +++
 .../metron/writer/hdfs/HdfsWriterTest.java      | 411 +++++++++++++++++++
 .../hdfs/PathExtensionFileNameFormatTest.java   |  48 +++
 12 files changed, 806 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
index 5f7998b..dee97d0 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
@@ -31,6 +31,7 @@ public class IndexingConfigurations extends Configurations {
   public static final String BATCH_SIZE_CONF = "batchSize";
   public static final String ENABLED_CONF = "enabled";
   public static final String INDEX_CONF = "index";
+  public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction";
 
   public Map<String, Object> getSensorIndexingConfig(String sensorType, String 
writerName) {
     Map<String, Object> ret = (Map<String, Object>) 
configurations.get(getKey(sensorType));
@@ -84,6 +85,10 @@ public class IndexingConfigurations extends Configurations {
     return isEnabled(getSensorIndexingConfig(sensorName, writerName));
   }
 
+  public String getOutputPathFunction(String sensorName, String writerName) {
+    return getOutputPathFunction(getSensorIndexingConfig(sensorName, 
writerName), sensorName);
+  }
+
   public static boolean isEnabled(Map<String, Object> conf) {
     return getAs( ENABLED_CONF
                  ,conf
@@ -108,6 +113,14 @@ public class IndexingConfigurations extends Configurations 
{
                 );
   }
 
+  public static String getOutputPathFunction(Map<String, Object> conf, String 
sensorName) {
+    return getAs(OUTPUT_PATH_FUNCTION_CONF
+            ,conf
+            , ""
+            , String.class
+    );
+  }
+
   public static Map<String, Object> setEnabled(Map<String, Object> conf, 
boolean enabled) {
     Map<String, Object> ret = conf == null?new HashMap<>():conf;
     ret.put(ENABLED_CONF, enabled);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
index 7fca9c2..b6b2a4b 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
@@ -18,9 +18,7 @@
 
 package org.apache.metron.common.configuration.writer;
 
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.IndexingConfigurations;
-import org.apache.metron.common.utils.ConversionUtils;
 
 import java.util.Map;
 import java.util.Optional;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/README.md 
b/metron-platform/metron-writer/README.md
new file mode 100644
index 0000000..16c6686
--- /dev/null
+++ b/metron-platform/metron-writer/README.md
@@ -0,0 +1,79 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+ -->
+
+# Writer
+
+## Introduction
+The writer module provides some utilties for writing to outside components 
from within Storm.  This includes managing bulk writing.  An implemention is 
included for writing to HDFS in this module. Other writers can be found in 
their own modules.
+
+## HDFS Writer
+The HDFS writer included here expands on what Storm has in several ways. 
There's customization in syncing to HDFS, rotation policy, etc. In addition, 
the writer allows for users to define output paths based on the fields in the 
provided JSON message.  This can be defined using Stellar.
+
+To manage the output path, a base path argument is provided by the Flux file, 
with the FileNameFormat as follows
+```
+    -   id: "fileNameFormat"
+        className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
+        configMethods:
+            -   name: "withPrefix"
+                args:
+                    - "enrichment-"
+            -   name: "withExtension"
+                args:
+                  - ".json"
+            -   name: "withPath"
+                args:
+                    - "/apps/metron/"
+```
+This means that all output will land in `/apps/metron/`.  With no further 
adjustment, it will be `/apps/metron/<sensor>/`.
+However, by modifying the sensor's JSON config, it is possible to provide 
additional pathing based on the the message itself.
+
+E.g.
+```
+{
+  "index": "bro",
+  "batchSize": 5,
+  "outputPathFunction": "FORMAT('uid-%s', uid)"
+}
+```
+will land data in `/apps/metron/uid-<uid>/`.
+
+For example, if the data contains uid's 1, 3, and 5, there will be 3 output 
folders in HDFS:
+```
+/apps/metron/uid-1/
+/apps/metron/uid-3/
+/apps/metron/uid-5/
+```
+
+The Stellar function must return a String, but is not limited to FORMAT 
functions. Other functions, such as `TO_LOWER`, `TO_UPPER`, etc. are all 
available for use. Typically, it's preferable to do nontrivial transformations 
as part of enrichment and simply reference the output here.
+
+If no Stellar function is provided, it will default to putting the sensor in a 
folder, as above.
+
+A caveat is that the writer will only allow a certain number of files to be 
created at once.  HdfsWriter has a function `withMaxOpenFiles` allowing this to 
be set.  The default is 500.  This can be set in Flux:
+```
+    -   id: "hdfsWriter"
+        className: "org.apache.metron.writer.hdfs.HdfsWriter"
+        configMethods:
+            -   name: "withFileNameFormat"
+                args:
+                    - ref: "fileNameFormat"
+            -   name: "withRotationPolicy"
+                args:
+                    - ref: "hdfsRotationPolicy"
+            -   name: "withMaxOpenFiles"
+                args: 500
+```
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index 4f6b4bb..4800787 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -17,8 +17,14 @@
  */
 package org.apache.metron.writer.hdfs;
 
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.MapVariableResolver;
+import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.dsl.VariableResolver;
+import org.apache.metron.common.stellar.StellarCompiler;
+import org.apache.metron.common.stellar.StellarProcessor;
 import org.apache.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
@@ -39,8 +45,12 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
   FileRotationPolicy rotationPolicy = new NoRotationPolicy();
   SyncPolicy syncPolicy = new CountSyncPolicy(1); //sync every time, duh.
   FileNameFormat fileNameFormat;
-  Map<String, SourceHandler> sourceHandlerMap = new HashMap<>();
+  Map<SourceHandlerKey, SourceHandler> sourceHandlerMap = new HashMap<>();
+  int maxOpenFiles = 500;
+  transient StellarProcessor stellarProcessor;
   transient Map stormConfig;
+
+
   public HdfsWriter withFileNameFormat(FileNameFormat fileNameFormat){
     this.fileNameFormat = fileNameFormat;
     return this;
@@ -60,9 +70,15 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
     return this;
   }
 
+  public HdfsWriter withMaxOpenFiles(int maxOpenFiles) {
+    this.maxOpenFiles = maxOpenFiles;
+    return this;
+  }
+
   @Override
   public void init(Map stormConfig, WriterConfiguration configurations) {
     this.stormConfig = stormConfig;
+    this.stellarProcessor = new StellarProcessor();
   }
 
 
@@ -74,10 +90,20 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
                    ) throws Exception
   {
     BulkWriterResponse response = new BulkWriterResponse();
-    SourceHandler handler = 
getSourceHandler(configurations.getIndex(sourceType));
+    // Currently treating all the messages in a group for pass/failure.
     try {
-      handler.handle(messages);
-    } catch(Exception e) {
+      // Messages can all result in different HDFS paths, because of Stellar 
Expressions, so we'll need to iterate through
+      for(JSONObject message : messages) {
+        Map<String, Object> val = configurations.getSensorConfig(sourceType);
+        String path = getHdfsPathExtension(
+                sourceType,
+                
(String)configurations.getSensorConfig(sourceType).getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF,
 ""),
+                message
+        );
+        SourceHandler handler = getSourceHandler(sourceType, path);
+        handler.handle(message);
+      }
+    } catch (Exception e) {
       response.addAllErrors(e, tuples);
     }
 
@@ -85,6 +111,21 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
     return response;
   }
 
+  public String getHdfsPathExtension(String sourceType, String 
stellarFunction, JSONObject message) {
+    // If no function is provided, just use the sourceType directly
+    if(stellarFunction == null || stellarFunction.trim().isEmpty()) {
+      return sourceType;
+    }
+
+    //processor is a StellarProcessor();
+    VariableResolver resolver = new MapVariableResolver(message);
+    Object objResult = stellarProcessor.parse(stellarFunction, resolver, 
StellarFunctions.FUNCTION_RESOLVER(), Context.EMPTY_CONTEXT());
+    if(objResult != null && !(objResult instanceof String)) {
+      throw new IllegalArgumentException("Stellar Function <" + 
stellarFunction + "> did not return a String value. Returned: " + objResult);
+    }
+    return objResult == null ? "" : (String)objResult;
+  }
+
   @Override
   public String getName() {
     return "hdfs";
@@ -95,12 +136,23 @@ public class HdfsWriter implements 
BulkMessageWriter<JSONObject>, Serializable {
     for(SourceHandler handler : sourceHandlerMap.values()) {
       handler.close();
     }
+    // Everything is closed, so just clear it
+    sourceHandlerMap.clear();
   }
-  private synchronized SourceHandler getSourceHandler(String sourceType) 
throws IOException {
-    SourceHandler ret = sourceHandlerMap.get(sourceType);
+
+  synchronized SourceHandler getSourceHandler(String sourceType, String 
stellarResult) throws IOException {
+    SourceHandlerKey key = new SourceHandlerKey(sourceType, stellarResult);
+    SourceHandler ret = sourceHandlerMap.get(key);
     if(ret == null) {
-      ret = new SourceHandler(rotationActions, rotationPolicy, syncPolicy, new 
SourceFileNameFormat(sourceType, fileNameFormat), stormConfig);
-      sourceHandlerMap.put(sourceType, ret);
+      if(sourceHandlerMap.size() >= maxOpenFiles) {
+        throw new IllegalStateException("Too many HDFS files open!");
+      }
+      ret = new SourceHandler(rotationActions,
+                              rotationPolicy,
+                              syncPolicy,
+                              new 
PathExtensionFileNameFormat(key.getStellarResult(), fileNameFormat),
+                              new SourceHandlerCallback(sourceHandlerMap, 
key));
+      sourceHandlerMap.put(key, ret);
     }
     return ret;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormat.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormat.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormat.java
new file mode 100644
index 0000000..9fe4b9b
--- /dev/null
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormat.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+
+import java.util.Map;
+
+public class PathExtensionFileNameFormat implements FileNameFormat {
+  FileNameFormat delegate;
+  String pathExtension;
+  public PathExtensionFileNameFormat(String pathExtension, FileNameFormat 
delegate) {
+    this.delegate = delegate;
+    this.pathExtension = pathExtension;
+  }
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext) {
+    this.delegate.prepare(map, topologyContext);
+  }
+
+  @Override
+  public String getName(long rotation, long l1) {
+    return delegate.getName(rotation, l1);
+  }
+
+  @Override
+  public String getPath() {
+    return delegate.getPath() + "/" + pathExtension;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
index 1c345b4..7a6f2b5 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
@@ -26,7 +26,7 @@ import org.apache.storm.hdfs.common.rotation.RotationAction;
 import java.io.IOException;
 
 public class SourceAwareMoveAction implements RotationAction{
-  private static final Logger LOG = Logger.getLogger(SourceHandler.class);
+  private static final Logger LOG = 
Logger.getLogger(SourceAwareMoveAction.class);
   private String destination;
 
   public SourceAwareMoveAction toDestination(String destDir){
@@ -43,6 +43,5 @@ public class SourceAwareMoveAction implements RotationAction{
     Path destPath = new Path(new Path(destination, getSource(filePath)), 
filePath.getName());
     LOG.info("Moving file " + filePath + " to " + destPath);
     boolean success = fileSystem.rename(filePath, destPath);
-    return;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
deleted file mode 100644
index fe9e3e3..0000000
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.writer.hdfs;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.hdfs.bolt.format.FileNameFormat;
-
-import java.util.Map;
-
-public class SourceFileNameFormat implements FileNameFormat {
-  FileNameFormat delegate;
-  String sourceType;
-  public SourceFileNameFormat(String sourceType, FileNameFormat delegate) {
-    this.delegate = delegate;
-    this.sourceType = sourceType;
-  }
-
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext) {
-    this.delegate.prepare(map, topologyContext);
-  }
-
-  @Override
-  public String getName(long l, long l1) {
-    return delegate.getName(l, l1);
-  }
-
-  @Override
-  public String getPath() {
-    return delegate.getPath() + "/" + sourceType;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
index ba3f96c..fa6d8da 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -21,18 +21,14 @@ package org.apache.metron.writer.hdfs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.util.MD5FileUtils;
-import org.apache.hadoop.io.MD5Hash;
 import org.apache.log4j.Logger;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
-import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
 import org.json.simple.JSONObject;
 
 import java.io.FileOutputStream;
@@ -45,6 +41,7 @@ public class SourceHandler {
   FileRotationPolicy rotationPolicy;
   SyncPolicy syncPolicy;
   FileNameFormat fileNameFormat;
+  SourceHandlerCallback cleanupCallback;
   private long offset = 0;
   private int rotation = 0;
   private transient FSDataOutputStream out;
@@ -56,42 +53,44 @@ public class SourceHandler {
                       , FileRotationPolicy rotationPolicy
                       , SyncPolicy syncPolicy
                       , FileNameFormat fileNameFormat
-                      , Map config
-                      ) throws IOException {
+                      , SourceHandlerCallback cleanupCallback) throws 
IOException {
     this.rotationActions = rotationActions;
     this.rotationPolicy = rotationPolicy;
     this.syncPolicy = syncPolicy;
     this.fileNameFormat = fileNameFormat;
-    initialize(config);
+    initialize();
   }
 
   public void handle(List<JSONObject> messages) throws Exception{
-
     for(JSONObject message : messages) {
-      byte[] bytes = (message.toJSONString() + "\n").getBytes();
-      synchronized (this.writeLock) {
-        out.write(bytes);
-        this.offset += bytes.length;
-
-        if (this.syncPolicy.mark(null, this.offset)) {
-          if (this.out instanceof HdfsDataOutputStream) {
-            ((HdfsDataOutputStream) 
this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
-          } else {
-            this.out.hsync();
-          }
-          this.syncPolicy.reset();
+      handle(message);
+    }
+  }
+
+  protected void handle(JSONObject message) throws IOException {
+    byte[] bytes = (message.toJSONString() + "\n").getBytes();
+    synchronized (this.writeLock) {
+      out.write(bytes);
+      this.offset += bytes.length;
+
+      if (this.syncPolicy.mark(null, this.offset)) {
+        if (this.out instanceof HdfsDataOutputStream) {
+          ((HdfsDataOutputStream) 
this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+        } else {
+          this.out.hsync();
         }
+        this.syncPolicy.reset();
       }
+    }
 
-      if (this.rotationPolicy.mark(null, this.offset)) {
-        rotateOutputFile(); // synchronized
-        this.offset = 0;
-        this.rotationPolicy.reset();
-      }
+    if (this.rotationPolicy.mark(null, this.offset)) {
+      rotateOutputFile(); // synchronized
+      this.offset = 0;
+      this.rotationPolicy.reset();
     }
   }
 
-  private void initialize(Map config) throws IOException {
+  private void initialize() throws IOException {
     this.fs = FileSystem.get(new Configuration());
     this.currentFile = createOutputFile();
     if(this.rotationPolicy instanceof TimedRotationPolicy){
@@ -116,6 +115,8 @@ public class SourceHandler {
     long start = System.currentTimeMillis();
     synchronized (this.writeLock) {
       closeOutputFile();
+      // Want to use the callback to make sure we have an accurate count of 
open files.
+      cleanupCallback();
       this.rotation++;
 
       Path newFile = createOutputFile();
@@ -146,12 +147,33 @@ public class SourceHandler {
     this.out.close();
   }
 
+  private void cleanupCallback() {
+    this.cleanupCallback.removeKey();
+  }
 
   public void close() {
     try {
       closeOutputFile();
+      // Don't call cleanup, to avoid HashMap's 
ConcurrentModificationException while iterating
     } catch (IOException e) {
       throw new RuntimeException("Unable to close output file.", e);
     }
   }
+
+  @Override
+  public String toString() {
+    return "SourceHandler{" +
+            "rotationActions=" + rotationActions +
+            ", rotationPolicy=" + rotationPolicy +
+            ", syncPolicy=" + syncPolicy +
+            ", fileNameFormat=" + fileNameFormat +
+            ", offset=" + offset +
+            ", rotation=" + rotation +
+            ", out=" + out +
+            ", writeLock=" + writeLock +
+            ", rotationTimer=" + rotationTimer +
+            ", fs=" + fs +
+            ", currentFile=" + currentFile +
+            '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
new file mode 100644
index 0000000..bfd1daf
--- /dev/null
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import java.util.Map;
+
+public class SourceHandlerCallback {
+  Map<SourceHandlerKey, SourceHandler> sourceHandlerMap;
+  SourceHandlerKey key;
+  SourceHandlerCallback(Map<SourceHandlerKey, SourceHandler> sourceHandlerMap, 
SourceHandlerKey key) {
+    this.sourceHandlerMap = sourceHandlerMap;
+    this.key = key;
+  }
+
+  public void removeKey() {
+    sourceHandlerMap.remove(key);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
new file mode 100644
index 0000000..6bf0917
--- /dev/null
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+class SourceHandlerKey {
+  private String sourceType;
+  private String stellarResult;
+
+  SourceHandlerKey(String sourceType, String stellarResult) {
+    this.sourceType = sourceType;
+    this.stellarResult = stellarResult;
+  }
+
+  public String getSourceType() {
+    return sourceType;
+  }
+
+  public String getStellarResult() {
+    return stellarResult;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SourceHandlerKey that = (SourceHandlerKey) o;
+
+    if (sourceType != null ? !sourceType.equals(that.sourceType) : 
that.sourceType != null) {
+      return false;
+    }
+    return stellarResult != null ? stellarResult.equals(that.stellarResult) : 
that.stellarResult == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = sourceType != null ? sourceType.hashCode() : 0;
+    result = 31 * result + (stellarResult != null ? stellarResult.hashCode() : 
0);
+    return result;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
new file mode 100644
index 0000000..0a4bdcb
--- /dev/null
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import 
org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.*;
+
+// Suppress ConstantConditions to avoid NPE warnings that only would occur on 
test failure anyway
+@SuppressWarnings("ConstantConditions")
+public class HdfsWriterTest {
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private static final String SENSOR_NAME = "sensor";
+  private static final String WRITER_NAME = "writerName";
+
+  private File folder;
+  private FileNameFormat testFormat;
+
+  @Before
+  public void setup() throws IOException {
+    // Ensure each test has a unique folder to work with.
+    folder = tempFolder.newFolder();
+    testFormat = new DefaultFileNameFormat()
+            .withPath(folder.toString())
+            .withExtension(".json")
+            .withPrefix("prefix-");
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetHdfsPathNull() {
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), config);
+
+    JSONObject message = new JSONObject();
+    Object result = writer.getHdfsPathExtension(SENSOR_NAME,null, message);
+    writer.close();
+    Assert.assertEquals(SENSOR_NAME, result);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetHdfsPathEmptyString() {
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), config);
+
+    JSONObject message = new JSONObject();
+    Object result = writer.getHdfsPathExtension(SENSOR_NAME, "", message);
+    writer.close();
+    Assert.assertEquals(SENSOR_NAME, result);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetHdfsPathConstant() {
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), config);
+
+    JSONObject message = new JSONObject();
+    Object result = writer.getHdfsPathExtension(SENSOR_NAME, "'new'", message);
+    writer.close();
+    Assert.assertEquals("new", result);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetHdfsPathDirectVariable() {
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), config);
+
+    JSONObject message = new JSONObject();
+    message.put("test.key", "test.value");
+    Object result = writer.getHdfsPathExtension(SENSOR_NAME, "test.key", 
message);
+    writer.close();
+    Assert.assertEquals("test.value", result);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetHdfsPathFormatConstant() {
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), config);
+
+    JSONObject message = new JSONObject();
+    Object result = writer.getHdfsPathExtension(SENSOR_NAME, 
"FORMAT('/test/folder/')", message);
+    writer.close();
+    Assert.assertEquals("/test/folder/", result);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetHdfsPathFormatVariable() {
+    IndexingConfigurations indexingConfig = new IndexingConfigurations();
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), config);
+
+    JSONObject message = new JSONObject();
+    message.put("test.key", "test.value");
+    message.put("test.key.2", "test.value.2");
+    message.put("test.key.3", "test.value.3");
+    Object result = 
writer.getHdfsPathExtension(SENSOR_NAME,"FORMAT('%s/%s/%s', test.key, 
test.key.2, test.key.3)", message);
+    writer.close();
+    Assert.assertEquals("test.value/test.value.2/test.value.3", result);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetHdfsPathMultipleFunctions() {
+    IndexingConfigurations indexingConfig = new IndexingConfigurations();
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), config);
+
+    JSONObject message = new JSONObject();
+    message.put("test.key", "test.value");
+    message.put("test.key.2", "test.value.2");
+    Object result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('%s', 
test.key)", message);
+    Assert.assertEquals("test.value", result);
+
+    result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('%s/%s', 
test.key, test.key.2)", message);
+    Assert.assertEquals("test.value/test.value.2", result);
+
+    result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('%s', 
test.key)", message);
+    writer.close();
+    Assert.assertEquals("test.value", result);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetHdfsPathStringReturned() {
+    IndexingConfigurations indexingConfig = new IndexingConfigurations();
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), config);
+
+    JSONObject message = new JSONObject();
+    message.put("test.key", "test.value");
+    Object result = writer.getHdfsPathExtension(SENSOR_NAME, 
"TO_UPPER(FORMAT(MAP_GET('key', {'key': 'AbC%s'}), test.key))", message);
+    writer.close();
+    Assert.assertEquals("ABCTEST.VALUE", result);
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testGetHdfsPathNonString() {
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
new IndexingConfigurations());
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), config);
+
+    JSONObject message = new JSONObject();
+    writer.getHdfsPathExtension(SENSOR_NAME, "{'key':'value'}", message);
+  }
+
+  @Test
+  public void testGetSourceHandlerOpenFilesMax() throws IOException {
+    int maxFiles = 2;
+    IndexingConfigurations indexingConfig = new IndexingConfigurations();
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat)
+            .withMaxOpenFiles(maxFiles);
+    writer.init(new HashMap<String, String>(), config);
+
+    for(int i = 0; i < maxFiles; i++) {
+      writer.getSourceHandler(SENSOR_NAME, Integer.toString(i));
+    }
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testGetSourceHandlerOpenFilesOverMax() throws IOException {
+    int maxFiles = 2;
+    IndexingConfigurations indexingConfig = new IndexingConfigurations();
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat)
+                                        .withMaxOpenFiles(maxFiles);
+    writer.init(new HashMap<String, String>(), config);
+
+    for(int i = 0; i < maxFiles+1; i++) {
+      writer.getSourceHandler(SENSOR_NAME, Integer.toString(i));
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testWriteNoOutputFunction() throws Exception {
+    FileNameFormat format = new DefaultFileNameFormat()
+            .withPath(folder.toString())
+            .withExtension(".json")
+            .withPrefix("prefix-");
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
+    IndexingConfigurations indexingConfig = new IndexingConfigurations();
+    WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, 
indexingConfig);
+    writer.init(new HashMap<String, String>(), config);
+
+    JSONObject message = new JSONObject();
+    message.put("test.key", "test.value");
+    message.put("test.key2", "test.value2");
+    JSONObject message2 = new JSONObject();
+    message2.put("test.key", "test.value3");
+    message2.put("test.key2", "test.value2");
+    ArrayList<JSONObject> messages = new ArrayList<>();
+    messages.add(message);
+    messages.add(message2);
+
+    ArrayList<Tuple> tuples = new ArrayList<>();
+
+    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.close();
+
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add(message.toJSONString());
+    expected.add(message2.toJSONString());
+    Collections.sort(expected);
+
+    // Default to just putting it in the base folder + the sensor name
+    File outputFolder = new File(folder.getAbsolutePath() + "/" + SENSOR_NAME);
+    Assert.assertTrue(outputFolder.exists() && outputFolder.isDirectory());
+    Assert.assertEquals(1, outputFolder.listFiles().length);
+
+    for(File file : outputFolder.listFiles()) {
+      List<String> lines = Files.readAllLines(file.toPath());
+      Collections.sort(lines);
+      Assert.assertEquals(expected, lines);
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testWriteSingleFile() throws Exception {
+    String function = "FORMAT('test-%s/%s', test.key, test.key)";
+    WriterConfiguration config = buildWriterConfiguration(function);
+    FileNameFormat format = new DefaultFileNameFormat()
+            .withPath(folder.toString())
+            .withExtension(".json")
+            .withPrefix("prefix-");
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
+    writer.init(new HashMap<String, String>(), config);
+
+    // These two messages will be routed to the same folder, because test.key 
is the same
+    JSONObject message = new JSONObject();
+    message.put("test.key", "test.value");
+    message.put("test.key2", "test.value2");
+    JSONObject message2 = new JSONObject();
+    message2.put("test.key", "test.value");
+    message2.put("test.key3", "test.value2");
+    ArrayList<JSONObject> messages = new ArrayList<>();
+    messages.add(message);
+    messages.add(message2);
+
+    ArrayList<Tuple> tuples = new ArrayList<>();
+
+    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.close();
+
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add(message.toJSONString());
+    expected.add(message2.toJSONString());
+    Collections.sort(expected);
+
+    File outputFolder = new File(folder.getAbsolutePath() + 
"/test-test.value/test.value/");
+    Assert.assertTrue(outputFolder.exists() && outputFolder.isDirectory());
+    Assert.assertEquals(1, outputFolder.listFiles().length);
+
+    for(File file : outputFolder.listFiles()) {
+      List<String> lines = Files.readAllLines(file.toPath());
+      Collections.sort(lines);
+      Assert.assertEquals(expected, lines);
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testWriteMultipleFiles() throws Exception {
+    String function = "FORMAT('test-%s/%s', test.key, test.key)";
+    WriterConfiguration config = buildWriterConfiguration(function);
+    FileNameFormat format = new DefaultFileNameFormat()
+            .withPath(folder.toString())
+            .withExtension(".json")
+            .withPrefix("prefix-");
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
+    writer.init(new HashMap<String, String>(), config);
+
+    // These two messages will be routed to the same folder, because test.key 
is the same
+    JSONObject message = new JSONObject();
+    message.put("test.key", "test.value");
+    message.put("test.key2", "test.value2");
+    JSONObject message2 = new JSONObject();
+    message2.put("test.key", "test.value2");
+    message2.put("test.key3", "test.value3");
+    ArrayList<JSONObject> messages = new ArrayList<>();
+    messages.add(message);
+    messages.add(message2);
+
+    ArrayList<Tuple> tuples = new ArrayList<>();
+
+    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.close();
+
+    ArrayList<String> expected1 = new ArrayList<>();
+    expected1.add(message.toJSONString());
+    Collections.sort(expected1);
+
+    File outputFolder1 = new File(folder.getAbsolutePath() + 
"/test-test.value/test.value/");
+    Assert.assertTrue(outputFolder1.exists() && outputFolder1.isDirectory());
+    Assert.assertEquals(1, outputFolder1.listFiles().length);
+
+    for(File file : outputFolder1.listFiles()) {
+      List<String> lines = Files.readAllLines(file.toPath());
+      Collections.sort(lines);
+      Assert.assertEquals(expected1, lines);
+    }
+
+    ArrayList<String> expected2 = new ArrayList<>();
+    expected2.add(message2.toJSONString());
+    Collections.sort(expected2);
+
+    File outputFolder2 = new File(folder.getAbsolutePath() + 
"/test-test.value2/test.value2/");
+    Assert.assertTrue(outputFolder2.exists() && outputFolder2.isDirectory());
+    Assert.assertEquals(1, outputFolder2.listFiles().length);
+
+    for(File file : outputFolder2.listFiles()) {
+      List<String> lines = Files.readAllLines(file.toPath());
+      Collections.sort(lines);
+      Assert.assertEquals(expected2, lines);
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testWriteSingleFileWithNull() throws Exception {
+    String function = "FORMAT('test-%s/%s', test.key, test.key)";
+    WriterConfiguration config = buildWriterConfiguration(function);
+    FileNameFormat format = new DefaultFileNameFormat()
+            .withPath(folder.toString())
+            .withExtension(".json")
+            .withPrefix("prefix-");
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(format);
+    writer.init(new HashMap<String, String>(), config);
+
+    // These two messages will be routed to the same folder, because test.key 
is the same
+    JSONObject message = new JSONObject();
+    message.put("test.key2", "test.value2");
+    ArrayList<JSONObject> messages = new ArrayList<>();
+    messages.add(message);
+
+    ArrayList<Tuple> tuples = new ArrayList<>();
+
+    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.close();
+
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add(message.toJSONString());
+    Collections.sort(expected);
+
+    File outputFolder = new File(folder.getAbsolutePath() + 
"/test-null/null/");
+    Assert.assertTrue(outputFolder.exists() && outputFolder.isDirectory());
+    Assert.assertEquals(1, outputFolder.listFiles().length);
+
+    for(File file : outputFolder.listFiles()) {
+      List<String> lines = Files.readAllLines(file.toPath());
+      Collections.sort(lines);
+      Assert.assertEquals(expected, lines);
+    }
+  }
+
+  protected WriterConfiguration buildWriterConfiguration(String function) {
+    IndexingConfigurations indexingConfig = new IndexingConfigurations();
+    Map<String, Object> sensorIndexingConfig = new HashMap<>();
+    Map<String, Object> writerIndexingConfig = new HashMap<>();
+    writerIndexingConfig.put(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, 
function);
+    sensorIndexingConfig.put(WRITER_NAME, writerIndexingConfig);
+    indexingConfig.updateSensorIndexingConfig(SENSOR_NAME, 
sensorIndexingConfig);
+    return new IndexingWriterConfiguration(WRITER_NAME, indexingConfig);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormatTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormatTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormatTest.java
new file mode 100644
index 0000000..9825311
--- /dev/null
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormatTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PathExtensionFileNameFormatTest {
+
+  private static final String PATH = "/apps/metron";
+  private static final String EXTENSION = ".json";
+  private static final String PATH_EXTENSION = "field_result";
+
+  @Test
+  public void testGetPath() {
+    FileNameFormat delegate = new 
DefaultFileNameFormat().withExtension(EXTENSION).withPath(PATH);
+    FileNameFormat sourceFormat = new 
PathExtensionFileNameFormat(PATH_EXTENSION, delegate);
+    String actual = sourceFormat.getPath();
+    String expected = PATH + "/" + PATH_EXTENSION;
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testGetPathEmptyPathExtension() {
+    FileNameFormat delegate = new 
DefaultFileNameFormat().withExtension(EXTENSION).withPath(PATH);
+    FileNameFormat sourceFormat = new PathExtensionFileNameFormat("", 
delegate);
+    String actual = sourceFormat.getPath();
+    Assert.assertEquals(PATH + "/", actual);
+  }
+}

Reply via email to