This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d7c76b9fbb no need for segment locks during segment preloading (#12077)
d7c76b9fbb is described below

commit d7c76b9fbb5a659da0da6276c6d9833ba1894e0d
Author: Xiaobing <[email protected]>
AuthorDate: Thu Nov 30 14:14:54 2023 -0800

    no need for segment locks during segment preloading (#12077)
---
 .../upsert/BasePartitionUpsertMetadataManager.java | 38 +++++++++++-----------
 .../upsert/BaseTableUpsertMetadataManager.java     | 19 +++++------
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  4 +--
 3 files changed, 29 insertions(+), 32 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 0e54d399bf..3e1fd4e178 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -281,7 +281,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
 
     try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
         _comparisonColumns, _deleteRecordColumn)) {
-      addSegment(segment, null, null, 
UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds), true);
+      addSegmentUnsafe(segment, null, null, 
UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds), true);
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Caught exception while preloading segment: %s, table: 
%s", segmentName, _tableNameWithType),
@@ -303,33 +303,33 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   @VisibleForTesting
   public void addSegment(ImmutableSegmentImpl segment, @Nullable 
ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, 
Iterator<RecordInfo> recordInfoIterator) {
-    addSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, 
false);
-  }
-
-  @VisibleForTesting
-  public void addSegment(ImmutableSegmentImpl segment, @Nullable 
ThreadSafeMutableRoaringBitmap validDocIds,
-      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, 
Iterator<RecordInfo> recordInfoIterator,
-      boolean isPreloading) {
     String segmentName = segment.getSegmentName();
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, 
segmentName);
     segmentLock.lock();
     try {
-      if (validDocIds == null) {
-        validDocIds = new ThreadSafeMutableRoaringBitmap();
-      }
-      if (queryableDocIds == null && _deleteRecordColumn != null) {
-        queryableDocIds = new ThreadSafeMutableRoaringBitmap();
-      }
-      if (isPreloading) {
-        addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, 
recordInfoIterator);
-      } else {
-        addOrReplaceSegment(segment, validDocIds, queryableDocIds, 
recordInfoIterator, null, null);
-      }
+      addSegmentUnsafe(segment, validDocIds, queryableDocIds, 
recordInfoIterator, false);
     } finally {
       segmentLock.unlock();
     }
   }
 
+  @VisibleForTesting
+  void addSegmentUnsafe(ImmutableSegmentImpl segment, @Nullable 
ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, 
Iterator<RecordInfo> recordInfoIterator,
+      boolean isPreloading) {
+    if (validDocIds == null) {
+      validDocIds = new ThreadSafeMutableRoaringBitmap();
+    }
+    if (queryableDocIds == null && _deleteRecordColumn != null) {
+      queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+    }
+    if (isPreloading) {
+      addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, 
recordInfoIterator);
+    } else {
+      addOrReplaceSegment(segment, validDocIds, queryableDocIds, 
recordInfoIterator, null, null);
+    }
+  }
+
   protected abstract long getNumPrimaryKeys();
 
   protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index bc783480b9..6352bc7fcb 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections.CollectionUtils;
@@ -41,7 +40,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
@@ -255,15 +253,14 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
   @VisibleForTesting
   void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
       SegmentZKMetadata zkMetadata) {
-    // This method might modify the file on disk. Use segment lock to prevent 
race condition
-    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, 
segmentName);
-    try {
-      segmentLock.lock();
-      // This method checks segment crc and if it has changed, the segment is 
not loaded.
-      _tableDataManager.tryLoadExistingSegment(segmentName, 
indexLoadingConfig, zkMetadata);
-    } finally {
-      segmentLock.unlock();
-    }
+    // This method checks segment crc and if it has changed, the segment is 
not loaded. It might modify the file on
+    // disk, but we don't need to take the segmentLock, because every segment 
from the current table is processed by
+    // at most one thread from the preloading thread pool. HelixTaskExecutor 
task threads about to process segments
+    // from the same table are blocked on ConcurrentHashMap lock as in 
HelixInstanceDataManager.addRealtimeSegment().
+    // In fact, taking segmentLock during segment preloading phase could cause 
deadlock when HelixTaskExecutor
+    // threads processing other tables have taken the same segmentLock as 
decided by the hash of table name and
+    // segment name, i.e. due to hash collision.
+    _tableDataManager.tryLoadExistingSegment(segmentName, indexLoadingConfig, 
zkMetadata);
   }
 
   private File getValidDocIdsSnapshotFile(String segmentName, String 
segmentTier) {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index b212a8b918..deb783c3e3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -834,7 +834,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     ImmutableSegmentImpl segment1 =
         mockImmutableSegment(1, validDocIds1, null, 
getPrimaryKeyList(numRecords, primaryKeys));
     // Preloading segment adds the segment without checking for upsert.
-    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+    upsertMetadataManager.addSegmentUnsafe(segment1, validDocIds1, null,
         getRecordInfoList(numRecords, primaryKeys, timestamps, 
null).iterator(), true);
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
@@ -851,7 +851,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment2 =
         mockImmutableSegment(2, validDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys));
-    upsertMetadataManager.addSegment(segment2, validDocIds2, null,
+    upsertMetadataManager.addSegmentUnsafe(segment2, validDocIds2, null,
         getRecordInfoList(numRecords, primaryKeys, timestamps, 
null).iterator(), true);
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to