This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 40ac22da06 wait for full segment commit protocol on force commit
(#10479)
40ac22da06 is described below
commit 40ac22da06b3f2f7cfc3eb90a0b00afad08e8be8
Author: Johan Adami <[email protected]>
AuthorDate: Wed Mar 29 18:58:02 2023 -0400
wait for full segment commit protocol on force commit (#10479)
---
.../core/realtime/SegmentCompletionManager.java | 3 +-
.../helix/core/realtime/SegmentCompletionTest.java | 75 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 2 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 28ad064601..7f5f7185a9 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -1170,8 +1170,7 @@ public class SegmentCompletionManager {
*/
private boolean isWinnerPicked(String preferredInstance, long now, final
String stopReason) {
if ((SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(stopReason)
- ||
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason)
- ||
SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED.equals(stopReason))
+ ||
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason))
&& _commitStateMap.size() == 1) {
_winner = preferredInstance;
_winningOffset = _commitStateMap.get(preferredInstance);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 2ada570ac8..4d85223b41 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -659,6 +659,81 @@ public class SegmentCompletionTest {
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
}
+ @Test
+ public void testWinnerOnForceCommit()
+ throws Exception {
+ SegmentCompletionProtocol.Response response;
+ Request.Params params;
+ // S1 comes to force commit
+ _segmentCompletionMgr._seconds = 10L;
+ params = new
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
+
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+ response = _segmentCompletionMgr.segmentConsumed(params);
+ // Still need to wait since we haven't hit time limit or heard from all
servers
+ Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
+
+ // S2 comes with a higher offset 1 second later
+ _segmentCompletionMgr._seconds += 1;
+ params = new
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
+
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+ response = _segmentCompletionMgr.segmentConsumed(params);
+ // Still need to wait since we haven't hit time limit or heard from all
servers
+ Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
+
+ // S3 comes with a lower offset than S2 3 seconds later
+ _segmentCompletionMgr._seconds += 3;
+ params = new
Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s3Offset.toString())
+
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+ response = _segmentCompletionMgr.segmentConsumed(params);
+ // We've met winner criteria, but it should be S_2 with the highest
offset. S_3 should catch up.
+ Assert.assertEquals(response.getStatus(),
ControllerResponseStatus.CATCH_UP);
+
+ // S1 comes back at the same offset
+ _segmentCompletionMgr._seconds += 1;
+ params = new
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString())
+
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+ response = _segmentCompletionMgr.segmentConsumed(params);
+ // We've met winner criteria, but it should be S2 with the highest offset.
S1 should catch up.
+ Assert.assertEquals(response.getStatus(),
ControllerResponseStatus.CATCH_UP);
+
+ // S2 comes back at the same offset
+ _segmentCompletionMgr._seconds += 1;
+ params = new
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
+
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+ response = _segmentCompletionMgr.segmentConsumed(params);
+ // S2 is told to commit
+ Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT);
+
+ // S2 comes back to commit the segment
+ _segmentCompletionMgr._seconds += 1;
+ params = new
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
+ .withSegmentName(_segmentNameStr);
+ response = _segmentCompletionMgr.segmentCommitStart(params);
+ Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
+ _segmentCompletionMgr._seconds += 5;
+ params = new
Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString())
+ .withSegmentName(_segmentNameStr).withSegmentLocation("location");
+ response = _segmentCompletionMgr
+ .segmentCommitEnd(params, true, false,
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
+
+ // S3 comes back at the latest offset
+ _segmentCompletionMgr._seconds += 1;
+ params = new
Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(_s2Offset.toString())
+
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+ response = _segmentCompletionMgr.segmentConsumed(params);
+ // S3 is told to keep since it caught up
+ Assert.assertEquals(response.getStatus(), ControllerResponseStatus.KEEP);
+
+ // S1 comes back with a higher offset than before, but still not caught up
+ _segmentCompletionMgr._seconds += 1;
+ params = new
Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s3Offset.toString())
+
.withSegmentName(_segmentNameStr).withReason(SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+ response = _segmentCompletionMgr.segmentConsumed(params);
+ // S1 is told to discard since S2 already uploaded
+ Assert.assertEquals(response.getStatus(),
ControllerResponseStatus.DISCARD);
+ }
+
@Test
public void testWinnerOnRowLimit()
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]