This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch fix-npe-for-minimizeDataMovement in repository https://gitbox.apache.org/repos/asf/pinot.git
commit d9832a4ac443f7cd4fed1f3fa4db55f49507a86a Author: jlli_LinkedIn <[email protected]> AuthorDate: Fri Nov 3 15:48:31 2023 -0700 Fix the NPE in minimizeDataMovement instance assignment strategy --- .../InstanceReplicaGroupPartitionSelector.java | 11 ++- .../InstanceReplicaGroupPartitionSelectorTest.java | 105 +++++++++++++++++++++ 2 files changed, 111 insertions(+), 5 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java index b6c62ac12e..de1e681d17 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java @@ -72,11 +72,17 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive"); Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>(); Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>(); + Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { // Pick one pool for each replica-group based on the table name hash int pool = pools.get((tableNameHash + replicaId) % numPools); poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); replicaGroupIdToPoolMap.put(replicaId, pool); + + Set<String> candidateInstances = + poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); + List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); + instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); } LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap, _tableNameWithType); @@ -132,7 +138,6 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups); - Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>(); Map<Integer, Set<String>> replicaGroupIdToExistingInstancesMap = new TreeMap<>(); // Step 1: find out the replica groups and their existing instances, // so that these instances can be filtered out and won't be chosen for the other replica group. @@ -142,10 +147,6 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele // Skip the replica group if it's no longer needed. continue; } - Set<String> candidateInstances = - poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>()); - List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool); - instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName())); for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) { List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java new file mode 100644 index 0000000000..4f982c45e6 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.instance; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.text.StringSubstitutor; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class InstanceReplicaGroupPartitionSelectorTest { + + private static final String instanceConfigTemplate = + "{\n" + " \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"simpleFields\": {\n" + " \"HELIX_ENABLED\": \"true\",\n" + + " \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n" + + " \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n" + + " \"HELIX_PORT\": \"8098\",\n" + " \"adminPort\": \"8097\",\n" + " \"grpcPort\": \"8090\",\n" + + " \"queryMailboxPort\": \"46347\",\n" + " \"queryServerPort\": \"45031\",\n" + + " \"shutdownInProgress\": \"false\"\n" + " },\n" + " \"mapFields\": {\n" + + " \"SYSTEM_RESOURCE_INFO\": {\n" + " \"numCores\": \"16\",\n" + + " \"totalMemoryMB\": \"126976\",\n" + " \"maxHeapSizeMB\": \"65536\"\n" + " },\n" + + " \"pool\": {\n" + " \"DefaultTenant_OFFLINE\": \"${pool}\",\n" + + " \"${poolName}\": \"${pool}\",\n" + " \"AllReplicationGroups\": \"1\"\n" + " }\n" + " },\n" + + " \"listFields\": {\n" + " \"TAG_LIST\": [\n" + " \"DefaultTenant_OFFLINE\",\n" + + " \"DefaultTenant_REALTIME\",\n" + " \"${poolName}\",\n" + " \"AllReplicationGroups\"\n" + + " ]\n" + " }\n" + "}"; + + @Test + public void testSelectInstances() throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + String existingPartitionsJson = + " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + " }\n" + " }\n"; + InstancePartitions existing = objectMapper.readValue(existingPartitionsJson, InstancePartitions.class); + InstanceReplicaGroupPartitionConfig config = + new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null); + + InstanceReplicaGroupPartitionSelector selector = + new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing); + + String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"}; + String[] poolNumbers = {"0", "0", "1", "1"}; + String[] poolNames = {"FirstHalfReplicationGroups", "FirstHalfReplicationGroups", "SecondHalfReplicationGroups", + "SecondHalfReplicationGroups"}; + Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>(); + + for (int i = 0; i < serverNames.length; i++) { + Map<String, String> valuesMap = new HashMap<>(); + valuesMap.put("serverName", serverNames[i]); + valuesMap.put("pool", poolNumbers[i]); + valuesMap.put("poolName", poolNames[i]); + + StringSubstitutor substitutor = new StringSubstitutor(valuesMap); + String resolvedString = substitutor.replace(instanceConfigTemplate); + + ZNRecord znRecord = objectMapper.readValue(resolvedString, ZNRecord.class); + int poolNumber = Integer.parseInt(poolNumbers[i]); + poolToInstanceConfigsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(new InstanceConfig(znRecord)); + } + InstancePartitions assignedPartitions = new InstancePartitions("0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE"); + selector.selectInstances(poolToInstanceConfigsMap, assignedPartitions); + + String expectedInstancePartitions = + " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n" + + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n" + + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ],\n" + " \"0_1\": [\n" + + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n" + + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n" + + " ]\n" + " }\n" + " }\n"; + InstancePartitions expectedPartitions = + objectMapper.readValue(expectedInstancePartitions, InstancePartitions.class); + assert assignedPartitions.equals(expectedPartitions); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
