This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d91ad73fec Optimize snapshot flow to only snapshot segments which have
updates (#13285)
d91ad73fec is described below
commit d91ad73fecc2989cb24cdfe3cbaf0ad4da469c12
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Wed Jun 12 04:01:59 2024 +0530
Optimize snapshot flow to only snapshot segments which have updates (#13285)
* Optimize snapshot flow to only snapshot segments which are updated since
last snapshot
* enable snapshotting before consumption for partial-upsert tables
* end snapshotting if error found
---
.../realtime/RealtimeSegmentDataManager.java | 19 +++++--
.../upsert/BasePartitionUpsertMetadataManager.java | 58 +++++++++++++++-----
.../BasePartitionUpsertMetadataManagerTest.java | 62 +++++++++++++++++++++-
.../apache/pinot/spi/config/table/TableConfig.java | 5 ++
4 files changed, 128 insertions(+), 16 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 8dc7842f81..44cea7155d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -703,9 +703,22 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// persisted.
// Take upsert snapshot before starting consuming events
if (_partitionUpsertMetadataManager != null) {
- _partitionUpsertMetadataManager.takeSnapshot();
- // If upsertTTL is enabled, we will remove expired primary keys from
upsertMetadata after taking snapshot.
- _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
+ if (_tableConfig.getUpsertMetadataTTL() > 0) {
+ // If upsertMetadataTTL is enabled, we will remove expired primary
keys from upsertMetadata
+ // AFTER taking a snapshot. Taking the snapshot first is crucial
to capture the final
+ // state of each key before it exits the TTL window. Out-of-TTL
segments are skipped in
+ // the doAddSegment flow, and the snapshot is used to enableUpsert
on the immutable out-of-TTL segment.
+ // If no snapshot is found, the entire segment is marked as valid
and queryable.
+ _partitionUpsertMetadataManager.takeSnapshot();
+ _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
+ } else {
+ // We should remove deleted-keys first and then take a snapshot.
This is because the deletedKeysTTL
+ // flow removes keys from the map and updates to remove valid doc
IDs. By taking the snapshot immediately
+ // after this process, we save one commit cycle, ensuring that the
deletion of valid doc IDs is reflected
+ // immediately
+ _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
+ _partitionUpsertMetadataManager.takeSnapshot();
+ }
}
while (!_state.isFinal()) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index adb9ac7f0e..3b72ddbb21 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -48,6 +48,7 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
@@ -105,8 +106,12 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// Tracks all the segments managed by this manager (excluding EmptySegment)
protected final Set<IndexSegment> _trackedSegments =
ConcurrentHashMap.newKeySet();
+ // This is to track all the segments where changes took place post last
snapshot
+ // Note: we need not take any _snapshotLock while updating this set as it is
only updated by the upsert thread
+ protected final Set<IndexSegment> _updatedSegmentsSinceLastSnapshot =
ConcurrentHashMap.newKeySet();
// NOTE: We do not persist snapshot on the first consuming segment because
most segments might not be loaded yet
+ // We only do this for Full-Upsert tables, for partial-upsert tables, we
have a check allSegmentsLoaded
protected volatile boolean _gotFirstConsumingSegment = false;
protected final ReadWriteLock _snapshotLock;
@@ -874,8 +879,10 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
if (!_enableSnapshot) {
return;
}
- if (!_gotFirstConsumingSegment) {
- _logger.info("Skip taking snapshot before getting the first consuming
segment");
+ if (_partialUpsertHandler == null && !_gotFirstConsumingSegment) {
+ // We only skip for full-Upsert tables, for partial-upsert tables, we
have a check allSegmentsLoaded in
+ // RealtimeTableDataManager
+ _logger.info("Skip taking snapshot before getting the first consuming
segment for full-upsert table");
return;
}
if (!startOperation()) {
@@ -897,7 +904,6 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
- // TODO: Consider optimizing it by tracking and persisting only the changed
snapshot
protected void doTakeSnapshot() {
int numTrackedSegments = _trackedSegments.size();
long numPrimaryKeysInSnapshot = 0L;
@@ -916,19 +922,34 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
numConsumingSegments++;
continue;
}
- ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
- if (!immutableSegment.hasValidDocIdsSnapshotFile()) {
- segmentsWithoutSnapshot.add(immutableSegment);
+ if (!_updatedSegmentsSinceLastSnapshot.contains(segment)) {
+ // if no updates since last snapshot then skip
continue;
}
- immutableSegment.persistValidDocIdsSnapshot();
- numImmutableSegments++;
- numPrimaryKeysInSnapshot +=
immutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+ try {
+ ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
+ if (!immutableSegment.hasValidDocIdsSnapshotFile()) {
+ segmentsWithoutSnapshot.add(immutableSegment);
+ continue;
+ }
+ immutableSegment.persistValidDocIdsSnapshot();
+ _updatedSegmentsSinceLastSnapshot.remove(segment);
+ numImmutableSegments++;
+ numPrimaryKeysInSnapshot +=
immutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+ } catch (Exception e) {
+ _logger.warn("Caught exception while taking snapshot for segment: {},
skipping", segment.getSegmentName(), e);
+ Utils.rethrowException(e);
+ }
}
for (ImmutableSegmentImpl segment : segmentsWithoutSnapshot) {
- segment.persistValidDocIdsSnapshot();
- numImmutableSegments++;
- numPrimaryKeysInSnapshot +=
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+ try {
+ segment.persistValidDocIdsSnapshot();
+ numImmutableSegments++;
+ numPrimaryKeysInSnapshot +=
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+ } catch (Exception e) {
+ _logger.warn("Caught exception while taking snapshot for segment: {},
skipping", segment.getSegmentName(), e);
+ Utils.rethrowException(e);
+ }
}
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
@@ -1112,6 +1133,10 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// refreshing is done.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
}
+ if (_enableSnapshot) {
+ _updatedSegmentsSinceLastSnapshot.add(newSegment);
+ _updatedSegmentsSinceLastSnapshot.add(oldSegment);
+ }
}
}
@@ -1142,6 +1167,9 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// Batch refresh takes WLock. Do it outside RLock for clarity.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
}
+ if (_enableSnapshot) {
+ _updatedSegmentsSinceLastSnapshot.add(segment);
+ }
}
}
@@ -1159,6 +1187,9 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// Batch refresh takes WLock. Do it outside RLock for clarity.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
}
+ if (_enableSnapshot) {
+ _updatedSegmentsSinceLastSnapshot.add(segment);
+ }
}
}
@@ -1183,6 +1214,9 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// Batch refresh takes WLock. Do it outside RLock for clarity.
doBatchRefreshUpsertView(_upsertViewRefreshIntervalMs);
}
+ if (_enableSnapshot) {
+ _updatedSegmentsSinceLastSnapshot.add(segment);
+ }
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index 0e97621733..b96e1fe8e4 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -209,6 +209,7 @@ public class BasePartitionUpsertMetadataManagerTest {
ImmutableSegmentImpl seg01 = createImmutableSegment("seg01", segDir01,
segmentsTakenSnapshot);
seg01.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3),
null);
upsertMetadataManager.trackSegment(seg01);
+ upsertMetadataManager.updatedSegmentForSnapshotting(seg01);
// seg01 has a tmp snapshot file, but no snapshot file
FileUtils.touch(new File(segDir01,
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp"));
@@ -216,6 +217,7 @@ public class BasePartitionUpsertMetadataManagerTest {
ImmutableSegmentImpl seg02 = createImmutableSegment("seg02", segDir02,
segmentsTakenSnapshot);
seg02.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3, 4,
5), null);
upsertMetadataManager.trackSegment(seg02);
+ upsertMetadataManager.updatedSegmentForSnapshotting(seg02);
// seg02 has snapshot file, so its snapshot is taken first.
FileUtils.touch(new File(segDir02,
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
@@ -223,9 +225,12 @@ public class BasePartitionUpsertMetadataManagerTest {
ImmutableSegmentImpl seg03 = createImmutableSegment("seg03", segDir03,
segmentsTakenSnapshot);
seg03.enableUpsert(upsertMetadataManager, createValidDocIds(3, 4, 7),
null);
upsertMetadataManager.trackSegment(seg03);
+ upsertMetadataManager.updatedSegmentForSnapshotting(seg03);
// The mutable segments will be skipped.
- upsertMetadataManager.trackSegment(mock(MutableSegmentImpl.class));
+ MutableSegmentImpl seg04 = mock(MutableSegmentImpl.class);
+ upsertMetadataManager.trackSegment(seg04);
+ upsertMetadataManager.updatedSegmentForSnapshotting(seg04);
upsertMetadataManager.doTakeSnapshot();
assertEquals(segmentsTakenSnapshot.size(), 3);
@@ -244,6 +249,57 @@ public class BasePartitionUpsertMetadataManagerTest {
assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3);
}
+ @Test
+ public void testTakeSnapshotInOrderBasedOnUpdates()
+ throws IOException {
+ DummyPartitionUpsertMetadataManager upsertMetadataManager =
+ new DummyPartitionUpsertMetadataManager("myTable", 0,
mock(UpsertContext.class));
+
+ List<String> segmentsTakenSnapshot = new ArrayList<>();
+
+ File segDir01 = new File(TEMP_DIR, "seg01");
+ ImmutableSegmentImpl seg01 = createImmutableSegment("seg01", segDir01,
segmentsTakenSnapshot);
+ seg01.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3),
null);
+ upsertMetadataManager.trackSegment(seg01);
+ upsertMetadataManager.updatedSegmentForSnapshotting(seg01);
+ // seg01 has a tmp snapshot file, but no snapshot file
+ FileUtils.touch(new File(segDir01,
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp"));
+
+ File segDir02 = new File(TEMP_DIR, "seg02");
+ ImmutableSegmentImpl seg02 = createImmutableSegment("seg02", segDir02,
segmentsTakenSnapshot);
+ seg02.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3, 4,
5), null);
+ upsertMetadataManager.trackSegment(seg02);
+ upsertMetadataManager.updatedSegmentForSnapshotting(seg02);
+ // seg02 has snapshot file, so its snapshot is taken first.
+ FileUtils.touch(new File(segDir02,
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
+
+ File segDir03 = new File(TEMP_DIR, "seg03");
+ ImmutableSegmentImpl seg03 = createImmutableSegment("seg03", segDir03,
segmentsTakenSnapshot);
+ seg03.enableUpsert(upsertMetadataManager, createValidDocIds(3, 4, 7),
null);
+ upsertMetadataManager.trackSegment(seg03);
+
+ // The mutable segments will be skipped.
+ MutableSegmentImpl seg04 = mock(MutableSegmentImpl.class);
+ upsertMetadataManager.trackSegment(seg04);
+ upsertMetadataManager.updatedSegmentForSnapshotting(seg04);
+
+ upsertMetadataManager.doTakeSnapshot();
+ assertEquals(segmentsTakenSnapshot.size(), 2);
+ // The snapshot of seg02 was taken firstly, as it's the only segment with
existing snapshot.
+ assertEquals(segmentsTakenSnapshot.get(0), "seg02");
+ // Set is used to track segments internally, so we can't assert the order
of the other segments deterministically,
+ // but all 3 segments should have taken their snapshots.
+ assertTrue(segmentsTakenSnapshot.containsAll(Arrays.asList("seg01",
"seg02")));
+
+ assertEquals(TEMP_DIR.list().length, 3);
+ assertTrue(segDir01.exists());
+ assertEquals(seg01.loadValidDocIdsFromSnapshot().getCardinality(), 4);
+ assertTrue(segDir02.exists());
+ assertEquals(seg02.loadValidDocIdsFromSnapshot().getCardinality(), 6);
+ assertTrue(segDir03.exists());
+ assertNull(seg03.loadValidDocIdsFromSnapshot());
+ }
+
@Test
public void testConsistencyModeSync()
throws Exception {
@@ -509,6 +565,10 @@ public class BasePartitionUpsertMetadataManagerTest {
_trackedSegments.add(seg);
}
+ public void updatedSegmentForSnapshotting(IndexSegment seg) {
+ _updatedSegmentsSinceLastSnapshot.add(seg);
+ }
+
@Override
protected long getNumPrimaryKeys() {
return 0;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index bf2dd611ef..b374855d59 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -389,6 +389,11 @@ public class TableConfig extends BaseJsonConfig {
return _upsertConfig == null ? null : _upsertConfig.getComparisonColumns();
}
+ @JsonIgnore
+ public double getUpsertMetadataTTL() {
+ return _upsertConfig == null ? 0 : _upsertConfig.getMetadataTTL();
+ }
+
@JsonIgnore
@Nullable
public String getUpsertDeleteRecordColumn() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]