This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1211f07ed9 Allow committing empty segment when some messages are 
fetched but all filtered out (#9089)
1211f07ed9 is described below

commit 1211f07ed9602e130da0e83368fcc465519f0fcc
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jul 22 21:30:58 2022 -0700

    Allow committing empty segment when some messages are fetched but all 
filtered out (#9089)
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 10 ++++++----
 .../realtime/LLRealtimeSegmentDataManagerTest.java | 23 ++++++++++------------
 2 files changed, 16 insertions(+), 17 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fdfe31377e..bc0e6a5932 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -243,6 +243,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
   // Segment end criteria
   private volatile long _consumeEndTime = 0;
+  private volatile boolean _hasMessagesFetched = false;
   private volatile boolean _endOfPartitionGroup = false;
   private volatile boolean _forceCommitMessageReceived = false;
   private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch 
up to this one
@@ -302,7 +303,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
         //   - partition group is ended
         //   - force commit message has been received
         if (now >= _consumeEndTime) {
-          if (_realtimeSegment.getNumDocsIndexed() == 0) {
+          if (!_hasMessagesFetched) {
             _segmentLogger.info("No events came in, extending time by {} 
hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
             _consumeEndTime += 
TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
             return false;
@@ -573,12 +574,13 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       streamMessageCount++;
     }
     updateCurrentDocumentCountMetrics();
-    if (streamMessageCount != 0) {
-      if (_segmentLogger.isDebugEnabled()) {
+    if (messagesAndOffsets.getUnfilteredMessageCount() > 0) {
+      _hasMessagesFetched = true;
+      if (streamMessageCount > 0 && _segmentLogger.isDebugEnabled()) {
         _segmentLogger.debug("Indexed {} messages ({} messages read from 
stream) current offset {}",
             indexedMessageCount, streamMessageCount, _currentOffset);
       }
-    } else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) {
+    } else {
       if (_segmentLogger.isDebugEnabled()) {
         _segmentLogger.debug("empty batch received - sleeping for {}ms", 
idlePipeSleepTimeMillis);
       }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 1e8d70d331..50cf2a842f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
@@ -41,7 +42,6 @@ import 
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.segment.local.segment.creator.Fixtures;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -512,12 +512,12 @@ public class LLRealtimeSegmentDataManagerTest {
       FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
       segmentDataManager._state.set(segmentDataManager, 
LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
       Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
+      // We should still get false because there is no messages fetched
       _timeNow += Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS + 1;
-      // We should still get false, since the number of records in the 
realtime segment is 0
       Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
-      replaceRealtimeSegment(segmentDataManager, 10);
-      // Now we can test when we are far ahead in time
-      _timeNow += Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS;
+      // Once there are messages fetched, and the time exceeds the extended 
hour, we should get true
+      setHasMessagesFetched(segmentDataManager, true);
+      _timeNow += TimeUnit.HOURS.toMillis(1);
       Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
       Assert.assertEquals(segmentDataManager.getStopReason(), 
SegmentCompletionProtocol.REASON_TIME_LIMIT);
       segmentDataManager.destroy();
@@ -579,14 +579,11 @@ public class LLRealtimeSegmentDataManagerTest {
     }
   }
 
-  // Replace the realtime segment with a mock that returns numDocs for raw doc 
count.
-  private void replaceRealtimeSegment(FakeLLRealtimeSegmentDataManager 
segmentDataManager, int numDocs)
+  private void setHasMessagesFetched(FakeLLRealtimeSegmentDataManager 
segmentDataManager, boolean hasMessagesFetched)
       throws Exception {
-    MutableSegmentImpl mockSegmentImpl = mock(MutableSegmentImpl.class);
-    when(mockSegmentImpl.getNumDocsIndexed()).thenReturn(numDocs);
-    Field segmentImpl = 
LLRealtimeSegmentDataManager.class.getDeclaredField("_realtimeSegment");
-    segmentImpl.setAccessible(true);
-    segmentImpl.set(segmentDataManager, mockSegmentImpl);
+    Field field = 
LLRealtimeSegmentDataManager.class.getDeclaredField("_hasMessagesFetched");
+    field.setAccessible(true);
+    field.set(segmentDataManager, hasMessagesFetched);
   }
 
   // If commit fails, make sure that we do not re-build the segment when we 
try to commit again.
@@ -743,7 +740,7 @@ public class LLRealtimeSegmentDataManagerTest {
     public Field _state;
     public Field _shouldStop;
     public Field _stopReason;
-    private Field _streamMsgOffsetFactory;
+    private final Field _streamMsgOffsetFactory;
     public LinkedList<LongMsgOffset> _consumeOffsets = new LinkedList<>();
     public LinkedList<SegmentCompletionProtocol.Response> _responses = new 
LinkedList<>();
     public boolean _commitSegmentCalled = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to