This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch winedepot-0.10 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 5fcb4124379d4cfe7f3dc817270abe0abff0bbbb Author: Xiaotian (Jackie) Jiang <[email protected]> AuthorDate: Thu Aug 15 17:29:41 2019 -0700 [Instance Assignment] Plug in instance assignment (#4516) - Add instance assignment APIs including: - GET/PUT/DELETE instance partitions - Assign instances for a table - Replace instance in instance partitions - Assign instances if necessary while creating/updating table config - Backward-compatible, no change to the cluster if instance assignment is not configured --- .../instance/InstanceAssignmentConfigUtils.java | 84 ++--- .../pinot/common/metadata/ZKMetadataProvider.java | 8 - .../PinotInstanceAssignmentRestletResource.java | 301 ++++++++++++++++++ .../helix/ControllerRequestURLBuilder.java | 38 ++- .../helix/core/PinotHelixResourceManager.java | 183 +++++------ .../helix/core/assignment/InstancePartitions.java | 8 +- .../core/assignment/InstancePartitionsUtils.java | 56 +++- .../instance/InstanceAssignmentDriver.java | 6 +- .../instance/InstanceReplicaPartitionSelector.java | 7 +- ...PinotInstanceAssignmentRestletResourceTest.java | 339 +++++++++++++++++++++ .../instance/InstanceAssignmentTest.java | 93 +----- .../ReplicaGroupRebalanceStrategyTest.java | 4 +- .../sharding/SegmentAssignmentStrategyTest.java | 72 ----- .../apache/pinot/core/util/ReplicationUtils.java | 35 --- 14 files changed, 902 insertions(+), 332 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java index f094527..446f107 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java @@ -33,8 +33,40 @@ public class InstanceAssignmentConfigUtils { private InstanceAssignmentConfigUtils() { } + /** + * Returns whether the instance assignment is allowed for the given table config. + */ + public static boolean allowInstanceAssignment(TableConfig tableConfig, + InstancePartitionsType instancePartitionsType) { + TableType tableType = tableConfig.getTableType(); + Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap = + tableConfig.getInstanceAssignmentConfigMap(); + switch (instancePartitionsType) { + // Allow OFFLINE instance assignment if the OFFLINE table has it configured or (for backward-compatibility) is + // using replica-group segment assignment + case OFFLINE: + return tableType == TableType.OFFLINE && ((instanceAssignmentConfigMap != null && instanceAssignmentConfigMap + .containsKey(InstancePartitionsType.OFFLINE)) + || AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY + .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy())); + // Allow CONSUMING/COMPLETED instance assignment if the REALTIME table has it configured + case CONSUMING: + case COMPLETED: + return tableType == TableType.REALTIME && (instanceAssignmentConfigMap != null && instanceAssignmentConfigMap + .containsKey(instancePartitionsType)); + default: + throw new IllegalStateException(); + } + } + + /** + * Extracts or generates default instance assignment config from the given table config. + */ public static InstanceAssignmentConfig getInstanceAssignmentConfig(TableConfig tableConfig, InstancePartitionsType instancePartitionsType) { + Preconditions.checkState(allowInstanceAssignment(tableConfig, instancePartitionsType), + "Instance assignment is not allowed for the given table config"); + // Use the instance assignment config from the table config if it exists Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap(); @@ -42,49 +74,33 @@ public class InstanceAssignmentConfigUtils { if (instanceAssignmentConfigMap.containsKey(instancePartitionsType)) { return instanceAssignmentConfigMap.get(instancePartitionsType); } - // Use the CONSUMING config for COMPLETED segments if COMPLETED config does not exist - if (instancePartitionsType == InstancePartitionsType.COMPLETED && instanceAssignmentConfigMap - .containsKey(InstancePartitionsType.CONSUMING)) { - return instanceAssignmentConfigMap.get(InstancePartitionsType.CONSUMING); - } } // Generate default instance assignment config if it does not exist + // Only allow default config for OFFLINE table with replica-group segment assignment for backward-compatibility InstanceAssignmentConfig instanceAssignmentConfig = new InstanceAssignmentConfig(); InstanceTagPoolConfig tagPoolConfig = new InstanceTagPoolConfig(); - String serverTag = - TagNameUtils.getServerTagFromTableConfigAndInstancePartitionsType(tableConfig, instancePartitionsType); - tagPoolConfig.setTag(serverTag); + tagPoolConfig.setTag(TagNameUtils.getOfflineTagForTenant(tableConfig.getTenantConfig().getServer())); instanceAssignmentConfig.setTagPoolConfig(tagPoolConfig); InstanceReplicaPartitionConfig replicaPartitionConfig = new InstanceReplicaPartitionConfig(); - SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); - if (AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY - .equalsIgnoreCase(validationConfig.getSegmentAssignmentStrategy())) { - replicaPartitionConfig.setReplicaGroupBased(true); - if (tableConfig.getTableType() == TableType.OFFLINE) { - replicaPartitionConfig.setNumReplicas(validationConfig.getReplicationNumber()); - ReplicaGroupStrategyConfig replicaGroupStrategyConfig = - tableConfig.getValidationConfig().getReplicaGroupStrategyConfig(); - Preconditions - .checkState(replicaGroupStrategyConfig != null, "Failed to find the replica-group strategy config"); - String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn(); - if (partitionColumn != null) { - int numPartitions = - tableConfig.getIndexingConfig().getSegmentPartitionConfig().getNumPartitions(partitionColumn); - Preconditions.checkState(numPartitions > 0, "Number of partitions for column: %s is not properly configured", - partitionColumn); - replicaPartitionConfig.setNumPartitions(numPartitions); - replicaPartitionConfig.setNumInstancesPerPartition(replicaGroupStrategyConfig.getNumInstancesPerPartition()); - } else { - // If partition column is not configured, use replicaGroupStrategyConfig.getNumInstancesPerPartition() as - // number of instances per replica for backward-compatibility - replicaPartitionConfig.setNumInstancesPerReplica(replicaGroupStrategyConfig.getNumInstancesPerPartition()); - } - } else { - replicaPartitionConfig.setNumReplicas(validationConfig.getReplicasPerPartitionNumber()); - } + replicaPartitionConfig.setReplicaGroupBased(true); + SegmentsValidationAndRetentionConfig segmentConfig = tableConfig.getValidationConfig(); + replicaPartitionConfig.setNumReplicas(segmentConfig.getReplicationNumber()); + ReplicaGroupStrategyConfig replicaGroupStrategyConfig = segmentConfig.getReplicaGroupStrategyConfig(); + Preconditions.checkState(replicaGroupStrategyConfig != null, "Failed to find the replica-group strategy config"); + String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn(); + if (partitionColumn != null) { + int numPartitions = tableConfig.getIndexingConfig().getSegmentPartitionConfig().getNumPartitions(partitionColumn); + Preconditions.checkState(numPartitions > 0, "Number of partitions for column: %s is not properly configured", + partitionColumn); + replicaPartitionConfig.setNumPartitions(numPartitions); + replicaPartitionConfig.setNumInstancesPerPartition(replicaGroupStrategyConfig.getNumInstancesPerPartition()); + } else { + // If partition column is not configured, use replicaGroupStrategyConfig.getNumInstancesPerPartition() as + // number of instances per replica for backward-compatibility + replicaPartitionConfig.setNumInstancesPerReplica(replicaGroupStrategyConfig.getNumInstancesPerPartition()); } instanceAssignmentConfig.setReplicaPartitionConfig(replicaPartitionConfig); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 2a3ad60..f8baf67 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -136,14 +136,6 @@ public class ZKMetadataProvider { } } - public static void removeInstancePartitionAssignmentFromPropertyStore(ZkHelixPropertyStore<ZNRecord> propertyStore, - String tableNameWithType) { - String propertyStorePath = constructPropertyStorePathForInstancePartitions(tableNameWithType); - if (propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) { - propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT); - } - } - public static boolean setOfflineSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore, String offlineTableName, OfflineSegmentZKMetadata offlineSegmentZKMetadata, int expectedVersion) { // NOTE: Helix will throw ZkBadVersionException if version does not match diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java new file mode 100644 index 0000000..9f8d48f --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.config.TableNameBuilder; +import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils; +import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.JsonUtils; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils; +import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@Api(tags = Constants.TABLE_TAG) +@Path("/") +public class PinotInstanceAssignmentRestletResource { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotInstanceAssignmentRestletResource.class); + + @Inject + PinotHelixResourceManager _resourceManager; + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/tables/{tableName}/instancePartitions") + @ApiOperation(value = "Get the instance partitions") + public Map<InstancePartitionsType, InstancePartitions> getInstancePartitions( + @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName, + @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable InstancePartitionsType instancePartitionsType) { + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>(); + + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + if (tableType != TableType.REALTIME) { + if (instancePartitionsType == InstancePartitionsType.OFFLINE || instancePartitionsType == null) { + InstancePartitions offlineInstancePartitions = InstancePartitionsUtils + .fetchInstancePartitions(_resourceManager.getPropertyStore(), + InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName)); + if (offlineInstancePartitions != null) { + instancePartitionsMap.put(InstancePartitionsType.OFFLINE, offlineInstancePartitions); + } + } + } + if (tableType != TableType.OFFLINE) { + if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) { + InstancePartitions consumingInstancePartitions = InstancePartitionsUtils + .fetchInstancePartitions(_resourceManager.getPropertyStore(), + InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName)); + if (consumingInstancePartitions != null) { + instancePartitionsMap.put(InstancePartitionsType.CONSUMING, consumingInstancePartitions); + } + } + if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) { + InstancePartitions completedInstancePartitions = InstancePartitionsUtils + .fetchInstancePartitions(_resourceManager.getPropertyStore(), + InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName)); + if (completedInstancePartitions != null) { + instancePartitionsMap.put(InstancePartitionsType.COMPLETED, completedInstancePartitions); + } + } + } + + if (instancePartitionsMap.isEmpty()) { + throw new ControllerApplicationException(LOGGER, "Failed to find the instance partitions", + Response.Status.NOT_FOUND); + } else { + return instancePartitionsMap; + } + } + + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tables/{tableName}/assignInstances") + @ApiOperation(value = "Assign server instances to a table") + public Map<InstancePartitionsType, InstancePartitions> assignInstances( + @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName, + @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable InstancePartitionsType instancePartitionsType, + @ApiParam(value = "Whether to do dry-run") @DefaultValue("false") @QueryParam("dryRun") boolean dryRun) { + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>(); + List<InstanceConfig> instanceConfigs = _resourceManager.getAllHelixInstanceConfigs(); + + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + if (tableType != TableType.REALTIME && (instancePartitionsType == InstancePartitionsType.OFFLINE + || instancePartitionsType == null)) { + TableConfig offlineTableConfig = _resourceManager.getOfflineTableConfig(tableName); + if (offlineTableConfig != null) { + try { + if (InstanceAssignmentConfigUtils + .allowInstanceAssignment(offlineTableConfig, InstancePartitionsType.OFFLINE)) { + instancePartitionsMap.put(InstancePartitionsType.OFFLINE, new InstanceAssignmentDriver(offlineTableConfig) + .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs)); + } + } catch (IllegalStateException e) { + throw new ControllerApplicationException(LOGGER, "Caught IllegalStateException", Response.Status.BAD_REQUEST, + e); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Caught exception while calculating the instance partitions", + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + } + if (tableType != TableType.OFFLINE && instancePartitionsType != InstancePartitionsType.OFFLINE) { + TableConfig realtimeTableConfig = _resourceManager.getRealtimeTableConfig(tableName); + if (realtimeTableConfig != null) { + try { + InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(realtimeTableConfig); + if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) { + if (InstanceAssignmentConfigUtils + .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.CONSUMING)) { + instancePartitionsMap.put(InstancePartitionsType.CONSUMING, + instanceAssignmentDriver.assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs)); + } + } + if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) { + if (InstanceAssignmentConfigUtils + .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.COMPLETED)) { + instancePartitionsMap.put(InstancePartitionsType.COMPLETED, + instanceAssignmentDriver.assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs)); + } + } + } catch (IllegalStateException e) { + throw new ControllerApplicationException(LOGGER, "Caught IllegalStateException", Response.Status.BAD_REQUEST, + e); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Caught exception while calculating the instance partitions", + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + } + + if (instancePartitionsMap.isEmpty()) { + throw new ControllerApplicationException(LOGGER, "Failed to find the instance assignment config", + Response.Status.NOT_FOUND); + } + + if (!dryRun) { + for (InstancePartitions instancePartitions : instancePartitionsMap.values()) { + persistInstancePartitionsHelper(instancePartitions); + } + } + + return instancePartitionsMap; + } + + private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) { + try { + LOGGER.info("Persisting instance partitions: {} to the property store", instancePartitions.getName()); + InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(), instancePartitions); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions", + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + @PUT + @Produces(MediaType.APPLICATION_JSON) + @Path("/tables/{tableName}/instancePartitions") + @ApiOperation(value = "Create/update the instance partitions") + public Map<InstancePartitionsType, InstancePartitions> setInstancePartitions( + @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName, String instancePartitionsStr) { + InstancePartitions instancePartitions; + try { + instancePartitions = JsonUtils.stringToObject(instancePartitionsStr, InstancePartitions.class); + } catch (IOException e) { + throw new ControllerApplicationException(LOGGER, "Failed to deserialize the instance partitions", + Response.Status.BAD_REQUEST); + } + + String instancePartitionsName = instancePartitions.getName(); + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + if (tableType != TableType.REALTIME) { + if (InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName).equals(instancePartitionsName)) { + persistInstancePartitionsHelper(instancePartitions); + return Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitions); + } + } + if (tableType != TableType.OFFLINE) { + if (InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName).equals(instancePartitionsName)) { + persistInstancePartitionsHelper(instancePartitions); + return Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); + } + if (InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName).equals(instancePartitionsName)) { + persistInstancePartitionsHelper(instancePartitions); + return Collections.singletonMap(InstancePartitionsType.COMPLETED, instancePartitions); + } + } + + throw new ControllerApplicationException(LOGGER, "Instance partitions cannot be applied to the table", + Response.Status.BAD_REQUEST); + } + + @DELETE + @Produces(MediaType.APPLICATION_JSON) + @Path("/tables/{tableName}/instancePartitions") + @ApiOperation(value = "Remove the instance partitions") + public SuccessResponse removeInstancePartitions( + @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName, + @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable InstancePartitionsType instancePartitionsType) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + if (tableType != TableType.REALTIME && (instancePartitionsType == InstancePartitionsType.OFFLINE + || instancePartitionsType == null)) { + removeInstancePartitionsHelper(InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName)); + } + if (tableType != TableType.OFFLINE) { + if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) { + removeInstancePartitionsHelper(InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName)); + } + if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) { + removeInstancePartitionsHelper(InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName)); + } + } + return new SuccessResponse("Instance partitions removed"); + } + + private void removeInstancePartitionsHelper(String instancePartitionsName) { + try { + LOGGER.info("Removing instance partitions: {} from the property store", instancePartitionsName); + InstancePartitionsUtils.removeInstancePartitions(_resourceManager.getPropertyStore(), instancePartitionsName); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Caught Exception while removing the instance partitions", + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/tables/{tableName}/replaceInstance") + @ApiOperation(value = "Replace an instance in the instance partitions") + public Map<InstancePartitionsType, InstancePartitions> replaceInstance( + @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName, + @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable InstancePartitionsType instancePartitionsType, + @ApiParam(value = "Old instance to be replaced", required = true) @QueryParam("oldInstanceId") String oldInstanceId, + @ApiParam(value = "New instance to replace with", required = true) @QueryParam("newInstanceId") String newInstanceId) { + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + getInstancePartitions(tableName, instancePartitionsType); + Iterator<InstancePartitions> iterator = instancePartitionsMap.values().iterator(); + while (iterator.hasNext()) { + InstancePartitions instancePartitions = iterator.next(); + boolean oldInstanceFound = false; + Map<String, List<String>> partitionToInstancesMap = instancePartitions.getPartitionToInstancesMap(); + for (List<String> instances : partitionToInstancesMap.values()) { + oldInstanceFound |= Collections.replaceAll(instances, oldInstanceId, newInstanceId); + } + if (oldInstanceFound) { + persistInstancePartitionsHelper(instancePartitions); + } else { + iterator.remove(); + } + } + if (instancePartitionsMap.isEmpty()) { + throw new ControllerApplicationException(LOGGER, "Failed to find the old instance", Response.Status.NOT_FOUND); + } else { + return instancePartitionsMap; + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java index 6dc7b40..d638392 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java @@ -18,8 +18,9 @@ */ package org.apache.pinot.controller.helix; -import org.apache.avro.reflect.Nullable; +import javax.annotation.Nullable; import org.apache.commons.lang.StringUtils; +import org.apache.pinot.common.utils.InstancePartitionsType; import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.common.utils.URIUtils; @@ -202,4 +203,39 @@ public class ControllerRequestURLBuilder { public String forSegmentListAPI(String tableName) { return StringUtil.join("/", _baseUrl, "segments", tableName); } + + public String forInstancePartitions(String tableName, @Nullable InstancePartitionsType instancePartitionsType) { + String url = StringUtil.join("/", _baseUrl, "tables", tableName, "instancePartitions"); + if (instancePartitionsType != null) { + url += "?type=" + instancePartitionsType; + } + return url; + } + + public String forInstanceAssign(String tableName, @Nullable InstancePartitionsType instancePartitionsType, + boolean dryRun) { + String url = StringUtil.join("/", _baseUrl, "tables", tableName, "assignInstances"); + if (instancePartitionsType != null) { + url += "?type=" + instancePartitionsType; + if (dryRun) { + url += "&dryRun=true"; + } + } else { + if (dryRun) { + url += "?dryRun=true"; + } + } + return url; + } + + public String forInstanceReplace(String tableName, @Nullable InstancePartitionsType instancePartitionsType, + String oldInstanceId, String newInstanceId) { + String url = + StringUtil.join("/", _baseUrl, "tables", tableName, "replaceInstance") + "?oldInstanceId=" + oldInstanceId + + "&newInstanceId=" + newInstanceId; + if (instancePartitionsType != null) { + url += "&type=" + instancePartitionsType; + } + return url; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index d4579de..c7fb073 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -72,6 +72,7 @@ import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.config.TagNameUtils; import org.apache.pinot.common.config.Tenant; import org.apache.pinot.common.config.TenantConfig; +import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils; import org.apache.pinot.common.data.Schema; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.exception.TableNotFoundException; @@ -83,14 +84,14 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; -import org.apache.pinot.common.partition.ReplicaGroupPartitionAssignment; -import org.apache.pinot.common.partition.ReplicaGroupPartitionAssignmentGenerator; import org.apache.pinot.common.restlet.resources.RebalanceResult; import org.apache.pinot.common.segment.SegmentMetadata; import org.apache.pinot.common.utils.CommonConstants.Helix; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnlineOfflineStateModel; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.JsonUtils; import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider; @@ -98,16 +99,17 @@ import org.apache.pinot.common.utils.retry.RetryPolicies; import org.apache.pinot.common.utils.retry.RetryPolicy; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.resources.StateType; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils; +import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy; import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory; import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy; -import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyEnum; import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyFactory; import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils; import org.apache.pinot.controller.helix.starter.HelixConfig; import org.apache.pinot.core.realtime.stream.StreamConfig; -import org.apache.pinot.core.util.ReplicationUtils; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1024,8 +1026,9 @@ public class PinotHelixResourceManager { validateTableTenantConfig(tableConfig); String tableNameWithType = tableConfig.getTableName(); - TableType tableType = tableConfig.getTableType(); SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig(); + + TableType tableType = tableConfig.getTableType(); switch (tableType) { case OFFLINE: // existing tooling relies on this check not existing for realtime table (to migrate to LLC) @@ -1040,14 +1043,16 @@ public class PinotHelixResourceManager { _enableBatchMessageMode); LOGGER.info("adding table via the admin"); _helixAdmin.addResource(_helixClusterName, tableNameWithType, offlineIdealState); - LOGGER.info("successfully added the table : " + tableNameWithType + " to the cluster"); // lets add table configs ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord()); - // Update replica group partition assignment to the property store if applicable - updateReplicaGroupPartitionAssignment(tableConfig); + // Assign instances + assignInstances(tableConfig, true); + + LOGGER.info("Successfully added table: {}", tableNameWithType); break; + case REALTIME: IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); verifyIndexingConfig(tableNameWithType, indexingConfig); @@ -1067,11 +1072,6 @@ public class PinotHelixResourceManager { // lets add table configs ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord()); - // Update replica group partition assignment to the property store if applicable - if (ReplicationUtils.setupRealtimeReplicaGroups(tableConfig)) { - updateReplicaGroupPartitionAssignment(tableConfig); - } - /* * PinotRealtimeSegmentManager sets up watches on table and segment path. When a table gets created, * it expects the INSTANCE path in propertystore to be set up so that it can get the group ID and @@ -1086,10 +1086,14 @@ public class PinotHelixResourceManager { */ ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType, indexingConfig); + // Assign instances + assignInstances(tableConfig, true); + LOGGER.info("Successfully added or updated the table {} ", tableNameWithType); break; + default: - throw new InvalidTableConfigException("UnSupported table type: " + tableType); + throw new InvalidTableConfigException("Unsupported table type: " + tableType); } String brokerTenantName = TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker()); @@ -1132,43 +1136,6 @@ public class PinotHelixResourceManager { } } - /** - * Update replica group partition assignment in the property store - * - * @param tableConfig a table config - */ - private void updateReplicaGroupPartitionAssignment(TableConfig tableConfig) { - String tableNameWithType = tableConfig.getTableName(); - String assignmentStrategy = tableConfig.getValidationConfig().getSegmentAssignmentStrategy(); - // We create replica group partition assignment and write to property store if new table config - // has the replica group config. - if (assignmentStrategy != null && SegmentAssignmentStrategyEnum.valueOf(assignmentStrategy) - == SegmentAssignmentStrategyEnum.ReplicaGroupSegmentAssignmentStrategy) { - ReplicaGroupPartitionAssignmentGenerator partitionAssignmentGenerator = - new ReplicaGroupPartitionAssignmentGenerator(_propertyStore); - - // Create the new replica group partition assignment if there is none in the property store. - // This will create the replica group partition assignment and write to the property store in 2 cases: - // 1. when we create the table with replica group segment assignment - // 2. when we update the table config with replica group segment assignment from another assignment strategy - if (partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType) == null) { - TableType tableType = tableConfig.getTableType(); - List<String> servers; - if (tableType.equals(TableType.OFFLINE)) { - OfflineTagConfig offlineTagConfig = new OfflineTagConfig(tableConfig); - servers = getInstancesWithTag(offlineTagConfig.getOfflineServerTag()); - } else { - RealtimeTagConfig realtimeTagConfig = new RealtimeTagConfig(tableConfig); - servers = getInstancesWithTag(realtimeTagConfig.getConsumingServerTag()); - } - int numReplicas = ReplicationUtils.getReplication(tableConfig); - ReplicaGroupPartitionAssignment partitionAssignment = partitionAssignmentGenerator - .buildReplicaGroupPartitionAssignment(tableNameWithType, tableConfig, numReplicas, servers); - partitionAssignmentGenerator.writeReplicaGroupPartitionAssignment(partitionAssignment); - } - } - } - public static class InvalidTableConfigException extends RuntimeException { public InvalidTableConfigException(String message) { super(message); @@ -1268,6 +1235,35 @@ public class PinotHelixResourceManager { } } + private void assignInstances(TableConfig tableConfig, boolean override) + throws IOException { + String tableNameWithType = tableConfig.getTableName(); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + + List<InstancePartitionsType> instancePartitionsTypesToAssign = new ArrayList<>(); + for (InstancePartitionsType instancePartitionsType : InstancePartitionsType.values()) { + if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) { + if (override || InstancePartitionsUtils + .fetchInstancePartitions(_propertyStore, instancePartitionsType.getInstancePartitionsName(rawTableName)) + == null) { + instancePartitionsTypesToAssign.add(instancePartitionsType); + } + } + } + + if (!instancePartitionsTypesToAssign.isEmpty()) { + LOGGER.info("Assigning {} instances to table: {}", instancePartitionsTypesToAssign, tableNameWithType); + InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig); + List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs(); + for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) { + InstancePartitions instancePartitions = + instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs); + InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions); + LOGGER.info("Persisted instance partitions: {}", JsonUtils.objectToString(instancePartitions)); + } + } + } + /** * Validate the table config and update it * @throws IOException @@ -1284,34 +1280,42 @@ public class PinotHelixResourceManager { public void setExistingTableConfig(TableConfig tableConfig) throws IOException { String tableNameWithType = tableConfig.getTableName(); + SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig(); + TableType tableType = tableConfig.getTableType(); - if (tableType == TableType.REALTIME) { - IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); - verifyIndexingConfig(tableNameWithType, indexingConfig); - // Update replica group partition assignment to the property store if applicable - if (ReplicationUtils.setupRealtimeReplicaGroups(tableConfig)) { - updateReplicaGroupPartitionAssignment(tableConfig); - } - ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord()); - ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType, indexingConfig); - } else if (tableType == TableType.OFFLINE) { - // Update replica group partition assignment to the property store if applicable - updateReplicaGroupPartitionAssignment(tableConfig); - - ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord()); - IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); - final String configReplication = tableConfig.getValidationConfig().getReplication(); - if (configReplication != null && !tableConfig.getValidationConfig().getReplication() - .equals(idealState.getReplicas())) { - HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, new Function<IdealState, IdealState>() { - @Nullable - @Override - public IdealState apply(@Nullable IdealState idealState) { - idealState.setReplicas(configReplication); - return idealState; - } - }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); - } + switch (tableType) { + case OFFLINE: + ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord()); + + // Update IdealState replication + IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); + String replicationConfigured = segmentsConfig.getReplication(); + if (!idealState.getReplicas().equals(replicationConfigured)) { + HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is -> { + assert is != null; + is.setReplicas(replicationConfigured); + return is; + }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); + } + + // Assign instances + assignInstances(tableConfig, false); + + break; + + case REALTIME: + IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); + verifyIndexingConfig(tableNameWithType, indexingConfig); + ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord()); + ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType, indexingConfig); + + // Assign instances + assignInstances(tableConfig, false); + + break; + + default: + throw new InvalidTableConfigException("Unsupported table type: " + tableType); } } @@ -1396,9 +1400,11 @@ public class PinotHelixResourceManager { ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore, offlineTableName); LOGGER.info("Deleting table {}: Removed table config", offlineTableName); - // Remove replica group partition assignment - ZKMetadataProvider.removeInstancePartitionAssignmentFromPropertyStore(_propertyStore, offlineTableName); - LOGGER.info("Deleting table {}: Removed replica group partition assignment", offlineTableName); + // Remove instance partitions + InstancePartitionsUtils.removeInstancePartitions(_propertyStore, + InstancePartitionsType.OFFLINE.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableName))); + LOGGER.info("Deleting table {}: Removed instance partitions", offlineTableName); + LOGGER.info("Deleting table {}: Finish", offlineTableName); } @@ -1430,11 +1436,15 @@ public class PinotHelixResourceManager { ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore, realtimeTableName); LOGGER.info("Deleting table {}: Removed table config", realtimeTableName); - // Remove replica group partition assignment - ZKMetadataProvider.removeInstancePartitionAssignmentFromPropertyStore(_propertyStore, realtimeTableName); - LOGGER.info("Deleting table {}: Removed replica group partition assignment", realtimeTableName); + // Remove instance partitions + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + InstancePartitionsUtils.removeInstancePartitions(_propertyStore, + InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName)); + InstancePartitionsUtils.removeInstancePartitions(_propertyStore, + InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName)); + LOGGER.info("Deleting table {}: Removed instance partitions", realtimeTableName); - // Remove groupId/PartitionId mapping for HLC table + // Remove groupId/partitionId mapping for HLC table if (instancesForTable != null) { for (String instance : instancesForTable) { InstanceZKMetadata instanceZKMetadata = ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, instance); @@ -1444,7 +1454,8 @@ public class PinotHelixResourceManager { } } } - LOGGER.info("Deleting table {}: Removed replica group partition assignment", realtimeTableName); + LOGGER.info("Deleting table {}: Removed groupId/partitionId mapping for HLC table", realtimeTableName); + LOGGER.info("Deleting table {}: Finish", realtimeTableName); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java index 6d9a35f..4ed659e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java @@ -18,7 +18,10 @@ */ package org.apache.pinot.controller.helix.core.assignment; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -32,6 +35,7 @@ import org.apache.helix.ZNRecord; * list of server instances, and is persisted under the ZK path: {@code <cluster>/PROPERTYSTORE/INSTANCE_PARTITIONS}. * <p>The segment assignment will be based on the instance partitions of the table. */ +@JsonIgnoreProperties(ignoreUnknown = true) public class InstancePartitions { private static final char PARTITION_REPLICA_SEPARATOR = '_'; @@ -46,7 +50,9 @@ public class InstancePartitions { _partitionToInstancesMap = new TreeMap<>(); } - private InstancePartitions(String name, Map<String, List<String>> partitionToInstancesMap) { + @JsonCreator + private InstancePartitions(@JsonProperty(value = "name", required = true) String name, + @JsonProperty(value = "partitionToInstancesMap", required = true) Map<String, List<String>> partitionToInstancesMap) { _name = name; _partitionToInstancesMap = partitionToInstancesMap; for (String key : partitionToInstancesMap.keySet()) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java index be2c99d..8e74340 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java @@ -20,10 +20,13 @@ package org.apache.pinot.controller.helix.core.assignment; import java.util.Collections; import java.util.List; +import javax.annotation.Nullable; +import org.I0Itec.zkclient.exception.ZkException; import org.apache.helix.AccessOption; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.store.HelixPropertyStore; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.config.TagNameUtils; @@ -45,14 +48,23 @@ public class InstancePartitionsUtils { public static InstancePartitions fetchOrComputeInstancePartitions(HelixManager helixManager, TableConfig tableConfig, InstancePartitionsType instancePartitionsType) { String tableNameWithType = tableConfig.getTableName(); - String instancePartitionsName = - instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + String instancePartitionsName = instancePartitionsType.getInstancePartitionsName(rawTableName); // Fetch the instance partitions from property store if exists - String path = ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitionsName); - ZNRecord znRecord = helixManager.getHelixPropertyStore().get(path, null, AccessOption.PERSISTENT); - if (znRecord != null) { - return InstancePartitions.fromZNRecord(znRecord); + ZkHelixPropertyStore<ZNRecord> propertyStore = helixManager.getHelixPropertyStore(); + InstancePartitions instancePartitions = fetchInstancePartitions(propertyStore, instancePartitionsName); + if (instancePartitions != null) { + return instancePartitions; + } + + // Use the CONSUMING instance partitions for COMPLETED segments if exists + if (instancePartitionsType == InstancePartitionsType.COMPLETED) { + InstancePartitions consumingInstancePartitions = fetchInstancePartitions(propertyStore, + InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName)); + if (consumingInstancePartitions != null) { + return consumingInstancePartitions; + } } // Compute the instance partitions (for backward compatible) @@ -64,19 +76,41 @@ public class InstancePartitionsUtils { instances.sort(null); int numInstances = instances.size(); Collections.rotate(instances, -(Math.abs(tableNameWithType.hashCode()) % numInstances)); - InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName); + instancePartitions = new InstancePartitions(instancePartitionsName); instancePartitions.setInstances(0, 0, instances); return instancePartitions; } /** + * Fetches the instance partitions from Helix property store. + */ + @Nullable + public static InstancePartitions fetchInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + String instancePartitionsName) { + String path = ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitionsName); + ZNRecord znRecord = propertyStore.get(path, null, AccessOption.PERSISTENT); + return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null; + } + + /** * Persists the instance partitions to Helix property store. - * - * @return true if the instance partitions was successfully persisted, false otherwise */ - public static boolean persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + public static void persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, InstancePartitions instancePartitions) { String path = ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitions.getName()); - return propertyStore.set(path, instancePartitions.toZNRecord(), AccessOption.PERSISTENT); + if (!propertyStore.set(path, instancePartitions.toZNRecord(), AccessOption.PERSISTENT)) { + throw new ZkException("Failed to persist instance partitions: " + instancePartitions.getName()); + } + } + + /** + * Removes the instance partitions from Helix property store. + */ + public static void removeInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + String instancePartitionsName) { + String path = ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitionsName); + if (!propertyStore.remove(path, AccessOption.PERSISTENT)) { + throw new ZkException("Failed to remove instance partitions: " + instancePartitionsName); + } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java index a1fbc40..a715834 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.config.instance.InstanceAssignmentConfig; import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils; import org.apache.pinot.common.config.instance.InstanceConstraintConfig; @@ -80,6 +81,9 @@ public class InstanceAssignmentDriver { Preconditions.checkState(replicaPartitionConfig != null, "Instance replica/partition config is missing"); InstanceReplicaPartitionSelector replicaPartitionSelector = new InstanceReplicaPartitionSelector(replicaPartitionConfig, tableNameWithType); - return replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap); + InstancePartitions instancePartitions = new InstancePartitions( + instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType))); + replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions); + return instancePartitions; } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java index 8f6976c..90c35fb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java @@ -51,7 +51,8 @@ public class InstanceReplicaPartitionSelector { * Selects instances based on the replica and partition config, and stores the result into the given instance * partitions. */ - public InstancePartitions selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap) { + public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap, + InstancePartitions instancePartitions) { int numPools = poolToInstanceConfigsMap.size(); Preconditions.checkState(numPools != 0, "No pool qualified for selection"); @@ -61,8 +62,6 @@ public class InstanceReplicaPartitionSelector { LOGGER.info("Starting instance replica/partition selection for table: {} with hash: {} from pools: {}", _tableNameWithType, tableNameHash, pools); - InstancePartitions instancePartitions = new InstancePartitions(_tableNameWithType); - if (_replicaPartitionConfig.isReplicaGroupBased()) { // Replica-group based selection @@ -183,7 +182,5 @@ public class InstanceReplicaPartitionSelector { // Set the instances as partition 0 replica 0 instancePartitions.setInstances(0, 0, instancesToSelect); } - - return instancePartitions; } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java new file mode 100644 index 0000000..9d6e7d7 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api; + +import com.fasterxml.jackson.core.type.TypeReference; +import java.io.FileNotFoundException; +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.config.TableNameBuilder; +import org.apache.pinot.common.config.TagNameUtils; +import org.apache.pinot.common.config.Tenant; +import org.apache.pinot.common.config.instance.InstanceAssignmentConfig; +import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig; +import org.apache.pinot.common.config.instance.InstanceTagPoolConfig; +import org.apache.pinot.common.data.FieldSpec.DataType; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.JsonUtils; +import org.apache.pinot.common.utils.TenantRole; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + + +public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest { + private static final int NUM_SERVER_INSTANCES = 3; + private static final String TENANT_NAME = "testTenant"; + private static final String RAW_TABLE_NAME = "testTable"; + private static final String TIME_COLUMN_NAME = "daysSinceEpoch"; + + @BeforeClass + public void setUp() + throws Exception { + startZk(); + ControllerConf config = getDefaultControllerConfiguration(); + config.setTenantIsolationEnabled(false); + startController(config); + addFakeBrokerInstancesToAutoJoinHelixCluster(1, false); + addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, false); + + // Create broker and server tenant + Tenant brokerTenant = new Tenant.TenantBuilder(TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(1).build(); + _helixResourceManager.createBrokerTenant(brokerTenant); + Tenant serverTenant = + new Tenant.TenantBuilder(TENANT_NAME).setRole(TenantRole.SERVER).setOfflineInstances(1).setRealtimeInstances(1) + .build(); + _helixResourceManager.createServerTenant(serverTenant); + } + + @Test + public void testInstanceAssignment() + throws Exception { + Schema schema = + new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addTime(TIME_COLUMN_NAME, TimeUnit.DAYS, DataType.INT) + .build(); + _helixResourceManager.addOrUpdateSchema(schema); + TableConfig offlineTableConfig = + new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(TENANT_NAME) + .setServerTenant(TENANT_NAME).build(); + _helixResourceManager.addTable(offlineTableConfig); + TableConfig realtimeTableConfig = + new TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setBrokerTenant(TENANT_NAME) + .setServerTenant(TENANT_NAME).setLLC(true) + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build(); + _helixResourceManager.addTable(realtimeTableConfig); + + // There should be no instance partitions + try { + getInstancePartitionsMap(); + fail(); + } catch (FileNotFoundException e) { + // Expected + } + + // Assign instances should fail + try { + sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, null, true), null); + fail(); + } catch (FileNotFoundException e) { + // Expected + } + + // Add OFFLINE instance assignment config to the OFFLINE table config + InstanceAssignmentConfig offlineInstanceAssignmentConfig = new InstanceAssignmentConfig(); + InstanceTagPoolConfig offlineInstanceTagPoolConfig = new InstanceTagPoolConfig(); + offlineInstanceTagPoolConfig.setTag(TagNameUtils.getOfflineTagForTenant(TENANT_NAME)); + offlineInstanceAssignmentConfig.setTagPoolConfig(offlineInstanceTagPoolConfig); + offlineInstanceAssignmentConfig.setReplicaPartitionConfig(new InstanceReplicaPartitionConfig()); + offlineTableConfig.setInstanceAssignmentConfigMap( + Collections.singletonMap(InstancePartitionsType.OFFLINE, offlineInstanceAssignmentConfig)); + _helixResourceManager.setExistingTableConfig(offlineTableConfig); + + // OFFLINE instance partitions should be generated + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = getInstancePartitionsMap(); + assertEquals(instancePartitionsMap.size(), 1); + InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE); + assertNotNull(offlineInstancePartitions); + assertEquals(offlineInstancePartitions.getNumReplicas(), 1); + assertEquals(offlineInstancePartitions.getNumPartitions(), 1); + assertEquals(offlineInstancePartitions.getInstances(0, 0).size(), 1); + String offlineInstanceId = offlineInstancePartitions.getInstances(0, 0).get(0); + + // Add CONSUMING instance assignment config to the REALTIME table config + InstanceAssignmentConfig consumingInstanceAssignmentConfig = new InstanceAssignmentConfig(); + InstanceTagPoolConfig realtimeInstanceTagPoolConfig = new InstanceTagPoolConfig(); + realtimeInstanceTagPoolConfig.setTag(TagNameUtils.getRealtimeTagForTenant(TENANT_NAME)); + consumingInstanceAssignmentConfig.setTagPoolConfig(realtimeInstanceTagPoolConfig); + consumingInstanceAssignmentConfig.setReplicaPartitionConfig(new InstanceReplicaPartitionConfig()); + realtimeTableConfig.setInstanceAssignmentConfigMap( + Collections.singletonMap(InstancePartitionsType.CONSUMING, consumingInstanceAssignmentConfig)); + _helixResourceManager.setExistingTableConfig(realtimeTableConfig); + + // CONSUMING instance partitions should be generated + instancePartitionsMap = getInstancePartitionsMap(); + assertEquals(instancePartitionsMap.size(), 2); + offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE); + assertNotNull(offlineInstancePartitions); + assertEquals(offlineInstancePartitions.getNumReplicas(), 1); + assertEquals(offlineInstancePartitions.getNumPartitions(), 1); + assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId)); + InstancePartitions consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); + assertNotNull(consumingInstancePartitions); + assertEquals(consumingInstancePartitions.getNumReplicas(), 1); + assertEquals(consumingInstancePartitions.getNumPartitions(), 1); + assertEquals(consumingInstancePartitions.getInstances(0, 0).size(), 1); + String realtimeInstanceId = consumingInstancePartitions.getInstances(0, 0).get(0); + + // Use OFFLINE instance assignment config as the COMPLETED instance assignment config + realtimeTableConfig + .setInstanceAssignmentConfigMap(new TreeMap<InstancePartitionsType, InstanceAssignmentConfig>() {{ + put(InstancePartitionsType.CONSUMING, consumingInstanceAssignmentConfig); + put(InstancePartitionsType.COMPLETED, offlineInstanceAssignmentConfig); + }}); + _helixResourceManager.setExistingTableConfig(realtimeTableConfig); + + // COMPLETED instance partitions should be generated + instancePartitionsMap = getInstancePartitionsMap(); + assertEquals(instancePartitionsMap.size(), 3); + offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE); + assertNotNull(offlineInstancePartitions); + assertEquals(offlineInstancePartitions.getNumReplicas(), 1); + assertEquals(offlineInstancePartitions.getNumPartitions(), 1); + assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId)); + consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); + assertNotNull(consumingInstancePartitions); + assertEquals(consumingInstancePartitions.getNumReplicas(), 1); + assertEquals(consumingInstancePartitions.getNumPartitions(), 1); + assertEquals(consumingInstancePartitions.getInstances(0, 0), Collections.singletonList(realtimeInstanceId)); + InstancePartitions completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED); + assertEquals(completedInstancePartitions.getNumReplicas(), 1); + assertEquals(completedInstancePartitions.getNumPartitions(), 1); + assertEquals(completedInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId)); + + // Test fetching instance partitions by table name with type suffix + instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(_controllerRequestURLBuilder + .forInstancePartitions(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME), null))); + assertEquals(instancePartitionsMap.size(), 1); + assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE)); + instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(_controllerRequestURLBuilder + .forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME), null))); + assertEquals(instancePartitionsMap.size(), 2); + assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING)); + assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED)); + + // Test fetching instance partitions by table name and instance partitions type + for (InstancePartitionsType instancePartitionsType : InstancePartitionsType.values()) { + instancePartitionsMap = deserializeInstancePartitionsMap( + sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, instancePartitionsType))); + assertEquals(instancePartitionsMap.size(), 1); + assertEquals(instancePartitionsMap.get(instancePartitionsType).getName(), + instancePartitionsType.getInstancePartitionsName(RAW_TABLE_NAME)); + } + + // Remove the instance partitions for both OFFLINE and REALTIME table + sendDeleteRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null)); + try { + getInstancePartitionsMap(); + fail(); + } catch (FileNotFoundException e) { + // Expected + } + + // Assign instances without instance partitions type (dry run) + instancePartitionsMap = deserializeInstancePartitionsMap( + sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, null, true), null)); + assertEquals(instancePartitionsMap.size(), 3); + offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE); + assertNotNull(offlineInstancePartitions); + assertEquals(offlineInstancePartitions.getNumReplicas(), 1); + assertEquals(offlineInstancePartitions.getNumPartitions(), 1); + assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId)); + consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); + assertNotNull(consumingInstancePartitions); + assertEquals(consumingInstancePartitions.getNumReplicas(), 1); + assertEquals(consumingInstancePartitions.getNumPartitions(), 1); + assertEquals(consumingInstancePartitions.getInstances(0, 0), Collections.singletonList(realtimeInstanceId)); + completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED); + assertEquals(completedInstancePartitions.getNumReplicas(), 1); + assertEquals(completedInstancePartitions.getNumPartitions(), 1); + assertEquals(completedInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId)); + + // Instance partitions should not be persisted + try { + getInstancePartitionsMap(); + fail(); + } catch (FileNotFoundException e) { + // Expected + } + + // Assign instances for both OFFLINE and REALTIME table + sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, null, false), null); + + // Instance partitions should be persisted + instancePartitionsMap = getInstancePartitionsMap(); + assertEquals(instancePartitionsMap.size(), 3); + + // Remove the instance partitions for REALTIME table + sendDeleteRequest(_controllerRequestURLBuilder + .forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME), null)); + instancePartitionsMap = getInstancePartitionsMap(); + assertEquals(instancePartitionsMap.size(), 1); + assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE)); + + // Assign instances for COMPLETED segments + instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest( + _controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, InstancePartitionsType.COMPLETED, false), null)); + assertEquals(instancePartitionsMap.size(), 1); + assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED)); + + // There should be OFFLINE and COMPLETED instance partitions persisted + instancePartitionsMap = getInstancePartitionsMap(); + assertEquals(instancePartitionsMap.size(), 2); + assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE)); + assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED)); + + // Replace OFFLINE instance with REALTIME instance for COMPLETED instance partitions + instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest(_controllerRequestURLBuilder + .forInstanceReplace(RAW_TABLE_NAME, InstancePartitionsType.COMPLETED, offlineInstanceId, realtimeInstanceId), + null)); + assertEquals(instancePartitionsMap.size(), 1); + assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0, 0), + Collections.singletonList(realtimeInstanceId)); + + // Replace the instance again using REALTIME table name (old instance does not exist) + try { + sendPostRequest(_controllerRequestURLBuilder + .forInstanceReplace(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME), null, offlineInstanceId, + realtimeInstanceId), null); + fail(); + } catch (FileNotFoundException e) { + // Expected + } + + // Post the CONSUMING instance partitions + instancePartitionsMap = deserializeInstancePartitionsMap( + sendPutRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null), + JsonUtils.objectToString(consumingInstancePartitions))); + assertEquals(instancePartitionsMap.size(), 1); + assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0, 0), + Collections.singletonList(realtimeInstanceId)); + + // OFFLINE instance partitions should have OFFLINE instance, CONSUMING and COMPLETED instance partitions should have + // REALTIME instance + instancePartitionsMap = getInstancePartitionsMap(); + assertEquals(instancePartitionsMap.size(), 3); + assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE).getInstances(0, 0), + Collections.singletonList(offlineInstanceId)); + assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0, 0), + Collections.singletonList(realtimeInstanceId)); + assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0, 0), + Collections.singletonList(realtimeInstanceId)); + + // Delete the OFFLINE table + _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME); + instancePartitionsMap = getInstancePartitionsMap(); + assertEquals(instancePartitionsMap.size(), 2); + assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING)); + assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED)); + + // Delete the REALTIME table + _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME); + try { + getInstancePartitionsMap(); + fail(); + } catch (FileNotFoundException e) { + // Expected + } + } + + private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap() + throws Exception { + return deserializeInstancePartitionsMap( + sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null))); + } + + private Map<InstancePartitionsType, InstancePartitions> deserializeInstancePartitionsMap( + String instancePartitionsMapString) + throws Exception { + return JsonUtils.stringToObject(instancePartitionsMapString, + new TypeReference<Map<InstancePartitionsType, InstancePartitions>>() { + }); + } + + @AfterClass + public void tearDown() { + stopFakeInstances(); + stopController(); + stopZk(); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java index 8639b00..fec7a73 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java @@ -29,8 +29,8 @@ import org.apache.pinot.common.config.ReplicaGroupStrategyConfig; import org.apache.pinot.common.config.SegmentPartitionConfig; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.config.TagNameUtils; -import org.apache.pinot.common.config.TagOverrideConfig; import org.apache.pinot.common.config.instance.InstanceAssignmentConfig; +import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils; import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig; import org.apache.pinot.common.config.instance.InstanceTagPoolConfig; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; @@ -40,6 +40,7 @@ import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.fail; @@ -47,34 +48,9 @@ public class InstanceAssignmentTest { private static final String RAW_TABLE_NAME = "myTable"; private static final String TENANT_NAME = "tenant"; private static final String OFFLINE_TAG = TagNameUtils.getOfflineTagForTenant(TENANT_NAME); - private static final String REALTIME_TAG = TagNameUtils.getRealtimeTagForTenant(TENANT_NAME); private static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_"; @Test - public void testDefaultOfflineNonReplicaGroup() { - TableConfig tableConfig = - new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setServerTenant(TENANT_NAME).build(); - InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); - int numInstances = 10; - List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances); - for (int i = 0; i < numInstances; i++) { - InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); - instanceConfig.addTag(OFFLINE_TAG); - instanceConfigs.add(instanceConfig); - } - - // All instances should be assigned as replica 0 partition 0 - InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), 1); - assertEquals(instancePartitions.getNumPartitions(), 1); - List<String> expectedInstances = new ArrayList<>(numInstances); - for (int i = 0; i < numInstances; i++) { - expectedInstances.add(SERVER_INSTANCE_ID_PREFIX + i); - } - assertEquals(instancePartitions.getInstances(0, 0), expectedInstances); - } - - @Test public void testDefaultOfflineReplicaGroup() { int numReplicas = 3; TableConfig tableConfig = @@ -149,53 +125,6 @@ public class InstanceAssignmentTest { } @Test - public void testRealtimeTagOverride() { - // Override COMPLETED tag to use OFFLINE tag - TagOverrideConfig tagOverrideConfig = new TagOverrideConfig(); - tagOverrideConfig.setRealtimeConsuming(REALTIME_TAG); - tagOverrideConfig.setRealtimeCompleted(OFFLINE_TAG); - TableConfig tableConfig = - new TableConfig.Builder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTagOverrideConfig(tagOverrideConfig) - .build(); - InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); - int numInstances = 10; - List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances); - for (int i = 0; i < numInstances; i++) { - InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); - if (i % 2 == 0) { - instanceConfig.addTag(REALTIME_TAG); - } else { - instanceConfig.addTag(OFFLINE_TAG); - } - instanceConfigs.add(instanceConfig); - } - - // All instances with REALTIME tag should be assigned as replica 0 partition 0 for CONSUMING instance partitions - InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), 1); - assertEquals(instancePartitions.getNumPartitions(), 1); - List<String> expectedInstances = new ArrayList<>(numInstances); - for (int i = 0; i < numInstances; i++) { - if (i % 2 == 0) { - expectedInstances.add(SERVER_INSTANCE_ID_PREFIX + i); - } - } - assertEquals(instancePartitions.getInstances(0, 0), expectedInstances); - - // All instances with OFFLINE tag should be assigned as replica 0 partition 0 for COMPLETED instance partitions - instancePartitions = driver.assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), 1); - assertEquals(instancePartitions.getNumPartitions(), 1); - expectedInstances = new ArrayList<>(numInstances); - for (int i = 0; i < numInstances; i++) { - if (i % 2 != 0) { - expectedInstances.add(SERVER_INSTANCE_ID_PREFIX + i); - } - } - assertEquals(instancePartitions.getInstances(0, 0), expectedInstances); - } - - @Test public void testPoolBased() { // 10 instances in 2 pools, each with 5 instances int numInstances = 10; @@ -329,10 +258,7 @@ public class InstanceAssignmentTest { @Test public void testIllegalConfig() { - InstanceAssignmentConfig assignmentConfig = new InstanceAssignmentConfig(); - TableConfig tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) - .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, assignmentConfig)) - .build(); + TableConfig tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); int numInstances = 10; @@ -342,6 +268,19 @@ public class InstanceAssignmentTest { instanceConfigs.add(instanceConfig); } + // No instance assignment config + assertFalse(InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, InstancePartitionsType.OFFLINE)); + try { + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); + fail(); + } catch (IllegalStateException e) { + assertEquals(e.getMessage(), "Instance assignment is not allowed for the given table config"); + } + + InstanceAssignmentConfig assignmentConfig = new InstanceAssignmentConfig(); + tableConfig + .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, assignmentConfig)); + // No instance tag/pool config try { driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java index 1a5d017..41bdc8d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java @@ -217,9 +217,11 @@ public class ReplicaGroupRebalanceStrategyTest extends ControllerTest { } // Test removing a replica group + // NOTE: ReplicaGroupRebalanceSegmentStrategy is able to remove a replica-group only when all the instances in the + // replica are removed _helixAdmin.removeInstanceTag(getHelixClusterName(), "Server_localhost_0", OFFLINE_TENENT_NAME); - _helixAdmin.removeInstanceTag(getHelixClusterName(), "Server_localhost_1", OFFLINE_TENENT_NAME); _helixAdmin.removeInstanceTag(getHelixClusterName(), "Server_localhost_2", OFFLINE_TENENT_NAME); + _helixAdmin.removeInstanceTag(getHelixClusterName(), "Server_localhost_4", OFFLINE_TENENT_NAME); targetNumInstancePerPartition = 3; targetNumReplicaGroup = 2; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java index 92ea2af..9f35b9b 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java @@ -206,78 +206,6 @@ public class SegmentAssignmentStrategyTest extends ControllerTest { Assert.assertTrue(partitionAssignment == null); } - /** - * Tests to check creation/deletion of replica group znode when table is created/updated/deleted - */ - @Test - public void testRealtimeReplicaGroupPartitionAssignment() { - String rawTableName = TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT; - String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); - - Schema schema = new Schema.SchemaBuilder().setSchemaName(rawTableName).build(); - _helixResourceManager.addOrUpdateSchema(schema); - - Map<String, String> streamConfigMap = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap(); - - // Adding a table without replica group - TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME) - .setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT) - .setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICAS) - .setStreamConfigs(streamConfigMap).setLLC(true).build(); - try { - _helixResourceManager.addTable(tableConfig); - } catch (Exception e) { - // ignore - } - - // Check that partition assignment does not exist - ReplicaGroupPartitionAssignment partitionAssignment = - _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType); - - Assert.assertNull(partitionAssignment); - - // Update table config with replica group config - int numInstancesPerPartition = 5; - ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new ReplicaGroupStrategyConfig(); - replicaGroupStrategyConfig.setNumInstancesPerPartition(numInstancesPerPartition); - replicaGroupStrategyConfig.setMirrorAssignmentAcrossReplicaGroups(true); - - TableConfig replicaGroupTableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME) - .setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT).setNumReplicas(NUM_REPLICAS) - .setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy").setStreamConfigs(streamConfigMap) - .setLLC(true).build(); - - replicaGroupTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig); - - // Check that the replica group partition assignment is created - try { - _helixResourceManager.setExistingTableConfig(replicaGroupTableConfig); - } catch (Exception e) { - // ignore - } - partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType); - Assert.assertNotNull(partitionAssignment); - - // After table deletion, check that the replica group partition assignment is deleted - _helixResourceManager.deleteRealtimeTable(rawTableName); - partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType); - Assert.assertNull(partitionAssignment); - - // Create a table with replica group - try { - _helixResourceManager.addTable(replicaGroupTableConfig); - } catch (Exception e) { - // ignore - } - partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType); - Assert.assertNotNull(partitionAssignment); - - // Check that the replica group partition assignment is deleted - _helixResourceManager.deleteRealtimeTable(rawTableName); - partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType); - Assert.assertNull(partitionAssignment); - } - @Test public void testTableLevelAndMirroringReplicaGroupSegmentAssignmentStrategy() throws Exception { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java index 2c37800..5b0bd28 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/ReplicationUtils.java @@ -54,39 +54,4 @@ public class ReplicationUtils { } return false; } - - /** - * Returns the {@link SegmentsValidationAndRetentionConfig::getReplicasPerPartitionNumber} if it is eligible for use, - * else returns the {@link SegmentsValidationAndRetentionConfig::getReplicationNumber} - * - * The reason we have replication as well as replicasPerPartition is, "replication" is used in HLC to support 'split' kafka topics - * For example, if replication is 3, and we have 6 realtime servers, each server will half of the events in the topic - * (see @link PinotTableIdealStateBuilder::setupInstanceConfigForHighLevelConsumer} - * ReplicasPerPartition is used in LLC as the replication. - * We need to keep both, as we could be operating in dual mode during migrations from HLC to LLC - */ - public static int getReplication(TableConfig tableConfig) { - - SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); - return useReplicasPerPartition(tableConfig) ? validationConfig.getReplicasPerPartitionNumber() : validationConfig - .getReplicationNumber(); - } - - /** - * Check if replica groups setup is necessary for realtime - * - * Replica groups is supported when only LLC is present. - * We do not want znode being created or failures in some validations, when attempting to setup replica groups in HLC. - * As a result, Replica groups cannot be used during migrations from HLC to LLC. - * In such a scenario, migrate to LLC completely, and then enable replica groups - * - */ - public static boolean setupRealtimeReplicaGroups(TableConfig tableConfig) { - TableType tableType = tableConfig.getTableType(); - if (tableType.equals(TableType.REALTIME)) { - StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs()); - return streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType(); - } - return false; - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
