This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2a443eeeaf try to acquire segmentLock before taking segment snapshot
(#14179)
2a443eeeaf is described below
commit 2a443eeeaf88eff4614e0f6243e92e6c70f9943f
Author: Xiaobing <[email protected]>
AuthorDate: Tue Oct 8 12:39:08 2024 -0700
try to acquire segmentLock before taking segment snapshot (#14179)
* try to acquire segmentLock before taking segment snapshot
* refine to ensure snapshots kept on disk are disjoint
* fix tests and add tests
---
.../upsert/BasePartitionUpsertMetadataManager.java | 66 +++++++++++----
.../BasePartitionUpsertMetadataManagerTest.java | 95 ++++++++++++++++++++++
2 files changed, 147 insertions(+), 14 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 72bb861463..e7542b8933 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -40,7 +40,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.helix.HelixManager;
-import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -103,7 +102,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// Helix threads. Otherwise, segments might be missed by the consuming
thread when taking snapshots, which takes
// snapshotLock WLock and clear the tracking set to avoid keeping segment
object references around.
// Skip mutableSegments as only immutable segments are for taking snapshots.
- protected final Set<ImmutableSegment> _updatedSegmentsSinceLastSnapshot =
ConcurrentHashMap.newKeySet();
+ protected final Set<IndexSegment> _updatedSegmentsSinceLastSnapshot =
ConcurrentHashMap.newKeySet();
// NOTE: We do not persist snapshot on the first consuming segment because
most segments might not be loaded yet
// We only do this for Full-Upsert tables, for partial-upsert tables, we
have a check allSegmentsLoaded
@@ -879,6 +878,8 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// segments. Because the valid docs as tracked by the existing validDocIds
snapshots can only get less. That no
// overlap of valid docs among segments with snapshots is required by the
preloading to work correctly.
Set<ImmutableSegmentImpl> segmentsWithoutSnapshot = new HashSet<>();
+ TableDataManager tableDataManager = _context.getTableDataManager();
+ boolean isSegmentSkipped = false;
for (IndexSegment segment : _trackedSegments) {
if (!(segment instanceof ImmutableSegmentImpl)) {
numConsumingSegments++;
@@ -889,6 +890,25 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
numUnchangedSegments++;
continue;
}
+ // Try to acquire the segmentLock when taking snapshot for the segment
because the segment directory can be
+ // modified, e.g. a new snapshot file can be added to the directory. If
not taking the lock, the Helix task
+ // thread replacing the segment could fail. For example, we found
FileUtils.cleanDirectory() failed due to
+ // DirectoryNotEmptyException because a new snapshot file got added into
the segment directory just between two
+ // major cleanup steps in the cleanDirectory() method.
+ String segmentName = segment.getSegmentName();
+ Lock segmentLock = tableDataManager.getSegmentLock(segmentName);
+ boolean locked = segmentLock.tryLock();
+ if (!locked) {
+ // Try to get the segmentLock in a non-blocking manner to avoid
deadlock. The Helix task thread takes
+ // segmentLock first and then the snapshot RLock when replacing a
segment. However, the consuming thread has
+ // already acquired the snapshot WLock when reaching here, and if it
has to wait for segmentLock, it may
+ // enter deadlock with the Helix task threads waiting for snapshot
RLock.
+ // If we can't get the segmentLock, we'd better skip taking snapshot
for this tracked segment, because its
+ // validDocIds might become stale or wrong when the segment is being
processed by another thread right now.
+ _logger.warn("Could not get segmentLock to take snapshot for segment:
{}, skipping", segmentName);
+ isSegmentSkipped = true;
+ continue;
+ }
try {
ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
if (!immutableSegment.hasValidDocIdsSnapshotFile()) {
@@ -896,24 +916,42 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
continue;
}
immutableSegment.persistValidDocIdsSnapshot();
+ _updatedSegmentsSinceLastSnapshot.remove(segment);
numImmutableSegments++;
numPrimaryKeysInSnapshot +=
immutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
} catch (Exception e) {
- _logger.warn("Caught exception while taking snapshot for segment: {},
skipping", segment.getSegmentName(), e);
- Utils.rethrowException(e);
+ _logger.warn("Caught exception while taking snapshot for segment: {},
skipping", segmentName, e);
+ isSegmentSkipped = true;
+ } finally {
+ segmentLock.unlock();
}
}
- for (ImmutableSegmentImpl segment : segmentsWithoutSnapshot) {
- try {
- segment.persistValidDocIdsSnapshot();
- numImmutableSegments++;
- numPrimaryKeysInSnapshot +=
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
- } catch (Exception e) {
- _logger.warn("Caught exception while taking snapshot for segment: {},
skipping", segment.getSegmentName(), e);
- Utils.rethrowException(e);
+ // If we have skipped any segments in the previous for-loop, we should
skip the next for-loop, basically to not
+ // add new snapshot files on disk. This ensures all the validDocIds
snapshots kept on disk are still disjoint
+ // with each other, although some of them may have become stale, i.e.
tracking more valid docs than expected.
+ if (!isSegmentSkipped) {
+ for (ImmutableSegmentImpl segment : segmentsWithoutSnapshot) {
+ String segmentName = segment.getSegmentName();
+ Lock segmentLock = tableDataManager.getSegmentLock(segmentName);
+ boolean locked = segmentLock.tryLock();
+ if (!locked) {
+ _logger.warn("Could not get segmentLock to take snapshot for
segment: {} w/o snapshot, skipping",
+ segmentName);
+ continue;
+ }
+ try {
+ segment.persistValidDocIdsSnapshot();
+ _updatedSegmentsSinceLastSnapshot.remove(segment);
+ numImmutableSegments++;
+ numPrimaryKeysInSnapshot +=
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+ } catch (Exception e) {
+ _logger.warn("Caught exception while taking snapshot for segment: {}
w/o snapshot, skipping", segmentName, e);
+ } finally {
+ segmentLock.unlock();
+ }
}
}
- _updatedSegmentsSinceLastSnapshot.clear();
+ _updatedSegmentsSinceLastSnapshot.retainAll(_trackedSegments);
// Persist TTL watermark after taking snapshots if TTL is enabled, so that
segments out of TTL can be loaded with
// updated validDocIds bitmaps. If the TTL watermark is persisted first,
segments out of TTL may get loaded with
// stale bitmaps or even no bitmap snapshots to use.
@@ -1085,7 +1123,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
if (_enableSnapshot && segment instanceof ImmutableSegment) {
_snapshotLock.readLock().lock();
try {
- _updatedSegmentsSinceLastSnapshot.add((ImmutableSegment) segment);
+ _updatedSegmentsSinceLastSnapshot.add(segment);
} finally {
_snapshotLock.readLock().unlock();
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index 8881753f42..aee3a778a9 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -32,6 +32,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -57,6 +59,7 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
@@ -104,6 +107,9 @@ public class BasePartitionUpsertMetadataManagerTest {
throws IOException {
UpsertContext upsertContext = mock(UpsertContext.class);
when(upsertContext.isSnapshotEnabled()).thenReturn(true);
+ TableDataManager tdm = mock(TableDataManager.class);
+ when(upsertContext.getTableDataManager()).thenReturn(tdm);
+ when(tdm.getSegmentLock(anyString())).thenReturn(new ReentrantLock());
DummyPartitionUpsertMetadataManager upsertMetadataManager =
new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
@@ -149,11 +155,97 @@ public class BasePartitionUpsertMetadataManagerTest {
assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3);
}
+ @Test
+ public void testSkipTakeSnapshotUponRaceCondition()
+ throws IOException {
+ UpsertContext upsertContext = mock(UpsertContext.class);
+ when(upsertContext.isSnapshotEnabled()).thenReturn(true);
+ TableDataManager tdm = mock(TableDataManager.class);
+ when(upsertContext.getTableDataManager()).thenReturn(tdm);
+ Map<String, Lock> segmentLocks = new HashMap<>();
+ segmentLocks.put("seg01", new ReentrantLock());
+ segmentLocks.put("seg02", new ReentrantLock());
+ segmentLocks.put("seg03", new ReentrantLock());
+ segmentLocks.put("seg04", new ReentrantLock());
+ when(tdm.getSegmentLock(anyString())).thenAnswer(invocation -> {
+ String segmentName = invocation.getArgument(0);
+ return segmentLocks.get(segmentName);
+ });
+ DummyPartitionUpsertMetadataManager upsertMetadataManager =
+ new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
+
+ List<String> segmentsTakenSnapshot = new ArrayList<>();
+
+ File segDir01 = new File(TEMP_DIR, "seg01");
+ ImmutableSegmentImpl seg01 = createImmutableSegment("seg01", segDir01,
segmentsTakenSnapshot);
+ seg01.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3),
null);
+ upsertMetadataManager.addSegment(seg01);
+ // seg01 has a tmp snapshot file, but no snapshot file
+ FileUtils.touch(new File(segDir01,
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp"));
+
+ File segDir02 = new File(TEMP_DIR, "seg02");
+ ImmutableSegmentImpl seg02 = createImmutableSegment("seg02", segDir02,
segmentsTakenSnapshot);
+ seg02.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3, 4,
5), null);
+ upsertMetadataManager.addSegment(seg02);
+ // seg02 has snapshot file, so its snapshot is taken first.
+ FileUtils.touch(new File(segDir02,
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
+
+ File segDir03 = new File(TEMP_DIR, "seg03");
+ ImmutableSegmentImpl seg03 = createImmutableSegment("seg03", segDir03,
segmentsTakenSnapshot);
+ seg03.enableUpsert(upsertMetadataManager, createValidDocIds(3, 4, 7),
null);
+ upsertMetadataManager.addSegment(seg03);
+
+ // The mutable segments will be skipped.
+ MutableSegmentImpl seg04 = mock(MutableSegmentImpl.class);
+ upsertMetadataManager.addRecord(seg04, mock(RecordInfo.class));
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ // seg02 is the only segment with existing snapshot file, so skipping it
should skip other segments w/o snapshots.
+ Lock seg02Lock = segmentLocks.get("seg02");
+ AtomicBoolean seg02Locked = new AtomicBoolean(false);
+ ReentrantLock holderLock = new ReentrantLock();
+ holderLock.lock();
+ executor.submit(() -> {
+ seg02Lock.lock();
+ seg02Locked.set(true);
+ // Block this thread a while to test if snapshots are skipped.
+ holderLock.lock();
+ seg02Lock.unlock();
+ });
+ // Make sure the bg thread has acquired the seg02Lock before testing to
avoid flakiness.
+ TestUtils.waitForCondition(aVoid -> seg02Locked.get(), 1000L, "Failed to
acquire seg02Lock in time");
+ // Since seg02 is skipped, no snapshots would be taken.
+ upsertMetadataManager.doTakeSnapshot();
+ assertEquals(segmentsTakenSnapshot.size(), 0);
+
+ // Unblock the bg thread so that it releases the segmentLock.
+ holderLock.unlock();
+ // Acquire the segmentLock once, in case the bg thread is not running in
time and causes flakiness.
+ seg02Lock.lock();
+ seg02Lock.unlock();
+ // Now the segmentLock can be acquired for sure, and snapshots should be
taken.
+ upsertMetadataManager.doTakeSnapshot();
+ assertEquals(segmentsTakenSnapshot.size(), 3);
+ assertTrue(segDir01.exists());
+ assertEquals(seg01.loadValidDocIdsFromSnapshot().getCardinality(), 4);
+ assertTrue(segDir02.exists());
+ assertEquals(seg02.loadValidDocIdsFromSnapshot().getCardinality(), 6);
+ assertTrue(segDir03.exists());
+ assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3);
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
@Test
public void testTakeSnapshotInOrderBasedOnUpdates()
throws IOException {
UpsertContext upsertContext = mock(UpsertContext.class);
when(upsertContext.isSnapshotEnabled()).thenReturn(true);
+ TableDataManager tdm = mock(TableDataManager.class);
+ when(upsertContext.getTableDataManager()).thenReturn(tdm);
+ when(tdm.getSegmentLock(anyString())).thenReturn(new ReentrantLock());
DummyPartitionUpsertMetadataManager upsertMetadataManager =
new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
@@ -207,6 +299,9 @@ public class BasePartitionUpsertMetadataManagerTest {
throws IOException {
UpsertContext upsertContext = mock(UpsertContext.class);
when(upsertContext.isSnapshotEnabled()).thenReturn(true);
+ TableDataManager tdm = mock(TableDataManager.class);
+ when(upsertContext.getTableDataManager()).thenReturn(tdm);
+ when(tdm.getSegmentLock(anyString())).thenReturn(new ReentrantLock());
DummyPartitionUpsertMetadataManager upsertMetadataManager =
new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]