Repository: metron Updated Branches: refs/heads/master c1c212117 -> be0307659
METRON-955: Make the default sync policy for HDFS Writer be based on the batch size closes apache/incubator-metron#589 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/be030765 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/be030765 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/be030765 Branch: refs/heads/master Commit: be03076599544f2baedb1b3010ad50625b2f32ae Parents: c1c2121 Author: cstella <ceste...@gmail.com> Authored: Tue May 16 16:50:04 2017 -0400 Committer: cstella <ceste...@gmail.com> Committed: Tue May 16 16:50:04 2017 -0400 ---------------------------------------------------------------------- .../apache/metron/writer/hdfs/HdfsWriter.java | 22 ++++++++++++----- .../metron/writer/hdfs/SourceHandler.java | 14 +++++------ .../metron/writer/hdfs/SyncPolicyCreator.java | 25 ++++++++++++++++++++ .../metron/writer/hdfs/HdfsWriterTest.java | 4 ++-- 4 files changed, 50 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java index a86dfbc..e0ab502 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java @@ -40,16 +40,18 @@ import org.json.simple.JSONObject; import java.io.IOException; import java.io.Serializable; import java.util.*; +import java.util.function.Function; public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { List<RotationAction> rotationActions = new ArrayList<>(); FileRotationPolicy rotationPolicy = new NoRotationPolicy(); - SyncPolicy syncPolicy = new CountSyncPolicy(1); //sync every time, duh. + SyncPolicy syncPolicy; FileNameFormat fileNameFormat; Map<SourceHandlerKey, SourceHandler> sourceHandlerMap = new HashMap<>(); int maxOpenFiles = 500; transient StellarProcessor stellarProcessor; transient Map stormConfig; + transient SyncPolicyCreator syncPolicyCreator; public HdfsWriter withFileNameFormat(FileNameFormat fileNameFormat){ @@ -81,6 +83,14 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { this.stormConfig = stormConfig; this.stellarProcessor = new StellarProcessor(); this.fileNameFormat.prepare(stormConfig,topologyContext); + if(syncPolicy != null) { + //if the user has specified the sync policy, we don't want to override their wishes. + syncPolicyCreator = (source,config) -> syncPolicy; + } + else { + //if the user has not, then we want to have the sync policy depend on the batch size. + syncPolicyCreator = (source, config) -> new CountSyncPolicy(config == null?1:config.getBatchSize(source)); + } } @@ -92,18 +102,18 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { ) throws Exception { BulkWriterResponse response = new BulkWriterResponse(); + // Currently treating all the messages in a group for pass/failure. try { // Messages can all result in different HDFS paths, because of Stellar Expressions, so we'll need to iterate through for(JSONObject message : messages) { - Map<String, Object> val = configurations.getSensorConfig(sourceType); String path = getHdfsPathExtension( sourceType, (String)configurations.getSensorConfig(sourceType).getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, ""), message ); - SourceHandler handler = getSourceHandler(sourceType, path); - handler.handle(message); + SourceHandler handler = getSourceHandler(sourceType, path, configurations); + handler.handle(message, sourceType, configurations, syncPolicyCreator); } } catch (Exception e) { response.addAllErrors(e, tuples); @@ -142,7 +152,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { sourceHandlerMap.clear(); } - synchronized SourceHandler getSourceHandler(String sourceType, String stellarResult) throws IOException { + synchronized SourceHandler getSourceHandler(String sourceType, String stellarResult, WriterConfiguration config) throws IOException { SourceHandlerKey key = new SourceHandlerKey(sourceType, stellarResult); SourceHandler ret = sourceHandlerMap.get(key); if(ret == null) { @@ -151,7 +161,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { } ret = new SourceHandler(rotationActions, rotationPolicy, - syncPolicy, + syncPolicyCreator.create(sourceType, config), new PathExtensionFileNameFormat(key.getStellarResult(), fileNameFormat), new SourceHandlerCallback(sourceHandlerMap, key)); sourceHandlerMap.put(key, ret); http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java index fa6d8da..d895465 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java @@ -24,9 +24,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.log4j.Logger; +import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.json.simple.JSONObject; @@ -34,6 +36,7 @@ import org.json.simple.JSONObject; import java.io.FileOutputStream; import java.io.IOException; import java.util.*; +import java.util.function.Function; public class SourceHandler { private static final Logger LOG = Logger.getLogger(SourceHandler.class); @@ -61,13 +64,8 @@ public class SourceHandler { initialize(); } - public void handle(List<JSONObject> messages) throws Exception{ - for(JSONObject message : messages) { - handle(message); - } - } - protected void handle(JSONObject message) throws IOException { + protected void handle(JSONObject message, String sensor, WriterConfiguration config, SyncPolicyCreator syncPolicyCreator) throws IOException { byte[] bytes = (message.toJSONString() + "\n").getBytes(); synchronized (this.writeLock) { out.write(bytes); @@ -79,7 +77,9 @@ public class SourceHandler { } else { this.out.hsync(); } - this.syncPolicy.reset(); + //recreate the sync policy for the next batch just in case something changed in the config + //and the sync policy depends on the config. + this.syncPolicy = syncPolicyCreator.create(sensor, config); } } http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java new file mode 100644 index 0000000..9b7d205 --- /dev/null +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.writer.hdfs; + +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.storm.hdfs.bolt.sync.SyncPolicy; + +public interface SyncPolicyCreator { + SyncPolicy create(String sensor, WriterConfiguration config); +} http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java index 6153ed2..0118a15 100644 --- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java @@ -209,7 +209,7 @@ public class HdfsWriterTest { writer.init(new HashMap<String, String>(), createTopologyContext(), config); for(int i = 0; i < maxFiles; i++) { - writer.getSourceHandler(SENSOR_NAME, Integer.toString(i)); + writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null); } } @@ -223,7 +223,7 @@ public class HdfsWriterTest { writer.init(new HashMap<String, String>(), createTopologyContext(), config); for(int i = 0; i < maxFiles+1; i++) { - writer.getSourceHandler(SENSOR_NAME, Integer.toString(i)); + writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null); } }