http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java index c5c1294..1f33060 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java @@ -18,6 +18,8 @@ package org.apache.metron.enrichment.bolt; import org.apache.log4j.Level; +import org.apache.metron.common.message.MessageGetStrategy; +import org.apache.metron.common.message.MessageGetters; import org.apache.metron.common.writer.BulkWriterResponse; import org.apache.metron.test.utils.UnitTestHelper; import org.apache.metron.writer.BulkWriterComponent; @@ -112,9 +114,13 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { @Mock private BulkMessageWriter<JSONObject> bulkMessageWriter; + @Mock + private MessageGetStrategy messageGetStrategy; + @Test public void test() throws Exception { - BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl").withBulkMessageWriter(bulkMessageWriter); + BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl") + .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()).withMessageGetterField("message"); bulkMessageWriterBolt.setCuratorFramework(client); bulkMessageWriterBolt.setTreeCache(cache); bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, new FileInputStream(sampleSensorIndexingConfigPath));
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java index b0076a4..90322fe 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java @@ -17,18 +17,22 @@ */ package org.apache.metron.enrichment.bolt; -import org.apache.log4j.Level; -import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase; -import org.apache.metron.test.utils.UnitTestHelper; -import org.apache.storm.tuple.Values; +import com.google.common.cache.CacheLoader; import com.google.common.collect.ImmutableMap; import org.adrianwalker.multilinestring.Multiline; +import org.apache.log4j.Level; import org.apache.metron.TestConstants; -import org.apache.metron.test.bolt.BaseEnrichmentBoltTest; -import org.apache.metron.enrichment.configuration.Enrichment; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase; +import org.apache.metron.enrichment.configuration.Enrichment; import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; -import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.test.bolt.BaseEnrichmentBoltTest; +import org.apache.metron.test.error.MetronErrorJSONMatcher; +import org.apache.metron.test.utils.UnitTestHelper; +import org.apache.storm.tuple.Values; import org.hamcrest.Description; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -43,12 +47,16 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest { @@ -194,7 +202,10 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest { UnitTestHelper.setLog4jLevel(GenericEnrichmentBolt.class, Level.FATAL); genericEnrichmentBolt.execute(tuple); UnitTestHelper.setLog4jLevel(GenericEnrichmentBolt.class, Level.ERROR); - verify(outputCollector, times(1)).emit(eq("error"), any(Values.class)); + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR) + .withThrowable(new Exception("Could not parse binary stream to JSON")); + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); when(tuple.getStringByField("key")).thenReturn(key); when(tuple.getValueByField("message")).thenReturn(originalMessage); when(enrichmentAdapter.enrich(any())).thenReturn(new JSONObject()); @@ -217,6 +228,19 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest { verify(enrichmentAdapter, times(1)).logAccess(cacheKey2); verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, enrichedMessage))); - + reset(outputCollector); + genericEnrichmentBolt.cache.invalidateAll(); + when(enrichmentAdapter.enrich(cacheKey1)).thenReturn(null); + genericEnrichmentBolt.execute(tuple); + error = new MetronError() + .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR) + .withErrorFields(new HashSet<String>() {{ add("field1"); }}) + .addRawMessage(new JSONObject() {{ + put("field1", "value1"); + put("field2", "value2"); + put("source.type", "test"); + }}) + .withThrowable(new CacheLoader.InvalidCacheLoadException("CacheLoader returned null for key CacheKey{field='field1', value='value1'}.")); + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java index 5b06d33..9f12fcd 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java @@ -17,10 +17,14 @@ */ package org.apache.metron.enrichment.bolt; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Values; +import com.google.common.cache.LoadingCache; import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.Constants; +import org.apache.metron.common.error.MetronError; import org.apache.metron.test.bolt.BaseEnrichmentBoltTest; +import org.apache.metron.test.error.MetronErrorJSONMatcher; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Values; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; @@ -31,11 +35,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -89,7 +95,7 @@ public class JoinBoltTest extends BaseEnrichmentBoltTest { } @Test - public void test() { + public void test() throws Exception { StandAloneJoinBolt joinBolt = new StandAloneJoinBolt("zookeeperUrl"); joinBolt.setCuratorFramework(client); joinBolt.setTreeCache(cache); @@ -126,5 +132,16 @@ public class JoinBoltTest extends BaseEnrichmentBoltTest { joinBolt.execute(tuple); verify(outputCollector, times(1)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, joinedMessage))); verify(outputCollector, times(1)).ack(tuple); + + joinBolt.cache = mock(LoadingCache.class); + when(joinBolt.cache.get(key)).thenThrow(new ExecutionException(new Exception("join exception"))); + joinBolt.execute(tuple); + + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR) + .withMessage("Joining problem: {}") + .withThrowable(new ExecutionException(new Exception("join exception"))) + .addRawMessage(new JSONObject()); + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java index 7306c64..e012c55 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java @@ -76,6 +76,7 @@ import java.util.stream.Stream; import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.*; public class EnrichmentIntegrationTest extends BaseIntegrationTest { + private static final String ERROR_TOPIC = "enrichment_error"; private static final String SRC_IP = "ip_src_addr"; private static final String DST_IP = "ip_dst_addr"; private static final String MALICIOUS_IP_TYPE = "malicious_ip"; @@ -139,13 +140,13 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { setProperty("enrichment.simple.hbase.table", enrichmentsTableName); setProperty("enrichment.simple.hbase.cf", cf); setProperty("enrichment.output.topic", Constants.INDEXING_TOPIC); - setProperty("enrichment.error.topic", Constants.ENRICHMENT_ERROR_TOPIC); + setProperty("enrichment.error.topic", ERROR_TOPIC); }}; final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{ add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1)); add(new KafkaComponent.Topic(Constants.INDEXING_TOPIC, 1)); - add(new KafkaComponent.Topic(Constants.ENRICHMENT_ERROR_TOPIC, 1)); + add(new KafkaComponent.Topic(ERROR_TOPIC, 1)); }}); String globalConfigStr = null; { @@ -201,15 +202,14 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { fluxComponent.submitTopology(); kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages); - ProcessorResult<List<Map<String, Object>>> result = runner.process(getProcessor()); - // We expect failures, so we don't care if result returned failure or not - List<Map<String, Object>> docs = result.getResult(); + ProcessorResult<Map<String, List<Map<String, Object>>>> result = runner.process(getProcessor()); + Map<String,List<Map<String, Object>>> outputMessages = result.getResult(); + List<Map<String, Object>> docs = outputMessages.get(Constants.INDEXING_TOPIC); Assert.assertEquals(inputMessages.size(), docs.size()); validateAll(docs); - - List<byte[]> errors = result.getProcessErrors(); + List<Map<String, Object>> errors = outputMessages.get(ERROR_TOPIC); Assert.assertEquals(inputMessages.size(), errors.size()); - validateErrors(result.getProcessErrors()); + validateErrors(errors); } finally { runner.stop(); } @@ -234,10 +234,12 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { } } - protected void validateErrors(List<byte[]> errors) { - for(byte[] error : errors) { - // Don't reconstruct the entire message, just ensure it contains the known error message inside. - Assert.assertTrue(new String(error).contains(ErrorEnrichmentBolt.TEST_ERROR_MESSAGE)); + protected void validateErrors(List<Map<String, Object>> errors) { + for(Map<String, Object> error : errors) { + Assert.assertEquals("Test throwing error from ErrorEnrichmentBolt", error.get(Constants.ErrorFields.MESSAGE.getName())); + Assert.assertEquals("java.lang.IllegalStateException: Test throwing error from ErrorEnrichmentBolt", error.get(Constants.ErrorFields.EXCEPTION.getName())); + Assert.assertEquals(Constants.ErrorType.ENRICHMENT_ERROR.getType(), error.get(Constants.ErrorFields.ERROR_TYPE.getName())); + Assert.assertEquals("{\"rawMessage\":\"Error Test Raw Message String\"}", error.get(Constants.ErrorFields.RAW_MESSAGE.getName())); } } @@ -504,39 +506,47 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { return ret; } + private static List<Map<String, Object>> loadMessages(List<byte[]> outputMessages) { + List<Map<String, Object>> tmp = new ArrayList<>(); + Iterables.addAll(tmp + , Iterables.transform(outputMessages + , message -> { + try { + return new HashMap<>(JSONUtils.INSTANCE.load(new String(message) + , new TypeReference<Map<String, Object>>() {} + ) + ); + } catch (Exception ex) { + throw new IllegalStateException(ex); + } + } + ) + ); + return tmp; + } @SuppressWarnings("unchecked") - private Processor<List<Map<String, Object>>> getProcessor() { + private KafkaProcessor<Map<String,List<Map<String, Object>>>> getProcessor(){ - KafkaProcessor<List<Map<String, Object>>> kafkaProcessor = new KafkaProcessor<>().withKafkaComponentName("kafka") + return new KafkaProcessor<>() + .withKafkaComponentName("kafka") .withReadTopic(Constants.INDEXING_TOPIC) - .withErrorTopic(Constants.ENRICHMENT_ERROR_TOPIC) - .withInvalidTopic(Constants.INVALID_STREAM) + .withErrorTopic(ERROR_TOPIC) .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() { @Nullable @Override public Boolean apply(@Nullable KafkaMessageSet messageSet) { - // this test is written to return 10 errors and 10 messages - // we can just check when the messages match here - // if they do then we are good - return messageSet.getMessages().size() == inputMessages.size(); + return (messageSet.getMessages().size() == inputMessages.size()) && (messageSet.getErrors().size() == inputMessages.size()); } }) - .withProvideResult(new Function<KafkaMessageSet , List<Map<String, Object>>>() { + .withProvideResult(new Function<KafkaMessageSet,Map<String,List<Map<String, Object>>>>(){ @Nullable @Override - public List<Map<String, Object>> apply(@Nullable KafkaMessageSet messageSet) { - List<Map<String,Object>> docs = new ArrayList<>(); - for (byte[] message : messageSet.getMessages()) { - try { - docs.add(JSONUtils.INSTANCE.load(new String(message), new TypeReference<Map<String, Object>>() { - })); - } catch (IOException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - return docs; + public Map<String,List<Map<String, Object>>> apply(@Nullable KafkaMessageSet messageSet) { + return new HashMap<String, List<Map<String, Object>>>() {{ + put(Constants.INDEXING_TOPIC, loadMessages(messageSet.getMessages())); + put(ERROR_TOPIC, loadMessages(messageSet.getErrors())); + }}; } }); - return kafkaProcessor; } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-indexing/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md index 5296ea0..f243eef 100644 --- a/metron-platform/metron-indexing/README.md +++ b/metron-platform/metron-indexing/README.md @@ -24,7 +24,7 @@ and sent to * An indexing bolt configured to write to either elasticsearch or Solr * An indexing bolt configured to write to HDFS under `/apps/metron/enrichment/indexed` -Errors during indexing are sent to a kafka queue called `index_errors` +By default, errors during indexing are sent back into the `indexing` kafka queue so that they can be indexed and archived. ##Sensor Indexing Configuration The sensor specific configuration is intended to configure the http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/error.json ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/error.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/error.json new file mode 100644 index 0000000..4c9d786 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/error.json @@ -0,0 +1,17 @@ +{ + "hdfs" : { + "index": "error", + "batchSize": 5, + "enabled" : true + }, + "elasticsearch" : { + "index": "error", + "batchSize": 5, + "enabled" : true + }, + "solr" : { + "index": "error", + "batchSize": 5, + "enabled" : true + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml index 8bf8f48..3e329f4 100644 --- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml +++ b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml @@ -101,7 +101,8 @@ bolts: - ref: "indexWriter" - name: "withMessageGetter" args: - - "RAW" + - "DEFAULT_JSON_FROM_POSITION" + - id: "hdfsIndexingBolt" className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" constructorArgs: @@ -112,7 +113,7 @@ bolts: - ref: "hdfsWriter" - name: "withMessageGetter" args: - - "RAW" + - "DEFAULT_JSON_FROM_POSITION" - id: "indexingErrorBolt" className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index ae04e43..60cd1d1 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -44,7 +44,6 @@ import org.junit.Assert; import org.junit.Test; import javax.annotation.Nullable; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.util.*; @@ -53,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient; public abstract class IndexingIntegrationTest extends BaseIntegrationTest { + protected static final String ERROR_TOPIC = "indexing_error"; protected String hdfsDir = "target/indexingIntegrationTest/hdfs"; protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed"; protected String fluxPath = "../metron-indexing/src/main/flux/indexing/remote.yaml"; @@ -125,7 +125,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { setProperty("indexing.workers", "1"); setProperty("indexing.executors", "0"); setProperty("index.input.topic", Constants.INDEXING_TOPIC); - setProperty("index.error.topic", Constants.INDEXING_ERROR_TOPIC); + setProperty("index.error.topic", ERROR_TOPIC); setProperty("index.date.format", dateFormat); //HDFS settings @@ -138,7 +138,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest { final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{ add(new KafkaComponent.Topic(Constants.INDEXING_TOPIC, 1)); - add(new KafkaComponent.Topic(Constants.INDEXING_ERROR_TOPIC, 1)); + add(new KafkaComponent.Topic(ERROR_TOPIC, 1)); }}); List<Map<String, Object>> inputDocs = new ArrayList<>(); for(byte[] b : inputMessages) { http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ProcessorResult.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ProcessorResult.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ProcessorResult.java index 3eb4e8f..c309686 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ProcessorResult.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ProcessorResult.java @@ -24,7 +24,6 @@ public class ProcessorResult<T> { public static class Builder<T>{ T result; List<byte[]> processErrors; - List<byte[]> processInvalids; public Builder(){} @@ -38,25 +37,18 @@ public class ProcessorResult<T> { return this; } - public Builder withProcessInvalids(List<byte[]> processInvalids){ - this.processInvalids = processInvalids; - return this; - } - public ProcessorResult<T> build(){ - return new ProcessorResult<T>(result,processErrors,processInvalids); + return new ProcessorResult<T>(result,processErrors); } } T result; List<byte[]> processErrors; - List<byte[]> processInvalids; @SuppressWarnings("unchecked") - public ProcessorResult(T result,List<byte[]> processErrors, List<byte[]> processInvalids){ + public ProcessorResult(T result,List<byte[]> processErrors){ this.result = result; this.processErrors = processErrors == null ? new ArrayList() : processErrors; - this.processInvalids = processInvalids == null ? new ArrayList() : processInvalids; } public T getResult(){ @@ -67,12 +59,8 @@ public class ProcessorResult<T> { return processErrors; } - public List<byte[]> getProcessInvalids(){ - return processInvalids; - } - public boolean failed(){ - return processErrors.size() > 0 || processInvalids.size() > 0; + return processErrors.size() > 0; } public void getBadResults(StringBuffer buffer){ @@ -84,10 +72,5 @@ public class ProcessorResult<T> { buffer.append(new String(outputMessage)); } buffer.append("\n"); - buffer.append(String.format("%d Invalid Messages", processInvalids.size())); - for (byte[] outputMessage : processInvalids) { - buffer.append(new String(outputMessage)); - } - buffer.append("\n"); } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java index 4227933..683fe6a 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java @@ -23,12 +23,10 @@ import java.util.List; public class KafkaMessageSet{ public List<byte[]> messages; public List<byte[]> errors; - public List<byte[]> invalids; - public KafkaMessageSet(List<byte[]> messages, List<byte[]> errors, List<byte[]> invalids) { + public KafkaMessageSet(List<byte[]> messages, List<byte[]> errors) { this.messages = messages; this.errors = errors; - this.invalids = invalids; } @@ -38,7 +36,4 @@ public class KafkaMessageSet{ public List<byte[]> getErrors() { return errors; } - public List<byte[]> getInvalids() { - return invalids; - } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java index 6fdbbf4..63f073d 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java @@ -30,10 +30,8 @@ public class KafkaProcessor<T> implements Processor<T> { private String kafkaComponentName; private String readTopic; private String errorTopic; - private String invalidTopic; private List<byte[]> messages = new LinkedList<>(); private List<byte[]> errors = new LinkedList<>(); - private List<byte[]> invalids = new LinkedList<>(); public KafkaProcessor(){} public KafkaProcessor withKafkaComponentName(String name){ @@ -48,10 +46,6 @@ public class KafkaProcessor<T> implements Processor<T> { this.errorTopic = topicName; return this; } - public KafkaProcessor withInvalidTopic(String topicName){ - this.invalidTopic = topicName; - return this; - } public KafkaProcessor withValidateReadMessages(Function<KafkaMessageSet, Boolean> validate){ this.validateReadMessages = validate; return this; @@ -68,25 +62,19 @@ public class KafkaProcessor<T> implements Processor<T> { KafkaComponent kafkaComponent = runner.getComponent(kafkaComponentName, KafkaComponent.class); LinkedList<byte[]> outputMessages = new LinkedList<>(kafkaComponent.readMessages(readTopic)); LinkedList<byte[]> outputErrors = null; - LinkedList<byte[]> outputInvalids = null; if (errorTopic != null) { outputErrors = new LinkedList<>(kafkaComponent.readMessages(errorTopic)); } - if (invalidTopic != null) { - outputInvalids = new LinkedList<>(kafkaComponent.readMessages(invalidTopic)); - } - Boolean validated = validateReadMessages.apply(new KafkaMessageSet(outputMessages,outputErrors,outputInvalids)); + Boolean validated = validateReadMessages.apply(new KafkaMessageSet(outputMessages,outputErrors)); if(validated == null){ validated = false; } if(validated){ messages.addAll(outputMessages); errors.addAll(outputErrors); - invalids.addAll(outputInvalids); outputMessages.clear(); outputErrors.clear(); - outputInvalids.clear(); return ReadinessState.READY; } return ReadinessState.NOT_READY; @@ -94,7 +82,7 @@ public class KafkaProcessor<T> implements Processor<T> { @SuppressWarnings("unchecked") public ProcessorResult<T> getResult(){ ProcessorResult.Builder<T> builder = new ProcessorResult.Builder(); - return builder.withResult(provideResult.apply(new KafkaMessageSet(messages,errors,invalids))).withProcessErrors(errors).withProcessInvalids(invalids).build(); + return builder.withResult(provideResult.apply(new KafkaMessageSet(messages,errors))).withProcessErrors(errors).build(); } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index 3c4310d..2cf9bbf 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -30,7 +30,7 @@ Data flows through the parser bolt via kafka and into the `enrichments` topology in kafka. Errors are collected with the context of the error (e.g. stacktrace) and original message causing the error and sent to an `error` queue. Invalid messages as determined by global validation -functions are sent to an `invalid` queue. +functions are also treated as errors and sent to an `error` queue. ##Message Format @@ -277,9 +277,6 @@ usage: start_parser_topology.sh -ewp,--error_writer_p <PARALLELISM_HINT> Error Writer Parallelism Hint -h,--help This screen - -iwnt,--invalid_writer_num_tasks <NUM_TASKS> Invalid Writer Num Tasks - -iwp,--invalid_writer_p <PARALLELISM_HINT> Invalid Message Writer - Parallelism Hint -k,--kafka <BROKER_URL> Kafka Broker URL -mt,--message_timeout <TIMEOUT_IN_SECS> Message Timeout in Seconds -mtp,--max_task_parallelism <MAX_TASK> Max task parallelism @@ -365,9 +362,6 @@ be customized by modifying the arguments sent to this utility. * The Error Message Writer Bolt * `--error_writer_num_tasks` : The number of tasks for the error writer bolt * `--error_writer_p` : The parallelism hint for the error writer bolt -* The Invalid Message Writer Bolt - * `--invalid_writer_num_tasks` : The number of tasks for the error writer bolt - * `--invalid_writer_p` : The parallelism hint for the error writer bolt Finally, if workers and executors are new to you, the following might be of use to you: * [Understanding the Parallelism of a Storm Topology](http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/) http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index 20c15ce..0b1ea3d 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -18,29 +18,38 @@ package org.apache.metron.parsers.bolt; import org.apache.commons.lang3.StringUtils; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredParserBolt; +import org.apache.metron.common.configuration.FieldTransformer; import org.apache.metron.common.configuration.FieldValidator; import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.common.dsl.Context; import org.apache.metron.common.dsl.StellarFunctions; -import org.apache.metron.parsers.filters.Filters; -import org.apache.metron.common.configuration.FieldTransformer; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.message.MessageGetStrategy; +import org.apache.metron.common.message.MessageGetters; import org.apache.metron.common.utils.ErrorUtils; +import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase; +import org.apache.metron.parsers.filters.Filters; import org.apache.metron.parsers.interfaces.MessageFilter; import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; public class ParserBolt extends ConfiguredParserBolt implements Serializable { @@ -51,6 +60,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { private MessageFilter<JSONObject> filter; private WriterHandler writer; private org.apache.metron.common.dsl.Context stellarContext; + private transient MessageGetStrategy messageGetStrategy; public ParserBolt( String zookeeperUrl , String sensorType , MessageParser<JSONObject> parser @@ -72,6 +82,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { super.prepare(stormConf, context, collector); + messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get(); this.collector = collector; initializeStellar(); if(getSensorParserConfig() != null && filter == null) { @@ -109,7 +120,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { - byte[] originalMessage = tuple.getBinary(0); + byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple); SensorParserConfig sensorParserConfig = getSensorParserConfig(); try { //we want to ack the tuple in the situation where we have are not doing a bulk write @@ -128,12 +139,22 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { } if (parser.validate(message) && (filter == null || filter.emitTuple(message, stellarContext))) { numWritten++; - if(!isGloballyValid(message, fieldValidations)) { - message.put(Constants.SENSOR_TYPE, getSensorType()+ ".invalid"); - collector.emit(Constants.INVALID_STREAM, new Values(message)); + List<FieldValidator> failedValidators = getFailedValidators(message, fieldValidations); + if(failedValidators.size() > 0) { + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_INVALID) + .withSensorType(getSensorType()) + .addRawMessage(message); + Set<String> errorFields = failedValidators.stream() + .flatMap(fieldValidator -> fieldValidator.getInput().stream()) + .collect(Collectors.toSet()); + if (!errorFields.isEmpty()) { + error.withErrorFields(errorFields); + } + ErrorUtils.handleError(collector, error); } else { - writer.write(getSensorType(), tuple, message, getConfigurations()); + writer.write(getSensorType(), tuple, message, getConfigurations(), messageGetStrategy); } } } @@ -145,28 +166,28 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { collector.ack(tuple); } } catch (Throwable ex) { - ErrorUtils.handleError( collector - , ex - , Constants.ERROR_STREAM - , Optional.of(getSensorType()) - , Optional.ofNullable(originalMessage) - ); + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(ex) + .withSensorType(getSensorType()) + .addRawMessage(originalMessage); + ErrorUtils.handleError(collector, error); collector.ack(tuple); } } - private boolean isGloballyValid(JSONObject input, List<FieldValidator> validators) { + private List<FieldValidator> getFailedValidators(JSONObject input, List<FieldValidator> validators) { + List<FieldValidator> failedValidators = new ArrayList<>(); for(FieldValidator validator : validators) { if(!validator.isValid(input, getConfigurations().getGlobalConfig(), stellarContext)) { - return false; + failedValidators.add(validator); } } - return true; + return failedValidators; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declareStream(Constants.INVALID_STREAM, new Fields("message")); declarer.declareStream(Constants.ERROR_STREAM, new Fields("message")); } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java index 8eb0656..ef7288b 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java @@ -18,23 +18,27 @@ package org.apache.metron.parsers.bolt; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.ParserConfigurations; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.message.MessageGetStrategy; +import org.apache.metron.common.message.MessageGetters; +import org.apache.metron.common.utils.ErrorUtils; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; -import org.apache.metron.common.Constants; -import org.apache.metron.common.configuration.ParserConfigurations; -import org.apache.metron.common.utils.ErrorUtils; import org.json.simple.JSONObject; import java.util.Map; -import java.util.Optional; public class WriterBolt extends BaseRichBolt { private WriterHandler handler; private ParserConfigurations configuration; private String sensorType; + private Constants.ErrorType errorType = Constants.ErrorType.DEFAULT_ERROR; + private transient MessageGetStrategy messageGetStrategy; private transient OutputCollector collector; public WriterBolt(WriterHandler handler, ParserConfigurations configuration, String sensorType) { this.handler = handler; @@ -42,9 +46,15 @@ public class WriterBolt extends BaseRichBolt { this.sensorType = sensorType; } + public WriterBolt withErrorType(Constants.ErrorType errorType) { + this.errorType = errorType; + return this; + } + @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; + messageGetStrategy = MessageGetters.DEFAULT_JSON_FROM_FIELD.get(); handler.init(stormConf, collector, configuration); } @@ -65,18 +75,18 @@ public class WriterBolt extends BaseRichBolt { public void execute(Tuple tuple) { JSONObject message = null; try { - message = (JSONObject)((JSONObject) tuple.getValueByField("message")).clone(); - handler.write(sensorType, tuple, message, configuration); + message = (JSONObject) messageGetStrategy.get(tuple); + handler.write(sensorType, tuple, message, configuration, messageGetStrategy); if(!handler.handleAck()) { collector.ack(tuple); } } catch (Throwable e) { - ErrorUtils.handleError( collector - , e - , Constants.ERROR_STREAM - , Optional.of(sensorType) - , Optional.ofNullable(message) - ); + MetronError error = new MetronError() + .withErrorType(errorType) + .withThrowable(e) + .withSensorType(sensorType) + .addRawMessage(message); + ErrorUtils.handleError(collector, error); collector.ack(tuple); } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java index 38425b5..3273ca7 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java @@ -18,6 +18,7 @@ package org.apache.metron.parsers.bolt; +import org.apache.metron.common.message.MessageGetStrategy; import org.apache.storm.task.OutputCollector; import org.apache.storm.tuple.Tuple; import org.apache.metron.common.configuration.ParserConfigurations; @@ -80,11 +81,12 @@ public class WriterHandler implements Serializable { , Tuple tuple , JSONObject message , ParserConfigurations configurations + , MessageGetStrategy messageGetStrategy ) throws Exception { - writerComponent.write(sensorType, tuple, message, messageWriter, writerTransformer.apply(configurations)); + writerComponent.write(sensorType, tuple, message, messageWriter, writerTransformer.apply(configurations), messageGetStrategy); } - public void errorAll(String sensorType, Throwable e) { - writerComponent.errorAll(sensorType, e); + public void errorAll(String sensorType, Throwable e, MessageGetStrategy messageGetStrategy) { + writerComponent.errorAll(sensorType, e, messageGetStrategy); } } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index 5f1927e..aeac33c 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -57,8 +57,6 @@ public class ParserTopologyBuilder { * @param spoutNumTasks Number of tasks for the spout * @param parserParallelism Parallelism hint for the parser bolt * @param parserNumTasks Number of tasks for the parser bolt - * @param invalidWriterParallelism Parallelism hint for the bolt that handles invalid data - * @param invalidWriterNumTasks Number of tasks for the bolt that handles invalid data * @param errorWriterParallelism Parallelism hint for the bolt that handles errors * @param errorWriterNumTasks Number of tasks for the bolt that handles errors * @param kafkaSpoutConfig Configuration options for the kafka spout @@ -73,8 +71,6 @@ public class ParserTopologyBuilder { int spoutNumTasks, int parserParallelism, int parserNumTasks, - int invalidWriterParallelism, - int invalidWriterNumTasks, int errorWriterParallelism, int errorWriterNumTasks, EnumMap<SpoutConfigOptions, Object> kafkaSpoutConfig @@ -104,14 +100,6 @@ public class ParserTopologyBuilder { .shuffleGrouping("parserBolt", Constants.ERROR_STREAM); } - // create the invalid bolt, if needed - if (invalidWriterNumTasks > 0) { - WriterBolt invalidBolt = createInvalidBolt(brokerUrl, sensorType, configs, parserConfig); - builder.setBolt("invalidMessageWriter", invalidBolt, invalidWriterParallelism) - .setNumTasks(invalidWriterNumTasks) - .shuffleGrouping("parserBolt", Constants.INVALID_STREAM); - } - return builder; } @@ -162,29 +150,6 @@ public class ParserTopologyBuilder { } /** - * Create a bolt that handles invalid messages. - * - * @param brokerUrl The Kafka Broker URL - * @param sensorType Type of sensor that is being consumed. - * @param configs - * @param parserConfig - * @return A Storm bolt that handles invalid messages. - */ - private static WriterBolt createInvalidBolt(String brokerUrl, String sensorType, ParserConfigurations configs, SensorParserConfig parserConfig) { - - // create writer - if not configured uses a sensible default - AbstractWriter writer = parserConfig.getErrorWriterClassName() == null - ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_INVALID_TOPIC).withConfigPrefix("invalid") - : ReflectionUtils.createInstance(parserConfig.getWriterClassName()); - writer.configure(sensorType, new ParserWriterConfiguration(configs)); - - // create a writer handler - WriterHandler writerHandler = createWriterHandler(writer); - - return new WriterBolt(writerHandler, configs, sensorType); - } - - /** * Create a bolt that handles error messages. * * @param brokerUrl Kafka Broker URL @@ -197,14 +162,14 @@ public class ParserTopologyBuilder { // create writer - if not configured uses a sensible default AbstractWriter writer = parserConfig.getErrorWriterClassName() == null - ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_ERROR_TOPIC).withConfigPrefix("error") + ? new KafkaWriter(brokerUrl).withTopic((String) configs.getGlobalConfig().get("parser.error.topic")).withConfigPrefix("error") : ReflectionUtils.createInstance(parserConfig.getWriterClassName()); writer.configure(sensorType, new ParserWriterConfiguration(configs)); // create a writer handler WriterHandler writerHandler = createWriterHandler(writer); - return new WriterBolt(writerHandler, configs, sensorType); + return new WriterBolt(writerHandler, configs, sensorType).withErrorType(Constants.ErrorType.PARSER_ERROR); } /** http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java index 5ea561c..2bf484e 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java @@ -302,8 +302,6 @@ public class ParserTopologyCLI { spoutNumTasks, parserParallelism, parserNumTasks, - invalidParallelism, - invalidNumTasks, errorParallelism, errorNumTasks, spoutConfig http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java index 6b00998..b3e15b2 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java @@ -17,8 +17,11 @@ */ package org.apache.metron.parsers.bolt; +import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.*; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.test.error.MetronErrorJSONMatcher; import org.apache.metron.test.utils.UnitTestHelper; import org.apache.storm.task.OutputCollector; import org.apache.storm.tuple.Tuple; @@ -151,6 +154,64 @@ public class ParserBoltTest extends BaseBoltTest { verify(parser, times(0)).validate(any()); verify(writer, times(0)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), any()); verify(outputCollector, times(1)).ack(tuple); + + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(new NullPointerException()) + .withSensorType(sensorType) + .addRawMessage(sampleBinary); + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); + } + + @Test + public void testInvalid() throws Exception { + String sensorType = "yaf"; + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) { + @Override + protected ParserConfigurations defaultConfigurations() { + return new ParserConfigurations() { + @Override + public SensorParserConfig getSensorParserConfig(String sensorType) { + return new SensorParserConfig() { + @Override + public Map<String, Object> getParserConfig() { + return new HashMap<String, Object>() {{ + }}; + } + + + }; + } + }; + } + + }; + + buildGlobalConfig(parserBolt); + + parserBolt.setCuratorFramework(client); + parserBolt.setTreeCache(cache); + parserBolt.prepare(new HashMap(), topologyContext, outputCollector); + byte[] sampleBinary = "some binary message".getBytes(); + + when(tuple.getBinary(0)).thenReturn(sampleBinary); + JSONObject parsedMessage = new JSONObject(); + parsedMessage.put("field", "invalidValue"); + List<JSONObject> messageList = new ArrayList<>(); + messageList.add(parsedMessage); + when(parser.parseOptional(sampleBinary)).thenReturn(Optional.of(messageList)); + when(parser.validate(parsedMessage)).thenReturn(true); + parserBolt.execute(tuple); + + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_INVALID) + .withSensorType(sensorType) + .withErrorFields(new HashSet<String>() {{ add("field"); }}) + .addRawMessage(new JSONObject(){{ + put("field", "invalidValue"); + put("source.type", "yaf"); + }}); + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); } @Test @@ -531,6 +592,16 @@ public void testImplicitBatchOfOne() throws Exception { } + protected void buildGlobalConfig(ParserBolt parserBolt) { + HashMap<String, Object> globalConfig = new HashMap<>(); + Map<String, Object> fieldValidation = new HashMap<>(); + fieldValidation.put("input", Arrays.asList("field")); + fieldValidation.put("validation", "STELLAR"); + fieldValidation.put("config", new HashMap<String, String>(){{ put("condition", "field != 'invalidValue'"); }}); + globalConfig.put("fieldValidations", Arrays.asList(fieldValidation)); + parserBolt.getConfigurations().updateGlobalConfig(globalConfig); + } + private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) { bolt.execute(t); } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java index 4693829..4511b55 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java @@ -17,22 +17,23 @@ */ package org.apache.metron.parsers.bolt; + import org.apache.log4j.Level; +import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.IndexingConfigurations; -import org.apache.metron.test.utils.UnitTestHelper; -import org.apache.metron.writer.BulkWriterComponent; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Tuple; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; -import org.apache.metron.common.configuration.writer.ParserWriterConfiguration; +import org.apache.metron.common.error.MetronError; import org.apache.metron.common.writer.BulkMessageWriter; -import org.apache.metron.common.writer.MessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; +import org.apache.metron.common.writer.MessageWriter; import org.apache.metron.test.bolt.BaseBoltTest; +import org.apache.metron.test.error.MetronErrorJSONMatcher; +import org.apache.metron.test.utils.UnitTestHelper; +import org.apache.metron.writer.BulkWriterComponent; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; import org.junit.Test; import org.mockito.Mock; @@ -44,7 +45,12 @@ import java.util.Map; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class WriterBoltTest extends BaseBoltTest{ @Mock @@ -143,16 +149,24 @@ public class WriterBoltTest extends BaseBoltTest{ ParserConfigurations configurations = getConfigurations(1); String sensorType = "test"; Tuple t = mock(Tuple.class); + when(t.toString()).thenReturn("tuple"); when(t.getValueByField(eq("message"))).thenReturn(new JSONObject()); WriterBolt bolt = new WriterBolt(new WriterHandler(writer), configurations, sensorType); bolt.prepare(new HashMap(), topologyContext, outputCollector); - doThrow(new Exception()).when(writer).write(any(), any(), any(), any()); + doThrow(new Exception("write error")).when(writer).write(any(), any(), any(), any()); verify(writer, times(1)).init(); bolt.execute(t); verify(outputCollector, times(1)).ack(t); verify(writer, times(1)).write(eq(sensorType), any(), any(), any()); verify(outputCollector, times(1)).reportError(any()); verify(outputCollector, times(0)).fail(any()); + + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.DEFAULT_ERROR) + .withThrowable(new IllegalStateException("Unhandled bulk errors in response: {java.lang.Exception: write error=[tuple]}")) + .withSensorType(sensorType) + .addRawMessage(new JSONObject()); + verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); } @Test http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java index 4ba1c43..a170a2c 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import java.util.*; public abstract class ParserIntegrationTest extends BaseIntegrationTest { + protected static final String ERROR_TOPIC = "parser_error"; protected List<byte[]> inputMessages; @Test public void test() throws Exception { @@ -47,8 +48,7 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest { final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{ add(new KafkaComponent.Topic(sensorType, 1)); add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1)); - add(new KafkaComponent.Topic(Constants.INVALID_STREAM,1)); - add(new KafkaComponent.Topic(Constants.ERROR_STREAM,1)); + add(new KafkaComponent.Topic(ERROR_TOPIC,1)); }}); topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList()); @@ -115,13 +115,12 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest { return new KafkaProcessor<>() .withKafkaComponentName("kafka") .withReadTopic(Constants.ENRICHMENT_TOPIC) - .withErrorTopic(Constants.ERROR_STREAM) - .withInvalidTopic(Constants.INVALID_STREAM) + .withErrorTopic(ERROR_TOPIC) .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() { @Nullable @Override public Boolean apply(@Nullable KafkaMessageSet messageSet) { - return (messageSet.getMessages().size() + messageSet.getErrors().size() + messageSet.getInvalids().size()) == inputMessages.size(); + return (messageSet.getMessages().size() + messageSet.getErrors().size() == inputMessages.size()); } }) .withProvideResult(new Function<KafkaMessageSet,List<byte[]>>(){ http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java index 80b6ebd..73d3827 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java @@ -34,6 +34,7 @@ public class ParserTopologyComponent implements InMemoryComponent { private Properties topologyProperties; private String brokerUrl; private String sensorType; + private SpoutConfig.Offset offset = SpoutConfig.Offset.BEGINNING; private LocalCluster stormCluster; public static class Builder { @@ -64,15 +65,17 @@ public class ParserTopologyComponent implements InMemoryComponent { this.sensorType = sensorType; } + public void setOffset(SpoutConfig.Offset offset) { + this.offset = offset; + } + @Override public void start() throws UnableToStartException { try { TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty("kafka.zk") , brokerUrl , sensorType - , SpoutConfig.Offset.BEGINNING - , 1 - , 1 + , offset , 1 , 1 , 1 http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java index f37b1fc..7476bcf 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.metron.writers.integration; + import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; import com.google.common.collect.Iterables; @@ -37,6 +37,8 @@ import org.apache.metron.parsers.csv.CSVParser; import org.apache.metron.parsers.integration.components.ParserTopologyComponent; import org.apache.metron.test.utils.UnitTestHelper; import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; import org.junit.Assert; import org.junit.Test; @@ -45,6 +47,8 @@ import java.io.IOException; import java.util.*; public class WriterBoltIntegrationTest extends BaseIntegrationTest { + private static final String ERROR_TOPIC = "parser_error"; + public static class MockValidator implements FieldValidation{ @Override @@ -65,7 +69,8 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { { "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator" } - ] + ], + "parser.error.topic":"parser_error" } */ @Multiline @@ -88,7 +93,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { public static String parserConfig; @Test - public void test() throws UnableToStartException, IOException { + public void test() throws UnableToStartException, IOException, ParseException { UnitTestHelper.setLog4jLevel(CSVParser.class, org.apache.log4j.Level.FATAL); final String sensorType = "dummy"; final List<byte[]> inputMessages = new ArrayList<byte[]>() {{ @@ -100,8 +105,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{ add(new KafkaComponent.Topic(sensorType, 1)); - add(new KafkaComponent.Topic(Constants.DEFAULT_PARSER_ERROR_TOPIC, 1)); - add(new KafkaComponent.Topic(Constants.DEFAULT_PARSER_INVALID_TOPIC, 1)); + add(new KafkaComponent.Topic(ERROR_TOPIC, 1)); add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1)); }}); topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList()); @@ -131,17 +135,20 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { kafkaComponent.writeMessages(sensorType, inputMessages); ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(getProcessor()); Map<String,List<JSONObject>> outputMessages = result.getResult(); - Assert.assertEquals(3, outputMessages.size()); + Assert.assertEquals(2, outputMessages.size()); Assert.assertEquals(1, outputMessages.get(Constants.ENRICHMENT_TOPIC).size()); Assert.assertEquals("valid", outputMessages.get(Constants.ENRICHMENT_TOPIC).get(0).get("action")); - Assert.assertEquals(1, outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).size()); - Assert.assertEquals("error", outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).get(0).get("rawMessage")); - Assert.assertTrue(Arrays.equals(listToBytes(outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).get(0).get("rawMessage_bytes")) - , "error".getBytes() - ) - ); - Assert.assertEquals(1, outputMessages.get(Constants.DEFAULT_PARSER_INVALID_TOPIC).size()); - Assert.assertEquals("invalid", outputMessages.get(Constants.DEFAULT_PARSER_INVALID_TOPIC).get(0).get("action")); + Assert.assertEquals(2, outputMessages.get(ERROR_TOPIC).size()); + JSONObject invalidMessage = outputMessages.get(ERROR_TOPIC).get(0); + Assert.assertEquals(Constants.ErrorType.PARSER_INVALID.getType(), invalidMessage.get(Constants.ErrorFields.ERROR_TYPE.getName())); + JSONObject rawMessage = JSONUtils.INSTANCE.load((String) invalidMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName()), JSONObject.class); + Assert.assertEquals("foo", rawMessage.get("dummy")); + Assert.assertEquals("invalid", rawMessage.get("action")); + JSONObject errorMessage = outputMessages.get(ERROR_TOPIC).get(1); + Assert.assertEquals(Constants.ErrorType.PARSER_ERROR.getType(), errorMessage.get(Constants.ErrorFields.ERROR_TYPE.getName())); + Assert.assertEquals("error", errorMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName())); + // It's unclear if we need a rawMessageBytes field so commenting out for now + //Assert.assertTrue(Arrays.equals(listToBytes(errorMessage.get(Constants.ErrorFields.RAW_MESSAGE_BYTES.getName())), "error".getBytes())); } finally { if(runner != null) { @@ -182,13 +189,12 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { return new KafkaProcessor<>() .withKafkaComponentName("kafka") .withReadTopic(Constants.ENRICHMENT_TOPIC) - .withErrorTopic(Constants.DEFAULT_PARSER_ERROR_TOPIC) - .withInvalidTopic(Constants.DEFAULT_PARSER_INVALID_TOPIC) + .withErrorTopic(ERROR_TOPIC) .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() { @Nullable @Override public Boolean apply(@Nullable KafkaMessageSet messageSet) { - return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 1) && (messageSet.getInvalids().size() ==1); + return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2); } }) .withProvideResult(new Function<KafkaMessageSet,Map<String,List<JSONObject>>>(){ @@ -197,8 +203,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest { public Map<String,List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) { return new HashMap<String, List<JSONObject>>() {{ put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages())); - put(Constants.DEFAULT_PARSER_ERROR_TOPIC, loadMessages(messageSet.getErrors())); - put(Constants.DEFAULT_PARSER_INVALID_TOPIC, loadMessages(messageSet.getInvalids())); + put(ERROR_TOPIC, loadMessages(messageSet.getErrors())); }}; } }); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-solr/src/main/config/solr.properties ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/config/solr.properties b/metron-platform/metron-solr/src/main/config/solr.properties index 8489d8e..d8dd25f 100644 --- a/metron-platform/metron-solr/src/main/config/solr.properties +++ b/metron-platform/metron-solr/src/main/config/solr.properties @@ -26,7 +26,7 @@ kafka.start=WHERE_I_LEFT_OFF ##### Indexing ##### index.input.topic=indexing -index.error.topic=indexing_error +index.error.topic=indexing writer.class.name=org.apache.metron.solr.writer.SolrWriter ##### Metrics ##### http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java index abf3a8a..c209ef3 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java @@ -18,14 +18,17 @@ package org.apache.metron.solr.integration; import com.google.common.base.Function; -import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.Configurations; import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.interfaces.FieldNameConverter; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.enrichment.integration.utils.SampleUtil; import org.apache.metron.indexing.integration.IndexingIntegrationTest; -import org.apache.metron.integration.*; +import org.apache.metron.integration.ComponentRunner; +import org.apache.metron.integration.InMemoryComponent; +import org.apache.metron.integration.Processor; +import org.apache.metron.integration.ProcessorResult; +import org.apache.metron.integration.ReadinessState; import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.components.ZKServerComponent; import org.apache.metron.solr.integration.components.SolrComponent; @@ -89,7 +92,7 @@ public class SolrIndexingIntegrationTest extends IndexingIntegrationTest { throw new IllegalStateException("Unable to retrieve indexed documents.", e); } if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) { - errors = kafkaComponent.readMessages(Constants.INDEXING_ERROR_TOPIC); + errors = kafkaComponent.readMessages(ERROR_TOPIC); if(errors.size() > 0){ return ReadinessState.READY; } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/error/MetronErrorJSONMatcher.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/error/MetronErrorJSONMatcher.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/error/MetronErrorJSONMatcher.java new file mode 100644 index 0000000..ad24283 --- /dev/null +++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/error/MetronErrorJSONMatcher.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.test.error; + +import org.apache.storm.tuple.Values; +import org.json.simple.JSONObject; +import org.mockito.ArgumentMatcher; + +public class MetronErrorJSONMatcher extends ArgumentMatcher<Values> { + + private JSONObject expected; + + public MetronErrorJSONMatcher(JSONObject expected) { + this.expected = expected; + } + + @Override + public boolean matches(Object o) { + Values values = (Values) o; + JSONObject actual = (JSONObject) values.get(0); + actual.remove("timestamp"); + expected.remove("timestamp"); + actual.remove("stack"); + expected.remove("stack"); + return actual.equals(expected); + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index 124ffd3..0a9e514 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -18,18 +18,26 @@ package org.apache.metron.writer; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.tuple.Tuple; import com.google.common.collect.Iterables; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.message.MessageGetStrategy; +import org.apache.metron.common.utils.ErrorUtils; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; -import org.apache.metron.common.utils.ErrorUtils; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; public class BulkWriterComponent<MESSAGE_T> { public static final Logger LOG = LoggerFactory @@ -60,18 +68,23 @@ public class BulkWriterComponent<MESSAGE_T> { commit(response.getSuccesses()); } - public void error(Throwable e, Iterable<Tuple> tuples) { + public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) { tuples.forEach(t -> collector.ack(t)); + MetronError error = new MetronError() + .withSensorType(sensorType) + .withErrorType(Constants.ErrorType.INDEXING_ERROR) + .withThrowable(e); if(!Iterables.isEmpty(tuples)) { LOG.error("Failing " + Iterables.size(tuples) + " tuples", e); - ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM); } + tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t))); + ErrorUtils.handleError(collector, error); } - public void error(BulkWriterResponse errors) { + public void error(String sensorType, BulkWriterResponse errors, MessageGetStrategy messageGetStrategy) { Map<Throwable, Collection<Tuple>> errorMap = errors.getErrors(); for(Map.Entry<Throwable, Collection<Tuple>> entry : errorMap.entrySet()) { - error(entry.getKey(), entry.getValue()); + error(sensorType, entry.getKey(), entry.getValue(), messageGetStrategy); } } @@ -80,24 +93,25 @@ public class BulkWriterComponent<MESSAGE_T> { } - public void errorAll(Throwable e) { - for(Map.Entry<String, Collection<Tuple>> kv : sensorTupleMap.entrySet()) { - error(e, kv.getValue()); - sensorTupleMap.remove(kv.getKey()); - sensorMessageMap.remove(kv.getKey()); + public void errorAll(Throwable e, MessageGetStrategy messageGetStrategy) { + for(String key : new HashSet<>(sensorTupleMap.keySet())) { + errorAll(key, e, messageGetStrategy); } } - public void errorAll(String sensorType, Throwable e) { - error(e, Optional.ofNullable(sensorTupleMap.get(sensorType)).orElse(new ArrayList<>())); + public void errorAll(String sensorType, Throwable e, MessageGetStrategy messageGetStrategy) { + Collection<Tuple> tuples = Optional.ofNullable(sensorTupleMap.get(sensorType)).orElse(new ArrayList<>()); + error(sensorType, e, tuples, messageGetStrategy); sensorTupleMap.remove(sensorType); sensorMessageMap.remove(sensorType); } + public void write( String sensorType , Tuple tuple , MESSAGE_T message , BulkMessageWriter<MESSAGE_T> bulkMessageWriter , WriterConfiguration configurations + , MessageGetStrategy messageGetStrategy ) throws Exception { if(!configurations.isEnabled(sensorType)) { @@ -129,13 +143,13 @@ public class BulkWriterComponent<MESSAGE_T> { } if(handleError) { - error(response); + error(sensorType, response, messageGetStrategy); } else if (response.hasErrors()) { throw new IllegalStateException("Unhandled bulk errors in response: " + response.getErrors()); } } catch (Throwable e) { if(handleError) { - error(e, tupleList); + error(sensorType, e, tupleList, messageGetStrategy); } else { throw e; http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java index 66c4c73..085ca5c 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java @@ -17,27 +17,27 @@ */ package org.apache.metron.writer.bolt; -import org.apache.metron.common.bolt.ConfiguredIndexingBolt; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; import org.apache.metron.common.Constants; +import org.apache.metron.common.bolt.ConfiguredIndexingBolt; import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.writer.MessageWriter; +import org.apache.metron.common.message.MessageGetStrategy; +import org.apache.metron.common.message.MessageGetters; import org.apache.metron.common.utils.MessageUtils; import org.apache.metron.common.writer.BulkMessageWriter; +import org.apache.metron.common.writer.MessageWriter; import org.apache.metron.writer.BulkWriterComponent; import org.apache.metron.writer.WriterToBulkWriter; -import org.apache.metron.writer.message.MessageGetter; -import org.apache.metron.writer.message.MessageGetters; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.Map; import java.util.function.Function; public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { @@ -46,8 +46,9 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { .getLogger(BulkMessageWriterBolt.class); private BulkMessageWriter<JSONObject> bulkMessageWriter; private BulkWriterComponent<JSONObject> writerComponent; - private String messageGetterStr = MessageGetters.NAMED.name(); - private transient MessageGetter messageGetter = null; + private String messageGetStrategyType = MessageGetters.DEFAULT_JSON_FROM_FIELD.name(); + private String messageGetField; + private transient MessageGetStrategy messageGetStrategy; private transient OutputCollector collector; private transient Function<WriterConfiguration, WriterConfiguration> configurationTransformation; public BulkMessageWriterBolt(String zookeeperUrl) { @@ -64,8 +65,13 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { return this; } - public BulkMessageWriterBolt withMessageGetter(String messageGetter) { - this.messageGetterStr = messageGetter; + public BulkMessageWriterBolt withMessageGetter(String messageGetStrategyType) { + this.messageGetStrategyType = messageGetStrategyType; + return this; + } + + public BulkMessageWriterBolt withMessageGetterField(String messageGetField) { + this.messageGetField = messageGetField; return this; } @@ -74,7 +80,11 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { this.writerComponent = new BulkWriterComponent<>(collector); this.collector = collector; super.prepare(stormConf, context, collector); - messageGetter = MessageGetters.valueOf(messageGetterStr); + if (messageGetField != null) { + messageGetStrategy = MessageGetters.valueOf(messageGetStrategyType).get(messageGetField); + } else { + messageGetStrategy = MessageGetters.valueOf(messageGetStrategyType).get(); + } if(bulkMessageWriter instanceof WriterToBulkWriter) { configurationTransformation = WriterToBulkWriter.TRANSFORMATION; } @@ -93,7 +103,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { - JSONObject message = messageGetter.getMessage(tuple); + JSONObject message = (JSONObject) messageGetStrategy.get(tuple); String sensorType = MessageUtils.getSensorType(message); try { @@ -107,6 +117,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt { , message , bulkMessageWriter , writerConfiguration + , messageGetStrategy ); LOG.trace("Writing enrichment message: {}", message); } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java deleted file mode 100644 index 99c825f..0000000 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetter.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.writer.message; - -import org.apache.storm.tuple.Tuple; -import org.json.simple.JSONObject; - -public interface MessageGetter { - JSONObject getMessage(Tuple t); -}