mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer to be able to rebalance table under any condition URL: https://github.com/apache/incubator-pinot/pull/4990#discussion_r368332729
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java ########## @@ -175,67 +166,127 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc if (currentAssignment.equals(targetAssignment)) { LOGGER.info("Table: {} is already balanced", tableNameWithType); if (reassignInstances) { - return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", - instancePartitionsMap, targetAssignment); + if (dryRun) { + return new RebalanceResult(RebalanceResult.Status.DONE, + "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap, + targetAssignment); + } else { + return new RebalanceResult(RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", + instancePartitionsMap, targetAssignment); + } } else { return new RebalanceResult(RebalanceResult.Status.NO_OP, "Table is already balanced", instancePartitionsMap, targetAssignment); } } if (dryRun) { - LOGGER.info("Rebalance table: {} in dry-run mode, returning the target assignment", tableNameWithType); + LOGGER.info("Rebalancing table: {} in dry-run mode, returning the target assignment", tableNameWithType); return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap, targetAssignment); } - int minAvailableReplicas; - if (rebalanceConfig.getBoolean(RebalanceConfigConstants.DOWNTIME, RebalanceConfigConstants.DEFAULT_DOWNTIME)) { - minAvailableReplicas = 0; + if (!downtime && !currentIdealState.isEnabled()) { + LOGGER.warn("Table: {} is disabled, rebalancing it with downtime", tableNameWithType); + downtime = true; + } + + if (downtime) { LOGGER.info("Rebalancing table: {} with downtime", tableNameWithType); - } else { - minAvailableReplicas = rebalanceConfig.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, - RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME); - int numCurrentReplicas = currentAssignment.values().iterator().next().size(); - int numTargetReplicas = targetAssignment.values().iterator().next().size(); - // Use the smaller one to determine the min available replicas - int numReplicas = Math.min(numCurrentReplicas, numTargetReplicas); - if (minAvailableReplicas > 0) { - if (minAvailableReplicas >= numReplicas) { - LOGGER.warn( - "Illegal config for min available replicas: {} for table: {}, must be less than number of replicas (current: {}, target: {})", - minAvailableReplicas, tableNameWithType, numCurrentReplicas, numTargetReplicas); - return new RebalanceResult(RebalanceResult.Status.FAILED, "Illegal min available replicas config", + + while (true) { + // Reuse current IdealState to update the IdealState in cluster + ZNRecord idealStateRecord = currentIdealState.getRecord(); + idealStateRecord.setMapFields(targetAssignment); + currentIdealState.setNumPartitions(targetAssignment.size()); + currentIdealState.setReplicas(Integer.toString(targetAssignment.values().iterator().next().size())); + + // Check version and update IdealState + try { + Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor() + .set(idealStatePropertyKey.getPath(), idealStateRecord, idealStateRecord.getVersion(), + AccessOption.PERSISTENT), "Failed to update IdealState"); + LOGGER.info("Finished rebalancing table: {} in {}ms.", tableNameWithType, + System.currentTimeMillis() - startTimeMs); + return new RebalanceResult(RebalanceResult.Status.DONE, "Success", instancePartitionsMap, targetAssignment); + } catch (ZkBadVersionException e) { + LOGGER.info("IdealState version changed for table: {}, re-calculating the target assignment", + tableNameWithType); + try { + IdealState idealState = _helixDataAccessor.getProperty(idealStatePropertyKey); + // IdealState might be null if table got deleted, throwing exception to abort the rebalance + Preconditions.checkState(idealState != null, "Failed to find the IdealState"); + currentIdealState = idealState; + currentAssignment = currentIdealState.getRecord().getMapFields(); + targetAssignment = + segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, rebalanceConfig); + } catch (Exception e1) { + LOGGER.error("Caught exception while re-calculating the target assignment for table: {}", tableNameWithType, + e1); + return new RebalanceResult(RebalanceResult.Status.FAILED, + "Caught exception while re-calculating the target assignment: " + e1, instancePartitionsMap, + targetAssignment); + } + } catch (Exception e) { + LOGGER.error("Caught exception while updating IdealState for table: {}", tableNameWithType, e); + return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e, instancePartitionsMap, targetAssignment); } } - // If min available replicas is negative, treat it as max unavailable replicas - if (minAvailableReplicas < 0) { - minAvailableReplicas = Math.max(numReplicas + minAvailableReplicas, 0); + } + + // Calculate the min available replicas for no-downtime rebalance + int minAvailableReplicas = rebalanceConfig.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, + RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME); + int numCurrentReplicas = currentAssignment.values().iterator().next().size(); + int numTargetReplicas = targetAssignment.values().iterator().next().size(); + // Use the smaller one to determine the min available replicas + int numReplicas = Math.min(numCurrentReplicas, numTargetReplicas); + if (minAvailableReplicas > 0) { + if (minAvailableReplicas >= numReplicas) { + LOGGER.warn( + "Illegal config for min available replicas: {} for table: {}, must be less than number of replicas (current: {}, target: {})", + minAvailableReplicas, tableNameWithType, numCurrentReplicas, numTargetReplicas); + return new RebalanceResult(RebalanceResult.Status.FAILED, "Illegal min available replicas config", + instancePartitionsMap, targetAssignment); } - LOGGER.info("Rebalancing table: {} with min available replicas: {}", tableNameWithType, minAvailableReplicas); } + // If min available replicas is negative, treat it as max unavailable replicas + if (minAvailableReplicas < 0) { + minAvailableReplicas = Math.max(numReplicas + minAvailableReplicas, 0); + } + LOGGER.info("Rebalancing table: {} with min available replicas: {}", tableNameWithType, minAvailableReplicas); int expectedVersion = currentIdealState.getRecord().getVersion(); while (true) { // Wait for ExternalView to converge before updating the next IdealState + IdealState idealState; try { - IdealState idealState = waitForExternalViewToConverge(tableNameWithType); - LOGGER.info("ExternalView converged for table: {}", tableNameWithType); - if (idealState.getRecord().getVersion() != expectedVersion) { - LOGGER.info( - "IdealState version changed while waiting for ExternalView to converge for table: {}, re-calculating the target assignment", - tableNameWithType); + idealState = waitForExternalViewToConverge(tableNameWithType); Review comment: Should we be passing the expected version into `waitForExternalViewToConverge` so that if the idealstate changes due to some other factor, we don't wait for convergence. Instead, we should execute the logic below. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org