This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c4e03e3697 Extract common utils used to preload segments for future
reuse (#14161)
c4e03e3697 is described below
commit c4e03e369789251f62436cefdc30445e6435429c
Author: Xiaobing <[email protected]>
AuthorDate: Thu Oct 3 19:10:28 2024 -0700
Extract common utils used to preload segments for future reuse (#14161)
* extract common utils for preloading to reuse for dedup table
* minor refines
---
.../upsert/BasePartitionUpsertMetadataManager.java | 148 +++-------------
.../segment/local/utils/SegmentPreloadUtils.java | 194 +++++++++++++++++++++
.../BasePartitionUpsertMetadataManagerTest.java | 111 +-----------
.../local/utils/SegmentPreloadUtilsTest.java | 150 ++++++++++++++++
4 files changed, 370 insertions(+), 233 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 0c72f26114..72bb861463 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -33,7 +32,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -42,18 +40,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.helix.HelixManager;
-import org.apache.helix.model.IdealState;
import org.apache.pinot.common.Utils;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
-import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
@@ -61,20 +54,19 @@ import
org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.local.utils.SegmentPreloadUtils;
import org.apache.pinot.segment.local.utils.WatermarkUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.V1Constants;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.BooleanUtils;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
@@ -247,10 +239,9 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
_serverMetrics.addTimedTableValue(_tableNameWithType,
ServerTimer.UPSERT_PRELOAD_TIME_MS, duration,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
- // Even if preloading fails, we should continue to complete the
initialization, so that TableDataManager can be
- // created. Once TableDataManager is created, no more segment preloading
would happen, and the normal segment
- // loading logic would be used. The segments not being preloaded
successfully here would be loaded via the
- // normal segment loading logic, the one doing more costly checks on the
upsert metadata.
+ // We should continue even if preloading fails, so that segments not
being preloaded successfully can get
+ // loaded via the normal segment loading logic as done on the Helix task
threads although with more costly
+ // checks on the upsert metadata.
_logger.warn("Failed to preload segments from partition: {} of table:
{}, skipping", _partitionId,
_tableNameWithType, e);
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.UPSERT_PRELOAD_FAILURE, 1);
@@ -264,115 +255,20 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
+ // Keep this hook method for subclasses to extend the preloading logic as
needed.
protected void doPreloadSegments(TableDataManager tableDataManager,
IndexLoadingConfig indexLoadingConfig,
HelixManager helixManager, ExecutorService segmentPreloadExecutor)
throws Exception {
- _logger.info("Preload segments from partition: {} of table: {} for fast
upsert metadata recovery", _partitionId,
- _tableNameWithType);
- String instanceId = getInstanceId(tableDataManager);
- Map<String, Map<String, String>> segmentAssignment =
getSegmentAssignment(helixManager);
- Map<String, SegmentZKMetadata> segmentMetadataMap =
getSegmentsZKMetadata(helixManager);
- List<Future<?>> futures = new ArrayList<>();
- for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
- String segmentName = entry.getKey();
- Map<String, String> instanceStateMap = entry.getValue();
- String state = instanceStateMap.get(instanceId);
- if
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
- if (state == null) {
- _logger.debug("Skip segment: {} as it's not assigned to instance:
{}", segmentName, instanceId);
- } else {
- _logger.info("Skip segment: {} as its ideal state: {} is not ONLINE
for instance: {}", segmentName, state,
- instanceId);
- }
- continue;
- }
- SegmentZKMetadata segmentZKMetadata =
segmentMetadataMap.get(segmentName);
- Preconditions.checkState(segmentZKMetadata != null, "Failed to find ZK
metadata for segment: %s, table: %s",
- segmentName, _tableNameWithType);
- Integer partitionId =
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata,
null);
- Preconditions.checkNotNull(partitionId,
- String.format("Failed to get partition id for segment: %s
(upsert-enabled table: %s)", segmentName,
- _tableNameWithType));
- if (partitionId != _partitionId) {
- _logger.debug("Skip segment: {} as its partition: {} is different from
the requested partition: {}",
- segmentName, partitionId, _partitionId);
- continue;
- }
- if (!hasValidDocIdsSnapshot(tableDataManager,
indexLoadingConfig.getTableConfig(), segmentName,
- segmentZKMetadata.getTier())) {
- _logger.info("Skip segment: {} from partition: {} as no validDocIds
snapshot exists", segmentName,
- _partitionId);
- continue;
- }
- futures.add(segmentPreloadExecutor.submit(
- () -> doPreloadSegmentWithSnapshot(tableDataManager, segmentName,
indexLoadingConfig, segmentZKMetadata)));
- }
- try {
- for (Future<?> f : futures) {
- f.get();
- }
- } finally {
- for (Future<?> f : futures) {
- if (!f.isDone()) {
- f.cancel(true);
- }
- }
- }
- _logger.info("Preloaded {} segments from partition: {} of table: {} for
fast upsert metadata recovery",
- futures.size(), _partitionId, _tableNameWithType);
- }
-
- private String getInstanceId(TableDataManager tableDataManager) {
- return tableDataManager.getInstanceDataManagerConfig().getInstanceId();
- }
-
- private static boolean hasValidDocIdsSnapshot(TableDataManager
tableDataManager, TableConfig tableConfig,
- String segmentName, String segmentTier) {
- try {
- File indexDir = tableDataManager.getSegmentDataDir(segmentName,
segmentTier, tableConfig);
- File snapshotFile =
- new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir),
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
- return snapshotFile.exists();
- } catch (Exception e) {
- return false;
- }
- }
-
- @VisibleForTesting
- Map<String, Map<String, String>> getSegmentAssignment(HelixManager
helixManager) {
- IdealState idealState = HelixHelper.getTableIdealState(helixManager,
_tableNameWithType);
- Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", _tableNameWithType);
- return idealState.getRecord().getMapFields();
- }
-
- @VisibleForTesting
- Map<String, SegmentZKMetadata> getSegmentsZKMetadata(HelixManager
helixManager) {
- Map<String, SegmentZKMetadata> segmentMetadataMap = new HashMap<>();
-
ZKMetadataProvider.getSegmentsZKMetadata(helixManager.getHelixPropertyStore(),
_tableNameWithType)
- .forEach(m -> segmentMetadataMap.put(m.getSegmentName(), m));
- return segmentMetadataMap;
- }
-
- @VisibleForTesting
- void doPreloadSegmentWithSnapshot(TableDataManager tableDataManager, String
segmentName,
- IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata
segmentZKMetadata) {
- try {
- _logger.info("Preload segment: {} from partition: {} of table: {}",
segmentName, _partitionId,
- _tableNameWithType);
- // This method checks segment crc and if it has changed, the segment is
not loaded. It might modify the
- // file on disk, but we don't need to take the segmentLock, because
every segment from the current table is
- // processed by at most one thread from the preloading thread pool.
HelixTaskExecutor task threads about to
- // process segments from the same table are blocked on _preloadLock.
- // In fact, taking segmentLock during segment preloading phase could
cause deadlock when HelixTaskExecutor
- // threads processing other tables have taken the same segmentLock as
decided by the hash of table name and
- // segment name, i.e. due to hash collision.
- tableDataManager.tryLoadExistingSegment(segmentZKMetadata,
indexLoadingConfig);
- _logger.info("Preloaded segment: {} from partition: {} of table: {}",
segmentName, _partitionId,
- _tableNameWithType);
- } catch (Exception e) {
- _logger.warn("Failed to preload segment: {} from partition: {} of table:
{}, skipping", segmentName, _partitionId,
- _tableNameWithType, e);
- }
+ TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+ SegmentPreloadUtils.preloadSegments(tableDataManager, _partitionId,
indexLoadingConfig, helixManager,
+ segmentPreloadExecutor, (segmentName, segmentZKMetadata) -> {
+ String tier = segmentZKMetadata.getTier();
+ if (SegmentPreloadUtils.hasValidDocIdsSnapshot(tableDataManager,
tableConfig, segmentName, tier)) {
+ return true;
+ }
+ _logger.info("Skip segment: {} on tier: {} as it has no validDocIds
snapshot", segmentName, tier);
+ return false;
+ });
}
@Override
@@ -821,19 +717,15 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
_logger.info("Skip removing segment: {} because metadata manager is
already stopped", segmentName);
return;
}
- // Skip removing the upsert metadata of segment that has max comparison
value smaller than
- // (largestSeenComparisonValue - TTL), i.e. out of metadata TTL. The
expired metadata is removed while creating
- // new consuming segment in batches.
- boolean skipRemoveMetadata = false;
- if (isOutOfMetadataTTL(segment)) {
- _logger.info("Skip removing segment: {} because it's out of TTL",
segmentName);
- skipRemoveMetadata = true;
- }
if (_enableSnapshot) {
_snapshotLock.readLock().lock();
}
try {
- if (!skipRemoveMetadata) {
+ // Skip removing the upsert metadata of segment that is out of metadata
TTL. The expired metadata is removed
+ // while creating new consuming segment in batches.
+ if (isOutOfMetadataTTL(segment)) {
+ _logger.info("Skip removing segment: {} because it's out of TTL",
segmentName);
+ } else {
doRemoveSegment(segment);
}
_trackedSegments.remove(segment);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtils.java
new file mode 100644
index 0000000000..41aa2eb339
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtils.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.function.BiPredicate;
+import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentPreloadUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentPreloadUtils.class);
+
+ private SegmentPreloadUtils() {
+ }
+
+ public static void preloadSegments(TableDataManager tableDataManager, int
partitionId,
+ IndexLoadingConfig indexLoadingConfig, HelixManager helixManager,
ExecutorService segmentPreloadExecutor,
+ @Nullable BiPredicate<String, SegmentZKMetadata> segmentSelector)
+ throws Exception {
+ String tableNameWithType = tableDataManager.getTableName();
+ LOGGER.info("Preload segments from partition: {} of table: {} for fast
metadata recovery", partitionId,
+ tableNameWithType);
+ Map<String, Map<String, String>> segmentAssignment =
getSegmentAssignment(tableNameWithType, helixManager);
+ Map<String, SegmentZKMetadata> segmentMetadataMap =
getSegmentsZKMetadata(tableNameWithType, helixManager);
+ List<String> preloadedSegments =
+ doPreloadSegments(tableDataManager, partitionId, indexLoadingConfig,
segmentAssignment, segmentMetadataMap,
+ segmentPreloadExecutor, segmentSelector);
+ LOGGER.info("Preloaded {} segments from partition: {} of table: {} for
fast metadata recovery",
+ preloadedSegments.size(), partitionId, tableNameWithType);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Preloaded segments: {}", preloadedSegments);
+ }
+ }
+
+ @VisibleForTesting
+ static List<String> doPreloadSegments(TableDataManager tableDataManager, int
partitionId,
+ IndexLoadingConfig indexLoadingConfig, Map<String, Map<String, String>>
segmentAssignment,
+ Map<String, SegmentZKMetadata> segmentMetadataMap, ExecutorService
segmentPreloadExecutor,
+ @Nullable BiPredicate<String, SegmentZKMetadata> segmentSelector)
+ throws ExecutionException, InterruptedException {
+ String tableNameWithType = tableDataManager.getTableName();
+ String instanceId = getInstanceId(tableDataManager);
+ List<String> preloadedSegments = new ArrayList<>();
+ List<Future<?>> futures = new ArrayList<>();
+ for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+ if (!isSegmentOnlineOnRequestedInstance(segmentName, instanceId,
instanceStateMap)) {
+ continue;
+ }
+ SegmentZKMetadata segmentZKMetadata =
segmentMetadataMap.get(segmentName);
+ if (!isSegmentFromRequestedPartition(segmentName, tableNameWithType,
partitionId, segmentZKMetadata)) {
+ continue;
+ }
+ if (segmentSelector != null && !segmentSelector.test(segmentName,
segmentZKMetadata)) {
+ continue;
+ }
+ futures.add(segmentPreloadExecutor.submit(
+ () -> preloadSegment(segmentName, tableDataManager, partitionId,
indexLoadingConfig, segmentZKMetadata)));
+ preloadedSegments.add(segmentName);
+ }
+ waitForSegmentsPreloaded(futures);
+ return preloadedSegments;
+ }
+
+ private static void preloadSegment(String segmentName, TableDataManager
tableDataManager, int partitionId,
+ IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata
segmentZKMetadata) {
+ String tableNameWithType = tableDataManager.getTableName();
+ try {
+ LOGGER.info("Preload segment: {} from partition: {} of table: {}",
segmentName, partitionId, tableNameWithType);
+ // This method checks segment crc, and the segment is not loaded if the
crc has changed.
+ tableDataManager.tryLoadExistingSegment(segmentZKMetadata,
indexLoadingConfig);
+ LOGGER.info("Preloaded segment: {} from partition: {} of table: {}",
segmentName, partitionId, tableNameWithType);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to preload segment: {} from partition: {} of table:
{}, skipping", segmentName, partitionId,
+ tableNameWithType, e);
+ }
+ }
+
+ private static void waitForSegmentsPreloaded(List<Future<?>> futures)
+ throws ExecutionException, InterruptedException {
+ try {
+ for (Future<?> f : futures) {
+ f.get();
+ }
+ } finally {
+ for (Future<?> f : futures) {
+ if (!f.isDone()) {
+ f.cancel(true);
+ }
+ }
+ }
+ }
+
+ public static boolean hasValidDocIdsSnapshot(TableDataManager
tableDataManager, TableConfig tableConfig,
+ String segmentName, String segmentTier) {
+ try {
+ File indexDir = tableDataManager.getSegmentDataDir(segmentName,
segmentTier, tableConfig);
+ File snapshotFile =
+ new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir),
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
+ return snapshotFile.exists();
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private static boolean isSegmentOnlineOnRequestedInstance(String
segmentName, String instanceId,
+ Map<String, String> instanceStateMap) {
+ String state = instanceStateMap.get(instanceId);
+ if
(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+ return true;
+ }
+ if (state == null) {
+ LOGGER.debug("Skip segment: {} as it's not assigned to instance: {}",
segmentName, instanceId);
+ } else {
+ LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE for
instance: {}", segmentName, state,
+ instanceId);
+ }
+ return false;
+ }
+
+ private static boolean isSegmentFromRequestedPartition(String segmentName,
String tableNameWithType,
+ int requestedPartitionId, SegmentZKMetadata segmentZKMetadata) {
+ Preconditions.checkState(segmentZKMetadata != null, "Failed to find ZK
metadata for segment: %s, table: %s",
+ segmentName, tableNameWithType);
+ Integer partitionId =
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata,
null);
+ Preconditions.checkNotNull(partitionId,
+ String.format("Failed to get partition id for segment: %s from table:
%s", segmentName, tableNameWithType));
+ if (partitionId == requestedPartitionId) {
+ return true;
+ }
+ LOGGER.debug("Skip segment: {} as its partition: {} is different from the
requested partition: {}", segmentName,
+ partitionId, requestedPartitionId);
+ return false;
+ }
+
+ private static String getInstanceId(TableDataManager tableDataManager) {
+ return tableDataManager.getInstanceDataManagerConfig().getInstanceId();
+ }
+
+ private static Map<String, Map<String, String>> getSegmentAssignment(String
tableNameWithType,
+ HelixManager helixManager) {
+ IdealState idealState = HelixHelper.getTableIdealState(helixManager,
tableNameWithType);
+ Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", tableNameWithType);
+ return idealState.getRecord().getMapFields();
+ }
+
+ private static Map<String, SegmentZKMetadata> getSegmentsZKMetadata(String
tableNameWithType,
+ HelixManager helixManager) {
+ Map<String, SegmentZKMetadata> segmentMetadataMap = new HashMap<>();
+
ZKMetadataProvider.getSegmentsZKMetadata(helixManager.getHelixPropertyStore(),
tableNameWithType)
+ .forEach(m -> segmentMetadataMap.put(m.getSegmentName(), m));
+ return segmentMetadataMap;
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index acb1eebe6d..8881753f42 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -18,18 +18,14 @@
*/
package org.apache.pinot.segment.local.upsert;
-import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -38,10 +34,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
-import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
@@ -54,14 +46,11 @@ import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.testng.annotations.AfterMethod;
@@ -91,110 +80,22 @@ public class BasePartitionUpsertMetadataManagerTest {
@Test
public void testPreloadSegments()
- throws Exception {
+ throws IOException {
String realtimeTableName = "testTable_REALTIME";
- String instanceId = "server01";
- Map<String, Map<String, String>> segmentAssignment = new HashMap<>();
- Map<String, SegmentZKMetadata> segmentMetadataMap = new HashMap<>();
- Set<String> preloadedSegments = new HashSet<>();
- AtomicBoolean wasPreloading = new AtomicBoolean(false);
- TableDataManager tableDataManager = mock(TableDataManager.class);
UpsertContext upsertContext = mock(UpsertContext.class);
when(upsertContext.isSnapshotEnabled()).thenReturn(true);
when(upsertContext.isPreloadEnabled()).thenReturn(true);
+ TableDataManager tableDataManager = mock(TableDataManager.class);
when(upsertContext.getTableDataManager()).thenReturn(tableDataManager);
- DummyPartitionUpsertMetadataManager upsertMetadataManager =
- new DummyPartitionUpsertMetadataManager(realtimeTableName, 0,
upsertContext) {
-
- @Override
- Map<String, Map<String, String>> getSegmentAssignment(HelixManager
helixManager) {
- return segmentAssignment;
- }
-
- @Override
- Map<String, SegmentZKMetadata> getSegmentsZKMetadata(HelixManager
helixManager) {
- return segmentMetadataMap;
- }
-
- @Override
- void doPreloadSegmentWithSnapshot(TableDataManager tableDataManager,
String segmentName,
- IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata
segmentZKMetadata) {
- wasPreloading.set(isPreloading());
- preloadedSegments.add(segmentName);
- }
- };
-
- // Setup mocks for TableConfig and Schema.
- TableConfig tableConfig = mock(TableConfig.class);
- UpsertConfig upsertConfig = new UpsertConfig();
- upsertConfig.setComparisonColumn("ts");
- upsertConfig.setEnablePreload(true);
- upsertConfig.setEnableSnapshot(true);
- when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
- when(tableConfig.getTableName()).thenReturn(realtimeTableName);
- Schema schema = mock(Schema.class);
-
when(schema.getPrimaryKeyColumns()).thenReturn(Collections.singletonList("pk"));
IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class);
- when(indexLoadingConfig.getTableConfig()).thenReturn(tableConfig);
-
- // Setup mocks for HelixManager.
- HelixManager helixManager = mock(HelixManager.class);
- ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
- when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-
- // Setup segment assignment. Only ONLINE segments are preloaded.
- segmentAssignment.put("consuming_seg01", ImmutableMap.of(instanceId,
"CONSUMING"));
- segmentAssignment.put("consuming_seg02", ImmutableMap.of(instanceId,
"CONSUMING"));
- segmentAssignment.put("offline_seg01", ImmutableMap.of(instanceId,
"OFFLINE"));
- segmentAssignment.put("offline_seg02", ImmutableMap.of(instanceId,
"OFFLINE"));
- String seg01Name = "testTable__0__1__" + System.currentTimeMillis();
- segmentAssignment.put(seg01Name, ImmutableMap.of(instanceId, "ONLINE"));
- String seg02Name = "testTable__0__2__" + System.currentTimeMillis();
- segmentAssignment.put(seg02Name, ImmutableMap.of(instanceId, "ONLINE"));
- // This segment is skipped as it's not from partition 0.
- String seg03Name = "testTable__1__3__" + System.currentTimeMillis();
- segmentAssignment.put(seg03Name, ImmutableMap.of(instanceId, "ONLINE"));
-
- SegmentZKMetadata zkMetadata = new SegmentZKMetadata(seg01Name);
- zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
- segmentMetadataMap.put(seg01Name, zkMetadata);
- zkMetadata = new SegmentZKMetadata(seg02Name);
- zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
- segmentMetadataMap.put(seg02Name, zkMetadata);
- zkMetadata = new SegmentZKMetadata(seg03Name);
- zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
- segmentMetadataMap.put(seg03Name, zkMetadata);
-
- // Setup mocks to get file path to validDocIds snapshot.
- ExecutorService segmentPreloadExecutor = Executors.newFixedThreadPool(1);
- File tableDataDir = new File(TEMP_DIR, realtimeTableName);
- when(tableDataManager.getHelixManager()).thenReturn(helixManager);
-
when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(segmentPreloadExecutor);
- when(tableDataManager.getTableDataDir()).thenReturn(tableDataDir);
- InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
- when(instanceDataManagerConfig.getInstanceId()).thenReturn(instanceId);
-
when(tableDataManager.getInstanceDataManagerConfig()).thenReturn(instanceDataManagerConfig);
-
- // No snapshot file for seg01, so it's skipped.
- File seg01IdxDir = new File(tableDataDir, seg01Name);
- FileUtils.forceMkdir(seg01IdxDir);
- when(tableDataManager.getSegmentDataDir(seg01Name, null,
tableConfig)).thenReturn(seg01IdxDir);
-
- File seg02IdxDir = new File(tableDataDir, seg02Name);
- FileUtils.forceMkdir(seg02IdxDir);
- FileUtils.touch(new File(new File(seg02IdxDir, "v3"),
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
- when(tableDataManager.getSegmentDataDir(seg02Name, null,
tableConfig)).thenReturn(seg02IdxDir);
+
when(indexLoadingConfig.getTableConfig()).thenReturn(mock(TableConfig.class));
- try {
- // If preloading is enabled, the _isPreloading flag is true initially,
until preloading is done.
+ try (DummyPartitionUpsertMetadataManager upsertMetadataManager = new
DummyPartitionUpsertMetadataManager(
+ realtimeTableName, 0, upsertContext)) {
assertTrue(upsertMetadataManager.isPreloading());
upsertMetadataManager.preloadSegments(indexLoadingConfig);
- assertEquals(preloadedSegments.size(), 1);
- assertTrue(preloadedSegments.contains(seg02Name));
- assertTrue(wasPreloading.get());
assertFalse(upsertMetadataManager.isPreloading());
- } finally {
- segmentPreloadExecutor.shutdownNow();
+ upsertMetadataManager.stop();
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtilsTest.java
new file mode 100644
index 0000000000..205440f20c
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtilsTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class SegmentPreloadUtilsTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"SegmentPreloadUtilsTest");
+
+ @BeforeMethod
+ public void setUp()
+ throws IOException {
+ FileUtils.forceMkdir(TEMP_DIR);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws IOException {
+ FileUtils.forceDelete(TEMP_DIR);
+ }
+
+ @Test
+ public void testPreloadSegments()
+ throws Exception {
+ String realtimeTableName = "testTable_REALTIME";
+ String instanceId = "server01";
+ Map<String, Map<String, String>> segmentAssignment = new HashMap<>();
+ Map<String, SegmentZKMetadata> segmentMetadataMap = new HashMap<>();
+ TableDataManager tableDataManager = mock(TableDataManager.class);
+
+ // Setup mocks for TableConfig and Schema.
+ TableConfig tableConfig = mock(TableConfig.class);
+ UpsertConfig upsertConfig = new UpsertConfig();
+ upsertConfig.setComparisonColumn("ts");
+ upsertConfig.setEnablePreload(true);
+ upsertConfig.setEnableSnapshot(true);
+ when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
+ when(tableConfig.getTableName()).thenReturn(realtimeTableName);
+ Schema schema = mock(Schema.class);
+
when(schema.getPrimaryKeyColumns()).thenReturn(Collections.singletonList("pk"));
+ IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class);
+ when(indexLoadingConfig.getTableConfig()).thenReturn(tableConfig);
+
+ // Setup mocks for HelixManager.
+ HelixManager helixManager = mock(HelixManager.class);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+
+ // Setup segment assignment. Only ONLINE segments are preloaded.
+ segmentAssignment.put("consuming_seg01", ImmutableMap.of(instanceId,
"CONSUMING"));
+ segmentAssignment.put("consuming_seg02", ImmutableMap.of(instanceId,
"CONSUMING"));
+ segmentAssignment.put("offline_seg01", ImmutableMap.of(instanceId,
"OFFLINE"));
+ segmentAssignment.put("offline_seg02", ImmutableMap.of(instanceId,
"OFFLINE"));
+ String seg01Name = "testTable__0__1__" + System.currentTimeMillis();
+ segmentAssignment.put(seg01Name, ImmutableMap.of(instanceId, "ONLINE"));
+ String seg02Name = "testTable__0__2__" + System.currentTimeMillis();
+ segmentAssignment.put(seg02Name, ImmutableMap.of(instanceId, "ONLINE"));
+ // This segment is skipped as it's not from partition 0.
+ String seg03Name = "testTable__1__3__" + System.currentTimeMillis();
+ segmentAssignment.put(seg03Name, ImmutableMap.of(instanceId, "ONLINE"));
+
+ SegmentZKMetadata zkMetadata = new SegmentZKMetadata(seg01Name);
+ zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ segmentMetadataMap.put(seg01Name, zkMetadata);
+ zkMetadata = new SegmentZKMetadata(seg02Name);
+ zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ segmentMetadataMap.put(seg02Name, zkMetadata);
+ zkMetadata = new SegmentZKMetadata(seg03Name);
+ zkMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ segmentMetadataMap.put(seg03Name, zkMetadata);
+
+ // Setup mocks to get file path to validDocIds snapshot.
+ ExecutorService segmentPreloadExecutor = Executors.newFixedThreadPool(1);
+ File tableDataDir = new File(TEMP_DIR, realtimeTableName);
+ when(tableDataManager.getHelixManager()).thenReturn(helixManager);
+
when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(segmentPreloadExecutor);
+ when(tableDataManager.getTableDataDir()).thenReturn(tableDataDir);
+ InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
+ when(instanceDataManagerConfig.getInstanceId()).thenReturn(instanceId);
+
when(tableDataManager.getInstanceDataManagerConfig()).thenReturn(instanceDataManagerConfig);
+
+ // No snapshot file for seg01, so it's skipped.
+ File seg01IdxDir = new File(tableDataDir, seg01Name);
+ FileUtils.forceMkdir(seg01IdxDir);
+ when(tableDataManager.getSegmentDataDir(seg01Name, null,
tableConfig)).thenReturn(seg01IdxDir);
+
+ File seg02IdxDir = new File(tableDataDir, seg02Name);
+ FileUtils.forceMkdir(seg02IdxDir);
+ FileUtils.touch(new File(new File(seg02IdxDir, "v3"),
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
+ when(tableDataManager.getSegmentDataDir(seg02Name, null,
tableConfig)).thenReturn(seg02IdxDir);
+
+ try {
+ List<String> preloadedSegments =
+ SegmentPreloadUtils.doPreloadSegments(tableDataManager, 0,
indexLoadingConfig, segmentAssignment,
+ segmentMetadataMap, segmentPreloadExecutor,
+ (segmentName, segmentZKMetadata) ->
SegmentPreloadUtils.hasValidDocIdsSnapshot(tableDataManager,
+ tableConfig, segmentName, segmentZKMetadata.getTier()));
+ assertEquals(preloadedSegments.size(), 1);
+ assertTrue(preloadedSegments.contains(seg02Name));
+ } finally {
+ segmentPreloadExecutor.shutdownNow();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]