Jackie-Jiang commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1224982841


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,51 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        // Check READY_TO_CONSUME_DATA before enter consumeLoop in the new 
PartitionConsumer.
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            
Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && 
!_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && 
_tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the 
previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for 
the previous consuming segment.
+          List<SegmentDataManager> allSegments = 
_realtimeTableDataManager.acquireAllSegments();
+          List<SegmentDataManager> allSegmentsForPartition = new ArrayList<>();
+          for (SegmentDataManager segmentDataManager: allSegments) {
+            // release segments not this partition
+            if (_partitionGroupId == new 
LLCSegmentName(segmentDataManager.getSegmentName()).getPartitionGroupId()) {

Review Comment:
   This will break for uploaded segment. Use 
`SegmentUtils.getRealtimeSegmentPartitionId()` to read the partition id



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,51 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        // Check READY_TO_CONSUME_DATA before enter consumeLoop in the new 
PartitionConsumer.
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            
Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && 
!_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && 
_tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the 
previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for 
the previous consuming segment.
+          List<SegmentDataManager> allSegments = 
_realtimeTableDataManager.acquireAllSegments();
+          List<SegmentDataManager> allSegmentsForPartition = new ArrayList<>();
+          for (SegmentDataManager segmentDataManager: allSegments) {
+            // release segments not this partition
+            if (_partitionGroupId == new 
LLCSegmentName(segmentDataManager.getSegmentName()).getPartitionGroupId()) {
+              allSegmentsForPartition.add(segmentDataManager);
+            } else {
+              _realtimeTableDataManager.releaseSegment(segmentDataManager);
+            }
+          }
+          // wait if all segments (except the new consuming segment) for this 
partition not sealed completely.
+          // The only consuming segment should be the new consuming segment, 
all the other segments should be persisted.
+          if (allSegmentsForPartition.stream()
+              .filter(segmentDataManager -> 
segmentDataManager.getSegment().getSegmentMetadata().isMutableSegment())

Review Comment:
   To be more robust, you may only count `segmentDataManager != this` in case 
somehow this segment is not acquired



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1241,6 +1283,15 @@ private boolean 
catchupToFinalOffset(StreamPartitionMsgOffset endOffset, long ti
     _state = State.CONSUMING_TO_ONLINE;
     _shouldStop = false;
     try {
+      if (!_isReadyToConsumeData.getAsBoolean()) {

Review Comment:
   Remove this part. We don't need to verify this in catch-up phase



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,51 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        // Check READY_TO_CONSUME_DATA before enter consumeLoop in the new 
PartitionConsumer.
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            
Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && 
!_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && 
_tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the 
previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for 
the previous consuming segment.
+          List<SegmentDataManager> allSegments = 
_realtimeTableDataManager.acquireAllSegments();
+          List<SegmentDataManager> allSegmentsForPartition = new ArrayList<>();
+          for (SegmentDataManager segmentDataManager: allSegments) {
+            // release segments not this partition
+            if (_partitionGroupId == new 
LLCSegmentName(segmentDataManager.getSegmentName()).getPartitionGroupId()) {
+              allSegmentsForPartition.add(segmentDataManager);
+            } else {
+              _realtimeTableDataManager.releaseSegment(segmentDataManager);
+            }
+          }
+          // wait if all segments (except the new consuming segment) for this 
partition not sealed completely.
+          // The only consuming segment should be the new consuming segment, 
all the other segments should be persisted.
+          if (allSegmentsForPartition.stream()
+              .filter(segmentDataManager -> 
segmentDataManager.getSegment().getSegmentMetadata().isMutableSegment())
+              .count() > 1) {
+            do {
+              //noinspection BusyWait
+              
Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+            } while (allSegmentsForPartition.stream()

Review Comment:
   (MAJOR) This will never return true without acquiring the segments again.
   We should acquire the segments that are previous mutable in the loop, until 
there is no mutable segment.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,51 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        // Check READY_TO_CONSUME_DATA before enter consumeLoop in the new 
PartitionConsumer.
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            
Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && 
!_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && 
_tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the 
previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for 
the previous consuming segment.
+          List<SegmentDataManager> allSegments = 
_realtimeTableDataManager.acquireAllSegments();

Review Comment:
   We should do a try-finally after acquiring all segments, and release all the 
acquired segments within the finally block to prevent leaving segments 
unreleased when exception happens



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,51 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        // Check READY_TO_CONSUME_DATA before enter consumeLoop in the new 
PartitionConsumer.
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            
Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && 
!_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && 
_tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the 
previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for 
the previous consuming segment.
+          List<SegmentDataManager> allSegments = 
_realtimeTableDataManager.acquireAllSegments();
+          List<SegmentDataManager> allSegmentsForPartition = new ArrayList<>();
+          for (SegmentDataManager segmentDataManager: allSegments) {
+            // release segments not this partition
+            if (_partitionGroupId == new 
LLCSegmentName(segmentDataManager.getSegmentName()).getPartitionGroupId()) {
+              allSegmentsForPartition.add(segmentDataManager);
+            } else {
+              _realtimeTableDataManager.releaseSegment(segmentDataManager);
+            }
+          }
+          // wait if all segments (except the new consuming segment) for this 
partition not sealed completely.
+          // The only consuming segment should be the new consuming segment, 
all the other segments should be persisted.
+          if (allSegmentsForPartition.stream()
+              .filter(segmentDataManager -> 
segmentDataManager.getSegment().getSegmentMetadata().isMutableSegment())
+              .count() > 1) {
+            do {
+              //noinspection BusyWait
+              
Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+            } while (allSegmentsForPartition.stream()
+                .filter(segmentDataManager -> 
segmentDataManager.getSegment().getSegmentMetadata().isMutableSegment())
+                .count() > 1);
+          }
+          // Persist snapshot and release all immutable segments for this 
partition.
+          for (SegmentDataManager segmentDataManager: allSegmentsForPartition) 
{
+            if (segmentDataManager.getSegment() instanceof ImmutableSegment) {

Review Comment:
   We should check if it is `ImmutableSegmentImpl`. `EmptySegment` is also 
`ImmutableSegment`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to