This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c4e03e3697 Extract common utils used to preload segments for future 
reuse (#14161)
c4e03e3697 is described below

commit c4e03e369789251f62436cefdc30445e6435429c
Author: Xiaobing <[email protected]>
AuthorDate: Thu Oct 3 19:10:28 2024 -0700

    Extract common utils used to preload segments for future reuse (#14161)
    
    * extract common utils for preloading to reuse for dedup table
    
    * minor refines
---
 .../upsert/BasePartitionUpsertMetadataManager.java | 148 +++-------------
 .../segment/local/utils/SegmentPreloadUtils.java   | 194 +++++++++++++++++++++
 .../BasePartitionUpsertMetadataManagerTest.java    | 111 +-----------
 .../local/utils/SegmentPreloadUtilsTest.java       | 150 ++++++++++++++++
 4 files changed, 370 insertions(+), 233 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 0c72f26114..72bb861463 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AtomicDouble;
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -33,7 +32,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -42,18 +40,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.helix.HelixManager;
-import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.Utils;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.SegmentUtils;
 import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
-import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
@@ -61,20 +54,19 @@ import 
org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
 import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.local.utils.SegmentPreloadUtils;
 import org.apache.pinot.segment.local.utils.WatermarkUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.V1Constants;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.BooleanUtils;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.slf4j.Logger;
@@ -247,10 +239,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       _serverMetrics.addTimedTableValue(_tableNameWithType, 
ServerTimer.UPSERT_PRELOAD_TIME_MS, duration,
           TimeUnit.MILLISECONDS);
     } catch (Exception e) {
-      // Even if preloading fails, we should continue to complete the 
initialization, so that TableDataManager can be
-      // created. Once TableDataManager is created, no more segment preloading 
would happen, and the normal segment
-      // loading logic would be used. The segments not being preloaded 
successfully here would be loaded via the
-      // normal segment loading logic, the one doing more costly checks on the 
upsert metadata.
+      // We should continue even if preloading fails, so that segments not 
being preloaded successfully can get
+      // loaded via the normal segment loading logic as done on the Helix task 
threads although with more costly
+      // checks on the upsert metadata.
       _logger.warn("Failed to preload segments from partition: {} of table: 
{}, skipping", _partitionId,
           _tableNameWithType, e);
       _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.UPSERT_PRELOAD_FAILURE, 1);
@@ -264,115 +255,20 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
+  // Keep this hook method for subclasses to extend the preloading logic as 
needed.
   protected void doPreloadSegments(TableDataManager tableDataManager, 
IndexLoadingConfig indexLoadingConfig,
       HelixManager helixManager, ExecutorService segmentPreloadExecutor)
       throws Exception {
-    _logger.info("Preload segments from partition: {} of table: {} for fast 
upsert metadata recovery", _partitionId,
-        _tableNameWithType);
-    String instanceId = getInstanceId(tableDataManager);
-    Map<String, Map<String, String>> segmentAssignment = 
getSegmentAssignment(helixManager);
-    Map<String, SegmentZKMetadata> segmentMetadataMap = 
getSegmentsZKMetadata(helixManager);
-    List<Future<?>> futures = new ArrayList<>();
-    for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
-      String segmentName = entry.getKey();
-      Map<String, String> instanceStateMap = entry.getValue();
-      String state = instanceStateMap.get(instanceId);
-      if 
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
-        if (state == null) {
-          _logger.debug("Skip segment: {} as it's not assigned to instance: 
{}", segmentName, instanceId);
-        } else {
-          _logger.info("Skip segment: {} as its ideal state: {} is not ONLINE 
for instance: {}", segmentName, state,
-              instanceId);
-        }
-        continue;
-      }
-      SegmentZKMetadata segmentZKMetadata = 
segmentMetadataMap.get(segmentName);
-      Preconditions.checkState(segmentZKMetadata != null, "Failed to find ZK 
metadata for segment: %s, table: %s",
-          segmentName, _tableNameWithType);
-      Integer partitionId = 
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata, 
null);
-      Preconditions.checkNotNull(partitionId,
-          String.format("Failed to get partition id for segment: %s 
(upsert-enabled table: %s)", segmentName,
-              _tableNameWithType));
-      if (partitionId != _partitionId) {
-        _logger.debug("Skip segment: {} as its partition: {} is different from 
the requested partition: {}",
-            segmentName, partitionId, _partitionId);
-        continue;
-      }
-      if (!hasValidDocIdsSnapshot(tableDataManager, 
indexLoadingConfig.getTableConfig(), segmentName,
-          segmentZKMetadata.getTier())) {
-        _logger.info("Skip segment: {} from partition: {} as no validDocIds 
snapshot exists", segmentName,
-            _partitionId);
-        continue;
-      }
-      futures.add(segmentPreloadExecutor.submit(
-          () -> doPreloadSegmentWithSnapshot(tableDataManager, segmentName, 
indexLoadingConfig, segmentZKMetadata)));
-    }
-    try {
-      for (Future<?> f : futures) {
-        f.get();
-      }
-    } finally {
-      for (Future<?> f : futures) {
-        if (!f.isDone()) {
-          f.cancel(true);
-        }
-      }
-    }
-    _logger.info("Preloaded {} segments from partition: {} of table: {} for 
fast upsert metadata recovery",
-        futures.size(), _partitionId, _tableNameWithType);
-  }
-
-  private String getInstanceId(TableDataManager tableDataManager) {
-    return tableDataManager.getInstanceDataManagerConfig().getInstanceId();
-  }
-
-  private static boolean hasValidDocIdsSnapshot(TableDataManager 
tableDataManager, TableConfig tableConfig,
-      String segmentName, String segmentTier) {
-    try {
-      File indexDir = tableDataManager.getSegmentDataDir(segmentName, 
segmentTier, tableConfig);
-      File snapshotFile =
-          new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir), 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
-      return snapshotFile.exists();
-    } catch (Exception e) {
-      return false;
-    }
-  }
-
-  @VisibleForTesting
-  Map<String, Map<String, String>> getSegmentAssignment(HelixManager 
helixManager) {
-    IdealState idealState = HelixHelper.getTableIdealState(helixManager, 
_tableNameWithType);
-    Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", _tableNameWithType);
-    return idealState.getRecord().getMapFields();
-  }
-
-  @VisibleForTesting
-  Map<String, SegmentZKMetadata> getSegmentsZKMetadata(HelixManager 
helixManager) {
-    Map<String, SegmentZKMetadata> segmentMetadataMap = new HashMap<>();
-    
ZKMetadataProvider.getSegmentsZKMetadata(helixManager.getHelixPropertyStore(), 
_tableNameWithType)
-        .forEach(m -> segmentMetadataMap.put(m.getSegmentName(), m));
-    return segmentMetadataMap;
-  }
-
-  @VisibleForTesting
-  void doPreloadSegmentWithSnapshot(TableDataManager tableDataManager, String 
segmentName,
-      IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata 
segmentZKMetadata) {
-    try {
-      _logger.info("Preload segment: {} from partition: {} of table: {}", 
segmentName, _partitionId,
-          _tableNameWithType);
-      // This method checks segment crc and if it has changed, the segment is 
not loaded. It might modify the
-      // file on disk, but we don't need to take the segmentLock, because 
every segment from the current table is
-      // processed by at most one thread from the preloading thread pool. 
HelixTaskExecutor task threads about to
-      // process segments from the same table are blocked on _preloadLock.
-      // In fact, taking segmentLock during segment preloading phase could 
cause deadlock when HelixTaskExecutor
-      // threads processing other tables have taken the same segmentLock as 
decided by the hash of table name and
-      // segment name, i.e. due to hash collision.
-      tableDataManager.tryLoadExistingSegment(segmentZKMetadata, 
indexLoadingConfig);
-      _logger.info("Preloaded segment: {} from partition: {} of table: {}", 
segmentName, _partitionId,
-          _tableNameWithType);
-    } catch (Exception e) {
-      _logger.warn("Failed to preload segment: {} from partition: {} of table: 
{}, skipping", segmentName, _partitionId,
-          _tableNameWithType, e);
-    }
+    TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+    SegmentPreloadUtils.preloadSegments(tableDataManager, _partitionId, 
indexLoadingConfig, helixManager,
+        segmentPreloadExecutor, (segmentName, segmentZKMetadata) -> {
+          String tier = segmentZKMetadata.getTier();
+          if (SegmentPreloadUtils.hasValidDocIdsSnapshot(tableDataManager, 
tableConfig, segmentName, tier)) {
+            return true;
+          }
+          _logger.info("Skip segment: {} on tier: {} as it has no validDocIds 
snapshot", segmentName, tier);
+          return false;
+        });
   }
 
   @Override
@@ -821,19 +717,15 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       _logger.info("Skip removing segment: {} because metadata manager is 
already stopped", segmentName);
       return;
     }
-    // Skip removing the upsert metadata of segment that has max comparison 
value smaller than
-    // (largestSeenComparisonValue - TTL), i.e. out of metadata TTL. The 
expired metadata is removed while creating
-    // new consuming segment in batches.
-    boolean skipRemoveMetadata = false;
-    if (isOutOfMetadataTTL(segment)) {
-      _logger.info("Skip removing segment: {} because it's out of TTL", 
segmentName);
-      skipRemoveMetadata = true;
-    }
     if (_enableSnapshot) {
       _snapshotLock.readLock().lock();
     }
     try {
-      if (!skipRemoveMetadata) {
+      // Skip removing the upsert metadata of segment that is out of metadata 
TTL. The expired metadata is removed
+      // while creating new consuming segment in batches.
+      if (isOutOfMetadataTTL(segment)) {
+        _logger.info("Skip removing segment: {} because it's out of TTL", 
segmentName);
+      } else {
         doRemoveSegment(segment);
       }
       _trackedSegments.remove(segment);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtils.java
new file mode 100644
index 0000000000..41aa2eb339
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtils.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.function.BiPredicate;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentPreloadUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentPreloadUtils.class);
+
+  private SegmentPreloadUtils() {
+  }
+
+  public static void preloadSegments(TableDataManager tableDataManager, int 
partitionId,
+      IndexLoadingConfig indexLoadingConfig, HelixManager helixManager, 
ExecutorService segmentPreloadExecutor,
+      @Nullable BiPredicate<String, SegmentZKMetadata> segmentSelector)
+      throws Exception {
+    String tableNameWithType = tableDataManager.getTableName();
+    LOGGER.info("Preload segments from partition: {} of table: {} for fast 
metadata recovery", partitionId,
+        tableNameWithType);
+    Map<String, Map<String, String>> segmentAssignment = 
getSegmentAssignment(tableNameWithType, helixManager);
+    Map<String, SegmentZKMetadata> segmentMetadataMap = 
getSegmentsZKMetadata(tableNameWithType, helixManager);
+    List<String> preloadedSegments =
+        doPreloadSegments(tableDataManager, partitionId, indexLoadingConfig, 
segmentAssignment, segmentMetadataMap,
+            segmentPreloadExecutor, segmentSelector);
+    LOGGER.info("Preloaded {} segments from partition: {} of table: {} for 
fast metadata recovery",
+        preloadedSegments.size(), partitionId, tableNameWithType);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Preloaded segments: {}", preloadedSegments);
+    }
+  }
+
+  @VisibleForTesting
+  static List<String> doPreloadSegments(TableDataManager tableDataManager, int 
partitionId,
+      IndexLoadingConfig indexLoadingConfig, Map<String, Map<String, String>> 
segmentAssignment,
+      Map<String, SegmentZKMetadata> segmentMetadataMap, ExecutorService 
segmentPreloadExecutor,
+      @Nullable BiPredicate<String, SegmentZKMetadata> segmentSelector)
+      throws ExecutionException, InterruptedException {
+    String tableNameWithType = tableDataManager.getTableName();
+    String instanceId = getInstanceId(tableDataManager);
+    List<String> preloadedSegments = new ArrayList<>();
+    List<Future<?>> futures = new ArrayList<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
+      if (!isSegmentOnlineOnRequestedInstance(segmentName, instanceId, 
instanceStateMap)) {
+        continue;
+      }
+      SegmentZKMetadata segmentZKMetadata = 
segmentMetadataMap.get(segmentName);
+      if (!isSegmentFromRequestedPartition(segmentName, tableNameWithType, 
partitionId, segmentZKMetadata)) {
+        continue;
+      }
+      if (segmentSelector != null && !segmentSelector.test(segmentName, 
segmentZKMetadata)) {
+        continue;
+      }
+      futures.add(segmentPreloadExecutor.submit(
+          () -> preloadSegment(segmentName, tableDataManager, partitionId, 
indexLoadingConfig, segmentZKMetadata)));
+      preloadedSegments.add(segmentName);
+    }
+    waitForSegmentsPreloaded(futures);
+    return preloadedSegments;
+  }
+
+  private static void preloadSegment(String segmentName, TableDataManager 
tableDataManager, int partitionId,
+      IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata 
segmentZKMetadata) {
+    String tableNameWithType = tableDataManager.getTableName();
+    try {
+      LOGGER.info("Preload segment: {} from partition: {} of table: {}", 
segmentName, partitionId, tableNameWithType);
+      // This method checks segment crc, and the segment is not loaded if the 
crc has changed.
+      tableDataManager.tryLoadExistingSegment(segmentZKMetadata, 
indexLoadingConfig);
+      LOGGER.info("Preloaded segment: {} from partition: {} of table: {}", 
segmentName, partitionId, tableNameWithType);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to preload segment: {} from partition: {} of table: 
{}, skipping", segmentName, partitionId,
+          tableNameWithType, e);
+    }
+  }
+
+  private static void waitForSegmentsPreloaded(List<Future<?>> futures)
+      throws ExecutionException, InterruptedException {
+    try {
+      for (Future<?> f : futures) {
+        f.get();
+      }
+    } finally {
+      for (Future<?> f : futures) {
+        if (!f.isDone()) {
+          f.cancel(true);
+        }
+      }
+    }
+  }
+
+  public static boolean hasValidDocIdsSnapshot(TableDataManager 
tableDataManager, TableConfig tableConfig,
+      String segmentName, String segmentTier) {
+    try {
+      File indexDir = tableDataManager.getSegmentDataDir(segmentName, 
segmentTier, tableConfig);
+      File snapshotFile =
+          new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir), 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
+      return snapshotFile.exists();
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  private static boolean isSegmentOnlineOnRequestedInstance(String 
segmentName, String instanceId,
+      Map<String, String> instanceStateMap) {
+    String state = instanceStateMap.get(instanceId);
+    if 
(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+      return true;
+    }
+    if (state == null) {
+      LOGGER.debug("Skip segment: {} as it's not assigned to instance: {}", 
segmentName, instanceId);
+    } else {
+      LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE for 
instance: {}", segmentName, state,
+          instanceId);
+    }
+    return false;
+  }
+
+  private static boolean isSegmentFromRequestedPartition(String segmentName, 
String tableNameWithType,
+      int requestedPartitionId, SegmentZKMetadata segmentZKMetadata) {
+    Preconditions.checkState(segmentZKMetadata != null, "Failed to find ZK 
metadata for segment: %s, table: %s",
+        segmentName, tableNameWithType);
+    Integer partitionId = 
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata, 
null);
+    Preconditions.checkNotNull(partitionId,
+        String.format("Failed to get partition id for segment: %s from table: 
%s", segmentName, tableNameWithType));
+    if (partitionId == requestedPartitionId) {
+      return true;
+    }
+    LOGGER.debug("Skip segment: {} as its partition: {} is different from the 
requested partition: {}", segmentName,
+        partitionId, requestedPartitionId);
+    return false;
+  }
+
+  private static String getInstanceId(TableDataManager tableDataManager) {
+    return tableDataManager.getInstanceDataManagerConfig().getInstanceId();
+  }
+
+  private static Map<String, Map<String, String>> getSegmentAssignment(String 
tableNameWithType,
+      HelixManager helixManager) {
+    IdealState idealState = HelixHelper.getTableIdealState(helixManager, 
tableNameWithType);
+    Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", tableNameWithType);
+    return idealState.getRecord().getMapFields();
+  }
+
+  private static Map<String, SegmentZKMetadata> getSegmentsZKMetadata(String 
tableNameWithType,
+      HelixManager helixManager) {
+    Map<String, SegmentZKMetadata> segmentMetadataMap = new HashMap<>();
+    
ZKMetadataProvider.getSegmentsZKMetadata(helixManager.getHelixPropertyStore(), 
tableNameWithType)
+        .forEach(m -> segmentMetadataMap.put(m.getSegmentName(), m));
+    return segmentMetadataMap;
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index acb1eebe6d..8881753f42 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -18,18 +18,14 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
-import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -38,10 +34,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
-import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
@@ -54,14 +46,11 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.AfterMethod;
@@ -91,110 +80,22 @@ public class BasePartitionUpsertMetadataManagerTest {
 
   @Test
   public void testPreloadSegments()
-      throws Exception {
+      throws IOException {
     String realtimeTableName = "testTable_REALTIME";
-    String instanceId = "server01";
-    Map<String, Map<String, String>> segmentAssignment = new HashMap<>();
-    Map<String, SegmentZKMetadata> segmentMetadataMap = new HashMap<>();
-    Set<String> preloadedSegments = new HashSet<>();
-    AtomicBoolean wasPreloading = new AtomicBoolean(false);
-    TableDataManager tableDataManager = mock(TableDataManager.class);
     UpsertContext upsertContext = mock(UpsertContext.class);
     when(upsertContext.isSnapshotEnabled()).thenReturn(true);
     when(upsertContext.isPreloadEnabled()).thenReturn(true);
+    TableDataManager tableDataManager = mock(TableDataManager.class);
     when(upsertContext.getTableDataManager()).thenReturn(tableDataManager);
-    DummyPartitionUpsertMetadataManager upsertMetadataManager =
-        new DummyPartitionUpsertMetadataManager(realtimeTableName, 0, 
upsertContext) {
-
-          @Override
-          Map<String, Map<String, String>> getSegmentAssignment(HelixManager 
helixManager) {
-            return segmentAssignment;
-          }
-
-          @Override
-          Map<String, SegmentZKMetadata> getSegmentsZKMetadata(HelixManager 
helixManager) {
-            return segmentMetadataMap;
-          }
-
-          @Override
-          void doPreloadSegmentWithSnapshot(TableDataManager tableDataManager, 
String segmentName,
-              IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata 
segmentZKMetadata) {
-            wasPreloading.set(isPreloading());
-            preloadedSegments.add(segmentName);
-          }
-        };
-
-    // Setup mocks for TableConfig and Schema.
-    TableConfig tableConfig = mock(TableConfig.class);
-    UpsertConfig upsertConfig = new UpsertConfig();
-    upsertConfig.setComparisonColumn("ts");
-    upsertConfig.setEnablePreload(true);
-    upsertConfig.setEnableSnapshot(true);
-    when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
-    when(tableConfig.getTableName()).thenReturn(realtimeTableName);
-    Schema schema = mock(Schema.class);
-    
when(schema.getPrimaryKeyColumns()).thenReturn(Collections.singletonList("pk"));
     IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class);
-    when(indexLoadingConfig.getTableConfig()).thenReturn(tableConfig);
-
-    // Setup mocks for HelixManager.
-    HelixManager helixManager = mock(HelixManager.class);
-    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
-    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-
-    // Setup segment assignment. Only ONLINE segments are preloaded.
-    segmentAssignment.put("consuming_seg01", ImmutableMap.of(instanceId, 
"CONSUMING"));
-    segmentAssignment.put("consuming_seg02", ImmutableMap.of(instanceId, 
"CONSUMING"));
-    segmentAssignment.put("offline_seg01", ImmutableMap.of(instanceId, 
"OFFLINE"));
-    segmentAssignment.put("offline_seg02", ImmutableMap.of(instanceId, 
"OFFLINE"));
-    String seg01Name = "testTable__0__1__" + System.currentTimeMillis();
-    segmentAssignment.put(seg01Name, ImmutableMap.of(instanceId, "ONLINE"));
-    String seg02Name = "testTable__0__2__" + System.currentTimeMillis();
-    segmentAssignment.put(seg02Name, ImmutableMap.of(instanceId, "ONLINE"));
-    // This segment is skipped as it's not from partition 0.
-    String seg03Name = "testTable__1__3__" + System.currentTimeMillis();
-    segmentAssignment.put(seg03Name, ImmutableMap.of(instanceId, "ONLINE"));
-
-    SegmentZKMetadata zkMetadata = new SegmentZKMetadata(seg01Name);
-    zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
-    segmentMetadataMap.put(seg01Name, zkMetadata);
-    zkMetadata = new SegmentZKMetadata(seg02Name);
-    zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
-    segmentMetadataMap.put(seg02Name, zkMetadata);
-    zkMetadata = new SegmentZKMetadata(seg03Name);
-    zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
-    segmentMetadataMap.put(seg03Name, zkMetadata);
-
-    // Setup mocks to get file path to validDocIds snapshot.
-    ExecutorService segmentPreloadExecutor = Executors.newFixedThreadPool(1);
-    File tableDataDir = new File(TEMP_DIR, realtimeTableName);
-    when(tableDataManager.getHelixManager()).thenReturn(helixManager);
-    
when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(segmentPreloadExecutor);
-    when(tableDataManager.getTableDataDir()).thenReturn(tableDataDir);
-    InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
-    when(instanceDataManagerConfig.getInstanceId()).thenReturn(instanceId);
-    
when(tableDataManager.getInstanceDataManagerConfig()).thenReturn(instanceDataManagerConfig);
-
-    // No snapshot file for seg01, so it's skipped.
-    File seg01IdxDir = new File(tableDataDir, seg01Name);
-    FileUtils.forceMkdir(seg01IdxDir);
-    when(tableDataManager.getSegmentDataDir(seg01Name, null, 
tableConfig)).thenReturn(seg01IdxDir);
-
-    File seg02IdxDir = new File(tableDataDir, seg02Name);
-    FileUtils.forceMkdir(seg02IdxDir);
-    FileUtils.touch(new File(new File(seg02IdxDir, "v3"), 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
-    when(tableDataManager.getSegmentDataDir(seg02Name, null, 
tableConfig)).thenReturn(seg02IdxDir);
+    
when(indexLoadingConfig.getTableConfig()).thenReturn(mock(TableConfig.class));
 
-    try {
-      // If preloading is enabled, the _isPreloading flag is true initially, 
until preloading is done.
+    try (DummyPartitionUpsertMetadataManager upsertMetadataManager = new 
DummyPartitionUpsertMetadataManager(
+        realtimeTableName, 0, upsertContext)) {
       assertTrue(upsertMetadataManager.isPreloading());
       upsertMetadataManager.preloadSegments(indexLoadingConfig);
-      assertEquals(preloadedSegments.size(), 1);
-      assertTrue(preloadedSegments.contains(seg02Name));
-      assertTrue(wasPreloading.get());
       assertFalse(upsertMetadataManager.isPreloading());
-    } finally {
-      segmentPreloadExecutor.shutdownNow();
+      upsertMetadataManager.stop();
     }
   }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtilsTest.java
new file mode 100644
index 0000000000..205440f20c
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtilsTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class SegmentPreloadUtilsTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"SegmentPreloadUtilsTest");
+
+  @BeforeMethod
+  public void setUp()
+      throws IOException {
+    FileUtils.forceMkdir(TEMP_DIR);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws IOException {
+    FileUtils.forceDelete(TEMP_DIR);
+  }
+
+  @Test
+  public void testPreloadSegments()
+      throws Exception {
+    String realtimeTableName = "testTable_REALTIME";
+    String instanceId = "server01";
+    Map<String, Map<String, String>> segmentAssignment = new HashMap<>();
+    Map<String, SegmentZKMetadata> segmentMetadataMap = new HashMap<>();
+    TableDataManager tableDataManager = mock(TableDataManager.class);
+
+    // Setup mocks for TableConfig and Schema.
+    TableConfig tableConfig = mock(TableConfig.class);
+    UpsertConfig upsertConfig = new UpsertConfig();
+    upsertConfig.setComparisonColumn("ts");
+    upsertConfig.setEnablePreload(true);
+    upsertConfig.setEnableSnapshot(true);
+    when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
+    when(tableConfig.getTableName()).thenReturn(realtimeTableName);
+    Schema schema = mock(Schema.class);
+    
when(schema.getPrimaryKeyColumns()).thenReturn(Collections.singletonList("pk"));
+    IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class);
+    when(indexLoadingConfig.getTableConfig()).thenReturn(tableConfig);
+
+    // Setup mocks for HelixManager.
+    HelixManager helixManager = mock(HelixManager.class);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+
+    // Setup segment assignment. Only ONLINE segments are preloaded.
+    segmentAssignment.put("consuming_seg01", ImmutableMap.of(instanceId, 
"CONSUMING"));
+    segmentAssignment.put("consuming_seg02", ImmutableMap.of(instanceId, 
"CONSUMING"));
+    segmentAssignment.put("offline_seg01", ImmutableMap.of(instanceId, 
"OFFLINE"));
+    segmentAssignment.put("offline_seg02", ImmutableMap.of(instanceId, 
"OFFLINE"));
+    String seg01Name = "testTable__0__1__" + System.currentTimeMillis();
+    segmentAssignment.put(seg01Name, ImmutableMap.of(instanceId, "ONLINE"));
+    String seg02Name = "testTable__0__2__" + System.currentTimeMillis();
+    segmentAssignment.put(seg02Name, ImmutableMap.of(instanceId, "ONLINE"));
+    // This segment is skipped as it's not from partition 0.
+    String seg03Name = "testTable__1__3__" + System.currentTimeMillis();
+    segmentAssignment.put(seg03Name, ImmutableMap.of(instanceId, "ONLINE"));
+
+    SegmentZKMetadata zkMetadata = new SegmentZKMetadata(seg01Name);
+    zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    segmentMetadataMap.put(seg01Name, zkMetadata);
+    zkMetadata = new SegmentZKMetadata(seg02Name);
+    zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    segmentMetadataMap.put(seg02Name, zkMetadata);
+    zkMetadata = new SegmentZKMetadata(seg03Name);
+    zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    segmentMetadataMap.put(seg03Name, zkMetadata);
+
+    // Setup mocks to get file path to validDocIds snapshot.
+    ExecutorService segmentPreloadExecutor = Executors.newFixedThreadPool(1);
+    File tableDataDir = new File(TEMP_DIR, realtimeTableName);
+    when(tableDataManager.getHelixManager()).thenReturn(helixManager);
+    
when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(segmentPreloadExecutor);
+    when(tableDataManager.getTableDataDir()).thenReturn(tableDataDir);
+    InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
+    when(instanceDataManagerConfig.getInstanceId()).thenReturn(instanceId);
+    
when(tableDataManager.getInstanceDataManagerConfig()).thenReturn(instanceDataManagerConfig);
+
+    // No snapshot file for seg01, so it's skipped.
+    File seg01IdxDir = new File(tableDataDir, seg01Name);
+    FileUtils.forceMkdir(seg01IdxDir);
+    when(tableDataManager.getSegmentDataDir(seg01Name, null, 
tableConfig)).thenReturn(seg01IdxDir);
+
+    File seg02IdxDir = new File(tableDataDir, seg02Name);
+    FileUtils.forceMkdir(seg02IdxDir);
+    FileUtils.touch(new File(new File(seg02IdxDir, "v3"), 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
+    when(tableDataManager.getSegmentDataDir(seg02Name, null, 
tableConfig)).thenReturn(seg02IdxDir);
+
+    try {
+      List<String> preloadedSegments =
+          SegmentPreloadUtils.doPreloadSegments(tableDataManager, 0, 
indexLoadingConfig, segmentAssignment,
+              segmentMetadataMap, segmentPreloadExecutor,
+              (segmentName, segmentZKMetadata) -> 
SegmentPreloadUtils.hasValidDocIdsSnapshot(tableDataManager,
+                  tableConfig, segmentName, segmentZKMetadata.getTier()));
+      assertEquals(preloadedSegments.size(), 1);
+      assertTrue(preloadedSegments.contains(seg02Name));
+    } finally {
+      segmentPreloadExecutor.shutdownNow();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to