METRON-1594: KafkaWriter is asynchronous and may lose data on node failure (mmiklavc via mmiklavc) closes apache/metron#1045
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/523c38cf Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/523c38cf Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/523c38cf Branch: refs/heads/master Commit: 523c38cf6399e2e3974a51a2cd0fe47e096b0bdf Parents: b6808f7 Author: mmiklavc <michael.miklav...@gmail.com> Authored: Wed Jun 6 15:40:55 2018 -0600 Committer: Michael Miklavcic <michael.miklav...@gmail.com> Committed: Wed Jun 6 15:40:55 2018 -0600 ---------------------------------------------------------------------- metron-analytics/metron-profiler/README.md | 15 ++ .../src/main/flux/profiler/remote.yaml | 3 +- metron-platform/metron-common/README.md | 5 + .../metron/common/bolt/ConfiguredBolt.java | 14 +- .../common/bolt/ConfiguredEnrichmentBolt.java | 11 +- .../common/bolt/ConfiguredIndexingBolt.java | 13 +- .../common/bolt/ConfiguredParserBolt.java | 13 +- .../common/bolt/ConfiguredProfilerBolt.java | 9 +- .../common/configuration/Configurations.java | 7 +- .../configuration/EnrichmentConfigurations.java | 26 +++ .../configuration/IndexingConfigurations.java | 12 +- .../configuration/ParserConfigurations.java | 1 + .../profiler/ProfilerConfigurations.java | 25 +++ .../writer/ConfigurationStrategy.java | 44 +++++ .../writer/ConfigurationsStrategies.java | 144 +++++++++++++++ .../writer/EnrichmentWriterConfiguration.java | 110 ++++++++++++ .../writer/ParserWriterConfiguration.java | 6 +- .../writer/ProfilerWriterConfiguration.java | 109 ++++++++++++ .../configuration/ParserConfigurationsTest.java | 120 +++++++++++++ .../profiler/ProfilerConfigTest.java | 13 +- .../writer/ConfigurationsStrategiesTest.java | 79 +++++++++ .../EnrichmentWriterConfigurationTest.java | 54 ++++++ .../writer/IndexingWriterConfigurationTest.java | 70 ++++++++ .../writer/ParserWriterConfigurationTest.java | 72 ++++++++ .../writer/ProfilerWriterConfigurationTest.java | 54 ++++++ .../writer/IndexingWriterConfigurationTest.java | 70 -------- .../writer/ParserWriterConfigurationTest.java | 38 ---- metron-platform/metron-enrichment/README.md | 15 +- .../main/flux/enrichment/remote-splitjoin.yaml | 9 +- .../main/flux/enrichment/remote-unified.yaml | 9 +- .../bolt/BulkMessageWriterBoltTest.java | 64 ++++--- .../src/main/flux/indexing/batch/remote.yaml | 4 +- .../flux/indexing/random_access/remote.yaml | 4 +- metron-platform/metron-parsers/README.md | 6 +- .../apache/metron/parsers/bolt/ParserBolt.java | 108 +++++++++++- .../metron/parsers/bolt/WriterHandler.java | 63 +++++-- .../metron/parsers/bolt/ParserBoltTest.java | 176 +++++++++++++------ .../integration/WriterBoltIntegrationTest.java | 6 +- .../metron/writer/bolt/BatchTimeoutHelper.java | 6 +- .../writer/bolt/BulkMessageWriterBolt.java | 48 ++--- .../apache/metron/writer/kafka/KafkaWriter.java | 92 +++++++--- 41 files changed, 1426 insertions(+), 321 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-analytics/metron-profiler/README.md ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/README.md b/metron-analytics/metron-profiler/README.md index 79cdd44..1a17e10 100644 --- a/metron-analytics/metron-profiler/README.md +++ b/metron-analytics/metron-profiler/README.md @@ -538,6 +538,8 @@ The Profiler runs as an independent Storm topology. The configuration for the P | [`profiler.hbase.batch`](#profilerhbasebatch) | The number of puts that are written to HBase in a single batch. | [`profiler.hbase.flush.interval.seconds`](#profilerhbaseflushintervalseconds) | The maximum number of seconds between batch writes to HBase. | [`topology.kryo.register`](#topologykryoregister) | Storm will use Kryo serialization for these classes. +| [`profiler.writer.batchSize`](#profilerwriterbatchsize) | The number of records to batch when writing to Kakfa. +| [`profiler.writer.batchTimeout`](#profilerwriterbatchtimeout) | The timeout in ms for batching when writing to Kakfa. ### `profiler.input.topic` @@ -852,6 +854,19 @@ More information on accessing profile data can be found in the [Profiler Client] More information on using the [`STATS_*` functions in Stellar can be found here](../../metron-platform/metron-common). +### `profiler.writer.batchSize` + +*Default*: 15 + +The number of records to batch when writing to Kakfa. This is managed in the global configuration and does not require a topology restart. + +### `profiler.writer.batchTimeout` + +*Default*: 0 + +The timeout after which a batch will be flushed even if batchSize has not been met. Optional. If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm parameter `topology.message.timeout.secs`. +Ignored if batchSize is `1`, since this disables batching. This is managed in the global configuration and does not require a topology restart. + ## Implementation ## Key Classes http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml index 5e92c62..2f40554 100644 --- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml +++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml @@ -183,8 +183,9 @@ bolts: className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" + - "PROFILER" configMethods: - - name: "withMessageWriter" + - name: "withBulkMessageWriter" args: [ref: "kafkaWriter"] streams: http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/README.md b/metron-platform/metron-common/README.md index c741e72..dae2e22 100644 --- a/metron-platform/metron-common/README.md +++ b/metron-platform/metron-common/README.md @@ -92,9 +92,14 @@ but a convenient index is provided here: | [`stellar.function.resolver.excludes`](../../metron-stellar/stellar-common#stellarfunctionresolverincludesexcludes) | Stellar | CSV String | N/A | | [`profiler.period.duration`](../../metron-analytics/metron-profiler#profilerperiodduration) | Profiler | Integer | `profiler_period_duration` | | [`profiler.period.duration.units`](../../metron-analytics/metron-profiler#profilerperioddurationunits) | Profiler | String | `profiler_period_units` | +| [`profiler.writer.batchSize`](../../metron-analytics/metron-profiler/#profilerwriterbatchsize) | Profiler | Integer | N/A | +| [`profiler.writer.batchTimeout`](../../metron-analytics/metron-profiler/#profilerwriterbatchtimeout) | Profiler | Integer | N/A | | [`update.hbase.table`](../metron-indexing#updatehbasetable) | REST/Indexing | String | `update_hbase_table` | | [`update.hbase.cf`](../metron-indexing#updatehbasecf) | REST/Indexing | String | `update_hbase_cf` | | [`geo.hdfs.file`](../metron-enrichment#geohdfsfile) | Enrichment | String | `geo_hdfs_file` | +| [`enrichment.writer.batchSize`](../metron-enrichment#enrichmentwriterbatchsize) | Enrichment | Integer | N/A | +| [`enrichment.writer.batchTimeout`](../metron-enrichment#enrichmentwriterbatchtimeout) | Enrichment | Integer | N/A | +| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile) | Enrichment | String | `geo_hdfs_file` | | [`source.type.field`](../../metron-interface/metron-alerts#sourcetypefield) | UI | String | N/A | ## Note Configs in Ambari http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java index 6f15746..ef3b2bf 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java @@ -28,6 +28,8 @@ import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.Configurations; import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.writer.ConfigurationStrategy; +import org.apache.metron.common.configuration.writer.ConfigurationsStrategies; import org.apache.metron.zookeeper.SimpleEventListener; import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; import org.apache.metron.common.zookeeper.configurations.Reloadable; @@ -43,12 +45,14 @@ public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends Ba private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private String zookeeperUrl; + private String configurationStrategy; protected CuratorFramework client; protected ZKCache cache; private final CONFIG_T configurations; - public ConfiguredBolt(String zookeeperUrl) { + public ConfiguredBolt(String zookeeperUrl, String configurationStrategy) { this.zookeeperUrl = zookeeperUrl; + this.configurationStrategy = configurationStrategy; this.configurations = createUpdater().defaultConfigurations(); } @@ -68,7 +72,13 @@ public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends Ba return configurations; } - protected abstract ConfigurationsUpdater<CONFIG_T> createUpdater(); + protected ConfigurationStrategy<CONFIG_T> getConfigurationStrategy() { + return ConfigurationsStrategies.valueOf(configurationStrategy); + } + + protected ConfigurationsUpdater<CONFIG_T> createUpdater() { + return getConfigurationStrategy().createUpdater(this, this::getConfigurations); + } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java index 54fd7e8..c28ca7b 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java @@ -17,13 +17,8 @@ */ package org.apache.metron.common.bolt; -import java.io.IOException; import java.lang.invoke.MethodHandles; -import org.apache.metron.common.configuration.ConfigurationType; -import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.EnrichmentConfigurations; -import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; -import org.apache.metron.common.zookeeper.configurations.EnrichmentUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,11 +28,7 @@ public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt<Enrichment public ConfiguredEnrichmentBolt(String zookeeperUrl) { - super(zookeeperUrl); + super(zookeeperUrl, "ENRICHMENT"); } - @Override - protected ConfigurationsUpdater<EnrichmentConfigurations> createUpdater() { - return new EnrichmentUpdater(this, this::getConfigurations); - } } http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java index 09300e4..27e081e 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java @@ -17,26 +17,17 @@ */ package org.apache.metron.common.bolt; -import java.io.IOException; import java.lang.invoke.MethodHandles; -import org.apache.metron.common.configuration.ConfigurationType; -import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.IndexingConfigurations; -import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; -import org.apache.metron.common.zookeeper.configurations.IndexingUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +// TODO delete - no longer used after removing from BulkMessageWriterBolt? public abstract class ConfiguredIndexingBolt extends ConfiguredBolt<IndexingConfigurations> { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public ConfiguredIndexingBolt(String zookeeperUrl) { - super(zookeeperUrl); - } - - @Override - protected ConfigurationsUpdater<IndexingConfigurations> createUpdater() { - return new IndexingUpdater(this, this::getConfigurations); + super(zookeeperUrl, "INDEXING"); } } http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java index 2f13658..1cb4e2e 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java @@ -17,14 +17,9 @@ */ package org.apache.metron.common.bolt; -import java.io.IOException; import java.lang.invoke.MethodHandles; -import org.apache.metron.common.configuration.ConfigurationType; -import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; -import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; -import org.apache.metron.common.zookeeper.configurations.ParserUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +30,7 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt<ParserConfigur protected final ParserConfigurations configurations = new ParserConfigurations(); private String sensorType; public ConfiguredParserBolt(String zookeeperUrl, String sensorType) { - super(zookeeperUrl); + super(zookeeperUrl, "PARSERS"); this.sensorType = sensorType; } @@ -47,10 +42,4 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt<ParserConfigur return sensorType; } - - @Override - protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return new ParserUpdater(this, this::getConfigurations); - } - } http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java index 90575d0..e4d9f7b 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java @@ -21,8 +21,6 @@ package org.apache.metron.common.bolt; import java.lang.invoke.MethodHandles; import org.apache.metron.common.configuration.profiler.ProfilerConfig; import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; -import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; -import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,16 +32,11 @@ public abstract class ConfiguredProfilerBolt extends ConfiguredBolt<ProfilerConf private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public ConfiguredProfilerBolt(String zookeeperUrl) { - super(zookeeperUrl); + super(zookeeperUrl, "PROFILER"); } protected ProfilerConfig getProfilerConfig() { return getConfigurations().getProfilerConfig(); } - @Override - protected ConfigurationsUpdater<ProfilerConfigurations> createUpdater() { - return new ProfilerUpdater(this, this::getConfigurations); - } - } http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java index af421a9..62cf742 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java @@ -27,8 +27,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.stellar.common.utils.ConversionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +75,11 @@ public class Configurations implements Serializable { getConfigurations().remove(ConfigurationType.GLOBAL.getTypeName()); } + public static <T> T getAs(String key, Map<String, Object> map, T defaultValue, Class<T> clazz) { + return map == null ? defaultValue + : ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz); + } + @Override public boolean equals(Object o) { if (this == o) return true; http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java index dfd7a65..1c723c7 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java @@ -27,6 +27,9 @@ import java.util.ArrayList; import java.util.List; public class EnrichmentConfigurations extends Configurations { + public static final Integer DEFAULT_KAFKA_BATCH_SIZE = 15; + public static final String BATCH_SIZE_CONF = "enrichment.writer.batchSize"; + public static final String BATCH_TIMEOUT_CONF = "enrichment.writer.batchTimeout"; public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) { return (SensorEnrichmentConfig) getConfigurations().get(getKey(sensorType)); @@ -49,6 +52,28 @@ public class EnrichmentConfigurations extends Configurations { getConfigurations().remove(getKey(sensorType)); } + /** + * Pulled from global config. + * Note: enrichment writes out to 1 kafka topic, so it is not pulling this config by sensor. + * + * @return batch size for writing to kafka + * @see org.apache.metron.common.configuration.EnrichmentConfigurations#BATCH_SIZE_CONF + */ + public int getBatchSize() { + return getAs(BATCH_SIZE_CONF, getGlobalConfig(true), DEFAULT_KAFKA_BATCH_SIZE, Integer.class); + } + + /** + * Pulled from global config + * Note: enrichment writes out to 1 kafka topic, so it is not pulling this config by sensor. + * + * @return batch timeout for writing to kafka + * @see org.apache.metron.common.configuration.EnrichmentConfigurations#BATCH_TIMEOUT_CONF + */ + public int getBatchTimeout() { + return getAs(BATCH_TIMEOUT_CONF, getGlobalConfig(true), 0, Integer.class); + } + public List<String> getTypes() { List<String> ret = new ArrayList<>(); for(String keyedSensor : getConfigurations().keySet()) { @@ -62,4 +87,5 @@ public class EnrichmentConfigurations extends Configurations { public static String getKey(String sensorType) { return ConfigurationType.ENRICHMENT.getTypeName() + "." + sensorType; } + } http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java index 5b67aa5..7ecb606 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java @@ -17,13 +17,14 @@ */ package org.apache.metron.common.configuration; -import org.apache.metron.stellar.common.utils.ConversionUtils; -import org.apache.metron.common.utils.JSONUtils; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.metron.common.utils.JSONUtils; public class IndexingConfigurations extends Configurations { public static final String BATCH_SIZE_CONF = "batchSize"; @@ -226,7 +227,4 @@ public class IndexingConfigurations extends Configurations { return ret; } - public static <T> T getAs(String key, Map<String, Object> map, T defaultValue, Class<T> clazz) { - return map == null?defaultValue: ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz); - } } http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java index 72af833..83daf0d 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; public class ParserConfigurations extends Configurations { + public static final Integer DEFAULT_KAFKA_BATCH_SIZE = 15; public SensorParserConfig getSensorParserConfig(String sensorType) { return (SensorParserConfig) getConfigurations().get(getKey(sensorType)); http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java index f50d770..1348068 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java @@ -29,6 +29,9 @@ import java.io.InputStream; * Used to manage configurations for the Profiler. */ public class ProfilerConfigurations extends Configurations { + public static final Integer DEFAULT_KAFKA_BATCH_SIZE = 15; + public static final String BATCH_SIZE_CONF = "profiler.writer.batchSize"; + public static final String BATCH_TIMEOUT_CONF = "profiler.writer.batchTimeout"; public ProfilerConfig getProfilerConfig() { return (ProfilerConfig) getConfigurations().get(getKey()); @@ -55,4 +58,26 @@ public class ProfilerConfigurations extends Configurations { configurations.remove(getKey()); } + /** + * Pulled from global config. + * Note: profiler writes out to 1 kafka topic, so it is not pulling this config by sensor. + * + * @return batch size for writing to kafka + * @see org.apache.metron.common.configuration.profiler.ProfilerConfigurations#BATCH_SIZE_CONF + */ + public int getBatchSize() { + return getAs(BATCH_SIZE_CONF, getGlobalConfig(true), DEFAULT_KAFKA_BATCH_SIZE, Integer.class); + } + + /** + * Pulled from global config + * Note: profiler writes out to 1 kafka topic, so it is not pulling this config by sensor. + * + * @return batch timeout for writing to kafka + * @see org.apache.metron.common.configuration.profiler.ProfilerConfigurations#BATCH_TIMEOUT_CONF + */ + public int getBatchTimeout() { + return getAs(BATCH_TIMEOUT_CONF, getGlobalConfig(true), 0, Integer.class); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationStrategy.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationStrategy.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationStrategy.java new file mode 100644 index 0000000..cd13dc7 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationStrategy.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.configuration.writer; + +import java.util.function.Supplier; +import org.apache.metron.common.configuration.Configurations; +import org.apache.metron.common.writer.BulkMessageWriter; +import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; +import org.apache.metron.common.zookeeper.configurations.Reloadable; + +public interface ConfigurationStrategy<T extends Configurations> { + + /** + * Create a specific writer configuration + * @param writer provided for the underlying creator to access metadata as needed + * @param configs a WriterConfiguration will typically access the pass configs + * @return + */ + WriterConfiguration createWriterConfig(BulkMessageWriter writer, Configurations configs); + + /** + * Create specific config updater for the type of config extending Configurations + * @param reloadable setup as a callback by the updater + * @param configSupplier supplies config to the updater + * @return updater + */ + ConfigurationsUpdater<T> createUpdater(Reloadable reloadable, Supplier<T> configSupplier); + +} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategies.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategies.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategies.java new file mode 100644 index 0000000..f22dd01 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategies.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.configuration.writer; + +import java.util.function.Supplier; +import org.apache.metron.common.configuration.Configurations; +import org.apache.metron.common.configuration.EnrichmentConfigurations; +import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.configuration.ParserConfigurations; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; +import org.apache.metron.common.writer.BulkMessageWriter; +import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; +import org.apache.metron.common.zookeeper.configurations.EnrichmentUpdater; +import org.apache.metron.common.zookeeper.configurations.IndexingUpdater; +import org.apache.metron.common.zookeeper.configurations.ParserUpdater; +import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater; +import org.apache.metron.common.zookeeper.configurations.Reloadable; + +/** + * Strategy pattern implementation that couples factories for WriterConfiguration and + * ConfigurationsUpdater together for a particular type. + * <br> + * <strong>Note:</strong> The enum type definition does not specify generics because + * enums are disallowed from doing this in Java. + */ +public enum ConfigurationsStrategies implements ConfigurationStrategy { + + PARSERS(new ConfigurationStrategy<ParserConfigurations>() { + + @Override + public WriterConfiguration createWriterConfig(BulkMessageWriter writer, + Configurations configs) { + if (configs instanceof ParserConfigurations) { + return new ParserWriterConfiguration((ParserConfigurations) configs); + } else { + throw new IllegalArgumentException( + "Expected config of type ParserConfigurations but found " + configs.getClass()); + } + } + + @Override + public ConfigurationsUpdater<ParserConfigurations> createUpdater(Reloadable reloadable, + Supplier configSupplier) { + return new ParserUpdater(reloadable, configSupplier); + } + }), + + ENRICHMENT(new ConfigurationStrategy() { + + @Override + public WriterConfiguration createWriterConfig(BulkMessageWriter writer, + Configurations configs) { + if (configs instanceof EnrichmentConfigurations) { + return new EnrichmentWriterConfiguration((EnrichmentConfigurations) configs); + } else { + throw new IllegalArgumentException( + "Expected config of type EnrichmentConfigurations but found " + configs.getClass()); + } + } + + @Override + public ConfigurationsUpdater<EnrichmentConfigurations> createUpdater(Reloadable reloadable, + Supplier configSupplier) { + return new EnrichmentUpdater(reloadable, configSupplier); + } + }), + + INDEXING(new ConfigurationStrategy() { + + @Override + public WriterConfiguration createWriterConfig(BulkMessageWriter writer, + Configurations configs) { + if (configs instanceof IndexingConfigurations) { + return new IndexingWriterConfiguration(writer.getName(), (IndexingConfigurations) configs); + } else { + throw new IllegalArgumentException( + "Expected config of type IndexingConfigurations but found " + configs.getClass()); + } + } + + @Override + public ConfigurationsUpdater<IndexingConfigurations> createUpdater(Reloadable reloadable, + Supplier configSupplier) { + return new IndexingUpdater(reloadable, configSupplier); + } + }), + + PROFILER(new ConfigurationStrategy() { + + @Override + public WriterConfiguration createWriterConfig(BulkMessageWriter writer, + Configurations configs) { + if (configs instanceof ProfilerConfigurations) { + return new ProfilerWriterConfiguration((ProfilerConfigurations) configs); + } else { + throw new IllegalArgumentException( + "Expected config of type IndexingConfigurations but found " + configs.getClass()); + } + } + + @Override + public ConfigurationsUpdater createUpdater(Reloadable reloadable, Supplier configSupplier) { + return new ProfilerUpdater(reloadable, configSupplier); + } + }); + + private ConfigurationStrategy<? extends Configurations> strategy; + + ConfigurationsStrategies(ConfigurationStrategy<? extends Configurations> strategy) { + this.strategy = strategy; + } + + @Override + public WriterConfiguration createWriterConfig(BulkMessageWriter writer, Configurations configs) { + return strategy.createWriterConfig(writer, configs); + } + + /** + * Config updater + * @param reloadable callback + * @param configSupplier Supplier provides config of type <? extends Configurations> + * @return Config updater + */ + @Override + public ConfigurationsUpdater createUpdater(Reloadable reloadable, Supplier configSupplier) { + return strategy.createUpdater(reloadable, configSupplier); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java new file mode 100644 index 0000000..c275a76 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.configuration.writer; + +import static java.util.Arrays.asList; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.metron.common.configuration.EnrichmentConfigurations; + +/** + * Writer configuration for the enrichment topology. Batch size and batch timeout are a couple + * primary values of interest that are used for configuring the enrichment writer. + */ +public class EnrichmentWriterConfiguration implements WriterConfiguration { + + private Optional<EnrichmentConfigurations> config; + + public EnrichmentWriterConfiguration(EnrichmentConfigurations config) { + this.config = Optional.ofNullable(config); + } + + /** + * Batch size for writing. + * @param sensorName n/a + * @return batch size in # messages + */ + @Override + public int getBatchSize(String sensorName) { + return config.orElse(new EnrichmentConfigurations()).getBatchSize(); + } + + /** + * Timeout for this writer. + * @param sensorName n/a + * @return timeout in ms + */ + @Override + public int getBatchTimeout(String sensorName) { + return config.orElse(new EnrichmentConfigurations()).getBatchTimeout(); + } + + /** + * Timeout for this writer. + * @return single item list with this writer's timeout + */ + @Override + public List<Integer> getAllConfiguredTimeouts() { + return asList(getBatchTimeout(null)); + } + + /** + * n/a for enrichment + * @param sensorName n/a + * @return null + */ + @Override + public String getIndex(String sensorName) { + return null; + } + + /** + * Always enabled in enrichment + * @param sensorName n/a + * @return true + */ + @Override + public boolean isEnabled(String sensorName) { + return true; + } + + @Override + public Map<String, Object> getSensorConfig(String sensorName) { + return config.orElse(new EnrichmentConfigurations()).getSensorEnrichmentConfig(sensorName) + .getConfiguration(); + } + + @Override + public Map<String, Object> getGlobalConfig() { + return config.orElse(new EnrichmentConfigurations()).getGlobalConfig(); + } + + @Override + public boolean isDefault(String sensorName) { + return false; + } + + @Override + public String getFieldNameConverter(String sensorName) { + // not applicable + return null; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java index 4603b32..a4f3f66 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java @@ -26,6 +26,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +/* + * Note that parsers can also be used for streaming enrichments, which means broader scope than + * Kafka alone. + */ public class ParserWriterConfiguration implements WriterConfiguration { private ParserConfigurations config; public ParserWriterConfiguration(ParserConfigurations config) { @@ -38,7 +42,7 @@ public class ParserWriterConfiguration implements WriterConfiguration { && config.getSensorParserConfig(sensorName).getParserConfig() != null ) { Object batchObj = config.getSensorParserConfig(sensorName).getParserConfig().get(IndexingConfigurations.BATCH_SIZE_CONF); - return batchObj == null ? 1 : ConversionUtils.convert(batchObj, Integer.class); + return batchObj == null ? ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE : ConversionUtils.convert(batchObj, Integer.class); } return 1; } http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfiguration.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfiguration.java new file mode 100644 index 0000000..fe283bd --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfiguration.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.configuration.writer; + +import static java.util.Arrays.asList; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; + +/** + * Writer configuration for the profiler topology. Batch size and batch timeout are a couple + * primary values of interest that are used for configuring the profiler writer. + */ +public class ProfilerWriterConfiguration implements WriterConfiguration { + private Optional<ProfilerConfigurations> config; + + public ProfilerWriterConfiguration(ProfilerConfigurations config) { + this.config = Optional.ofNullable(config); + } + + /** + * Batch size for writing. + * @param sensorName n/a + * @return batch size in # messages + */ + @Override + public int getBatchSize(String sensorName) { + return config.orElse(new ProfilerConfigurations()).getBatchSize(); + } + + /** + * Timeout for this writer. + * @param sensorName n/a + * @return timeout in ms + */ + @Override + public int getBatchTimeout(String sensorName) { + return config.orElse(new ProfilerConfigurations()).getBatchTimeout(); + } + + /** + * Timeout for this writer. + * @return single item list with this writer's timeout + */ + @Override + public List<Integer> getAllConfiguredTimeouts() { + return asList(getBatchTimeout(null)); + } + + /** + * n/a for profiler + * @param sensorName n/a + * @return null + */ + @Override + public String getIndex(String sensorName) { + return null; + } + + /** + * Always enabled in profiler + * @param sensorName n/a + * @return true + */ + @Override + public boolean isEnabled(String sensorName) { + return true; + } + + @Override + public Map<String, Object> getSensorConfig(String sensorName) { + throw new UnsupportedOperationException("Profiler does not have sensor configs"); + } + + @Override + public Map<String, Object> getGlobalConfig() { + return config.orElse(new ProfilerConfigurations()).getGlobalConfig(); + } + + @Override + public boolean isDefault(String sensorName) { + return false; + } + + @Override + public String getFieldNameConverter(String sensorName) { + // not applicable + return null; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ParserConfigurationsTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ParserConfigurationsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ParserConfigurationsTest.java new file mode 100644 index 0000000..c099aec --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ParserConfigurationsTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.configuration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import org.adrianwalker.multilinestring.Multiline; +import org.junit.Test; + +public class ParserConfigurationsTest { + + /** + * { + * "parserClassName" : "parser-class", + * "filterClassName" : "filter-class", + * "sensorTopic" : "sensor-topic", + * "outputTopic" : "output-topic", + * "errorTopic" : "error-topic", + * "writerClassName" : "writer-class", + * "errorWriterClassName" : "error-writer-class", + * "readMetadata" : true, + * "mergeMetadata" : true, + * "numWorkers" : 40, + * "numAckers" : 40, + * "spoutParallelism" : 40, + * "spoutNumTasks" : 40, + * "parserParallelism" : 40, + * "parserNumTasks" : 40, + * "errorWriterParallelism" : 40, + * "errorWriterNumTasks" : 40, + * "securityProtocol" : "security-protocol", + * "spoutConfig" : { + * "foo" : "bar" + * }, + * "stormConfig" : { + * "storm" : "config" + * }, + * "cacheConfig" : { + * "stellar.cache.maxSize" : 20000 + * }, + * "parserConfig" : { + * "parser" : "config" + * }, + * "fieldTransformations" : [ + * { + * "input" : "input-field", + * "transformation" : "REMOVE" + * } + * ] + * } + */ + @Multiline + private static String parserConfig; + + @Test + public void sensorParserConfig_properties_populated_by_JSON_configuration() throws IOException { + ParserConfigurations parserConfigs = new ParserConfigurations(); + parserConfigs.updateSensorParserConfig("test-sensor", parserConfig.getBytes()); + SensorParserConfig actualSensorConfig = parserConfigs.getSensorParserConfig("test-sensor"); + assertThat(actualSensorConfig.getParserClassName(), equalTo("parser-class")); + assertThat(actualSensorConfig.getFilterClassName(), equalTo("filter-class")); + assertThat(actualSensorConfig.getSensorTopic(), equalTo("sensor-topic")); + assertThat(actualSensorConfig.getOutputTopic(), equalTo("output-topic")); + assertThat(actualSensorConfig.getErrorTopic(), equalTo("error-topic")); + assertThat(actualSensorConfig.getWriterClassName(), equalTo("writer-class")); + assertThat(actualSensorConfig.getErrorWriterClassName(), equalTo("error-writer-class")); + assertThat(actualSensorConfig.getReadMetadata(), equalTo(true)); + assertThat(actualSensorConfig.getMergeMetadata(), equalTo(true)); + assertThat(actualSensorConfig.getNumWorkers(), equalTo(40)); + assertThat(actualSensorConfig.getNumAckers(), equalTo(40)); + assertThat(actualSensorConfig.getSpoutParallelism(), equalTo(40)); + assertThat(actualSensorConfig.getSpoutNumTasks(), equalTo(40)); + assertThat(actualSensorConfig.getParserParallelism(), equalTo(40)); + assertThat(actualSensorConfig.getParserNumTasks(), equalTo(40)); + assertThat(actualSensorConfig.getErrorWriterParallelism(), equalTo(40)); + assertThat(actualSensorConfig.getErrorWriterNumTasks(), equalTo(40)); + assertThat(actualSensorConfig.getSecurityProtocol(), equalTo("security-protocol")); + assertThat(actualSensorConfig.getSpoutConfig(), not(new HashMap<>())); + assertThat(actualSensorConfig.getSpoutConfig().get("foo"), equalTo("bar")); + assertThat(actualSensorConfig.getStormConfig(), not(new HashMap<>())); + assertThat(actualSensorConfig.getStormConfig().get("storm"), equalTo("config")); + assertThat(actualSensorConfig.getCacheConfig(), not(new HashMap<>())); + assertThat(actualSensorConfig.getCacheConfig().get("stellar.cache.maxSize"), equalTo(20000)); + assertThat(actualSensorConfig.getParserConfig(), not(new HashMap<>())); + assertThat(actualSensorConfig.getParserConfig().get("parser"), equalTo("config")); + assertThat(actualSensorConfig.getFieldTransformations(), not(new ArrayList<>())); + assertThat(actualSensorConfig.getFieldTransformations().get(0), not(nullValue())); + assertThat( + ((FieldTransformer) actualSensorConfig.getFieldTransformations().get(0)).getInput().size(), + equalTo(1)); + assertThat( + ((FieldTransformer) actualSensorConfig.getFieldTransformations().get(0)).getInput().get(0), + equalTo("input-field")); + assertThat(((FieldTransformer) actualSensorConfig.getFieldTransformations().get(0)) + .getTransformation(), equalTo("REMOVE")); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java index 2cbdfb9..4c70a8c 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java @@ -19,19 +19,18 @@ */ package org.apache.metron.common.configuration.profiler; -import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.utils.SerDeUtils; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.utils.SerDeUtils; +import org.junit.Test; /** * Tests the {@link ProfilerConfig} class. http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategiesTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategiesTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategiesTest.java new file mode 100644 index 0000000..fd498ba --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategiesTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.configuration.writer; + +import static org.apache.metron.common.configuration.writer.ConfigurationsStrategies.ENRICHMENT; +import static org.apache.metron.common.configuration.writer.ConfigurationsStrategies.INDEXING; +import static org.apache.metron.common.configuration.writer.ConfigurationsStrategies.PARSERS; +import static org.apache.metron.common.configuration.writer.ConfigurationsStrategies.PROFILER; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertThat; + +import org.apache.metron.common.configuration.EnrichmentConfigurations; +import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.configuration.ParserConfigurations; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; +import org.apache.metron.common.writer.BulkMessageWriter; +import org.apache.metron.common.zookeeper.configurations.EnrichmentUpdater; +import org.apache.metron.common.zookeeper.configurations.IndexingUpdater; +import org.apache.metron.common.zookeeper.configurations.ParserUpdater; +import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater; +import org.apache.metron.common.zookeeper.configurations.Reloadable; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class ConfigurationsStrategiesTest { + + @Mock + private BulkMessageWriter writer; + @Mock + private Reloadable reloadable; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void strategies_build_writer_configs() { + assertThat(PARSERS.createWriterConfig(writer, new ParserConfigurations()), + instanceOf(ParserWriterConfiguration.class)); + assertThat(ENRICHMENT.createWriterConfig(writer, new EnrichmentConfigurations()), + instanceOf(EnrichmentWriterConfiguration.class)); + assertThat(INDEXING.createWriterConfig(writer, new IndexingConfigurations()), + instanceOf(IndexingWriterConfiguration.class)); + assertThat(PROFILER.createWriterConfig(writer, new ProfilerConfigurations()), + instanceOf(ProfilerWriterConfiguration.class)); + } + + @Test + public void strategies_build_updaters() { + assertThat(PARSERS.createUpdater(reloadable, ParserConfigurations::new), + instanceOf(ParserUpdater.class)); + assertThat(ENRICHMENT.createUpdater(reloadable, EnrichmentConfigurations::new), + instanceOf(EnrichmentUpdater.class)); + assertThat(INDEXING.createUpdater(reloadable, IndexingConfigurations::new), + instanceOf(IndexingUpdater.class)); + assertThat(PROFILER.createUpdater(reloadable, ProfilerConfigurations::new), + instanceOf(ProfilerUpdater.class)); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfigurationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfigurationTest.java new file mode 100644 index 0000000..b6aae19 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfigurationTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.configuration.writer; + +import static java.util.Arrays.asList; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.EnrichmentConfigurations; +import org.junit.Test; + +public class EnrichmentWriterConfigurationTest { + + /** + * { + * "enrichment.writer.batchSize" : 12345, + * "enrichment.writer.batchTimeout" : 555 + * } + */ + @Multiline + private static String globalJson; + + @Test + public void gets_batch_size_and_timeout_from_global_config() throws IOException { + EnrichmentConfigurations configs = new EnrichmentConfigurations(); + configs.updateGlobalConfig(globalJson.getBytes()); + EnrichmentWriterConfiguration writerConfig = new EnrichmentWriterConfiguration(configs); + assertThat("batch timeout should match global config setting", + writerConfig.getBatchTimeout(null), equalTo(555)); + assertThat("list should have single batch timeout matching global config setting", + writerConfig.getAllConfiguredTimeouts(), equalTo(asList(555))); + assertThat("batch size should match global config setting", writerConfig.getBatchSize(null), + equalTo(12345)); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/IndexingWriterConfigurationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/IndexingWriterConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/IndexingWriterConfigurationTest.java new file mode 100644 index 0000000..b2b5eca --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/IndexingWriterConfigurationTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.configuration.writer; + +import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; +import org.junit.Assert; +import org.junit.Test; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; + +import static org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sampleSensorIndexingConfigPath; +import static org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sensorType; + +public class IndexingWriterConfigurationTest { + @Test + public void testDefaultBatchSize() { + IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs", + new IndexingConfigurations() + ); + Assert.assertEquals(1, config.getBatchSize("foo")); + } + @Test + public void testDefaultBatchTimeout() { + IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs", + new IndexingConfigurations() + ); + Assert.assertEquals(0, config.getBatchTimeout("foo")); + } + @Test + public void testGetAllConfiguredTimeouts() throws FileNotFoundException, IOException { + //default + IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs", + new IndexingConfigurations() + ); + Assert.assertEquals(0, config.getAllConfiguredTimeouts().size()); + //non-default + IndexingConfigurations iconfigs = new IndexingConfigurations(); + iconfigs.updateSensorIndexingConfig( + sensorType, new FileInputStream(sampleSensorIndexingConfigPath)); + config = new IndexingWriterConfiguration("elasticsearch", iconfigs); + Assert.assertEquals(1, config.getAllConfiguredTimeouts().size()); + Assert.assertEquals(7, (long)config.getAllConfiguredTimeouts().get(0)); + } + @Test + public void testDefaultIndex() { + IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs", + new IndexingConfigurations() + ); + Assert.assertEquals("foo", config.getIndex("foo")); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ParserWriterConfigurationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ParserWriterConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ParserWriterConfigurationTest.java new file mode 100644 index 0000000..ebe3d9b --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ParserWriterConfigurationTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.configuration.writer; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.ParserConfigurations; +import org.junit.Assert; +import org.junit.Test; + +public class ParserWriterConfigurationTest { + + @Test + public void testDefaultBatchSize() { + ParserWriterConfiguration config = new ParserWriterConfiguration( new ParserConfigurations() ); + Assert.assertEquals(1, config.getBatchSize("foo")); + } + + @Test + public void testDefaultIndex() { + ParserWriterConfiguration config = new ParserWriterConfiguration( new ParserConfigurations() ); + Assert.assertEquals("foo", config.getIndex("foo")); + } + + /** + * { + * "parserClassName":"some-parser", + * "sensorTopic":"testtopic", + * "parserConfig": { + * "batchSize" : 5, + * "batchTimeout" : "10000", + * "index" : "modified-index", + * "enabled" : "false" + * } + * } + */ + @Multiline + private static String configJson; + + @Test + public void pulls_writer_configuration_from_parserConfig() throws IOException { + ParserConfigurations parserConfigurations = new ParserConfigurations(); + final String sensorName = "some-sensor"; + parserConfigurations.updateSensorParserConfig("some-sensor", configJson.getBytes()); + ParserWriterConfiguration writerConfiguration = new ParserWriterConfiguration( + parserConfigurations); + assertThat("batch size should match", writerConfiguration.getBatchSize(sensorName), equalTo(5)); + assertThat("batch timeout should match", writerConfiguration.getBatchTimeout(sensorName), equalTo(10000)); + assertThat("index should match", writerConfiguration.getIndex(sensorName), equalTo("modified-index")); + assertThat("enabled should match", writerConfiguration.isEnabled(sensorName), equalTo(false)); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfigurationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfigurationTest.java new file mode 100644 index 0000000..3639276 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfigurationTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.configuration.writer; + +import static java.util.Arrays.asList; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; +import org.junit.Test; + +public class ProfilerWriterConfigurationTest { + + /** + * { + * "profiler.writer.batchSize" : 12345, + * "profiler.writer.batchTimeout" : 555 + * } + */ + @Multiline + private static String globalJson; + + @Test + public void gets_batch_size_and_timeout_from_global_config() throws IOException { + ProfilerConfigurations configs = new ProfilerConfigurations(); + configs.updateGlobalConfig(globalJson.getBytes()); + ProfilerWriterConfiguration writerConfig = new ProfilerWriterConfiguration(configs); + assertThat("batch timeout should match global config setting", + writerConfig.getBatchTimeout(null), equalTo(555)); + assertThat("list should have single batch timeout matching global config setting", + writerConfig.getAllConfiguredTimeouts(), equalTo(asList(555))); + assertThat("batch size should match global config setting", writerConfig.getBatchSize(null), + equalTo(12345)); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java deleted file mode 100644 index aec215f..0000000 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.common.writer; - -import org.apache.metron.common.configuration.IndexingConfigurations; -import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; -import org.junit.Assert; -import org.junit.Test; - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; - -import static org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sampleSensorIndexingConfigPath; -import static org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sensorType; - -public class IndexingWriterConfigurationTest { - @Test - public void testDefaultBatchSize() { - IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs", - new IndexingConfigurations() - ); - Assert.assertEquals(1, config.getBatchSize("foo")); - } - @Test - public void testDefaultBatchTimeout() { - IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs", - new IndexingConfigurations() - ); - Assert.assertEquals(0, config.getBatchTimeout("foo")); - } - @Test - public void testGetAllConfiguredTimeouts() throws FileNotFoundException, IOException { - //default - IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs", - new IndexingConfigurations() - ); - Assert.assertEquals(0, config.getAllConfiguredTimeouts().size()); - //non-default - IndexingConfigurations iconfigs = new IndexingConfigurations(); - iconfigs.updateSensorIndexingConfig( - sensorType, new FileInputStream(sampleSensorIndexingConfigPath)); - config = new IndexingWriterConfiguration("elasticsearch", iconfigs); - Assert.assertEquals(1, config.getAllConfiguredTimeouts().size()); - Assert.assertEquals(7, (long)config.getAllConfiguredTimeouts().get(0)); - } - @Test - public void testDefaultIndex() { - IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs", - new IndexingConfigurations() - ); - Assert.assertEquals("foo", config.getIndex("foo")); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/ParserWriterConfigurationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/ParserWriterConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/ParserWriterConfigurationTest.java deleted file mode 100644 index e960e47..0000000 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/ParserWriterConfigurationTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.common.writer; - -import org.apache.metron.common.configuration.ParserConfigurations; -import org.apache.metron.common.configuration.writer.ParserWriterConfiguration; -import org.junit.Assert; -import org.junit.Test; - -public class ParserWriterConfigurationTest { - @Test - public void testDefaultBatchSize() { - ParserWriterConfiguration config = new ParserWriterConfiguration( new ParserConfigurations() ); - Assert.assertEquals(1, config.getBatchSize("foo")); - } - - @Test - public void testDefaultIndex() { - ParserWriterConfiguration config = new ParserWriterConfiguration( new ParserConfigurations() ); - Assert.assertEquals("foo", config.getIndex("foo")); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-enrichment/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/README.md b/metron-platform/metron-enrichment/README.md index cbf8ee8..8a53e71 100644 --- a/metron-platform/metron-enrichment/README.md +++ b/metron-platform/metron-enrichment/README.md @@ -89,7 +89,8 @@ There are two types of configurations at the moment, `global` and ## Global Configuration There are a few enrichments which have independent configurations, such -as from the global config. +as from the global config. You can also configure the enrichment topology's +writer batching settings. Also, see the "[Global Configuration](../metron-common)" section for more discussion of the global config. @@ -107,6 +108,18 @@ the topology and used from there. This is lazy, so if this property changes in a running topology, the file will be localized from HDFS upon first time the file is used via the geo enrichment. +### Writer Batching + +#### `enrichment.writer.batchSize` + +The size of the batch that is written to Kafka at once. Defaults to `15` (size of 1 disables batching). + +#### `enrichment.writer.batchTimeout` + +The timeout after which a batch will be flushed even if batchSize has not been met. Optional. +If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm +parameter `topology.message.timeout.secs`. Ignored if batchSize is `1`, since this disables batching. + ## Sensor Enrichment Configuration The sensor specific configuration is intended to configure the http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml index fd7ceff..8a1bba1 100644 --- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml +++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml @@ -328,8 +328,9 @@ bolts: className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" + - "ENRICHMENT" configMethods: - - name: "withMessageWriter" + - name: "withBulkMessageWriter" args: - ref: "enrichmentErrorKafkaWriter" @@ -388,8 +389,9 @@ bolts: className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" + - "ENRICHMENT" configMethods: - - name: "withMessageWriter" + - name: "withBulkMessageWriter" args: - ref: "threatIntelErrorKafkaWriter" @@ -398,8 +400,9 @@ bolts: className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" + - "ENRICHMENT" configMethods: - - name: "withMessageWriter" + - name: "withBulkMessageWriter" args: - ref: "kafkaWriter" parallelism: ${kafka.writer.parallelism} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml index d7107d9..45b05cf 100644 --- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml +++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml @@ -295,8 +295,9 @@ bolts: className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" + - "ENRICHMENT" configMethods: - - name: "withMessageWriter" + - name: "withBulkMessageWriter" args: - ref: "enrichmentErrorKafkaWriter" @@ -329,8 +330,9 @@ bolts: className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" + - "ENRICHMENT" configMethods: - - name: "withMessageWriter" + - name: "withBulkMessageWriter" args: - ref: "threatIntelErrorKafkaWriter" @@ -339,8 +341,9 @@ bolts: className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: - "${kafka.zk}" + - "ENRICHMENT" configMethods: - - name: "withMessageWriter" + - name: "withBulkMessageWriter" args: - ref: "kafkaWriter" parallelism: ${kafka.writer.parallelism} http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java index 52516ac..588fc58 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java @@ -17,9 +17,27 @@ */ package org.apache.metron.enrichment.bolt; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.FileInputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.adrianwalker.multilinestring.Multiline; import org.apache.log4j.Level; import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.IndexingConfigurations; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.message.MessageGetters; @@ -43,19 +61,6 @@ import org.mockito.ArgumentMatcher; import org.mockito.Matchers; import org.mockito.Mock; -import java.io.FileInputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; - public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { protected class MessageListMatcher extends ArgumentMatcher<List<JSONObject>> { @@ -119,9 +124,9 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { @Test public void testSourceTypeMissing() throws Exception { - // setup the bolt - BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") + BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new BulkMessageWriterBolt<IndexingConfigurations>( + "zookeeperUrl", "INDEXING") .withBulkMessageWriter(bulkMessageWriter) .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) .withMessageGetterField("message"); @@ -148,9 +153,11 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { @Test public void testFlushOnBatchSize() throws Exception { - BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") - .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) - .withMessageGetterField("message"); + BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new BulkMessageWriterBolt<IndexingConfigurations>( + "zookeeperUrl", "INDEXING") + .withBulkMessageWriter(bulkMessageWriter) + .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) + .withMessageGetterField("message"); bulkMessageWriterBolt.setCuratorFramework(client); bulkMessageWriterBolt.setZKCache(cache); bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, @@ -207,9 +214,12 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { @Test public void testFlushOnBatchTimeout() throws Exception { FakeClock clock = new FakeClock(); - BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") - .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) - .withMessageGetterField("message").withBatchTimeoutDivisor(3); + BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new BulkMessageWriterBolt<IndexingConfigurations>( + "zookeeperUrl", "INDEXING") + .withBulkMessageWriter(bulkMessageWriter) + .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) + .withMessageGetterField("message") + .withBatchTimeoutDivisor(3); bulkMessageWriterBolt.setCuratorFramework(client); bulkMessageWriterBolt.setZKCache(cache); bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, @@ -251,9 +261,11 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { @Test public void testFlushOnTickTuple() throws Exception { FakeClock clock = new FakeClock(); - BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") - .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) - .withMessageGetterField("message"); + BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new BulkMessageWriterBolt<IndexingConfigurations>( + "zookeeperUrl", "INDEXING") + .withBulkMessageWriter(bulkMessageWriter) + .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) + .withMessageGetterField("message"); bulkMessageWriterBolt.setCuratorFramework(client); bulkMessageWriterBolt.setZKCache(cache); bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType @@ -309,7 +321,8 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { FakeClock clock = new FakeClock(); // setup the bolt - BulkMessageWriterBolt bolt = new BulkMessageWriterBolt("zookeeperUrl") + BulkMessageWriterBolt<IndexingConfigurations> bolt = new BulkMessageWriterBolt<IndexingConfigurations>( + "zookeeperUrl", "INDEXING") .withBulkMessageWriter(bulkMessageWriter) .withMessageGetter(MessageGetters.JSON_FROM_POSITION.name()) .withMessageGetterField("message"); @@ -331,4 +344,5 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any()); verify(outputCollector, times(1)).ack(tuple); } + }