Repository: metron Updated Branches: refs/heads/master cf7043c59 -> 3d6b049b8
METRON-1044 Disabled writers are not acking messages (merrimanr) closes apache/metron#654 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3d6b049b Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3d6b049b Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3d6b049b Branch: refs/heads/master Commit: 3d6b049b8799235b580a275024c18f6092fc8942 Parents: cf7043c Author: merrimanr <[email protected]> Authored: Thu Jul 20 15:42:55 2017 -0500 Committer: merrimanr <[email protected]> Committed: Thu Jul 20 15:42:55 2017 -0500 ---------------------------------------------------------------------- .../java/org/apache/metron/writer/BulkWriterComponent.java | 1 + .../org/apache/metron/writer/BulkWriterComponentTest.java | 9 +++++++++ 2 files changed, 10 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/3d6b049b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java index 0a9e514..54a758b 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java @@ -115,6 +115,7 @@ public class BulkWriterComponent<MESSAGE_T> { ) throws Exception { if(!configurations.isEnabled(sensorType)) { + collector.ack(tuple); return; } int batchSize = configurations.getBatchSize(sensorType); http://git-wip-us.apache.org/repos/asf/metron/blob/3d6b049b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java index c560b30..0264b3d 100644 --- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java @@ -44,6 +44,7 @@ import java.util.List; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -113,6 +114,14 @@ public class BulkWriterComponentTest { verify(collector, times(1)).ack(tuple1); verify(collector, times(1)).ack(tuple2); + + // A disabled writer should still ack + Tuple disabledTuple = mock(Tuple.class); + String disabledSensorType = "disabled"; + when(configurations.isEnabled(disabledSensorType)).thenReturn(false); + bulkWriterComponent.write(disabledSensorType, disabledTuple, message2, bulkMessageWriter, configurations, messageGetStrategy); + verify(collector, times(1)).ack(disabledTuple); + verifyStatic(times(0)); ErrorUtils.handleError(eq(collector), any(MetronError.class)); }
