This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a443eeeaf try to acquire segmentLock before taking segment snapshot 
(#14179)
2a443eeeaf is described below

commit 2a443eeeaf88eff4614e0f6243e92e6c70f9943f
Author: Xiaobing <[email protected]>
AuthorDate: Tue Oct 8 12:39:08 2024 -0700

    try to acquire segmentLock before taking segment snapshot (#14179)
    
    * try to acquire segmentLock before taking segment snapshot
    
    * refine to ensure snapshots kept on disk are disjoint
    
    * fix tests and add tests
---
 .../upsert/BasePartitionUpsertMetadataManager.java | 66 +++++++++++----
 .../BasePartitionUpsertMetadataManagerTest.java    | 95 ++++++++++++++++++++++
 2 files changed, 147 insertions(+), 14 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 72bb861463..e7542b8933 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -40,7 +40,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.helix.HelixManager;
-import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -103,7 +102,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   // Helix threads. Otherwise, segments might be missed by the consuming 
thread when taking snapshots, which takes
   // snapshotLock WLock and clear the tracking set to avoid keeping segment 
object references around.
   // Skip mutableSegments as only immutable segments are for taking snapshots.
-  protected final Set<ImmutableSegment> _updatedSegmentsSinceLastSnapshot = 
ConcurrentHashMap.newKeySet();
+  protected final Set<IndexSegment> _updatedSegmentsSinceLastSnapshot = 
ConcurrentHashMap.newKeySet();
 
   // NOTE: We do not persist snapshot on the first consuming segment because 
most segments might not be loaded yet
   // We only do this for Full-Upsert tables, for partial-upsert tables, we 
have a check allSegmentsLoaded
@@ -879,6 +878,8 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     // segments. Because the valid docs as tracked by the existing validDocIds 
snapshots can only get less. That no
     // overlap of valid docs among segments with snapshots is required by the 
preloading to work correctly.
     Set<ImmutableSegmentImpl> segmentsWithoutSnapshot = new HashSet<>();
+    TableDataManager tableDataManager = _context.getTableDataManager();
+    boolean isSegmentSkipped = false;
     for (IndexSegment segment : _trackedSegments) {
       if (!(segment instanceof ImmutableSegmentImpl)) {
         numConsumingSegments++;
@@ -889,6 +890,25 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         numUnchangedSegments++;
         continue;
       }
+      // Try to acquire the segmentLock when taking snapshot for the segment 
because the segment directory can be
+      // modified, e.g. a new snapshot file can be added to the directory. If 
not taking the lock, the Helix task
+      // thread replacing the segment could fail. For example, we found 
FileUtils.cleanDirectory() failed due to
+      // DirectoryNotEmptyException because a new snapshot file got added into 
the segment directory just between two
+      // major cleanup steps in the cleanDirectory() method.
+      String segmentName = segment.getSegmentName();
+      Lock segmentLock = tableDataManager.getSegmentLock(segmentName);
+      boolean locked = segmentLock.tryLock();
+      if (!locked) {
+        // Try to get the segmentLock in a non-blocking manner to avoid 
deadlock. The Helix task thread takes
+        // segmentLock first and then the snapshot RLock when replacing a 
segment. However, the consuming thread has
+        // already acquired the snapshot WLock when reaching here, and if it 
has to wait for segmentLock, it may
+        // enter deadlock with the Helix task threads waiting for snapshot 
RLock.
+        // If we can't get the segmentLock, we'd better skip taking snapshot 
for this tracked segment, because its
+        // validDocIds might become stale or wrong when the segment is being 
processed by another thread right now.
+        _logger.warn("Could not get segmentLock to take snapshot for segment: 
{}, skipping", segmentName);
+        isSegmentSkipped = true;
+        continue;
+      }
       try {
         ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
         if (!immutableSegment.hasValidDocIdsSnapshotFile()) {
@@ -896,24 +916,42 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
           continue;
         }
         immutableSegment.persistValidDocIdsSnapshot();
+        _updatedSegmentsSinceLastSnapshot.remove(segment);
         numImmutableSegments++;
         numPrimaryKeysInSnapshot += 
immutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
       } catch (Exception e) {
-        _logger.warn("Caught exception while taking snapshot for segment: {}, 
skipping", segment.getSegmentName(), e);
-        Utils.rethrowException(e);
+        _logger.warn("Caught exception while taking snapshot for segment: {}, 
skipping", segmentName, e);
+        isSegmentSkipped = true;
+      } finally {
+        segmentLock.unlock();
       }
     }
-    for (ImmutableSegmentImpl segment : segmentsWithoutSnapshot) {
-      try {
-        segment.persistValidDocIdsSnapshot();
-        numImmutableSegments++;
-        numPrimaryKeysInSnapshot += 
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
-      } catch (Exception e) {
-        _logger.warn("Caught exception while taking snapshot for segment: {}, 
skipping", segment.getSegmentName(), e);
-        Utils.rethrowException(e);
+    // If we have skipped any segments in the previous for-loop, we should 
skip the next for-loop, basically to not
+    // add new snapshot files on disk. This ensures all the validDocIds 
snapshots kept on disk are still disjoint
+    // with each other, although some of them may have become stale, i.e. 
tracking more valid docs than expected.
+    if (!isSegmentSkipped) {
+      for (ImmutableSegmentImpl segment : segmentsWithoutSnapshot) {
+        String segmentName = segment.getSegmentName();
+        Lock segmentLock = tableDataManager.getSegmentLock(segmentName);
+        boolean locked = segmentLock.tryLock();
+        if (!locked) {
+          _logger.warn("Could not get segmentLock to take snapshot for 
segment: {} w/o snapshot, skipping",
+              segmentName);
+          continue;
+        }
+        try {
+          segment.persistValidDocIdsSnapshot();
+          _updatedSegmentsSinceLastSnapshot.remove(segment);
+          numImmutableSegments++;
+          numPrimaryKeysInSnapshot += 
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+        } catch (Exception e) {
+          _logger.warn("Caught exception while taking snapshot for segment: {} 
w/o snapshot, skipping", segmentName, e);
+        } finally {
+          segmentLock.unlock();
+        }
       }
     }
-    _updatedSegmentsSinceLastSnapshot.clear();
+    _updatedSegmentsSinceLastSnapshot.retainAll(_trackedSegments);
     // Persist TTL watermark after taking snapshots if TTL is enabled, so that 
segments out of TTL can be loaded with
     // updated validDocIds bitmaps. If the TTL watermark is persisted first, 
segments out of TTL may get loaded with
     // stale bitmaps or even no bitmap snapshots to use.
@@ -1085,7 +1123,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     if (_enableSnapshot && segment instanceof ImmutableSegment) {
       _snapshotLock.readLock().lock();
       try {
-        _updatedSegmentsSinceLastSnapshot.add((ImmutableSegment) segment);
+        _updatedSegmentsSinceLastSnapshot.add(segment);
       } finally {
         _snapshotLock.readLock().unlock();
       }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index 8881753f42..aee3a778a9 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -32,6 +32,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -57,6 +59,7 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.*;
@@ -104,6 +107,9 @@ public class BasePartitionUpsertMetadataManagerTest {
       throws IOException {
     UpsertContext upsertContext = mock(UpsertContext.class);
     when(upsertContext.isSnapshotEnabled()).thenReturn(true);
+    TableDataManager tdm = mock(TableDataManager.class);
+    when(upsertContext.getTableDataManager()).thenReturn(tdm);
+    when(tdm.getSegmentLock(anyString())).thenReturn(new ReentrantLock());
     DummyPartitionUpsertMetadataManager upsertMetadataManager =
         new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
 
@@ -149,11 +155,97 @@ public class BasePartitionUpsertMetadataManagerTest {
     assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3);
   }
 
+  @Test
+  public void testSkipTakeSnapshotUponRaceCondition()
+      throws IOException {
+    UpsertContext upsertContext = mock(UpsertContext.class);
+    when(upsertContext.isSnapshotEnabled()).thenReturn(true);
+    TableDataManager tdm = mock(TableDataManager.class);
+    when(upsertContext.getTableDataManager()).thenReturn(tdm);
+    Map<String, Lock> segmentLocks = new HashMap<>();
+    segmentLocks.put("seg01", new ReentrantLock());
+    segmentLocks.put("seg02", new ReentrantLock());
+    segmentLocks.put("seg03", new ReentrantLock());
+    segmentLocks.put("seg04", new ReentrantLock());
+    when(tdm.getSegmentLock(anyString())).thenAnswer(invocation -> {
+      String segmentName = invocation.getArgument(0);
+      return segmentLocks.get(segmentName);
+    });
+    DummyPartitionUpsertMetadataManager upsertMetadataManager =
+        new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
+
+    List<String> segmentsTakenSnapshot = new ArrayList<>();
+
+    File segDir01 = new File(TEMP_DIR, "seg01");
+    ImmutableSegmentImpl seg01 = createImmutableSegment("seg01", segDir01, 
segmentsTakenSnapshot);
+    seg01.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3), 
null);
+    upsertMetadataManager.addSegment(seg01);
+    // seg01 has a tmp snapshot file, but no snapshot file
+    FileUtils.touch(new File(segDir01, 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp"));
+
+    File segDir02 = new File(TEMP_DIR, "seg02");
+    ImmutableSegmentImpl seg02 = createImmutableSegment("seg02", segDir02, 
segmentsTakenSnapshot);
+    seg02.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3, 4, 
5), null);
+    upsertMetadataManager.addSegment(seg02);
+    // seg02 has snapshot file, so its snapshot is taken first.
+    FileUtils.touch(new File(segDir02, 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
+
+    File segDir03 = new File(TEMP_DIR, "seg03");
+    ImmutableSegmentImpl seg03 = createImmutableSegment("seg03", segDir03, 
segmentsTakenSnapshot);
+    seg03.enableUpsert(upsertMetadataManager, createValidDocIds(3, 4, 7), 
null);
+    upsertMetadataManager.addSegment(seg03);
+
+    // The mutable segments will be skipped.
+    MutableSegmentImpl seg04 = mock(MutableSegmentImpl.class);
+    upsertMetadataManager.addRecord(seg04, mock(RecordInfo.class));
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    try {
+      // seg02 is the only segment with existing snapshot file, so skipping it 
should skip other segments w/o snapshots.
+      Lock seg02Lock = segmentLocks.get("seg02");
+      AtomicBoolean seg02Locked = new AtomicBoolean(false);
+      ReentrantLock holderLock = new ReentrantLock();
+      holderLock.lock();
+      executor.submit(() -> {
+        seg02Lock.lock();
+        seg02Locked.set(true);
+        // Block this thread a while to test if snapshots are skipped.
+        holderLock.lock();
+        seg02Lock.unlock();
+      });
+      // Make sure the bg thread has acquired the seg02Lock before testing to 
avoid flakiness.
+      TestUtils.waitForCondition(aVoid -> seg02Locked.get(), 1000L, "Failed to 
acquire seg02Lock in time");
+      // Since seg02 is skipped, no snapshots would be taken.
+      upsertMetadataManager.doTakeSnapshot();
+      assertEquals(segmentsTakenSnapshot.size(), 0);
+
+      // Unblock the bg thread so that it releases the segmentLock.
+      holderLock.unlock();
+      // Acquire the segmentLock once, in case the bg thread is not running in 
time and causes flakiness.
+      seg02Lock.lock();
+      seg02Lock.unlock();
+      // Now the segmentLock can be acquired for sure, and snapshots should be 
taken.
+      upsertMetadataManager.doTakeSnapshot();
+      assertEquals(segmentsTakenSnapshot.size(), 3);
+      assertTrue(segDir01.exists());
+      assertEquals(seg01.loadValidDocIdsFromSnapshot().getCardinality(), 4);
+      assertTrue(segDir02.exists());
+      assertEquals(seg02.loadValidDocIdsFromSnapshot().getCardinality(), 6);
+      assertTrue(segDir03.exists());
+      assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3);
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
   @Test
   public void testTakeSnapshotInOrderBasedOnUpdates()
       throws IOException {
     UpsertContext upsertContext = mock(UpsertContext.class);
     when(upsertContext.isSnapshotEnabled()).thenReturn(true);
+    TableDataManager tdm = mock(TableDataManager.class);
+    when(upsertContext.getTableDataManager()).thenReturn(tdm);
+    when(tdm.getSegmentLock(anyString())).thenReturn(new ReentrantLock());
     DummyPartitionUpsertMetadataManager upsertMetadataManager =
         new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
 
@@ -207,6 +299,9 @@ public class BasePartitionUpsertMetadataManagerTest {
       throws IOException {
     UpsertContext upsertContext = mock(UpsertContext.class);
     when(upsertContext.isSnapshotEnabled()).thenReturn(true);
+    TableDataManager tdm = mock(TableDataManager.class);
+    when(upsertContext.getTableDataManager()).thenReturn(tdm);
+    when(tdm.getSegmentLock(anyString())).thenReturn(new ReentrantLock());
     DummyPartitionUpsertMetadataManager upsertMetadataManager =
         new DummyPartitionUpsertMetadataManager("myTable", 0, upsertContext);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to