This is an automated email from the ASF dual-hosted git repository.
rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new d275a95 METRON-1998 Only one sensor is flushed by tick tuple
(merrimanr) closes apache/metron#1335
d275a95 is described below
commit d275a958cb5b7bbab533d273dff80bc1e3e65fd0
Author: merrimanr <[email protected]>
AuthorDate: Wed Feb 20 16:11:50 2019 -0600
METRON-1998 Only one sensor is flushed by tick tuple (merrimanr) closes
apache/metron#1335
---
.../apache/metron/writer/BulkWriterComponent.java | 13 +++-
.../metron/writer/BulkWriterComponentTest.java | 78 ++++++++++++++++++++++
2 files changed, 88 insertions(+), 3 deletions(-)
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 15e59d3..ad6d4d1 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -31,6 +31,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections.CollectionUtils;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.error.MetronError;
@@ -308,7 +310,7 @@ public class BulkWriterComponent<MESSAGE_T> {
}
long endTime = System.currentTimeMillis();
long elapsed = endTime - startTime;
- LOG.debug("Bulk batch for sensor {} completed in ~{} ns", sensorType,
elapsed);
+ LOG.debug("Flushed batch successfully; sensorType={}, batchSize={},
took={} ms", sensorType, CollectionUtils.size(tupleList), elapsed);
}
// Flushes all queues older than their batchTimeouts.
@@ -320,17 +322,22 @@ public class BulkWriterComponent<MESSAGE_T> {
{
// No need to do "all" sensorTypes here, just the ones that have data
batched up.
// Note queues with batchSize == 1 don't get batched, so they never
persist in the sensorTupleMap.
- for (String sensorType : sensorTupleMap.keySet()) {
+ // Sensors are removed from the sensorTupleMap when flushed so we need to
iterate over a copy of sensorTupleMap keys
+ // to avoid a ConcurrentModificationException.
+ for (String sensorType : new HashSet<>(sensorTupleMap.keySet())) {
long[] batchTimeoutInfo = batchTimeoutMap.get(sensorType);
if (batchTimeoutInfo == null //Shouldn't happen, but conservatively
flush if so
|| clock.currentTimeMillis() - batchTimeoutInfo[LAST_CREATE_TIME_MS]
>= batchTimeoutInfo[TIMEOUT_MS]) {
flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy
, sensorTupleMap.get(sensorType),
sensorMessageMap.get(sensorType));
- return;
}
}
}
+ public int getDefaultBatchTimeout() {
+ return defaultBatchTimeout;
+ }
+
/**
* @param defaultBatchTimeout
*/
diff --git
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index 1a05ba4..754a650 100644
---
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -17,10 +17,13 @@
*/
package org.apache.metron.writer;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -38,6 +41,7 @@ import
org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.error.MetronError;
import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.system.Clock;
import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterResponse;
@@ -270,4 +274,78 @@ public class BulkWriterComponentTest {
verify(collector, times(1)).ack(missingTuple);
verifyNoMoreInteractions(collector);
}
+
+ @Test
+ public void flushTimeoutsShouldFlushAllMessagesAfterDefaultTimeout() throws
Exception {
+ Clock clock = mock(Clock.class);
+ BulkMessageWriter<JSONObject> bulkMessageWriter =
mock(BulkMessageWriter.class);
+ BulkWriterComponent<JSONObject> bulkWriterComponent = new
BulkWriterComponent<>(collector, false, false).withClock(clock);
+ assertEquals(6, bulkWriterComponent.getDefaultBatchTimeout());
+
+ BulkWriterResponse response1 = new BulkWriterResponse();
+ response1.addSuccess(tuple1);
+ when(bulkMessageWriter.write("sensor1", configurations,
Collections.singletonList(tuple1),
Collections.singletonList(message1))).thenReturn(response1);
+ BulkWriterResponse response2 = new BulkWriterResponse();
+ response1.addSuccess(tuple2);
+ when(bulkMessageWriter.write("sensor2", configurations,
Collections.singletonList(tuple2),
Collections.singletonList(message2))).thenReturn(response2);
+
+ when(clock.currentTimeMillis()).thenReturn(0L);
+ bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter,
configurations, messageGetStrategy);
+ bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter,
configurations, messageGetStrategy);
+
+ when(clock.currentTimeMillis()).thenReturn(1000L);
+ bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations,
messageGetStrategy);
+ when(clock.currentTimeMillis()).thenReturn(2000L);
+ bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations,
messageGetStrategy);
+ when(clock.currentTimeMillis()).thenReturn(3000L);
+ bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations,
messageGetStrategy);
+ when(clock.currentTimeMillis()).thenReturn(4000L);
+ bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations,
messageGetStrategy);
+ when(clock.currentTimeMillis()).thenReturn(5000L);
+ bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations,
messageGetStrategy);
+ when(clock.currentTimeMillis()).thenReturn(5999L);
+ bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations,
messageGetStrategy);
+ when(clock.currentTimeMillis()).thenReturn(6000L); // triggers timeout
+ bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations,
messageGetStrategy);
+
+ verify(bulkMessageWriter, times(1)).write("sensor1", configurations,
Collections.singletonList(tuple1), Collections.singletonList(message1));
+ verify(bulkMessageWriter, times(1)).write("sensor2", configurations,
Collections.singletonList(tuple2), Collections.singletonList(message2));
+
+ when(clock.currentTimeMillis()).thenReturn(6001L); // no trigger
+ bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations,
messageGetStrategy);
+
+ verifyNoMoreInteractions(bulkMessageWriter);
+ }
+
+ @Test
+ public void flushTimeoutsShouldFlushAllMessagesAfterConfiguredTimeout()
throws Exception {
+ Clock clock = mock(Clock.class);
+ BulkMessageWriter<JSONObject> bulkMessageWriter =
mock(BulkMessageWriter.class);
+ BulkWriterComponent<JSONObject> bulkWriterComponent = new
BulkWriterComponent<>(collector, false, false).withClock(clock);
+ bulkWriterComponent.setDefaultBatchTimeout(1);
+
+ BulkWriterResponse response1 = new BulkWriterResponse();
+ response1.addSuccess(tuple1);
+ when(bulkMessageWriter.write("sensor1", configurations,
Collections.singletonList(tuple1),
Collections.singletonList(message1))).thenReturn(response1);
+ BulkWriterResponse response2 = new BulkWriterResponse();
+ response1.addSuccess(tuple2);
+ when(bulkMessageWriter.write("sensor2", configurations,
Collections.singletonList(tuple2),
Collections.singletonList(message2))).thenReturn(response2);
+
+ when(clock.currentTimeMillis()).thenReturn(0L);
+ bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter,
configurations, messageGetStrategy);
+ bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter,
configurations, messageGetStrategy);
+
+ when(clock.currentTimeMillis()).thenReturn(999L);
+ bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations,
messageGetStrategy);
+ when(clock.currentTimeMillis()).thenReturn(1000L); // triggers timeout
+ bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations,
messageGetStrategy);
+
+ verify(bulkMessageWriter, times(1)).write("sensor1", configurations,
Collections.singletonList(tuple1), Collections.singletonList(message1));
+ verify(bulkMessageWriter, times(1)).write("sensor2", configurations,
Collections.singletonList(tuple2), Collections.singletonList(message2));
+
+ when(clock.currentTimeMillis()).thenReturn(1001L); // no trigger
+ bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations,
messageGetStrategy);
+
+ verifyNoMoreInteractions(bulkMessageWriter);
+ }
}