Repository: incubator-metron Updated Branches: refs/heads/master 339f95ed3 -> 7e21ad3c7
METRON-817 Customise output file path patterns for HDFS indexing (justinleet) closes apache/incubator-metron#505 Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/7e21ad3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/7e21ad3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/7e21ad3c Branch: refs/heads/master Commit: 7e21ad3c79a337bdc8007c4af8bdd3109c1e7332 Parents: 339f95e Author: justinleet <justinjl...@gmail.com> Authored: Mon Apr 10 08:07:46 2017 -0400 Committer: leet <l...@apache.org> Committed: Mon Apr 10 08:07:46 2017 -0400 ---------------------------------------------------------------------- .../configuration/IndexingConfigurations.java | 13 + .../writer/IndexingWriterConfiguration.java | 2 - metron-platform/metron-writer/README.md | 79 ++++ .../apache/metron/writer/hdfs/HdfsWriter.java | 70 +++- .../hdfs/PathExtensionFileNameFormat.java | 48 +++ .../writer/hdfs/SourceAwareMoveAction.java | 3 +- .../writer/hdfs/SourceFileNameFormat.java | 48 --- .../metron/writer/hdfs/SourceHandler.java | 74 ++-- .../writer/hdfs/SourceHandlerCallback.java | 35 ++ .../metron/writer/hdfs/SourceHandlerKey.java | 62 +++ .../metron/writer/hdfs/HdfsWriterTest.java | 411 +++++++++++++++++++ .../hdfs/PathExtensionFileNameFormatTest.java | 48 +++ 12 files changed, 806 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java index 5f7998b..dee97d0 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java @@ -31,6 +31,7 @@ public class IndexingConfigurations extends Configurations { public static final String BATCH_SIZE_CONF = "batchSize"; public static final String ENABLED_CONF = "enabled"; public static final String INDEX_CONF = "index"; + public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction"; public Map<String, Object> getSensorIndexingConfig(String sensorType, String writerName) { Map<String, Object> ret = (Map<String, Object>) configurations.get(getKey(sensorType)); @@ -84,6 +85,10 @@ public class IndexingConfigurations extends Configurations { return isEnabled(getSensorIndexingConfig(sensorName, writerName)); } + public String getOutputPathFunction(String sensorName, String writerName) { + return getOutputPathFunction(getSensorIndexingConfig(sensorName, writerName), sensorName); + } + public static boolean isEnabled(Map<String, Object> conf) { return getAs( ENABLED_CONF ,conf @@ -108,6 +113,14 @@ public class IndexingConfigurations extends Configurations { ); } + public static String getOutputPathFunction(Map<String, Object> conf, String sensorName) { + return getAs(OUTPUT_PATH_FUNCTION_CONF + ,conf + , "" + , String.class + ); + } + public static Map<String, Object> setEnabled(Map<String, Object> conf, boolean enabled) { Map<String, Object> ret = conf == null?new HashMap<>():conf; ret.put(ENABLED_CONF, enabled); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java index 7fca9c2..b6b2a4b 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java @@ -18,9 +18,7 @@ package org.apache.metron.common.configuration.writer; -import org.apache.metron.common.configuration.EnrichmentConfigurations; import org.apache.metron.common.configuration.IndexingConfigurations; -import org.apache.metron.common.utils.ConversionUtils; import java.util.Map; import java.util.Optional; http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/README.md b/metron-platform/metron-writer/README.md new file mode 100644 index 0000000..16c6686 --- /dev/null +++ b/metron-platform/metron-writer/README.md @@ -0,0 +1,79 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + --> + +# Writer + +## Introduction +The writer module provides some utilties for writing to outside components from within Storm. This includes managing bulk writing. An implemention is included for writing to HDFS in this module. Other writers can be found in their own modules. + +## HDFS Writer +The HDFS writer included here expands on what Storm has in several ways. There's customization in syncing to HDFS, rotation policy, etc. In addition, the writer allows for users to define output paths based on the fields in the provided JSON message. This can be defined using Stellar. + +To manage the output path, a base path argument is provided by the Flux file, with the FileNameFormat as follows +``` + - id: "fileNameFormat" + className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat" + configMethods: + - name: "withPrefix" + args: + - "enrichment-" + - name: "withExtension" + args: + - ".json" + - name: "withPath" + args: + - "/apps/metron/" +``` +This means that all output will land in `/apps/metron/`. With no further adjustment, it will be `/apps/metron/<sensor>/`. +However, by modifying the sensor's JSON config, it is possible to provide additional pathing based on the the message itself. + +E.g. +``` +{ + "index": "bro", + "batchSize": 5, + "outputPathFunction": "FORMAT('uid-%s', uid)" +} +``` +will land data in `/apps/metron/uid-<uid>/`. + +For example, if the data contains uid's 1, 3, and 5, there will be 3 output folders in HDFS: +``` +/apps/metron/uid-1/ +/apps/metron/uid-3/ +/apps/metron/uid-5/ +``` + +The Stellar function must return a String, but is not limited to FORMAT functions. Other functions, such as `TO_LOWER`, `TO_UPPER`, etc. are all available for use. Typically, it's preferable to do nontrivial transformations as part of enrichment and simply reference the output here. + +If no Stellar function is provided, it will default to putting the sensor in a folder, as above. + +A caveat is that the writer will only allow a certain number of files to be created at once. HdfsWriter has a function `withMaxOpenFiles` allowing this to be set. The default is 500. This can be set in Flux: +``` + - id: "hdfsWriter" + className: "org.apache.metron.writer.hdfs.HdfsWriter" + configMethods: + - name: "withFileNameFormat" + args: + - ref: "fileNameFormat" + - name: "withRotationPolicy" + args: + - ref: "hdfsRotationPolicy" + - name: "withMaxOpenFiles" + args: 500 +``` + http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java index 4f6b4bb..4800787 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java @@ -17,8 +17,14 @@ */ package org.apache.metron.writer.hdfs; +import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.dsl.Context; +import org.apache.metron.common.dsl.MapVariableResolver; +import org.apache.metron.common.dsl.StellarFunctions; +import org.apache.metron.common.dsl.VariableResolver; +import org.apache.metron.common.stellar.StellarCompiler; +import org.apache.metron.common.stellar.StellarProcessor; import org.apache.storm.tuple.Tuple; -import org.apache.metron.common.configuration.EnrichmentConfigurations; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; @@ -39,8 +45,12 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { FileRotationPolicy rotationPolicy = new NoRotationPolicy(); SyncPolicy syncPolicy = new CountSyncPolicy(1); //sync every time, duh. FileNameFormat fileNameFormat; - Map<String, SourceHandler> sourceHandlerMap = new HashMap<>(); + Map<SourceHandlerKey, SourceHandler> sourceHandlerMap = new HashMap<>(); + int maxOpenFiles = 500; + transient StellarProcessor stellarProcessor; transient Map stormConfig; + + public HdfsWriter withFileNameFormat(FileNameFormat fileNameFormat){ this.fileNameFormat = fileNameFormat; return this; @@ -60,9 +70,15 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { return this; } + public HdfsWriter withMaxOpenFiles(int maxOpenFiles) { + this.maxOpenFiles = maxOpenFiles; + return this; + } + @Override public void init(Map stormConfig, WriterConfiguration configurations) { this.stormConfig = stormConfig; + this.stellarProcessor = new StellarProcessor(); } @@ -74,10 +90,20 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { ) throws Exception { BulkWriterResponse response = new BulkWriterResponse(); - SourceHandler handler = getSourceHandler(configurations.getIndex(sourceType)); + // Currently treating all the messages in a group for pass/failure. try { - handler.handle(messages); - } catch(Exception e) { + // Messages can all result in different HDFS paths, because of Stellar Expressions, so we'll need to iterate through + for(JSONObject message : messages) { + Map<String, Object> val = configurations.getSensorConfig(sourceType); + String path = getHdfsPathExtension( + sourceType, + (String)configurations.getSensorConfig(sourceType).getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, ""), + message + ); + SourceHandler handler = getSourceHandler(sourceType, path); + handler.handle(message); + } + } catch (Exception e) { response.addAllErrors(e, tuples); } @@ -85,6 +111,21 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { return response; } + public String getHdfsPathExtension(String sourceType, String stellarFunction, JSONObject message) { + // If no function is provided, just use the sourceType directly + if(stellarFunction == null || stellarFunction.trim().isEmpty()) { + return sourceType; + } + + //processor is a StellarProcessor(); + VariableResolver resolver = new MapVariableResolver(message); + Object objResult = stellarProcessor.parse(stellarFunction, resolver, StellarFunctions.FUNCTION_RESOLVER(), Context.EMPTY_CONTEXT()); + if(objResult != null && !(objResult instanceof String)) { + throw new IllegalArgumentException("Stellar Function <" + stellarFunction + "> did not return a String value. Returned: " + objResult); + } + return objResult == null ? "" : (String)objResult; + } + @Override public String getName() { return "hdfs"; @@ -95,12 +136,23 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { for(SourceHandler handler : sourceHandlerMap.values()) { handler.close(); } + // Everything is closed, so just clear it + sourceHandlerMap.clear(); } - private synchronized SourceHandler getSourceHandler(String sourceType) throws IOException { - SourceHandler ret = sourceHandlerMap.get(sourceType); + + synchronized SourceHandler getSourceHandler(String sourceType, String stellarResult) throws IOException { + SourceHandlerKey key = new SourceHandlerKey(sourceType, stellarResult); + SourceHandler ret = sourceHandlerMap.get(key); if(ret == null) { - ret = new SourceHandler(rotationActions, rotationPolicy, syncPolicy, new SourceFileNameFormat(sourceType, fileNameFormat), stormConfig); - sourceHandlerMap.put(sourceType, ret); + if(sourceHandlerMap.size() >= maxOpenFiles) { + throw new IllegalStateException("Too many HDFS files open!"); + } + ret = new SourceHandler(rotationActions, + rotationPolicy, + syncPolicy, + new PathExtensionFileNameFormat(key.getStellarResult(), fileNameFormat), + new SourceHandlerCallback(sourceHandlerMap, key)); + sourceHandlerMap.put(key, ret); } return ret; } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormat.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormat.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormat.java new file mode 100644 index 0000000..9fe4b9b --- /dev/null +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormat.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.writer.hdfs; + +import org.apache.storm.task.TopologyContext; +import org.apache.storm.hdfs.bolt.format.FileNameFormat; + +import java.util.Map; + +public class PathExtensionFileNameFormat implements FileNameFormat { + FileNameFormat delegate; + String pathExtension; + public PathExtensionFileNameFormat(String pathExtension, FileNameFormat delegate) { + this.delegate = delegate; + this.pathExtension = pathExtension; + } + + @Override + public void prepare(Map map, TopologyContext topologyContext) { + this.delegate.prepare(map, topologyContext); + } + + @Override + public String getName(long rotation, long l1) { + return delegate.getName(rotation, l1); + } + + @Override + public String getPath() { + return delegate.getPath() + "/" + pathExtension; + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java index 1c345b4..7a6f2b5 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java @@ -26,7 +26,7 @@ import org.apache.storm.hdfs.common.rotation.RotationAction; import java.io.IOException; public class SourceAwareMoveAction implements RotationAction{ - private static final Logger LOG = Logger.getLogger(SourceHandler.class); + private static final Logger LOG = Logger.getLogger(SourceAwareMoveAction.class); private String destination; public SourceAwareMoveAction toDestination(String destDir){ @@ -43,6 +43,5 @@ public class SourceAwareMoveAction implements RotationAction{ Path destPath = new Path(new Path(destination, getSource(filePath)), filePath.getName()); LOG.info("Moving file " + filePath + " to " + destPath); boolean success = fileSystem.rename(filePath, destPath); - return; } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java deleted file mode 100644 index fe9e3e3..0000000 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.writer.hdfs; - -import org.apache.storm.task.TopologyContext; -import org.apache.storm.hdfs.bolt.format.FileNameFormat; - -import java.util.Map; - -public class SourceFileNameFormat implements FileNameFormat { - FileNameFormat delegate; - String sourceType; - public SourceFileNameFormat(String sourceType, FileNameFormat delegate) { - this.delegate = delegate; - this.sourceType = sourceType; - } - - @Override - public void prepare(Map map, TopologyContext topologyContext) { - this.delegate.prepare(map, topologyContext); - } - - @Override - public String getName(long l, long l1) { - return delegate.getName(l, l1); - } - - @Override - public String getPath() { - return delegate.getPath() + "/" + sourceType; - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java index ba3f96c..fa6d8da 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java @@ -21,18 +21,14 @@ package org.apache.metron.writer.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; -import org.apache.hadoop.hdfs.util.MD5FileUtils; -import org.apache.hadoop.io.MD5Hash; import org.apache.log4j.Logger; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import org.apache.storm.hdfs.common.rotation.RotationAction; -import org.apache.storm.hdfs.common.security.HdfsSecurityUtil; import org.json.simple.JSONObject; import java.io.FileOutputStream; @@ -45,6 +41,7 @@ public class SourceHandler { FileRotationPolicy rotationPolicy; SyncPolicy syncPolicy; FileNameFormat fileNameFormat; + SourceHandlerCallback cleanupCallback; private long offset = 0; private int rotation = 0; private transient FSDataOutputStream out; @@ -56,42 +53,44 @@ public class SourceHandler { , FileRotationPolicy rotationPolicy , SyncPolicy syncPolicy , FileNameFormat fileNameFormat - , Map config - ) throws IOException { + , SourceHandlerCallback cleanupCallback) throws IOException { this.rotationActions = rotationActions; this.rotationPolicy = rotationPolicy; this.syncPolicy = syncPolicy; this.fileNameFormat = fileNameFormat; - initialize(config); + initialize(); } public void handle(List<JSONObject> messages) throws Exception{ - for(JSONObject message : messages) { - byte[] bytes = (message.toJSONString() + "\n").getBytes(); - synchronized (this.writeLock) { - out.write(bytes); - this.offset += bytes.length; - - if (this.syncPolicy.mark(null, this.offset)) { - if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); - } else { - this.out.hsync(); - } - this.syncPolicy.reset(); + handle(message); + } + } + + protected void handle(JSONObject message) throws IOException { + byte[] bytes = (message.toJSONString() + "\n").getBytes(); + synchronized (this.writeLock) { + out.write(bytes); + this.offset += bytes.length; + + if (this.syncPolicy.mark(null, this.offset)) { + if (this.out instanceof HdfsDataOutputStream) { + ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); + } else { + this.out.hsync(); } + this.syncPolicy.reset(); } + } - if (this.rotationPolicy.mark(null, this.offset)) { - rotateOutputFile(); // synchronized - this.offset = 0; - this.rotationPolicy.reset(); - } + if (this.rotationPolicy.mark(null, this.offset)) { + rotateOutputFile(); // synchronized + this.offset = 0; + this.rotationPolicy.reset(); } } - private void initialize(Map config) throws IOException { + private void initialize() throws IOException { this.fs = FileSystem.get(new Configuration()); this.currentFile = createOutputFile(); if(this.rotationPolicy instanceof TimedRotationPolicy){ @@ -116,6 +115,8 @@ public class SourceHandler { long start = System.currentTimeMillis(); synchronized (this.writeLock) { closeOutputFile(); + // Want to use the callback to make sure we have an accurate count of open files. + cleanupCallback(); this.rotation++; Path newFile = createOutputFile(); @@ -146,12 +147,33 @@ public class SourceHandler { this.out.close(); } + private void cleanupCallback() { + this.cleanupCallback.removeKey(); + } public void close() { try { closeOutputFile(); + // Don't call cleanup, to avoid HashMap's ConcurrentModificationException while iterating } catch (IOException e) { throw new RuntimeException("Unable to close output file.", e); } } + + @Override + public String toString() { + return "SourceHandler{" + + "rotationActions=" + rotationActions + + ", rotationPolicy=" + rotationPolicy + + ", syncPolicy=" + syncPolicy + + ", fileNameFormat=" + fileNameFormat + + ", offset=" + offset + + ", rotation=" + rotation + + ", out=" + out + + ", writeLock=" + writeLock + + ", rotationTimer=" + rotationTimer + + ", fs=" + fs + + ", currentFile=" + currentFile + + '}'; + } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java new file mode 100644 index 0000000..bfd1daf --- /dev/null +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.writer.hdfs; + +import java.util.Map; + +public class SourceHandlerCallback { + Map<SourceHandlerKey, SourceHandler> sourceHandlerMap; + SourceHandlerKey key; + SourceHandlerCallback(Map<SourceHandlerKey, SourceHandler> sourceHandlerMap, SourceHandlerKey key) { + this.sourceHandlerMap = sourceHandlerMap; + this.key = key; + } + + public void removeKey() { + sourceHandlerMap.remove(key); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java new file mode 100644 index 0000000..6bf0917 --- /dev/null +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.writer.hdfs; + +class SourceHandlerKey { + private String sourceType; + private String stellarResult; + + SourceHandlerKey(String sourceType, String stellarResult) { + this.sourceType = sourceType; + this.stellarResult = stellarResult; + } + + public String getSourceType() { + return sourceType; + } + + public String getStellarResult() { + return stellarResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SourceHandlerKey that = (SourceHandlerKey) o; + + if (sourceType != null ? !sourceType.equals(that.sourceType) : that.sourceType != null) { + return false; + } + return stellarResult != null ? stellarResult.equals(that.stellarResult) : that.stellarResult == null; + } + + @Override + public int hashCode() { + int result = sourceType != null ? sourceType.hashCode() : 0; + result = 31 * result + (stellarResult != null ? stellarResult.hashCode() : 0); + return result; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java new file mode 100644 index 0000000..0a4bdcb --- /dev/null +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.writer.hdfs; + +import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; +import org.apache.storm.hdfs.bolt.format.FileNameFormat; +import org.apache.storm.tuple.Tuple; +import org.json.simple.JSONObject; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.*; + +// Suppress ConstantConditions to avoid NPE warnings that only would occur on test failure anyway +@SuppressWarnings("ConstantConditions") +public class HdfsWriterTest { + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private static final String SENSOR_NAME = "sensor"; + private static final String WRITER_NAME = "writerName"; + + private File folder; + private FileNameFormat testFormat; + + @Before + public void setup() throws IOException { + // Ensure each test has a unique folder to work with. + folder = tempFolder.newFolder(); + testFormat = new DefaultFileNameFormat() + .withPath(folder.toString()) + .withExtension(".json") + .withPrefix("prefix-"); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetHdfsPathNull() { + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), config); + + JSONObject message = new JSONObject(); + Object result = writer.getHdfsPathExtension(SENSOR_NAME,null, message); + writer.close(); + Assert.assertEquals(SENSOR_NAME, result); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetHdfsPathEmptyString() { + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), config); + + JSONObject message = new JSONObject(); + Object result = writer.getHdfsPathExtension(SENSOR_NAME, "", message); + writer.close(); + Assert.assertEquals(SENSOR_NAME, result); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetHdfsPathConstant() { + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), config); + + JSONObject message = new JSONObject(); + Object result = writer.getHdfsPathExtension(SENSOR_NAME, "'new'", message); + writer.close(); + Assert.assertEquals("new", result); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetHdfsPathDirectVariable() { + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), config); + + JSONObject message = new JSONObject(); + message.put("test.key", "test.value"); + Object result = writer.getHdfsPathExtension(SENSOR_NAME, "test.key", message); + writer.close(); + Assert.assertEquals("test.value", result); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetHdfsPathFormatConstant() { + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), config); + + JSONObject message = new JSONObject(); + Object result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('/test/folder/')", message); + writer.close(); + Assert.assertEquals("/test/folder/", result); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetHdfsPathFormatVariable() { + IndexingConfigurations indexingConfig = new IndexingConfigurations(); + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), config); + + JSONObject message = new JSONObject(); + message.put("test.key", "test.value"); + message.put("test.key.2", "test.value.2"); + message.put("test.key.3", "test.value.3"); + Object result = writer.getHdfsPathExtension(SENSOR_NAME,"FORMAT('%s/%s/%s', test.key, test.key.2, test.key.3)", message); + writer.close(); + Assert.assertEquals("test.value/test.value.2/test.value.3", result); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetHdfsPathMultipleFunctions() { + IndexingConfigurations indexingConfig = new IndexingConfigurations(); + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), config); + + JSONObject message = new JSONObject(); + message.put("test.key", "test.value"); + message.put("test.key.2", "test.value.2"); + Object result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('%s', test.key)", message); + Assert.assertEquals("test.value", result); + + result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('%s/%s', test.key, test.key.2)", message); + Assert.assertEquals("test.value/test.value.2", result); + + result = writer.getHdfsPathExtension(SENSOR_NAME, "FORMAT('%s', test.key)", message); + writer.close(); + Assert.assertEquals("test.value", result); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetHdfsPathStringReturned() { + IndexingConfigurations indexingConfig = new IndexingConfigurations(); + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), config); + + JSONObject message = new JSONObject(); + message.put("test.key", "test.value"); + Object result = writer.getHdfsPathExtension(SENSOR_NAME, "TO_UPPER(FORMAT(MAP_GET('key', {'key': 'AbC%s'}), test.key))", message); + writer.close(); + Assert.assertEquals("ABCTEST.VALUE", result); + } + + @Test(expected=IllegalArgumentException.class) + public void testGetHdfsPathNonString() { + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, new IndexingConfigurations()); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat); + writer.init(new HashMap<String, String>(), config); + + JSONObject message = new JSONObject(); + writer.getHdfsPathExtension(SENSOR_NAME, "{'key':'value'}", message); + } + + @Test + public void testGetSourceHandlerOpenFilesMax() throws IOException { + int maxFiles = 2; + IndexingConfigurations indexingConfig = new IndexingConfigurations(); + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat) + .withMaxOpenFiles(maxFiles); + writer.init(new HashMap<String, String>(), config); + + for(int i = 0; i < maxFiles; i++) { + writer.getSourceHandler(SENSOR_NAME, Integer.toString(i)); + } + } + + @Test(expected=IllegalStateException.class) + public void testGetSourceHandlerOpenFilesOverMax() throws IOException { + int maxFiles = 2; + IndexingConfigurations indexingConfig = new IndexingConfigurations(); + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat) + .withMaxOpenFiles(maxFiles); + writer.init(new HashMap<String, String>(), config); + + for(int i = 0; i < maxFiles+1; i++) { + writer.getSourceHandler(SENSOR_NAME, Integer.toString(i)); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testWriteNoOutputFunction() throws Exception { + FileNameFormat format = new DefaultFileNameFormat() + .withPath(folder.toString()) + .withExtension(".json") + .withPrefix("prefix-"); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(format); + IndexingConfigurations indexingConfig = new IndexingConfigurations(); + WriterConfiguration config = new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); + writer.init(new HashMap<String, String>(), config); + + JSONObject message = new JSONObject(); + message.put("test.key", "test.value"); + message.put("test.key2", "test.value2"); + JSONObject message2 = new JSONObject(); + message2.put("test.key", "test.value3"); + message2.put("test.key2", "test.value2"); + ArrayList<JSONObject> messages = new ArrayList<>(); + messages.add(message); + messages.add(message2); + + ArrayList<Tuple> tuples = new ArrayList<>(); + + writer.write(SENSOR_NAME, config, tuples, messages); + writer.close(); + + ArrayList<String> expected = new ArrayList<>(); + expected.add(message.toJSONString()); + expected.add(message2.toJSONString()); + Collections.sort(expected); + + // Default to just putting it in the base folder + the sensor name + File outputFolder = new File(folder.getAbsolutePath() + "/" + SENSOR_NAME); + Assert.assertTrue(outputFolder.exists() && outputFolder.isDirectory()); + Assert.assertEquals(1, outputFolder.listFiles().length); + + for(File file : outputFolder.listFiles()) { + List<String> lines = Files.readAllLines(file.toPath()); + Collections.sort(lines); + Assert.assertEquals(expected, lines); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testWriteSingleFile() throws Exception { + String function = "FORMAT('test-%s/%s', test.key, test.key)"; + WriterConfiguration config = buildWriterConfiguration(function); + FileNameFormat format = new DefaultFileNameFormat() + .withPath(folder.toString()) + .withExtension(".json") + .withPrefix("prefix-"); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(format); + writer.init(new HashMap<String, String>(), config); + + // These two messages will be routed to the same folder, because test.key is the same + JSONObject message = new JSONObject(); + message.put("test.key", "test.value"); + message.put("test.key2", "test.value2"); + JSONObject message2 = new JSONObject(); + message2.put("test.key", "test.value"); + message2.put("test.key3", "test.value2"); + ArrayList<JSONObject> messages = new ArrayList<>(); + messages.add(message); + messages.add(message2); + + ArrayList<Tuple> tuples = new ArrayList<>(); + + writer.write(SENSOR_NAME, config, tuples, messages); + writer.close(); + + ArrayList<String> expected = new ArrayList<>(); + expected.add(message.toJSONString()); + expected.add(message2.toJSONString()); + Collections.sort(expected); + + File outputFolder = new File(folder.getAbsolutePath() + "/test-test.value/test.value/"); + Assert.assertTrue(outputFolder.exists() && outputFolder.isDirectory()); + Assert.assertEquals(1, outputFolder.listFiles().length); + + for(File file : outputFolder.listFiles()) { + List<String> lines = Files.readAllLines(file.toPath()); + Collections.sort(lines); + Assert.assertEquals(expected, lines); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testWriteMultipleFiles() throws Exception { + String function = "FORMAT('test-%s/%s', test.key, test.key)"; + WriterConfiguration config = buildWriterConfiguration(function); + FileNameFormat format = new DefaultFileNameFormat() + .withPath(folder.toString()) + .withExtension(".json") + .withPrefix("prefix-"); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(format); + writer.init(new HashMap<String, String>(), config); + + // These two messages will be routed to the same folder, because test.key is the same + JSONObject message = new JSONObject(); + message.put("test.key", "test.value"); + message.put("test.key2", "test.value2"); + JSONObject message2 = new JSONObject(); + message2.put("test.key", "test.value2"); + message2.put("test.key3", "test.value3"); + ArrayList<JSONObject> messages = new ArrayList<>(); + messages.add(message); + messages.add(message2); + + ArrayList<Tuple> tuples = new ArrayList<>(); + + writer.write(SENSOR_NAME, config, tuples, messages); + writer.close(); + + ArrayList<String> expected1 = new ArrayList<>(); + expected1.add(message.toJSONString()); + Collections.sort(expected1); + + File outputFolder1 = new File(folder.getAbsolutePath() + "/test-test.value/test.value/"); + Assert.assertTrue(outputFolder1.exists() && outputFolder1.isDirectory()); + Assert.assertEquals(1, outputFolder1.listFiles().length); + + for(File file : outputFolder1.listFiles()) { + List<String> lines = Files.readAllLines(file.toPath()); + Collections.sort(lines); + Assert.assertEquals(expected1, lines); + } + + ArrayList<String> expected2 = new ArrayList<>(); + expected2.add(message2.toJSONString()); + Collections.sort(expected2); + + File outputFolder2 = new File(folder.getAbsolutePath() + "/test-test.value2/test.value2/"); + Assert.assertTrue(outputFolder2.exists() && outputFolder2.isDirectory()); + Assert.assertEquals(1, outputFolder2.listFiles().length); + + for(File file : outputFolder2.listFiles()) { + List<String> lines = Files.readAllLines(file.toPath()); + Collections.sort(lines); + Assert.assertEquals(expected2, lines); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testWriteSingleFileWithNull() throws Exception { + String function = "FORMAT('test-%s/%s', test.key, test.key)"; + WriterConfiguration config = buildWriterConfiguration(function); + FileNameFormat format = new DefaultFileNameFormat() + .withPath(folder.toString()) + .withExtension(".json") + .withPrefix("prefix-"); + HdfsWriter writer = new HdfsWriter().withFileNameFormat(format); + writer.init(new HashMap<String, String>(), config); + + // These two messages will be routed to the same folder, because test.key is the same + JSONObject message = new JSONObject(); + message.put("test.key2", "test.value2"); + ArrayList<JSONObject> messages = new ArrayList<>(); + messages.add(message); + + ArrayList<Tuple> tuples = new ArrayList<>(); + + writer.write(SENSOR_NAME, config, tuples, messages); + writer.close(); + + ArrayList<String> expected = new ArrayList<>(); + expected.add(message.toJSONString()); + Collections.sort(expected); + + File outputFolder = new File(folder.getAbsolutePath() + "/test-null/null/"); + Assert.assertTrue(outputFolder.exists() && outputFolder.isDirectory()); + Assert.assertEquals(1, outputFolder.listFiles().length); + + for(File file : outputFolder.listFiles()) { + List<String> lines = Files.readAllLines(file.toPath()); + Collections.sort(lines); + Assert.assertEquals(expected, lines); + } + } + + protected WriterConfiguration buildWriterConfiguration(String function) { + IndexingConfigurations indexingConfig = new IndexingConfigurations(); + Map<String, Object> sensorIndexingConfig = new HashMap<>(); + Map<String, Object> writerIndexingConfig = new HashMap<>(); + writerIndexingConfig.put(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, function); + sensorIndexingConfig.put(WRITER_NAME, writerIndexingConfig); + indexingConfig.updateSensorIndexingConfig(SENSOR_NAME, sensorIndexingConfig); + return new IndexingWriterConfiguration(WRITER_NAME, indexingConfig); + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7e21ad3c/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormatTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormatTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormatTest.java new file mode 100644 index 0000000..9825311 --- /dev/null +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/PathExtensionFileNameFormatTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.writer.hdfs; + +import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; +import org.apache.storm.hdfs.bolt.format.FileNameFormat; +import org.junit.Assert; +import org.junit.Test; + +public class PathExtensionFileNameFormatTest { + + private static final String PATH = "/apps/metron"; + private static final String EXTENSION = ".json"; + private static final String PATH_EXTENSION = "field_result"; + + @Test + public void testGetPath() { + FileNameFormat delegate = new DefaultFileNameFormat().withExtension(EXTENSION).withPath(PATH); + FileNameFormat sourceFormat = new PathExtensionFileNameFormat(PATH_EXTENSION, delegate); + String actual = sourceFormat.getPath(); + String expected = PATH + "/" + PATH_EXTENSION; + Assert.assertEquals(expected, actual); + } + + @Test + public void testGetPathEmptyPathExtension() { + FileNameFormat delegate = new DefaultFileNameFormat().withExtension(EXTENSION).withPath(PATH); + FileNameFormat sourceFormat = new PathExtensionFileNameFormat("", delegate); + String actual = sourceFormat.getPath(); + Assert.assertEquals(PATH + "/", actual); + } +}