This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 76729f8424 Refactoring removeSegment flow in upsert (#13449)
76729f8424 is described below
commit 76729f84247323f1d145da98c8855238f149ca98
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Fri Jun 28 02:00:46 2024 +0530
Refactoring removeSegment flow in upsert (#13449)
---
.../upsert/BasePartitionUpsertMetadataManager.java | 17 +++++++++++-
...oncurrentMapPartitionUpsertMetadataManager.java | 32 +++++++---------------
.../pinot/segment/local/upsert/UpsertUtils.java | 20 ++++++++++++++
3 files changed, 46 insertions(+), 23 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index b7c3c327e6..b0703d2501 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -77,6 +77,7 @@ import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.roaringbitmap.PeekableIntIterator;
@@ -758,7 +759,21 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
- protected abstract void removeSegment(IndexSegment segment,
MutableRoaringBitmap validDocIds);
+ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
+ try (UpsertUtils.PrimaryKeyReader primaryKeyReader = new
UpsertUtils.PrimaryKeyReader(segment,
+ _primaryKeyColumns)) {
+ removeSegment(segment,
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while removing segment: %s, table:
%s", segment.getSegmentName(),
+ _tableNameWithType), e);
+ }
+ }
+
+ protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
+ throw new UnsupportedOperationException("Both removeSegment(segment,
validDocID) and "
+ + "removeSegment(segment, pkIterator) are not implemented. Implement
one of them to support removal.");
+ }
@Override
public void removeSegment(IndexSegment segment) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 4c54b59135..dfe7be0fc6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -37,7 +37,6 @@ import org.apache.pinot.segment.spi.MutableSegment;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -168,27 +167,16 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
}
@Override
- protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
- assert !validDocIds.isEmpty();
-
- PrimaryKey primaryKey = new PrimaryKey(new
Object[_primaryKeyColumns.size()]);
- PeekableIntIterator iterator = validDocIds.getIntIterator();
- try (
- UpsertUtils.PrimaryKeyReader primaryKeyReader = new
UpsertUtils.PrimaryKeyReader(segment, _primaryKeyColumns)) {
- while (iterator.hasNext()) {
- primaryKeyReader.getPrimaryKey(iterator.next(), primaryKey);
-
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
- (pk, recordLocation) -> {
- if (recordLocation.getSegment() == segment) {
- return null;
- }
- return recordLocation;
- });
- }
- } catch (Exception e) {
- throw new RuntimeException(
- String.format("Caught exception while removing segment: %s, table:
%s", segment.getSegmentName(),
- _tableNameWithType), e);
+ protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
+ while (primaryKeyIterator.hasNext()) {
+ PrimaryKey primaryKey = primaryKeyIterator.next();
+
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
+ (pk, recordLocation) -> {
+ if (recordLocation.getSegment() == segment) {
+ return null;
+ }
+ return recordLocation;
+ });
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index b29daab533..bf19aa470c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -88,6 +88,26 @@ public class UpsertUtils {
};
}
+ /**
+ * Returns an iterator of {@link PrimaryKey} for the valid documents from
the segment.
+ */
+ public static Iterator<PrimaryKey> getPrimaryKeyIterator(PrimaryKeyReader
primaryKeyReader,
+ MutableRoaringBitmap validDocIds) {
+ return new Iterator<>() {
+ private final PeekableIntIterator _docIdIterator =
validDocIds.getIntIterator();
+
+ @Override
+ public boolean hasNext() {
+ return _docIdIterator.hasNext();
+ }
+
+ @Override
+ public PrimaryKey next() {
+ return primaryKeyReader.getPrimaryKey(_docIdIterator.next());
+ }
+ };
+ }
+
public static class RecordInfoReader implements Closeable {
private final PrimaryKeyReader _primaryKeyReader;
private final ComparisonColumnReader _comparisonColumnReader;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]