This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ce017c00d4 Minor improvement to upsert preload (#11694)
ce017c00d4 is described below
commit ce017c00d406cdb56c712d85e4010babd90772a0
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Sep 27 12:11:53 2023 -0700
Minor improvement to upsert preload (#11694)
---
.../upsert/BaseTableUpsertMetadataManager.java | 21 +++++---
...oncurrentMapTableUpsertMetadataManagerTest.java | 61 +++++++++-------------
2 files changed, 39 insertions(+), 43 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 2fecc4fbe5..9de2349007 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -161,13 +161,14 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
throws Exception {
LOGGER.info("Preload segments from table: {} for fast upsert metadata
recovery", _tableNameWithType);
onPreloadStart();
- IdealState idealState = HelixHelper.getTableIdealState(_helixManager,
_tableNameWithType);
ZkHelixPropertyStore<ZNRecord> propertyStore =
_helixManager.getHelixPropertyStore();
String instanceId = getInstanceId();
IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
+ Map<String, Map<String, String>> segmentAssignment =
getSegmentAssignment();
List<Future<?>> futures = new ArrayList<>();
- for (String segmentName : idealState.getPartitionSet()) {
- Map<String, String> instanceStateMap =
idealState.getInstanceStateMap(segmentName);
+ for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
String state = instanceStateMap.get(instanceId);
if
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE",
segmentName, state);
@@ -208,18 +209,26 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
protected void onPreloadFinish() {
}
- private String getInstanceId() {
+ @VisibleForTesting
+ String getInstanceId() {
InstanceDataManagerConfig instanceDataManagerConfig =
_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig();
return instanceDataManagerConfig.getInstanceId();
}
@VisibleForTesting
- protected IndexLoadingConfig createIndexLoadingConfig() {
+ IndexLoadingConfig createIndexLoadingConfig() {
return new
IndexLoadingConfig(_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig(),
_tableConfig, _schema);
}
+ @VisibleForTesting
+ Map<String, Map<String, String>> getSegmentAssignment() {
+ IdealState idealState = HelixHelper.getTableIdealState(_helixManager,
_tableNameWithType);
+ Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", _tableNameWithType);
+ return idealState.getRecord().getMapFields();
+ }
+
private void preloadSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
LOGGER.info("Preload segment: {} from table: {}", segmentName,
_tableNameWithType);
@@ -237,7 +246,7 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
}
@VisibleForTesting
- protected void preloadSegmentWithSnapshot(String segmentName,
IndexLoadingConfig indexLoadingConfig,
+ void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig
indexLoadingConfig,
SegmentZKMetadata zkMetadata) {
// This method might modify the file on disk. Use segment lock to prevent
race condition
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType,
segmentName);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
index ee1d6ffbd2..526e0920c3 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
@@ -29,20 +29,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
-import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
@@ -53,7 +48,6 @@ import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -117,21 +111,35 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
@Test
public void testPreloadOnlineSegments()
throws Exception {
+ String instanceId = "server01";
+ Map<String, Map<String, String>> segmentAssignment = new HashMap<>();
Set<String> preloadedSegments = new HashSet<>();
AtomicBoolean wasPreloading = new AtomicBoolean(false);
ConcurrentMapTableUpsertMetadataManager mgr = new
ConcurrentMapTableUpsertMetadataManager() {
+
+ @Override
+ String getInstanceId() {
+ return instanceId;
+ }
+
@Override
- protected IndexLoadingConfig createIndexLoadingConfig() {
+ IndexLoadingConfig createIndexLoadingConfig() {
return mock(IndexLoadingConfig.class);
}
@Override
- protected void preloadSegmentWithSnapshot(String segmentName,
IndexLoadingConfig indexLoadingConfig,
+ Map<String, Map<String, String>> getSegmentAssignment() {
+ return segmentAssignment;
+ }
+
+ @Override
+ void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig
indexLoadingConfig,
SegmentZKMetadata zkMetadata) {
wasPreloading.set(isPreloading());
preloadedSegments.add(segmentName);
}
};
+
// Setup mocks for TableConfig and Schema.
String tableNameWithType = "myTable_REALTIME";
TableConfig tableConfig = mock(TableConfig.class);
@@ -146,38 +154,16 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
// Setup mocks for HelixManager.
HelixManager helixManager = mock(HelixManager.class);
- IdealState idealState = mock(IdealState.class);
- HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
- PropertyKey.Builder keyBuilder = mock(PropertyKey.Builder.class);
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
- PropertyKey propKey = mock(PropertyKey.class);
- when(helixManager.getHelixDataAccessor()).thenReturn(dataAccessor);
when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
- when(dataAccessor.keyBuilder()).thenReturn(keyBuilder);
- when(keyBuilder.idealStates(anyString())).thenReturn(propKey);
- when(dataAccessor.getProperty(propKey)).thenReturn(idealState);
- // Setup mocks to return the instanceId.
- String instanceId = "server01";
- TableDataManager tableDataManager = mock(TableDataManager.class);
- TableDataManagerConfig tdmc = mock(TableDataManagerConfig.class);
- InstanceDataManagerConfig idmc = mock(InstanceDataManagerConfig.class);
- when(tableDataManager.getTableDataManagerConfig()).thenReturn(tdmc);
- when(tdmc.getInstanceDataManagerConfig()).thenReturn(idmc);
- when(idmc.getInstanceId()).thenReturn(instanceId);
-
- // Only ONLINE segments are preloaded.
- Map<String, Map<String, String>> segStates = new HashMap<>();
- segStates.put("consuming_seg01", ImmutableMap.of(instanceId, "CONSUMING"));
- segStates.put("consuming_seg02", ImmutableMap.of(instanceId, "CONSUMING"));
- segStates.put("online_seg01", ImmutableMap.of(instanceId, "ONLINE"));
- segStates.put("online_seg02", ImmutableMap.of(instanceId, "ONLINE"));
- segStates.put("offline_seg01", ImmutableMap.of(instanceId, "OFFLINE"));
- segStates.put("offline_seg02", ImmutableMap.of(instanceId, "OFFLINE"));
- when(idealState.getPartitionSet()).thenReturn(segStates.keySet());
- for (String segName : segStates.keySet()) {
-
when(idealState.getInstanceStateMap(segName)).thenReturn(segStates.get(segName));
- }
+ // Setup segment assignment. Only ONLINE segments are preloaded.
+ segmentAssignment.put("consuming_seg01", ImmutableMap.of(instanceId,
"CONSUMING"));
+ segmentAssignment.put("consuming_seg02", ImmutableMap.of(instanceId,
"CONSUMING"));
+ segmentAssignment.put("online_seg01", ImmutableMap.of(instanceId,
"ONLINE"));
+ segmentAssignment.put("online_seg02", ImmutableMap.of(instanceId,
"ONLINE"));
+ segmentAssignment.put("offline_seg01", ImmutableMap.of(instanceId,
"OFFLINE"));
+ segmentAssignment.put("offline_seg02", ImmutableMap.of(instanceId,
"OFFLINE"));
// Setup mocks to get file path to validDocIds snapshot.
SegmentZKMetadata realtimeSegmentZKMetadata = new
SegmentZKMetadata("online_seg01");
@@ -192,6 +178,7 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord());
// No snapshot file for online_seg01, so it's skipped.
+ TableDataManager tableDataManager = mock(TableDataManager.class);
File seg01IdxDir = new File(TEMP_DIR, "online_seg01");
FileUtils.forceMkdir(seg01IdxDir);
when(tableDataManager.getSegmentDataDir("online_seg01", null,
tableConfig)).thenReturn(seg01IdxDir);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]