Jackie-Jiang commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1214856674
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -232,7 +236,7 @@ public void deleteSegmentFile() {
// consuming.
private final AtomicBoolean _acquiredConsumerSemaphore;
private final ServerMetrics _serverMetrics;
- private final BooleanSupplier _isReadyToConsumeData;
+ private BooleanSupplier _isReadyToConsumeData;
Review Comment:
Revert this
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -208,8 +208,39 @@ protected void doInit() {
_tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(tableConfig, schema, this,
_serverMetrics);
}
- // For dedup and partial-upsert, need to wait for all segments loaded
before starting consuming data
- if (isDedupEnabled() || isPartialUpsertEnabled()) {
+ if (tableConfig.getUpsertConfig() != null &&
tableConfig.getUpsertConfig().isEnableSnapshot()
Review Comment:
I don't think we need the change in this file
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata
segmentZKMetadata, TableCo
try {
_partitionGroupConsumerSemaphore.acquire();
_acquiredConsumerSemaphore.set(true);
+ if (_tableConfig.getUpsertConfig() != null &&
_tableConfig.getUpsertConfig().isEnableSnapshot()) {
+ // persist snapshot for all sealed segments
+ // TODO: Use a semaphore to guarantee all the segments are sealed
before persisting snapshot.
Review Comment:
Please add more description here. Basically right now we cannot guarantee
the previous segment is already replaced with the immutable segment, so the
snapshot might not be persisted for the previous consuming segment
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java:
##########
@@ -159,7 +159,7 @@ public void deleteValidDocIdsSnapshot() {
}
}
- private File getValidDocIdsSnapshotFile() {
+ public File getValidDocIdsSnapshotFile() {
Review Comment:
Do we need to expose the internal file? IMO we should use
`loadValidDocIdsFromSnapshot()` to access it
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata
segmentZKMetadata, TableCo
try {
_partitionGroupConsumerSemaphore.acquire();
_acquiredConsumerSemaphore.set(true);
+ if (_tableConfig.getUpsertConfig() != null &&
_tableConfig.getUpsertConfig().isEnableSnapshot()) {
+ // persist snapshot for all sealed segments
+ // TODO: Use a semaphore to guarantee all the segments are sealed
before persisting snapshot.
+ List<SegmentDataManager> allSegments =
_realtimeTableDataManager.acquireAllSegments();
+ for (SegmentDataManager segmentDataManager: allSegments) {
+ if (segmentDataManager.getSegment() instanceof ImmutableSegment) {
+ MutableRoaringBitmap validDocIds = new MutableRoaringBitmap();
Review Comment:
We don't want to initialize `validDocIds` with an empty bitmap. If valid doc
ids don't exist (not sure if it is possible), we shouldn't persist it as empty
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata
segmentZKMetadata, TableCo
try {
_partitionGroupConsumerSemaphore.acquire();
_acquiredConsumerSemaphore.set(true);
+ if (_tableConfig.getUpsertConfig() != null &&
_tableConfig.getUpsertConfig().isEnableSnapshot()) {
+ // persist snapshot for all sealed segments
+ // TODO: Use a semaphore to guarantee all the segments are sealed
before persisting snapshot.
+ List<SegmentDataManager> allSegments =
_realtimeTableDataManager.acquireAllSegments();
+ for (SegmentDataManager segmentDataManager: allSegments) {
Review Comment:
We don't want to persist snapshot for all segments, but only for segments
within this partition
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata
segmentZKMetadata, TableCo
try {
_partitionGroupConsumerSemaphore.acquire();
Review Comment:
cc @ankitsultana This might be the root cause of #10552. If for some reason
2 segments are going through the OFFLINE -> CONSUMING state transition (one
segment just committed), and the newer segment acquired this lock, the older
segment will block here and never finish the OFFLINE -> CONSUMING state
transition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]