METRON-1829 Large Error Message Causes Slow Search Performance (merrimanr) closes apache/metron#1239
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d44a3925 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d44a3925 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d44a3925 Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: d44a392567e35022bfb35a787b55aff3447ac60e Parents: 2531c3e Author: merrimanr <[email protected]> Authored: Mon Oct 22 08:43:10 2018 -0500 Committer: rmerriman <[email protected]> Committed: Mon Oct 22 08:43:10 2018 -0500 ---------------------------------------------------------------------- .../bolt/BulkMessageWriterBoltTest.java | 2 +- metron-platform/metron-writer/pom.xml | 6 ++ .../metron/writer/BulkWriterComponent.java | 33 ++++++----- .../writer/bolt/BulkMessageWriterBolt.java | 2 +- .../metron/writer/BulkWriterComponentTest.java | 61 +++++++++++++++----- 5 files changed, 76 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java index 588fc58..083628c 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java @@ -207,7 +207,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { } UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.ERROR); verify(outputCollector, times(5)).ack(tuple); - verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class)); + verify(outputCollector, times(5)).emit(eq(Constants.ERROR_STREAM), any(Values.class)); verify(outputCollector, times(1)).reportError(any(Throwable.class)); } http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/pom.xml b/metron-platform/metron-writer/pom.xml index e845516..6d08093 100644 --- a/metron-platform/metron-writer/pom.xml +++ b/metron-platform/metron-writer/pom.xml @@ -207,6 +207,12 @@ <artifactId>metron-common</artifactId> <version>${project.parent.version}</version> </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-test-utilities</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index 7678584..68585c5 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -40,6 +40,7 @@ import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; import org.apache.storm.task.OutputCollector; import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,12 +119,18 @@ public class BulkWriterComponent<MESSAGE_T> { public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) { LOG.error(format("Failing %d tuple(s); sensorType=%s", Iterables.size(tuples), sensorType), e); - MetronError error = new MetronError() - .withSensorType(Collections.singleton(sensorType)) - .withErrorType(Constants.ErrorType.INDEXING_ERROR) - .withThrowable(e); - tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t))); - handleError(tuples, error); + tuples.forEach(t -> { + MetronError error = new MetronError() + .withSensorType(Collections.singleton(sensorType)) + .withErrorType(Constants.ErrorType.INDEXING_ERROR) + .withThrowable(e) + .addRawMessage(messageGetStrategy.get(t)); + collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject())); + collector.ack(t); + }); + // there is only one error to report for all of the failed tuples + collector.reportError(e); + } /** @@ -133,24 +140,24 @@ public class BulkWriterComponent<MESSAGE_T> { * <p>Without a valid message, the JSON message cannot be added to the error. * * @param e The exception that occurred. - * @param tuples The tuples to error that may not contain valid messages. + * @param tuple The tuple to error that may not contain a valid message. */ - public void error(Throwable e, Iterable<Tuple> tuples) { - LOG.error(format("Failing %d tuple(s)", Iterables.size(tuples)), e); + public void error(Throwable e, Tuple tuple) { + LOG.error("Failing tuple", e); MetronError error = new MetronError() .withErrorType(Constants.ErrorType.INDEXING_ERROR) .withThrowable(e); - handleError(tuples, error); + handleError(tuple, error); } /** * Errors a set of tuples. * - * @param tuples The tuples to error. + * @param tuple The tuple to error. * @param error */ - private void handleError(Iterable<Tuple> tuples, MetronError error) { - tuples.forEach(t -> collector.ack(t)); + private void handleError(Tuple tuple, MetronError error) { + collector.ack(tuple); ErrorUtils.handleError(collector, error); } http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java index 4bb3888..590ab8c 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java @@ -311,7 +311,7 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf LOG.debug("Unable to extract message from tuple; expected valid JSON"); getWriterComponent().error( new Exception("Unable to extract message from tuple; expected valid JSON"), - ImmutableList.of(tuple) + tuple ); } http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java index c389854..decf3a5 100644 --- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java @@ -18,10 +18,12 @@ package org.apache.metron.writer; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.powermock.api.mockito.PowerMockito.verifyStatic; @@ -36,8 +38,10 @@ import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.utils.ErrorUtils; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; +import org.apache.metron.test.error.MetronErrorJSONMatcher; import org.apache.storm.task.OutputCollector; import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.json.simple.JSONObject; import org.junit.Before; import org.junit.Rule; @@ -127,9 +131,12 @@ public class BulkWriterComponentTest { @Test public void writeShouldProperlyHandleWriterErrors() throws Exception { Throwable e = new Exception("test exception"); - MetronError error = new MetronError() + MetronError expectedError1 = new MetronError() .withSensorType(Collections.singleton(sensorType)) - .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2)); + .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1)); + MetronError expectedError2 = new MetronError() + .withSensorType(Collections.singleton(sensorType)) + .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2)); BulkWriterResponse response = new BulkWriterResponse(); response.addAllErrors(e, tupleList); @@ -139,8 +146,14 @@ public class BulkWriterComponentTest { bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy); bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy); - verifyStatic(times(1)); - ErrorUtils.handleError(collector, error); + verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), + new Values(argThat(new MetronErrorJSONMatcher(expectedError1.getJSONObject())))); + verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), + new Values(argThat(new MetronErrorJSONMatcher(expectedError2.getJSONObject())))); + verify(collector, times(1)).ack(tuple1); + verify(collector, times(1)).ack(tuple2); + verify(collector, times(1)).reportError(e); + verifyNoMoreInteractions(collector); } @Test @@ -161,9 +174,16 @@ public class BulkWriterComponentTest { @Test public void writeShouldProperlyHandleWriterException() throws Exception { Throwable e = new Exception("test exception"); - MetronError error = new MetronError() + MetronError expectedError1 = new MetronError() + .withSensorType(Collections.singleton(sensorType)) + .withErrorType(Constants.ErrorType.INDEXING_ERROR) + .withThrowable(e) + .withRawMessages(Collections.singletonList(message1)); + MetronError expectedError2 = new MetronError() .withSensorType(Collections.singleton(sensorType)) - .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2)); + .withErrorType(Constants.ErrorType.INDEXING_ERROR) + .withThrowable(e) + .withRawMessages(Collections.singletonList(message2)); BulkWriterResponse response = new BulkWriterResponse(); response.addAllErrors(e, tupleList); @@ -173,8 +193,14 @@ public class BulkWriterComponentTest { bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy); bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy); - verifyStatic(times(1)); - ErrorUtils.handleError(collector, error); + verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), + new Values(argThat(new MetronErrorJSONMatcher(expectedError1.getJSONObject())))); + verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), + new Values(argThat(new MetronErrorJSONMatcher(expectedError2.getJSONObject())))); + verify(collector, times(1)).ack(tuple1); + verify(collector, times(1)).ack(tuple2); + verify(collector, times(1)).reportError(e); + verifyNoMoreInteractions(collector); } @Test @@ -182,19 +208,28 @@ public class BulkWriterComponentTest { Throwable e = new Exception("test exception"); MetronError error1 = new MetronError() .withSensorType(Collections.singleton("sensor1")) - .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1)); + .withErrorType(Constants.ErrorType.INDEXING_ERROR) + .withThrowable(e) + .withRawMessages(Collections.singletonList(message1)); MetronError error2 = new MetronError() .withSensorType(Collections.singleton("sensor2")) - .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2)); + .withErrorType(Constants.ErrorType.INDEXING_ERROR) + .withThrowable(e) + .withRawMessages(Collections.singletonList(message2)); BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector); bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy); bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy); bulkWriterComponent.errorAll(e, messageGetStrategy); - verifyStatic(times(1)); - ErrorUtils.handleError(collector, error1); - ErrorUtils.handleError(collector, error2); + verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), + new Values(argThat(new MetronErrorJSONMatcher(error1.getJSONObject())))); + verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), + new Values(argThat(new MetronErrorJSONMatcher(error2.getJSONObject())))); + verify(collector, times(1)).ack(tuple1); + verify(collector, times(1)).ack(tuple2); + verify(collector, times(2)).reportError(e); + verifyNoMoreInteractions(collector); bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy); verify(bulkMessageWriter, times(0)).write(sensorType, configurations, Collections.singletonList(tuple1), Collections.singletonList(message1));
