http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml index 3087dd9..919654c 100644 --- a/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml +++ b/metron-platform/metron-indexing/src/main/flux/indexing/batch/remote.yaml @@ -134,6 +134,7 @@ bolts: className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" + - "INDEXING" configMethods: - name: "withBulkMessageWriter" args: @@ -147,8 +148,9 @@ bolts: className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" + - "INDEXING" configMethods: - - name: "withMessageWriter" + - name: "withBulkMessageWriter" args: - ref: "kafkaWriter"
http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml index 429ba45..9aa2b24 100644 --- a/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml +++ b/metron-platform/metron-indexing/src/main/flux/indexing/random_access/remote.yaml @@ -106,6 +106,7 @@ bolts: className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" + - "INDEXING" configMethods: - name: "withBulkMessageWriter" args: @@ -119,8 +120,9 @@ bolts: className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" + - "INDEXING" configMethods: - - name: "withMessageWriter" + - name: "withBulkMessageWriter" args: - ref: "kafkaWriter" http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-parsers/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index e8b2896..8254baf 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -161,7 +161,11 @@ Example Stellar Filter which includes messages which contain a the `field1` fiel then it is assumed to be a regex and will match any topic matching the pattern (e.g. `/bro.*/` would match `bro_cust0`, `bro_cust1` and `bro_cust2`) * `readMetadata` : Boolean indicating whether to read metadata or not (`false` by default). See below for a discussion about metadata. * `mergeMetadata` : Boolean indicating whether to merge metadata with the message or not (`false` by default). See below for a discussion about metadata. -* `parserConfig` : A JSON Map representing the parser implementation specific configuration. +* `parserConfig` : A JSON Map representing the parser implementation specific configuration. Also include batch sizing and timeout for writer configuration here. + * `batchSize` : Integer indicating number of records to batch together before sending to the writer. (default to `15`) + * `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met. Optional. + If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm + parameter `topology.message.timeout.secs`. Ignored if batchSize is `1`, since this disables batching. * `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic. * `spoutParallelism` : The kafka spout parallelism (default to `1`). This can be overridden on the command line. * `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden on the command line. http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index dd59355..0e9b48f 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -19,6 +19,7 @@ package org.apache.metron.parsers.bolt; import static org.apache.metron.common.Constants.METADATA_PREFIX; +import com.github.benmanes.caffeine.cache.Cache; import java.io.IOException; import java.io.Serializable; import java.lang.invoke.MethodHandles; @@ -30,15 +31,15 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.function.Function; import java.util.stream.Collectors; - -import com.github.benmanes.caffeine.cache.Cache; import org.apache.commons.lang3.StringUtils; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredParserBolt; import org.apache.metron.common.configuration.FieldTransformer; import org.apache.metron.common.configuration.FieldValidator; import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.error.MetronError; import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.message.MessageGetters; @@ -50,11 +51,15 @@ import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.stellar.common.CachingStellarProcessor; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.StellarFunctions; +import org.apache.metron.writer.WriterToBulkWriter; +import org.apache.metron.writer.bolt.BatchTimeoutHelper; +import org.apache.storm.Config; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +76,10 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { private Context stellarContext; private transient MessageGetStrategy messageGetStrategy; private transient Cache<CachingStellarProcessor.Key, Object> cache; + private int requestedTickFreqSecs; + private int defaultBatchTimeout; + private int batchTimeoutDivisor = 1; + public ParserBolt( String zookeeperUrl , String sensorType , MessageParser<JSONObject> parser @@ -88,10 +97,85 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { return this; } + /** + * If this ParserBolt is in a topology where it is daisy-chained with + * other queuing Writers, then the max amount of time it takes for a tuple + * to clear the whole topology is the sum of all the batchTimeouts for all the + * daisy-chained Writers. In the common case where each Writer is using the default + * batchTimeout, it is then necessary to divide that batchTimeout by the number of + * daisy-chained Writers. There are no examples of daisy-chained batching Writers + * in the current Metron topologies, but the feature is available as a "fluent"-style + * mutator if needed. It would be used in the parser topology builder. + * Default value, if not otherwise set, is 1. + * + * If non-default batchTimeouts are configured for some components, the administrator + * may want to take this behavior into account. + * + * @param batchTimeoutDivisor + * @return BulkMessageWriterBolt + */ + public ParserBolt withBatchTimeoutDivisor(int batchTimeoutDivisor) { + if (batchTimeoutDivisor <= 0) { + throw new IllegalArgumentException(String.format("batchTimeoutDivisor must be positive. Value provided was %s", batchTimeoutDivisor)); + } + this.batchTimeoutDivisor = batchTimeoutDivisor; + return this; + } + + /** + * Used only for unit testing + * @param defaultBatchTimeout + */ + protected void setDefaultBatchTimeout(int defaultBatchTimeout) { + this.defaultBatchTimeout = defaultBatchTimeout; + } + + /** + * Used only for unit testing + */ + public int getDefaultBatchTimeout() { + return defaultBatchTimeout; + } + public MessageParser<JSONObject> getParser() { return parser; } + /** + * This method is called by TopologyBuilder.createTopology() to obtain topology and + * bolt specific configuration parameters. We use it primarily to configure how often + * a tick tuple will be sent to our bolt. + * @return conf topology and bolt specific configuration parameters + */ + @Override + public Map<String, Object> getComponentConfiguration() { + // This is called long before prepare(), so do some of the same stuff as prepare() does, + // to get the valid WriterConfiguration. But don't store any non-serializable objects, + // else Storm will throw a runtime error. + Function<WriterConfiguration, WriterConfiguration> configurationXform; + if(writer.isWriterToBulkWriter()) { + configurationXform = WriterToBulkWriter.TRANSFORMATION; + } + else { + configurationXform = x -> x; + } + WriterConfiguration writerconf = configurationXform + .apply(getConfigurationStrategy().createWriterConfig(writer.getBulkMessageWriter(), getConfigurations())); + + BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor); + this.requestedTickFreqSecs = timeoutHelper.getRecommendedTickInterval(); + //And while we've got BatchTimeoutHelper handy, capture the defaultBatchTimeout for writerComponent. + this.defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout(); + + Map<String, Object> conf = super.getComponentConfiguration(); + if (conf == null) { + conf = new HashMap<String, Object>(); + } + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, requestedTickFreqSecs); + LOG.info("Requesting " + Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS + " set to " + Integer.toString(requestedTickFreqSecs)); + return conf; + } + @SuppressWarnings("unchecked") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { @@ -114,6 +198,15 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { parser.init(); writer.init(stormConf, context, collector, getConfigurations()); + if (defaultBatchTimeout == 0) { + //This means getComponentConfiguration was never called to initialize defaultBatchTimeout, + //probably because we are in a unit test scenario. So calculate it here. + WriterConfiguration writerConfig = getConfigurationStrategy() + .createWriterConfig(writer.getBulkMessageWriter(), getConfigurations()); + BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerConfig::getAllConfiguredTimeouts, batchTimeoutDivisor); + defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout(); + } + writer.setDefaultBatchTimeout(defaultBatchTimeout); SensorParserConfig config = getSensorParserConfig(); if(config != null) { @@ -173,6 +266,17 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { + if (TupleUtils.isTick(tuple)) { + try { + writer.flush(getConfigurations(), messageGetStrategy); + } catch (Exception e) { + throw new RuntimeException( + "This should have been caught in the writerHandler. If you see this, file a JIRA", e); + } finally { + collector.ack(tuple); + } + return; + } byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple); SensorParserConfig sensorParserConfig = getSensorParserConfig(); try { http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java index 2192942..3916dea 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java @@ -18,31 +18,37 @@ package org.apache.metron.parsers.bolt; -import org.apache.metron.common.message.MessageGetStrategy; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Tuple; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.function.Function; import org.apache.metron.common.configuration.ParserConfigurations; -import org.apache.metron.common.configuration.writer.ParserWriterConfiguration; +import org.apache.metron.common.configuration.writer.ConfigurationStrategy; +import org.apache.metron.common.configuration.writer.ConfigurationsStrategies; import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade; import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.MessageWriter; import org.apache.metron.writer.BulkWriterComponent; import org.apache.metron.writer.WriterToBulkWriter; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; - -import java.io.Serializable; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WriterHandler implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private BulkMessageWriter<JSONObject> messageWriter; private transient BulkWriterComponent<JSONObject> writerComponent; private transient Function<ParserConfigurations, WriterConfiguration> writerTransformer; private boolean isBulk = false; + private ConfigurationStrategy configStrategy = ConfigurationsStrategies.PARSERS; + public WriterHandler(MessageWriter<JSONObject> writer) { isBulk = false; messageWriter = new WriterToBulkWriter<>(writer); @@ -53,17 +59,24 @@ public class WriterHandler implements Serializable { messageWriter = writer; } - public boolean handleAck() { return isBulk; } + public boolean isWriterToBulkWriter() { + return messageWriter instanceof WriterToBulkWriter; + } + + public BulkMessageWriter getBulkMessageWriter() { + return messageWriter; + } + public void init(Map stormConf, TopologyContext topologyContext, OutputCollector collector, ParserConfigurations configurations) { if(isBulk) { - writerTransformer = config -> new ParserWriterConfiguration(config); + writerTransformer = config -> configStrategy.createWriterConfig(messageWriter, config); } else { - writerTransformer = config -> new SingleBatchConfigurationFacade(new ParserWriterConfiguration(config)); + writerTransformer = config -> new SingleBatchConfigurationFacade(configStrategy.createWriterConfig(messageWriter, config)); } try { messageWriter.init(stormConf, topologyContext, writerTransformer.apply(configurations)); @@ -87,7 +100,29 @@ public class WriterHandler implements Serializable { writerComponent.write(sensorType, tuple, message, messageWriter, writerTransformer.apply(configurations), messageGetStrategy); } + public void flush(ParserConfigurations configurations, MessageGetStrategy messageGetStrategy) + throws Exception { + if (!(messageWriter instanceof WriterToBulkWriter)) { + //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick. + LOG.debug("Flushing message queues older than their batchTimeouts"); + writerComponent.flushTimeouts(messageWriter, writerTransformer.apply(configurations), + messageGetStrategy); + } + } + public void errorAll(String sensorType, Throwable e, MessageGetStrategy messageGetStrategy) { writerComponent.errorAll(sensorType, e, messageGetStrategy); } + + /** + * Sets batch timeout on the underlying component + * @param defaultBatchTimeout + */ + public void setDefaultBatchTimeout(int defaultBatchTimeout) { + if (writerComponent == null) { + throw new UnsupportedOperationException("Must call init prior to calling this method."); + } + writerComponent.setDefaultBatchTimeout(defaultBatchTimeout); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java index 6439b2b..15ce735 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java @@ -17,46 +17,59 @@ */ package org.apache.metron.parsers.bolt; -import org.apache.curator.framework.CuratorFramework; -import org.apache.metron.common.Constants; -import org.apache.metron.common.configuration.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -import org.apache.metron.common.error.MetronError; -import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; -import org.apache.metron.test.error.MetronErrorJSONMatcher; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Tuple; import com.google.common.collect.ImmutableList; -import org.apache.metron.common.configuration.writer.ParserWriterConfiguration; -import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.stellar.dsl.Context; -import org.apache.metron.common.writer.BulkMessageWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.adrianwalker.multilinestring.Multiline; +import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.IndexingConfigurations; import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.common.configuration.writer.ParserWriterConfiguration; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; +import org.apache.metron.common.writer.MessageWriter; +import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; import org.apache.metron.parsers.BasicParser; -import org.apache.metron.test.bolt.BaseBoltTest; import org.apache.metron.parsers.interfaces.MessageFilter; import org.apache.metron.parsers.interfaces.MessageParser; -import org.apache.metron.common.writer.MessageWriter; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.test.bolt.BaseBoltTest; +import org.apache.metron.test.error.MetronErrorJSONMatcher; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.junit.Assert; import org.junit.Test; import org.mockito.Mock; -import java.io.IOException; -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; - public class ParserBoltTest extends BaseBoltTest { @Mock @@ -285,33 +298,6 @@ public class ParserBoltTest extends BaseBoltTest { parserBolt.execute(tuple); verify(outputCollector, times(1)).reportError(any(Throwable.class)); } -@Test -public void testImplicitBatchOfOne() throws Exception { - - String sensorType = "yaf"; - - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { - @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(); - } - }; - - parserBolt.setCuratorFramework(client); - parserBolt.setZKCache(cache); - parserBolt.prepare(new HashMap(), topologyContext, outputCollector); - verify(parser, times(1)).init(); - verify(batchWriter, times(1)).init(any(), any(), any()); - when(parser.validate(any())).thenReturn(true); - when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); - when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); - BulkWriterResponse response = new BulkWriterResponse(); - response.addSuccess(t1); - when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(Collections.singleton(t1)), any())).thenReturn(response); - parserBolt.withMessageFilter(filter); - parserBolt.execute(t1); - verify(outputCollector, times(1)).ack(t1); -} /** { @@ -343,7 +329,7 @@ public void testImplicitBatchOfOne() throws Exception { } @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(); + return ParserBoltTest.createUpdater(Optional.of(1)); } }; @@ -459,7 +445,7 @@ public void testImplicitBatchOfOne() throws Exception { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(); + return ParserBoltTest.createUpdater(Optional.of(1)); } }; @@ -474,13 +460,14 @@ public void testImplicitBatchOfOne() throws Exception { } @Test - public void testBatchOfOne() throws Exception { + public void testDefaultBatchSize() throws Exception { String sensorType = "yaf"; ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + // this uses default batch size return ParserBoltTest.createUpdater(); } }; @@ -494,12 +481,93 @@ public void testImplicitBatchOfOne() throws Exception { when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); BulkWriterResponse response = new BulkWriterResponse(); + Tuple[] uniqueTuples = new Tuple[ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE]; + for (int i=0; i < uniqueTuples.length; i++) { + uniqueTuples[i] = mock(Tuple.class); + response.addSuccess(uniqueTuples[i]); + } + when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(new HashSet<>(Arrays.asList(uniqueTuples))), any())).thenReturn(response); + parserBolt.withMessageFilter(filter); + for (Tuple tuple : uniqueTuples) { + parserBolt.execute(tuple); + } + for (Tuple uniqueTuple : uniqueTuples) { + verify(outputCollector, times(1)).ack(uniqueTuple); + } + } + + @Test + public void testLessRecordsThanDefaultBatchSize() throws Exception { + + String sensorType = "yaf"; + + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { + @Override + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + // this uses default batch size + return ParserBoltTest.createUpdater(); + } + }; + + parserBolt.setCuratorFramework(client); + parserBolt.setZKCache(cache); + parserBolt.prepare(new HashMap(), topologyContext, outputCollector); + verify(parser, times(1)).init(); + verify(batchWriter, times(1)).init(any(), any(), any()); + when(parser.validate(any())).thenReturn(true); + when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); + when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); + int oneLessThanDefaultBatchSize = ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE - 1; + BulkWriterResponse response = new BulkWriterResponse(); + Tuple[] uniqueTuples = new Tuple[oneLessThanDefaultBatchSize]; + for (int i=0; i < uniqueTuples.length; i++) { + uniqueTuples[i] = mock(Tuple.class); + response.addSuccess(uniqueTuples[i]); + } + parserBolt.withMessageFilter(filter); + for (Tuple tuple : uniqueTuples) { + parserBolt.execute(tuple); + } + // should have no acking yet - batch size not fulfilled + verify(outputCollector, never()).ack(any(Tuple.class)); + response.addSuccess(t1); // used to achieve count in final verify + Iterable<Tuple> tuples = new HashSet(Arrays.asList(uniqueTuples)) {{ + add(t1); + }}; + when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response); + // meet batch size requirement and now it should ack + parserBolt.execute(t1); + verify(outputCollector, times(ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE)).ack(any(Tuple.class)); + } + + @Test + public void testBatchOfOne() throws Exception { + + String sensorType = "yaf"; + + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { + @Override + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(Optional.of(1)); + } + }; + + parserBolt.setCuratorFramework(client); + parserBolt.setZKCache(cache); + parserBolt.prepare(new HashMap(), topologyContext, outputCollector); + verify(parser, times(1)).init(); + verify(batchWriter, times(1)).init(any(), any(), any()); + when(parser.validate(any())).thenReturn(true); + when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); + when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); + BulkWriterResponse response = new BulkWriterResponse(); response.addSuccess(t1); when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(Collections.singleton(t1)), any())).thenReturn(response); parserBolt.withMessageFilter(filter); parserBolt.execute(t1); verify(outputCollector, times(1)).ack(t1); } + @Test public void testBatchOfFive() throws Exception { http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java index d565147..dfadfdc 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java @@ -94,6 +94,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { * "outputTopic": "output", * "errorTopic": "parser_error", * "parserConfig": { + * "batchSize" : 1, * "columns" : { * "action" : 0, * "dummy" : 1 @@ -246,7 +247,10 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { * "parserClassName":"org.apache.metron.writers.integration.WriterBoltIntegrationTest$EmptyObjectParser", * "sensorTopic":"emptyobjectparser", * "outputTopic": "enrichments", - * "errorTopic": "parser_error" + * "errorTopic": "parser_error", + * "parserConfig": { + * "batchSize" : 1 + * } * } */ @Multiline http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java index 22440ad..195e010 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java @@ -57,7 +57,7 @@ public class BatchTimeoutHelper { protected int minBatchTimeoutRequestedSecs; //min of all sensorType configured batchTimeout requests protected int recommendedTickIntervalSecs; //the answer - BatchTimeoutHelper( Supplier<List<Integer>> listAllConfiguredTimeouts + public BatchTimeoutHelper( Supplier<List<Integer>> listAllConfiguredTimeouts , int batchTimeoutDivisor ) { @@ -130,7 +130,7 @@ public class BatchTimeoutHelper { * @return the max batchTimeout allowed, in seconds * Guaranteed positive number. */ - protected int getDefaultBatchTimeout() { + public int getDefaultBatchTimeout() { if (!initialized) {this.init();} return maxBatchTimeoutAllowedSecs; } @@ -160,7 +160,7 @@ public class BatchTimeoutHelper { * @return the recommended TickInterval to request, in seconds * Guaranteed positive number. */ - protected int getRecommendedTickInterval() { + public int getRecommendedTickInterval() { if (!initialized) {this.init();} // Remember that parameter settings in the CLI override parameter settings set by the Storm component. // We shouldn't have to deal with this in the Metron environment, but just in case, http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java index 1d8f0c6..4bb3888 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java @@ -17,10 +17,15 @@ */ package org.apache.metron.writer.bolt; +import static org.apache.storm.utils.TupleUtils.isTick; + import com.google.common.collect.ImmutableList; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; import org.apache.metron.common.Constants; -import org.apache.metron.common.bolt.ConfiguredIndexingBolt; -import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; +import org.apache.metron.common.bolt.ConfiguredBolt; +import org.apache.metron.common.configuration.Configurations; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.message.MessageGetters; @@ -40,13 +45,7 @@ import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Function; - -import static org.apache.storm.utils.TupleUtils.isTick; - -public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { +public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends ConfiguredBolt<CONFIG_T> { private static final Logger LOG = LoggerFactory .getLogger(BulkMessageWriterBolt.class); @@ -61,26 +60,26 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { private int defaultBatchTimeout; private int batchTimeoutDivisor = 1; - public BulkMessageWriterBolt(String zookeeperUrl) { - super(zookeeperUrl); + public BulkMessageWriterBolt(String zookeeperUrl, String configurationStrategy) { + super(zookeeperUrl, configurationStrategy); } - public BulkMessageWriterBolt withBulkMessageWriter(BulkMessageWriter<JSONObject > bulkMessageWriter) { + public BulkMessageWriterBolt<CONFIG_T> withBulkMessageWriter(BulkMessageWriter<JSONObject> bulkMessageWriter) { this.bulkMessageWriter = bulkMessageWriter; return this; } - public BulkMessageWriterBolt withMessageWriter(MessageWriter<JSONObject> messageWriter) { + public BulkMessageWriterBolt<CONFIG_T> withMessageWriter(MessageWriter<JSONObject> messageWriter) { this.bulkMessageWriter = new WriterToBulkWriter<>(messageWriter); return this; } - public BulkMessageWriterBolt withMessageGetter(String messageGetStrategyType) { + public BulkMessageWriterBolt<CONFIG_T> withMessageGetter(String messageGetStrategyType) { this.messageGetStrategyType = messageGetStrategyType; return this; } - public BulkMessageWriterBolt withMessageGetterField(String messageGetField) { + public BulkMessageWriterBolt<CONFIG_T> withMessageGetterField(String messageGetField) { this.messageGetField = messageGetField; return this; } @@ -103,7 +102,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { * @param batchTimeoutDivisor * @return BulkMessageWriterBolt */ - public BulkMessageWriterBolt withBatchTimeoutDivisor(int batchTimeoutDivisor) { + public BulkMessageWriterBolt<CONFIG_T> withBatchTimeoutDivisor(int batchTimeoutDivisor) { if (batchTimeoutDivisor <= 0) { throw new IllegalArgumentException(String.format("batchTimeoutDivisor must be positive. Value provided was %s", batchTimeoutDivisor)); } @@ -133,6 +132,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { public void setWriterComponent(BulkWriterComponent<JSONObject> component) { writerComponent = component; } + /** * This method is called by TopologyBuilder.createTopology() to obtain topology and * bolt specific configuration parameters. We use it primarily to configure how often @@ -151,8 +151,8 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { else { configurationXform = x -> x; } - WriterConfiguration writerconf = configurationXform.apply( - new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); + WriterConfiguration writerconf = configurationXform + .apply(getConfigurationStrategy().createWriterConfig(bulkMessageWriter, getConfigurations())); BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor); this.requestedTickFreqSecs = timeoutHelper.getRecommendedTickInterval(); @@ -187,8 +187,8 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { configurationTransformation = x -> x; } try { - WriterConfiguration writerconf = configurationTransformation.apply( - new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); + WriterConfiguration writerconf = configurationTransformation + .apply(getConfigurationStrategy().createWriterConfig(bulkMessageWriter, getConfigurations())); if (defaultBatchTimeout == 0) { //This means getComponentConfiguration was never called to initialize defaultBatchTimeout, //probably because we are in a unit test scenario. So calculate it here. @@ -219,8 +219,8 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick. LOG.debug("Flushing message queues older than their batchTimeouts"); getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply( - new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())) - , messageGetStrategy); + getConfigurationStrategy().createWriterConfig(bulkMessageWriter, getConfigurations())), + messageGetStrategy); } } catch(Exception e) { @@ -247,8 +247,8 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { } LOG.trace("Writing enrichment message: {}", message); - WriterConfiguration writerConfiguration = configurationTransformation.apply( - new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); + WriterConfiguration writerConfiguration = configurationTransformation + .apply(getConfigurationStrategy().createWriterConfig(bulkMessageWriter, getConfigurations())); if (writerConfiguration.isDefault(sensorType)) { //want to warn, but not fail the tuple http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java index c4e3998..efb2418 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java @@ -17,25 +17,32 @@ */ package org.apache.metron.writer.kafka; -import org.apache.storm.tuple.Tuple; import com.google.common.base.Joiner; +import java.io.Serializable; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.InterruptException; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.writer.MessageWriter; -import org.apache.metron.stellar.common.utils.ConversionUtils; import org.apache.metron.common.utils.KafkaUtils; import org.apache.metron.common.utils.StringUtils; +import org.apache.metron.common.writer.BulkMessageWriter; +import org.apache.metron.common.writer.BulkWriterResponse; +import org.apache.metron.stellar.common.utils.ConversionUtils; import org.apache.metron.writer.AbstractWriter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - -public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObject>, Serializable { +public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSONObject>, Serializable { public enum Configurations { BROKER("kafka.brokerUrl") ,KEY_SERIALIZER("kafka.keySerializer") @@ -60,6 +67,15 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj return null; } } + + /** + * Default batch size in bytes. Note, we don't want to expose this to clients. End users should + * set the writer batchSize setting via the BulkWriterComponent. + * + * @see ProducerConfig#BATCH_SIZE_DOC + * @see <a href="https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_kafka-component-guide/content/kafka-producer-settings.html">https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_kafka-component-guide/content/kafka-producer-settings.html</a> + */ + private static final int DEFAULT_BATCH_SIZE = 1_024 * 64; // 64 kilobytes private String brokerUrl; private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer"; private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer"; @@ -156,33 +172,69 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj } } + @Override + public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) + throws Exception { + if(this.zkQuorum != null && this.brokerUrl == null) { + try { + this.brokerUrl = Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum)); + } catch (Exception e) { + throw new IllegalStateException("Cannot read kafka brokers from zookeeper and you didn't specify them, giving up!", e); + } + } + this.kafkaProducer = new KafkaProducer<>(createProducerConfigs()); + } + public Map<String, Object> createProducerConfigs() { Map<String, Object> producerConfig = new HashMap<>(); producerConfig.put("bootstrap.servers", brokerUrl); producerConfig.put("key.serializer", keySerializer); producerConfig.put("value.serializer", valueSerializer); producerConfig.put("request.required.acks", requiredAcks); + producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, DEFAULT_BATCH_SIZE); producerConfig.putAll(producerConfigs == null?new HashMap<>():producerConfigs); producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig); return producerConfig; } @Override - public void init() { - if(this.zkQuorum != null && this.brokerUrl == null) { + public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, + Iterable<Tuple> tuples, List<JSONObject> messages) { + BulkWriterResponse writerResponse = new BulkWriterResponse(); + List<Map.Entry<Tuple, Future>> results = new ArrayList<>(); + int i = 0; + for (Tuple tuple : tuples) { + JSONObject message = messages.get(i++); + String jsonMessage; try { - this.brokerUrl = Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(this.zkQuorum)); - } catch (Exception e) { - throw new IllegalStateException("Cannot read kafka brokers from zookeeper and you didn't specify them, giving up!", e); + jsonMessage = message.toJSONString(); + } catch (Throwable t) { + writerResponse.addError(t, tuple); + continue; } + Future future = kafkaProducer + .send(new ProducerRecord<String, String>(kafkaTopic, jsonMessage)); + // we want to manage the batching + results.add(new AbstractMap.SimpleEntry<>(tuple, future)); } - this.kafkaProducer = new KafkaProducer<>(createProducerConfigs()); - } - @SuppressWarnings("unchecked") - @Override - public void write(String sourceType, WriterConfiguration configurations, Tuple tuple, JSONObject message) throws Exception { - kafkaProducer.send(new ProducerRecord<String, String>(kafkaTopic, message.toJSONString())); + try { + // ensures all Future.isDone() == true + kafkaProducer.flush(); + } catch (InterruptException e) { + writerResponse.addAllErrors(e, tuples); + return writerResponse; + } + + for (Map.Entry<Tuple, Future> kv : results) { + try { + kv.getValue().get(); + writerResponse.addSuccess(kv.getKey()); + } catch (Exception e) { + writerResponse.addError(e, kv.getKey()); + } + } + return writerResponse; } @Override
