This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 76729f8424 Refactoring removeSegment flow in upsert (#13449)
76729f8424 is described below

commit 76729f84247323f1d145da98c8855238f149ca98
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Fri Jun 28 02:00:46 2024 +0530

    Refactoring removeSegment flow in upsert (#13449)
---
 .../upsert/BasePartitionUpsertMetadataManager.java | 17 +++++++++++-
 ...oncurrentMapPartitionUpsertMetadataManager.java | 32 +++++++---------------
 .../pinot/segment/local/upsert/UpsertUtils.java    | 20 ++++++++++++++
 3 files changed, 46 insertions(+), 23 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index b7c3c327e6..b0703d2501 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -77,6 +77,7 @@ import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.roaringbitmap.PeekableIntIterator;
@@ -758,7 +759,21 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
-  protected abstract void removeSegment(IndexSegment segment, 
MutableRoaringBitmap validDocIds);
+  protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
+    try (UpsertUtils.PrimaryKeyReader primaryKeyReader = new 
UpsertUtils.PrimaryKeyReader(segment,
+        _primaryKeyColumns)) {
+      removeSegment(segment, 
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Caught exception while removing segment: %s, table: 
%s", segment.getSegmentName(),
+              _tableNameWithType), e);
+    }
+  }
+
+  protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
+    throw new UnsupportedOperationException("Both removeSegment(segment, 
validDocID) and "
+        + "removeSegment(segment, pkIterator) are not implemented. Implement 
one of them to support removal.");
+  }
 
   @Override
   public void removeSegment(IndexSegment segment) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 4c54b59135..dfe7be0fc6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -37,7 +37,6 @@ import org.apache.pinot.segment.spi.MutableSegment;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
@@ -168,27 +167,16 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
   }
 
   @Override
-  protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
-    assert !validDocIds.isEmpty();
-
-    PrimaryKey primaryKey = new PrimaryKey(new 
Object[_primaryKeyColumns.size()]);
-    PeekableIntIterator iterator = validDocIds.getIntIterator();
-    try (
-        UpsertUtils.PrimaryKeyReader primaryKeyReader = new 
UpsertUtils.PrimaryKeyReader(segment, _primaryKeyColumns)) {
-      while (iterator.hasNext()) {
-        primaryKeyReader.getPrimaryKey(iterator.next(), primaryKey);
-        
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
-            (pk, recordLocation) -> {
-              if (recordLocation.getSegment() == segment) {
-                return null;
-              }
-              return recordLocation;
-            });
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(
-          String.format("Caught exception while removing segment: %s, table: 
%s", segment.getSegmentName(),
-              _tableNameWithType), e);
+  protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
+    while (primaryKeyIterator.hasNext()) {
+      PrimaryKey primaryKey = primaryKeyIterator.next();
+      
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
+          (pk, recordLocation) -> {
+        if (recordLocation.getSegment() == segment) {
+          return null;
+        }
+        return recordLocation;
+      });
     }
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index b29daab533..bf19aa470c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -88,6 +88,26 @@ public class UpsertUtils {
     };
   }
 
+  /**
+   * Returns an iterator of {@link PrimaryKey} for the valid documents from 
the segment.
+   */
+  public static Iterator<PrimaryKey> getPrimaryKeyIterator(PrimaryKeyReader 
primaryKeyReader,
+      MutableRoaringBitmap validDocIds) {
+    return new Iterator<>() {
+      private final PeekableIntIterator _docIdIterator = 
validDocIds.getIntIterator();
+
+      @Override
+      public boolean hasNext() {
+        return _docIdIterator.hasNext();
+      }
+
+      @Override
+      public PrimaryKey next() {
+        return primaryKeyReader.getPrimaryKey(_docIdIterator.next());
+      }
+    };
+  }
+
   public static class RecordInfoReader implements Closeable {
     private final PrimaryKeyReader _primaryKeyReader;
     private final ComparisonColumnReader _comparisonColumnReader;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to