This is an automated email from the ASF dual-hosted git repository.

rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new d275a95  METRON-1998 Only one sensor is flushed by tick tuple 
(merrimanr) closes apache/metron#1335
d275a95 is described below

commit d275a958cb5b7bbab533d273dff80bc1e3e65fd0
Author: merrimanr <[email protected]>
AuthorDate: Wed Feb 20 16:11:50 2019 -0600

    METRON-1998 Only one sensor is flushed by tick tuple (merrimanr) closes 
apache/metron#1335
---
 .../apache/metron/writer/BulkWriterComponent.java  | 13 +++-
 .../metron/writer/BulkWriterComponentTest.java     | 78 ++++++++++++++++++++++
 2 files changed, 88 insertions(+), 3 deletions(-)

diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 15e59d3..ad6d4d1 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -31,6 +31,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
@@ -308,7 +310,7 @@ public class BulkWriterComponent<MESSAGE_T> {
     }
     long endTime = System.currentTimeMillis();
     long elapsed = endTime - startTime;
-    LOG.debug("Bulk batch for sensor {} completed in ~{} ns", sensorType, 
elapsed);
+    LOG.debug("Flushed batch successfully; sensorType={}, batchSize={}, 
took={} ms", sensorType, CollectionUtils.size(tupleList), elapsed);
   }
 
   // Flushes all queues older than their batchTimeouts.
@@ -320,17 +322,22 @@ public class BulkWriterComponent<MESSAGE_T> {
   {
     // No need to do "all" sensorTypes here, just the ones that have data 
batched up.
     // Note queues with batchSize == 1 don't get batched, so they never 
persist in the sensorTupleMap.
-    for (String sensorType : sensorTupleMap.keySet()) {
+    // Sensors are removed from the sensorTupleMap when flushed so we need to 
iterate over a copy of sensorTupleMap keys
+    // to avoid a ConcurrentModificationException.
+    for (String sensorType : new HashSet<>(sensorTupleMap.keySet())) {
       long[] batchTimeoutInfo = batchTimeoutMap.get(sensorType);
       if (batchTimeoutInfo == null  //Shouldn't happen, but conservatively 
flush if so
           || clock.currentTimeMillis() - batchTimeoutInfo[LAST_CREATE_TIME_MS] 
>= batchTimeoutInfo[TIMEOUT_MS]) {
         flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy
                    , sensorTupleMap.get(sensorType), 
sensorMessageMap.get(sensorType));
-        return;
       }
     }
   }
 
+  public int getDefaultBatchTimeout() {
+    return defaultBatchTimeout;
+  }
+
   /**
    * @param defaultBatchTimeout
    */
diff --git 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index 1a05ba4..754a650 100644
--- 
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++ 
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -17,10 +17,13 @@
  */
 package org.apache.metron.writer;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -38,6 +41,7 @@ import 
org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.system.Clock;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
@@ -270,4 +274,78 @@ public class BulkWriterComponentTest {
     verify(collector, times(1)).ack(missingTuple);
     verifyNoMoreInteractions(collector);
   }
+
+  @Test
+  public void flushTimeoutsShouldFlushAllMessagesAfterDefaultTimeout() throws 
Exception {
+    Clock clock = mock(Clock.class);
+    BulkMessageWriter<JSONObject> bulkMessageWriter = 
mock(BulkMessageWriter.class);
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new 
BulkWriterComponent<>(collector, false, false).withClock(clock);
+    assertEquals(6, bulkWriterComponent.getDefaultBatchTimeout());
+
+    BulkWriterResponse response1 = new BulkWriterResponse();
+    response1.addSuccess(tuple1);
+    when(bulkMessageWriter.write("sensor1", configurations, 
Collections.singletonList(tuple1), 
Collections.singletonList(message1))).thenReturn(response1);
+    BulkWriterResponse response2 = new BulkWriterResponse();
+    response1.addSuccess(tuple2);
+    when(bulkMessageWriter.write("sensor2", configurations, 
Collections.singletonList(tuple2), 
Collections.singletonList(message2))).thenReturn(response2);
+
+    when(clock.currentTimeMillis()).thenReturn(0L);
+    bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, 
configurations, messageGetStrategy);
+    bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter, 
configurations, messageGetStrategy);
+
+    when(clock.currentTimeMillis()).thenReturn(1000L);
+    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, 
messageGetStrategy);
+    when(clock.currentTimeMillis()).thenReturn(2000L);
+    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, 
messageGetStrategy);
+    when(clock.currentTimeMillis()).thenReturn(3000L);
+    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, 
messageGetStrategy);
+    when(clock.currentTimeMillis()).thenReturn(4000L);
+    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, 
messageGetStrategy);
+    when(clock.currentTimeMillis()).thenReturn(5000L);
+    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, 
messageGetStrategy);
+    when(clock.currentTimeMillis()).thenReturn(5999L);
+    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, 
messageGetStrategy);
+    when(clock.currentTimeMillis()).thenReturn(6000L); // triggers timeout
+    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, 
messageGetStrategy);
+
+    verify(bulkMessageWriter, times(1)).write("sensor1", configurations, 
Collections.singletonList(tuple1), Collections.singletonList(message1));
+    verify(bulkMessageWriter, times(1)).write("sensor2", configurations, 
Collections.singletonList(tuple2), Collections.singletonList(message2));
+
+    when(clock.currentTimeMillis()).thenReturn(6001L); // no trigger
+    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, 
messageGetStrategy);
+
+    verifyNoMoreInteractions(bulkMessageWriter);
+  }
+
+  @Test
+  public void flushTimeoutsShouldFlushAllMessagesAfterConfiguredTimeout() 
throws Exception {
+    Clock clock = mock(Clock.class);
+    BulkMessageWriter<JSONObject> bulkMessageWriter = 
mock(BulkMessageWriter.class);
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new 
BulkWriterComponent<>(collector, false, false).withClock(clock);
+    bulkWriterComponent.setDefaultBatchTimeout(1);
+
+    BulkWriterResponse response1 = new BulkWriterResponse();
+    response1.addSuccess(tuple1);
+    when(bulkMessageWriter.write("sensor1", configurations, 
Collections.singletonList(tuple1), 
Collections.singletonList(message1))).thenReturn(response1);
+    BulkWriterResponse response2 = new BulkWriterResponse();
+    response1.addSuccess(tuple2);
+    when(bulkMessageWriter.write("sensor2", configurations, 
Collections.singletonList(tuple2), 
Collections.singletonList(message2))).thenReturn(response2);
+
+    when(clock.currentTimeMillis()).thenReturn(0L);
+    bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, 
configurations, messageGetStrategy);
+    bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter, 
configurations, messageGetStrategy);
+
+    when(clock.currentTimeMillis()).thenReturn(999L);
+    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, 
messageGetStrategy);
+    when(clock.currentTimeMillis()).thenReturn(1000L); // triggers timeout
+    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, 
messageGetStrategy);
+
+    verify(bulkMessageWriter, times(1)).write("sensor1", configurations, 
Collections.singletonList(tuple1), Collections.singletonList(message1));
+    verify(bulkMessageWriter, times(1)).write("sensor2", configurations, 
Collections.singletonList(tuple2), Collections.singletonList(message2));
+
+    when(clock.currentTimeMillis()).thenReturn(1001L); // no trigger
+    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, 
messageGetStrategy);
+
+    verifyNoMoreInteractions(bulkMessageWriter);
+  }
 }

Reply via email to