This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 40ac22da06 wait for full segment commit protocol on force commit 
(#10479)
40ac22da06 is described below

commit 40ac22da06b3f2f7cfc3eb90a0b00afad08e8be8
Author: Johan Adami <[email protected]>
AuthorDate: Wed Mar 29 18:58:02 2023 -0400

    wait for full segment commit protocol on force commit (#10479)
---
 .../core/realtime/SegmentCompletionManager.java    |  3 +-
 .../helix/core/realtime/SegmentCompletionTest.java | 75 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 2 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 28ad064601..7f5f7185a9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -1170,8 +1170,7 @@ public class SegmentCompletionManager {
      */
     private boolean isWinnerPicked(String preferredInstance, long now, final 
String stopReason) {
       if ((SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(stopReason)
-          || 
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason)
-          || 
SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED.equals(stopReason))
+          || 
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason))
           && _commitStateMap.size() == 1) {
         _winner = preferredInstance;
         _winningOffset = _commitStateMap.get(preferredInstance);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 2ada570ac8..4d85223b41 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -659,6 +659,81 @@ public class SegmentCompletionTest {
     Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
   }
 
+  @Test
+  public void testWinnerOnForceCommit()
+      throws Exception {
+    SegmentCompletionProtocol.Response response;
+    Request.Params params;
+    // S1 comes to force commit
+    _segmentCompletionMgr._seconds = 10L;
+    params = new 
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
+        
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+    response = _segmentCompletionMgr.segmentConsumed(params);
+    // Still need to wait since we haven't hit time limit or heard from all 
servers
+    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
+
+    // S2 comes with a higher offset 1 second later
+    _segmentCompletionMgr._seconds += 1;
+    params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
+        
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+    response = _segmentCompletionMgr.segmentConsumed(params);
+    // Still need to wait since we haven't hit time limit or heard from all 
servers
+    Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
+
+    // S3 comes with a lower offset than S2 3 seconds later
+    _segmentCompletionMgr._seconds += 3;
+    params = new 
Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s3Offset.toString())
+        
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+    response = _segmentCompletionMgr.segmentConsumed(params);
+    // We've met winner criteria, but it should be S_2 with the highest 
offset. S_3 should catch up.
+    Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.CATCH_UP);
+
+    // S1 comes back at the same offset
+    _segmentCompletionMgr._seconds += 1;
+    params = new 
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
+        
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+    response = _segmentCompletionMgr.segmentConsumed(params);
+    // We've met winner criteria, but it should be S2 with the highest offset. 
S1 should catch up.
+    Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.CATCH_UP);
+
+    // S2 comes back at the same offset
+    _segmentCompletionMgr._seconds += 1;
+    params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
+        
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+    response = _segmentCompletionMgr.segmentConsumed(params);
+    // S2 is told to commit
+    Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT);
+
+    // S2 comes back to commit the segment
+    _segmentCompletionMgr._seconds += 1;
+    params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
+        .withSegmentName(_segmentNameStr);
+    response = _segmentCompletionMgr.segmentCommitStart(params);
+    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
+    _segmentCompletionMgr._seconds += 5;
+    params = new 
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
+        .withSegmentName(_segmentNameStr).withSegmentLocation("location");
+    response = _segmentCompletionMgr
+        .segmentCommitEnd(params, true, false, 
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    Assert.assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
+
+    // S3 comes back at the latest offset
+    _segmentCompletionMgr._seconds += 1;
+    params = new 
Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s2Offset.toString())
+        
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+    response = _segmentCompletionMgr.segmentConsumed(params);
+    // S3 is told to keep since it caught up
+    Assert.assertEquals(response.getStatus(), ControllerResponseStatus.KEEP);
+
+    // S1 comes back with a higher offset than before, but still not caught up
+    _segmentCompletionMgr._seconds += 1;
+    params = new 
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s3Offset.toString())
+        
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+    response = _segmentCompletionMgr.segmentConsumed(params);
+    // S1 is told to discard since S2 already uploaded
+    Assert.assertEquals(response.getStatus(), 
ControllerResponseStatus.DISCARD);
+  }
+
   @Test
   public void testWinnerOnRowLimit()
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to