Jackie-Jiang commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1214856674


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -232,7 +236,7 @@ public void deleteSegmentFile() {
   // consuming.
   private final AtomicBoolean _acquiredConsumerSemaphore;
   private final ServerMetrics _serverMetrics;
-  private final BooleanSupplier _isReadyToConsumeData;
+  private BooleanSupplier _isReadyToConsumeData;

Review Comment:
   Revert this



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -208,8 +208,39 @@ protected void doInit() {
       _tableUpsertMetadataManager = 
TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, 
_serverMetrics);
     }
 
-    // For dedup and partial-upsert, need to wait for all segments loaded 
before starting consuming data
-    if (isDedupEnabled() || isPartialUpsertEnabled()) {
+    if (tableConfig.getUpsertConfig() != null && 
tableConfig.getUpsertConfig().isEnableSnapshot()

Review Comment:
   I don't think we need the change in this file



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata 
segmentZKMetadata, TableCo
     try {
       _partitionGroupConsumerSemaphore.acquire();
       _acquiredConsumerSemaphore.set(true);
+      if (_tableConfig.getUpsertConfig() != null && 
_tableConfig.getUpsertConfig().isEnableSnapshot()) {
+        // persist snapshot for all sealed segments
+        // TODO: Use a semaphore to guarantee all the segments are sealed 
before persisting snapshot.

Review Comment:
   Please add more description here. Basically right now we cannot guarantee 
the previous segment is already replaced with the immutable segment, so the 
snapshot might not be persisted for the previous consuming segment



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java:
##########
@@ -159,7 +159,7 @@ public void deleteValidDocIdsSnapshot() {
     }
   }
 
-  private File getValidDocIdsSnapshotFile() {
+  public File getValidDocIdsSnapshotFile() {

Review Comment:
   Do we need to expose the internal file? IMO we should use 
`loadValidDocIdsFromSnapshot()` to access it



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata 
segmentZKMetadata, TableCo
     try {
       _partitionGroupConsumerSemaphore.acquire();
       _acquiredConsumerSemaphore.set(true);
+      if (_tableConfig.getUpsertConfig() != null && 
_tableConfig.getUpsertConfig().isEnableSnapshot()) {
+        // persist snapshot for all sealed segments
+        // TODO: Use a semaphore to guarantee all the segments are sealed 
before persisting snapshot.
+        List<SegmentDataManager> allSegments = 
_realtimeTableDataManager.acquireAllSegments();
+        for (SegmentDataManager segmentDataManager: allSegments) {
+          if (segmentDataManager.getSegment() instanceof ImmutableSegment) {
+            MutableRoaringBitmap validDocIds = new MutableRoaringBitmap();

Review Comment:
   We don't want to initialize `validDocIds` with an empty bitmap. If valid doc 
ids don't exist (not sure if it is possible), we shouldn't persist it as empty



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata 
segmentZKMetadata, TableCo
     try {
       _partitionGroupConsumerSemaphore.acquire();
       _acquiredConsumerSemaphore.set(true);
+      if (_tableConfig.getUpsertConfig() != null && 
_tableConfig.getUpsertConfig().isEnableSnapshot()) {
+        // persist snapshot for all sealed segments
+        // TODO: Use a semaphore to guarantee all the segments are sealed 
before persisting snapshot.
+        List<SegmentDataManager> allSegments = 
_realtimeTableDataManager.acquireAllSegments();
+        for (SegmentDataManager segmentDataManager: allSegments) {

Review Comment:
   We don't want to persist snapshot for all segments, but only for segments 
within this partition



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata 
segmentZKMetadata, TableCo
     try {
       _partitionGroupConsumerSemaphore.acquire();

Review Comment:
   cc @ankitsultana This might be the root cause of #10552. If for some reason 
2 segments are going through the OFFLINE -> CONSUMING state transition (one 
segment just committed), and the newer segment acquired this lock, the older 
segment will block here and never finish the OFFLINE -> CONSUMING state 
transition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to