This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5f220b398c Add support for performing pre-checks for TableRebalance
(#15029)
5f220b398c is described below
commit 5f220b398cda1fde87a286492e05521d48923634
Author: Sonam Mandal <[email protected]>
AuthorDate: Fri Feb 14 21:47:03 2025 -0800
Add support for performing pre-checks for TableRebalance (#15029)
* Add support for performing pre-checks for TableRebalance
---
.../pinot/controller/BaseControllerStarter.java | 6 +-
.../apache/pinot/controller/ControllerConf.java | 11 ++
.../api/resources/PinotSegmentRestletResource.java | 2 +-
.../api/resources/PinotTableRestletResource.java | 14 +-
.../helix/core/PinotHelixResourceManager.java | 31 ++++-
.../core/rebalance/DefaultRebalancePreChecker.java | 142 +++++++++++++++++++++
.../helix/core/rebalance/RebalanceConfig.java | 23 +++-
.../helix/core/rebalance/RebalancePreChecker.java | 31 +++++
.../core/rebalance/RebalancePreCheckerFactory.java | 42 ++++++
.../helix/core/rebalance/RebalanceResult.java | 11 +-
.../helix/core/rebalance/TableRebalancer.java | 70 ++++++----
.../rebalance/tenant/DefaultTenantRebalancer.java | 4 +-
.../rebalance/tenant/TenantRebalanceResult.java | 2 +-
.../util/ServerSegmentMetadataReader.java | 24 +++-
.../pinot/controller/util/TableMetadataReader.java | 35 ++++-
.../TableRebalancerClusterStatelessTest.java | 15 ++-
.../tests/OfflineClusterIntegrationTest.java | 119 +++++++++++++++++
17 files changed, 529 insertions(+), 53 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index a0ce503d41..e95e0f5d87 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -252,11 +252,11 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
// queries)
FunctionRegistry.init();
_adminApp = createControllerAdminApp();
+ // This executor service is used to do async tasks from multiget util or
table rebalancing.
+ _executorService =
createExecutorService(_config.getControllerExecutorNumThreads(),
"async-task-thread-%d");
// Do not use this before the invocation of {@link
PinotHelixResourceManager::start()}, which happens in {@link
// ControllerStarter::start()}
_helixResourceManager = createHelixResourceManager();
- // This executor service is used to do async tasks from multiget util or
table rebalancing.
- _executorService =
createExecutorService(_config.getControllerExecutorNumThreads(),
"async-task-thread-%d");
_tenantRebalanceExecutorService =
createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
"tenant-rebalance-thread-%d");
_tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager,
_tenantRebalanceExecutorService);
@@ -324,7 +324,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
* @return A new instance of PinotHelixResourceManager.
*/
protected PinotHelixResourceManager createHelixResourceManager() {
- return new PinotHelixResourceManager(_config);
+ return new PinotHelixResourceManager(_config, _executorService);
}
public PinotHelixResourceManager getHelixResourceManager() {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 2957ca81fd..5ea1fd20cf 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -293,6 +293,7 @@ public class ControllerConf extends PinotConfiguration {
public static final String ACCESS_CONTROL_USERNAME =
"access.control.init.username";
public static final String ACCESS_CONTROL_PASSWORD =
"access.control.init.password";
public static final String LINEAGE_MANAGER_CLASS =
"controller.lineage.manager.class";
+ public static final String REBALANCE_PRE_CHECKER_CLASS =
"controller.rebalance.pre.checker.class";
// Amount of the time the segment can take from the beginning of upload to
the end of upload. Used when parallel push
// protection is enabled. If the upload does not finish within the timeout,
next upload can override the previous one.
private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS =
"controller.segment.upload.timeoutInMillis";
@@ -316,6 +317,8 @@ public class ControllerConf extends PinotConfiguration {
private static final String DEFAULT_ACCESS_CONTROL_PASSWORD = "admin";
private static final String DEFAULT_LINEAGE_MANAGER =
"org.apache.pinot.controller.helix.core.lineage.DefaultLineageManager";
+ private static final String DEFAULT_REBALANCE_PRE_CHECKER =
+
"org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker";
private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS =
600_000L; // 10 minutes
private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION
= -1;
private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS =
64;
@@ -952,6 +955,14 @@ public class ControllerConf extends PinotConfiguration {
setProperty(LINEAGE_MANAGER_CLASS, lineageModifierClass);
}
+ public String getRebalancePreCheckerClass() {
+ return getProperty(REBALANCE_PRE_CHECKER_CLASS,
DEFAULT_REBALANCE_PRE_CHECKER);
+ }
+
+ public void setRebalancePreCheckerClass(String rebalancePreCheckerClass) {
+ setProperty(REBALANCE_PRE_CHECKER_CLASS, rebalancePreCheckerClass);
+ }
+
public long getSegmentUploadTimeoutInMillis() {
return getProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS,
DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 37e365bc7f..600c75b718 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -948,7 +948,7 @@ public class PinotSegmentRestletResource {
new TableMetadataReader(_executor, _connectionManager,
_pinotHelixResourceManager);
Map<String, JsonNode> needReloadMetadata =
tableMetadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType,
- _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ _controllerConf.getServerAdminRequestTimeoutSeconds() *
1000).getServerReloadJsonResponses();
boolean needReload =
needReloadMetadata.values().stream().anyMatch(value ->
value.get("needReload").booleanValue());
Map<String, ServerSegmentsReloadCheckResponse> serverResponses = new
HashMap<>();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 638849df46..74361a62a8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -604,6 +604,8 @@ public class PinotTableRestletResource {
@ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
@ApiParam(value = "Whether to rebalance table in dry-run mode")
@DefaultValue("false") @QueryParam("dryRun")
boolean dryRun,
+ @ApiParam(value = "Whether to enable pre-checks for table, must be in
dry-run mode to enable")
+ @DefaultValue("false") @QueryParam("preChecks") boolean preChecks,
@ApiParam(value = "Whether to reassign instances before reassigning
segments") @DefaultValue("false")
@QueryParam("reassignInstances") boolean reassignInstances,
@ApiParam(value = "Whether to reassign CONSUMING segments for real-time
table") @DefaultValue("false")
@@ -644,6 +646,7 @@ public class PinotTableRestletResource {
String tableNameWithType = constructTableNameWithType(tableName,
tableTypeStr);
RebalanceConfig rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setDryRun(dryRun);
+ rebalanceConfig.setPreChecks(preChecks);
rebalanceConfig.setReassignInstances(reassignInstances);
rebalanceConfig.setIncludeConsuming(includeConsuming);
rebalanceConfig.setBootstrap(bootstrap);
@@ -663,8 +666,9 @@ public class PinotTableRestletResource {
String rebalanceJobId =
TableRebalancer.createUniqueRebalanceJobIdentifier();
try {
- if (dryRun || downtime) {
- // For dry-run or rebalance with downtime, directly return the
rebalance result as it should return immediately
+ if (dryRun || preChecks || downtime) {
+ // For dry-run, preChecks or rebalance with downtime, directly return
the rebalance result as it should return
+ // immediately
return _pinotHelixResourceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, rebalanceJobId, false);
} else {
// Make a dry-run first to get the target assignment
@@ -682,7 +686,8 @@ public class PinotTableRestletResource {
} catch (Throwable t) {
String errorMsg = String.format("Caught exception/error while
rebalancing table: %s", tableNameWithType);
LOGGER.error(errorMsg, t);
- return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, errorMsg, null, null, null);
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, errorMsg, null, null, null,
+ null);
}
});
boolean isJobIdPersisted = waitForRebalanceToPersist(
@@ -702,7 +707,8 @@ public class PinotTableRestletResource {
return new RebalanceResult(dryRunResult.getJobId(),
RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller logs for updates",
dryRunResult.getInstanceAssignment(),
- dryRunResult.getTierInstanceAssignment(),
dryRunResult.getSegmentAssignment());
+ dryRunResult.getTierInstanceAssignment(),
dryRunResult.getSegmentAssignment(),
+ dryRunResult.getPreChecksResult());
} else {
// If dry-run failed or is no-op, return the dry-run result
return dryRunResult;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1f983ce671..748f9d3c28 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -48,6 +48,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -151,6 +152,8 @@ import
org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker;
+import
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
@@ -237,10 +240,12 @@ public class PinotHelixResourceManager {
private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
private TableCache _tableCache;
private final LineageManager _lineageManager;
+ private final RebalancePreChecker _rebalancePreChecker;
public PinotHelixResourceManager(String zkURL, String helixClusterName,
@Nullable String dataDir,
boolean isSingleTenantCluster, boolean enableBatchMessageMode, int
deletedSegmentsRetentionInDays,
- boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
+ boolean enableTieredSegmentAssignment, LineageManager lineageManager,
RebalancePreChecker rebalancePreChecker,
+ @Nullable ExecutorService executorService) {
_helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
_helixClusterName = helixClusterName;
_dataDir = dataDir;
@@ -263,13 +268,24 @@ public class PinotHelixResourceManager {
_lineageUpdaterLocks[i] = new Object();
}
_lineageManager = lineageManager;
+ _rebalancePreChecker = rebalancePreChecker;
+ _rebalancePreChecker.init(this, executorService);
+ }
+
+ public PinotHelixResourceManager(ControllerConf controllerConf, @Nullable
ExecutorService executorService) {
+ this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
controllerConf.getDataDir(),
+ controllerConf.tenantIsolationEnabled(),
controllerConf.getEnableBatchMessageMode(),
+ controllerConf.getDeletedSegmentsRetentionInDays(),
controllerConf.tieredSegmentAssignmentEnabled(),
+ LineageManagerFactory.create(controllerConf),
+
RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()),
executorService);
}
public PinotHelixResourceManager(ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
controllerConf.getDataDir(),
controllerConf.tenantIsolationEnabled(),
controllerConf.getEnableBatchMessageMode(),
controllerConf.getDeletedSegmentsRetentionInDays(),
controllerConf.tieredSegmentAssignmentEnabled(),
- LineageManagerFactory.create(controllerConf));
+ LineageManagerFactory.create(controllerConf),
+
RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()),
null);
}
/**
@@ -423,6 +439,15 @@ public class PinotHelixResourceManager {
return _lineageManager;
}
+ /**
+ * Get the rebalance pre-checker
+ *
+ * @return rebalance pre-checker
+ */
+ public RebalancePreChecker getRebalancePreChecker() {
+ return _rebalancePreChecker;
+ }
+
/**
* Instance related APIs
*/
@@ -3587,7 +3612,7 @@ public class PinotHelixResourceManager {
tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType,
tableConfig);
}
TableRebalancer tableRebalancer =
- new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver,
_controllerMetrics);
+ new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver,
_controllerMetrics, _rebalancePreChecker);
return tableRebalancer.rebalance(tableConfig, rebalanceConfig,
rebalanceJobId, tierToSegmentsMap);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
new file mode 100644
index 0000000000..945a98e48e
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableMetadataReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultRebalancePreChecker implements RebalancePreChecker {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultRebalancePreChecker.class);
+
+ public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus";
+ public static final String IS_MINIMIZE_DATA_MOVEMENT =
"isMinimizeDataMovement";
+
+ private PinotHelixResourceManager _pinotHelixResourceManager;
+ private ExecutorService _executorService;
+
+ @Override
+ public void init(PinotHelixResourceManager pinotHelixResourceManager,
@Nullable ExecutorService executorService) {
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ _executorService = executorService;
+ }
+
+ @Override
+ public Map<String, String> check(String rebalanceJobId, String
tableNameWithType,
+ TableConfig tableConfig) {
+ LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
+
+ Map<String, String> preCheckResult = new HashMap<>();
+ // Check for reload status
+ Boolean needsReload = checkReloadNeededOnServers(rebalanceJobId,
tableNameWithType);
+ preCheckResult.put(NEEDS_RELOAD_STATUS, needsReload == null ? "error" :
String.valueOf(needsReload));
+ // Check whether minimizeDataMovement is set in TableConfig
+ boolean isMinimizeDataMovement =
checkIsMinimizeDataMovement(rebalanceJobId, tableNameWithType, tableConfig);
+ preCheckResult.put(IS_MINIMIZE_DATA_MOVEMENT,
String.valueOf(isMinimizeDataMovement));
+
+ LOGGER.info("End pre-checks for table: {} with rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
+ return preCheckResult;
+ }
+
+ /**
+ * Checks if the current segments on any servers needs a reload (table
config or schema change that hasn't been
+ * applied yet). This check does not guarantee that the segments in deep
store are up to date.
+ * TODO: Add an API to check for whether segments in deep store are up to
date with the table configs and schema
+ * and add a pre-check here to call that API.
+ */
+ private Boolean checkReloadNeededOnServers(String rebalanceJobId, String
tableNameWithType) {
+ LOGGER.info("Fetching whether reload is needed for table: {} with
rebalanceJobId: {}", tableNameWithType,
+ rebalanceJobId);
+ Boolean needsReload = null;
+ if (_executorService == null) {
+ LOGGER.warn("Executor service is null, skipping needsReload check for
table: {} rebalanceJobId: {}",
+ tableNameWithType, rebalanceJobId);
+ return needsReload;
+ }
+ try (PoolingHttpClientConnectionManager connectionManager = new
PoolingHttpClientConnectionManager()) {
+ TableMetadataReader metadataReader = new
TableMetadataReader(_executorService, connectionManager,
+ _pinotHelixResourceManager);
+ TableMetadataReader.TableReloadJsonResponse needsReloadMetadataPair =
+
metadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType, 30_000);
+ Map<String, JsonNode> needsReloadMetadata =
needsReloadMetadataPair.getServerReloadJsonResponses();
+ int failedResponses = needsReloadMetadataPair.getNumFailedResponses();
+ LOGGER.info("Received {} needs reload responses and {} failed responses
from servers for table: {} with "
+ + "rebalanceJobId: {}", needsReloadMetadata.size(),
failedResponses, tableNameWithType, rebalanceJobId);
+ needsReload = needsReloadMetadata.values().stream().anyMatch(value ->
value.get("needReload").booleanValue());
+ if (needsReload) {
+ return needsReload;
+ }
+ if (failedResponses > 0) {
+ LOGGER.warn("Received {} failed responses from servers and needsReload
is false from returned responses, "
+ + "check needsReload status manually", failedResponses);
+ needsReload = null;
+ }
+ } catch (InvalidConfigException | IOException e) {
+ LOGGER.warn("Caught exception while trying to fetch reload status from
servers", e);
+ }
+
+ return needsReload;
+ }
+
+ /**
+ * Checks if minimize data movement is set for the given table in the
TableConfig
+ */
+ private boolean checkIsMinimizeDataMovement(String rebalanceJobId, String
tableNameWithType,
+ TableConfig tableConfig) {
+ LOGGER.info("Checking whether minimizeDataMovement is set for table: {}
with rebalanceJobId: {}", tableNameWithType,
+ rebalanceJobId);
+ try {
+ if (tableConfig.getTableType() == TableType.OFFLINE) {
+ InstanceAssignmentConfig instanceAssignmentConfig =
+
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig,
InstancePartitionsType.OFFLINE);
+ return instanceAssignmentConfig.isMinimizeDataMovement();
+ } else {
+ InstanceAssignmentConfig instanceAssignmentConfigConsuming =
+
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig,
InstancePartitionsType.CONSUMING);
+ // For REALTIME tables need to check for both CONSUMING and COMPLETED
segments if relocation is enabled
+ if
(!InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
+ return instanceAssignmentConfigConsuming.isMinimizeDataMovement();
+ }
+
+ InstanceAssignmentConfig instanceAssignmentConfigCompleted =
+
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig,
InstancePartitionsType.COMPLETED);
+ return instanceAssignmentConfigConsuming.isMinimizeDataMovement()
+ && instanceAssignmentConfigCompleted.isMinimizeDataMovement();
+ }
+ } catch (IllegalStateException e) {
+ LOGGER.warn("Error while trying to fetch instance assignment config,
assuming minimizeDataMovement is false", e);
+ return false;
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index b6b471cfbc..e5bd39f0ec 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -34,6 +34,12 @@ public class RebalanceConfig {
@ApiModelProperty(example = "false")
private boolean _dryRun = false;
+ // Whether to perform pre-checks for rebalance. This only returns the status
of each pre-check and does not fail
+ // rebalance
+ @JsonProperty("preChecks")
+ @ApiModelProperty(example = "false")
+ private boolean _preChecks = false;
+
// Whether to reassign instances before reassigning segments
@JsonProperty("reassignInstances")
@ApiModelProperty(example = "false")
@@ -118,6 +124,14 @@ public class RebalanceConfig {
_dryRun = dryRun;
}
+ public boolean isPreChecks() {
+ return _preChecks;
+ }
+
+ public void setPreChecks(boolean preChecks) {
+ _preChecks = preChecks;
+ }
+
public boolean isReassignInstances() {
return _reassignInstances;
}
@@ -232,10 +246,10 @@ public class RebalanceConfig {
@Override
public String toString() {
- return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", _reassignInstances="
+ _reassignInstances
- + ", _includeConsuming=" + _includeConsuming + ", _bootstrap=" +
_bootstrap + ", _downtime=" + _downtime
- + ", _minAvailableReplicas=" + _minAvailableReplicas + ",
_bestEfforts=" + _bestEfforts
- + ", _externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs
+ return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" +
_preChecks + ", _reassignInstances="
+ + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ",
_bootstrap=" + _bootstrap
+ + ", _downtime=" + _downtime + ", _minAvailableReplicas=" +
_minAvailableReplicas + ", _bestEfforts="
+ + _bestEfforts + ", _externalViewCheckIntervalInMs=" +
_externalViewCheckIntervalInMs
+ ", _externalViewStabilizationTimeoutInMs=" +
_externalViewStabilizationTimeoutInMs + ", _updateTargetTier="
+ _updateTargetTier + ", _heartbeatIntervalInMs=" +
_heartbeatIntervalInMs + ", _heartbeatTimeoutInMs="
+ _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ",
_retryInitialDelayInMs="
@@ -245,6 +259,7 @@ public class RebalanceConfig {
public static RebalanceConfig copy(RebalanceConfig cfg) {
RebalanceConfig rc = new RebalanceConfig();
rc._dryRun = cfg._dryRun;
+ rc._preChecks = cfg._preChecks;
rc._reassignInstances = cfg._reassignInstances;
rc._includeConsuming = cfg._includeConsuming;
rc._bootstrap = cfg._bootstrap;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
new file mode 100644
index 0000000000..54f15e8b33
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+
+public interface RebalancePreChecker {
+ void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable
ExecutorService executorService);
+ Map<String, String> check(String rebalanceJobId, String tableNameWithType,
TableConfig tableConfig);
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreCheckerFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreCheckerFactory.java
new file mode 100644
index 0000000000..286942dc78
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreCheckerFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RebalancePreCheckerFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RebalancePreCheckerFactory.class);
+
+ private RebalancePreCheckerFactory() {
+ }
+
+ public static RebalancePreChecker create(String
rebalancePreCheckerClassName) {
+ try {
+ LOGGER.info("Trying to create rebalance pre-checker object for class:
{}", rebalancePreCheckerClassName);
+ return (RebalancePreChecker)
Class.forName(rebalancePreCheckerClassName).newInstance();
+ } catch (Exception e) {
+ String errMsg = String.format("Failed to create rebalance pre-checker
for class: %s",
+ rebalancePreCheckerClassName);
+ LOGGER.error(errMsg, e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index 2be7fc7753..2b81d8d78b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -40,6 +40,8 @@ public class RebalanceResult {
@JsonInclude(JsonInclude.Include.NON_NULL)
private final Map<String, Map<String, String>> _segmentAssignment;
private final String _description;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final Map<String, String> _preChecksResult;
@JsonCreator
public RebalanceResult(@JsonProperty(value = "jobId", required = true)
String jobId,
@@ -47,13 +49,15 @@ public class RebalanceResult {
@JsonProperty(value = "description", required = true) String description,
@JsonProperty("instanceAssignment") @Nullable
Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
@JsonProperty("tierInstanceAssignment") @Nullable Map<String,
InstancePartitions> tierInstanceAssignment,
- @JsonProperty("segmentAssignment") @Nullable Map<String, Map<String,
String>> segmentAssignment) {
+ @JsonProperty("segmentAssignment") @Nullable Map<String, Map<String,
String>> segmentAssignment,
+ @JsonProperty("preChecksResult") @Nullable Map<String, String>
preChecksResult) {
_jobId = jobId;
_status = status;
_description = description;
_instanceAssignment = instanceAssignment;
_tierInstanceAssignment = tierInstanceAssignment;
_segmentAssignment = segmentAssignment;
+ _preChecksResult = preChecksResult;
}
@JsonProperty
@@ -86,6 +90,11 @@ public class RebalanceResult {
return _segmentAssignment;
}
+ @JsonProperty
+ public Map<String, String> getPreChecksResult() {
+ return _preChecksResult;
+ }
+
public enum Status {
// FAILED if the job has ended with known exceptions;
// ABORTED if the job is stopped by others but retry is still allowed;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index facd904f21..5dc7c4d8ea 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -119,9 +119,10 @@ public class TableRebalancer {
private final HelixDataAccessor _helixDataAccessor;
private final TableRebalanceObserver _tableRebalanceObserver;
private final ControllerMetrics _controllerMetrics;
+ private final RebalancePreChecker _rebalancePreChecker;
public TableRebalancer(HelixManager helixManager, @Nullable
TableRebalanceObserver tableRebalanceObserver,
- @Nullable ControllerMetrics controllerMetrics) {
+ @Nullable ControllerMetrics controllerMetrics, @Nullable
RebalancePreChecker rebalancePreChecker) {
_helixManager = helixManager;
if (tableRebalanceObserver != null) {
_tableRebalanceObserver = tableRebalanceObserver;
@@ -130,10 +131,11 @@ public class TableRebalancer {
}
_helixDataAccessor = helixManager.getHelixDataAccessor();
_controllerMetrics = controllerMetrics;
+ _rebalancePreChecker = rebalancePreChecker;
}
public TableRebalancer(HelixManager helixManager) {
- this(helixManager, null, null);
+ this(helixManager, null, null, null);
}
public static String createUniqueRebalanceJobIdentifier() {
@@ -173,6 +175,7 @@ public class TableRebalancer {
rebalanceJobId = createUniqueRebalanceJobIdentifier();
}
boolean dryRun = rebalanceConfig.isDryRun();
+ boolean preChecks = rebalanceConfig.isPreChecks();
boolean reassignInstances = rebalanceConfig.isReassignInstances();
boolean includeConsuming = rebalanceConfig.isIncludeConsuming();
boolean bootstrap = rebalanceConfig.isBootstrap();
@@ -186,13 +189,29 @@ public class TableRebalancer {
&&
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
tableConfig.getRoutingConfig().getInstanceSelectorType());
LOGGER.info(
- "Start rebalancing table: {} with dryRun: {}, reassignInstances: {},
includeConsuming: {}, bootstrap: {}, "
- + "downtime: {}, minReplicasToKeepUpForNoDowntime: {},
enableStrictReplicaGroup: {}, lowDiskMode: {}, "
- + "bestEfforts: {}, externalViewCheckIntervalInMs: {},
externalViewStabilizationTimeoutInMs: {}",
- tableNameWithType, dryRun, reassignInstances, includeConsuming,
bootstrap, downtime,
+ "Start rebalancing table: {} with dryRun: {}, preChecks: {},
reassignInstances: {}, includeConsuming: {}, "
+ + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime:
{}, enableStrictReplicaGroup: {}, "
+ + "lowDiskMode: {}, bestEfforts: {},
externalViewCheckIntervalInMs: {}, "
+ + "externalViewStabilizationTimeoutInMs: {}",
+ tableNameWithType, dryRun, preChecks, reassignInstances,
includeConsuming, bootstrap, downtime,
minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup,
lowDiskMode, bestEfforts,
externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
+ // Perform pre-checks if enabled
+ Map<String, String> preChecksResult = null;
+ if (preChecks) {
+ if (!dryRun) {
+ // Dry-run must be enabled to run pre-checks
+ String errorMsg = String.format("Pre-checks can only be enabled in
dry-run mode, not triggering rebalance for "
+ + "table: %s with rebalanceJobId: %s", tableNameWithType,
rebalanceJobId);
+ LOGGER.error(errorMsg);
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, errorMsg, null, null, null, null);
+ }
+ if (_rebalancePreChecker != null) {
+ preChecksResult = _rebalancePreChecker.check(rebalanceJobId,
tableNameWithType, tableConfig);
+ }
+ }
+
// Fetch ideal state
PropertyKey idealStatePropertyKey =
_helixDataAccessor.keyBuilder().idealStates(tableNameWithType);
IdealState currentIdealState;
@@ -203,21 +222,21 @@ public class TableRebalancer {
"For rebalanceId: %s, caught exception while fetching IdealState for
table: %s, aborting the rebalance",
rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
- "Caught exception while fetching IdealState: " + e, null, null,
null);
+ "Caught exception while fetching IdealState: " + e, null, null,
null, preChecksResult);
}
if (currentIdealState == null) {
onReturnFailure(
String.format("For rebalanceId: %s, cannot find the IdealState for
table: %s, aborting the rebalance",
rebalanceJobId, tableNameWithType), null);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, "Cannot find the IdealState for table",
- null, null, null);
+ null, null, null, preChecksResult);
}
if (!currentIdealState.isEnabled() && !downtime) {
onReturnFailure(String.format(
"For rebalanceId: %s, cannot rebalance disabled table: %s without
downtime, aborting the rebalance",
rebalanceJobId, tableNameWithType), null);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
- "Cannot rebalance disabled table without downtime", null, null,
null);
+ "Cannot rebalance disabled table without downtime", null, null,
null, preChecksResult);
}
LOGGER.info("For rebalanceId: {}, processing instance partitions for
table: {}", rebalanceJobId, tableNameWithType);
@@ -235,7 +254,7 @@ public class TableRebalancer {
"For rebalanceId: %s, caught exception while fetching/calculating
instance partitions for table: %s, "
+ "aborting the rebalance", rebalanceJobId, tableNameWithType),
e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
- "Caught exception while fetching/calculating instance partitions: "
+ e, null, null, null);
+ "Caught exception while fetching/calculating instance partitions: "
+ e, null, null, null, preChecksResult);
}
// Calculate instance partitions for tiers if configured
@@ -253,7 +272,8 @@ public class TableRebalancer {
"For rebalanceId: %s, caught exception while fetching/calculating
tier instance partitions for table: %s, "
+ "aborting the rebalance", rebalanceJobId, tableNameWithType),
e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
- "Caught exception while fetching/calculating tier instance
partitions: " + e, null, null, null);
+ "Caught exception while fetching/calculating tier instance
partitions: " + e, null, null, null,
+ preChecksResult);
}
LOGGER.info("For rebalanceId: {}, calculating the target assignment for
table: {}", rebalanceJobId,
@@ -271,7 +291,7 @@ public class TableRebalancer {
+ "rebalance", rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while calculating target assignment: " + e,
instancePartitionsMap,
- tierToInstancePartitionsMap, null);
+ tierToInstancePartitionsMap, null, preChecksResult);
}
boolean segmentAssignmentUnchanged =
currentAssignment.equals(targetAssignment);
@@ -286,19 +306,19 @@ public class TableRebalancer {
String.format("For rebalanceId: %s, instance unchanged and table:
%s is already balanced", rebalanceJobId,
tableNameWithType));
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.NO_OP, "Table is already balanced",
- instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment, preChecksResult);
} else {
if (dryRun) {
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.DONE,
"Instance reassigned in dry-run mode, table is already
balanced", instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment, preChecksResult);
} else {
_tableRebalanceObserver.onSuccess(
String.format("For rebalanceId: %s, instance reassigned but
table: %s is already balanced",
rebalanceJobId, tableNameWithType));
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.DONE,
"Instance reassigned, table is already balanced",
instancePartitionsMap, tierToInstancePartitionsMap,
- targetAssignment);
+ targetAssignment, preChecksResult);
}
}
}
@@ -307,7 +327,7 @@ public class TableRebalancer {
LOGGER.info("For rebalanceId: {}, rebalancing table: {} in dry-run mode,
returning the target assignment",
rebalanceJobId, tableNameWithType);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Dry-run mode", instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment, preChecksResult);
}
if (downtime) {
@@ -332,14 +352,14 @@ public class TableRebalancer {
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Success with downtime (replaced IdealState with the target
segment assignment, ExternalView might not "
+ "reach the target segment assignment yet)",
instancePartitionsMap, tierToInstancePartitionsMap,
- targetAssignment);
+ targetAssignment, preChecksResult);
} catch (Exception e) {
onReturnFailure(String.format(
"For rebalanceId: %s, caught exception while updating IdealState
for table: %s, aborting the rebalance",
rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while updating IdealState: " + e,
instancePartitionsMap, tierToInstancePartitionsMap,
- targetAssignment);
+ targetAssignment, preChecksResult);
}
}
@@ -371,7 +391,7 @@ public class TableRebalancer {
minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas),
null);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Illegal min available replicas config", instancePartitionsMap,
tierToInstancePartitionsMap,
- targetAssignment);
+ targetAssignment, preChecksResult);
}
minAvailableReplicas = minReplicasToKeepUpForNoDowntime;
} else {
@@ -412,12 +432,12 @@ public class TableRebalancer {
if (_tableRebalanceObserver.isStopped()) {
return new RebalanceResult(rebalanceJobId,
_tableRebalanceObserver.getStopStatus(),
"Caught exception while waiting for ExternalView to converge: "
+ e, instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment, preChecksResult);
}
_tableRebalanceObserver.onError(errorMsg);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while waiting for ExternalView to converge: " +
e, instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment, preChecksResult);
}
// Re-calculate the target assignment if IdealState changed while
waiting for ExternalView to converge
@@ -466,7 +486,7 @@ public class TableRebalancer {
+ "aborting the rebalance", rebalanceJobId,
tableNameWithType), e);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while re-calculating the target assignment:
" + e, instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment,
preChecksResult);
}
} else {
LOGGER.info("For rebalanceId:{}, no state change found for segments
to be moved, "
@@ -490,7 +510,7 @@ public class TableRebalancer {
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Success with minAvailableReplicas: " + minAvailableReplicas
+ " (both IdealState and ExternalView should reach the target
segment assignment)",
- instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment, preChecksResult);
}
// Record change of current ideal state and the new target
@@ -499,7 +519,7 @@ public class TableRebalancer {
if (_tableRebalanceObserver.isStopped()) {
return new RebalanceResult(rebalanceJobId,
_tableRebalanceObserver.getStopStatus(),
"Rebalance has stopped already before updating the IdealState",
instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment, preChecksResult);
}
Map<String, Map<String, String>> nextAssignment =
getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup,
@@ -530,7 +550,7 @@ public class TableRebalancer {
+ "aborting the rebalance", rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while updating IdealState: " + e,
instancePartitionsMap, tierToInstancePartitionsMap,
- targetAssignment);
+ targetAssignment, preChecksResult);
}
segmentsToMonitor = new HashSet<>(segmentsToMove);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
index ec55c38b95..e115476529 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -60,7 +60,7 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
false));
} catch (TableNotFoundException exception) {
rebalanceResult.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
- null, null, null));
+ null, null, null, null));
}
});
if (config.isDryRun()) {
@@ -71,7 +71,7 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
if (result.getStatus() == RebalanceResult.Status.DONE) {
rebalanceResult.put(table, new RebalanceResult(result.getJobId(),
RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller task status for the",
result.getInstanceAssignment(),
- result.getTierInstanceAssignment(),
result.getSegmentAssignment()));
+ result.getTierInstanceAssignment(),
result.getSegmentAssignment(), result.getPreChecksResult()));
}
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
index 57c17054d7..c96c25b250 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
@@ -36,7 +36,7 @@ public class TenantRebalanceResult {
_rebalanceTableResults = new HashMap<>();
rebalanceTableResults.forEach((table, result) -> {
_rebalanceTableResults.put(table, new
RebalanceResult(result.getJobId(), result.getStatus(),
- result.getDescription(), null, null, null));
+ result.getDescription(), null, null, null, null));
});
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index 8dde7f08fe..0376b90dac 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -222,8 +222,8 @@ public class ServerSegmentMetadataReader {
* This method will return metadata of all the servers along with need
reload flag.
* In future additional details like segments list can also be added
*/
- public List<String> getCheckReloadSegmentsFromServer(String
tableNameWithType, Set<String> serverInstances,
- BiMap<String, String> endpoints, int timeoutMs) {
+ public TableReloadResponse getCheckReloadSegmentsFromServer(String
tableNameWithType,
+ Set<String> serverInstances, BiMap<String, String> endpoints, int
timeoutMs) {
LOGGER.debug("Checking if reload is needed on segments from servers for
table {}.", tableNameWithType);
List<String> serverURLs = new ArrayList<>();
for (String serverInstance : serverInstances) {
@@ -250,7 +250,7 @@ public class ServerSegmentMetadataReader {
}
LOGGER.debug("Retrieved metadata of reload check from servers.");
- return serversNeedReloadResponses;
+ return new TableReloadResponse(serviceResponse._failedResponseCount,
serversNeedReloadResponses);
}
/**
@@ -505,4 +505,22 @@ public class ServerSegmentMetadataReader {
tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
return String.format("%s/tables/%s/segments/isStale", endpoint,
tableNameWithType);
}
+
+ public class TableReloadResponse {
+ private int _numFailedResponses;
+ private List<String> _serverReloadResponses;
+
+ TableReloadResponse(int numFailedResponses, List<String>
serverReloadResponses) {
+ _numFailedResponses = numFailedResponses;
+ _serverReloadResponses = serverReloadResponses;
+ }
+
+ public int getNumFailedResponses() {
+ return _numFailedResponses;
+ }
+
+ public List<String> getServerReloadResponses() {
+ return _serverReloadResponses;
+ }
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index 48f53577a8..c6ba7aa59f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -59,19 +59,26 @@ public class TableMetadataReader {
_pinotHelixResourceManager = helixResourceManager;
}
- public Map<String, JsonNode> getServerCheckSegmentsReloadMetadata(String
tableNameWithType, int timeoutMs)
+ /**
+ * Check if segments need a reload on any servers
+ * @return pair of: a) number of failed responses, b) reload responses
returned
+ */
+ public TableReloadJsonResponse getServerCheckSegmentsReloadMetadata(String
tableNameWithType,
+ int timeoutMs)
throws InvalidConfigException, IOException {
- List<String> segmentsMetadata = getReloadCheckResponses(tableNameWithType,
timeoutMs);
+ ServerSegmentMetadataReader.TableReloadResponse segmentsMetadataPair =
+ getReloadCheckResponses(tableNameWithType, timeoutMs);
+ List<String> segmentsMetadata =
segmentsMetadataPair.getServerReloadResponses();
Map<String, JsonNode> response = new HashMap<>();
for (String segmentMetadata : segmentsMetadata) {
JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
response.put(responseJson.get("instanceId").asText(), responseJson);
}
- return response;
+ return new
TableReloadJsonResponse(segmentsMetadataPair.getNumFailedResponses(), response);
}
- public List<String> getReloadCheckResponses(String tableNameWithType, int
timeoutMs)
- throws InvalidConfigException {
+ public ServerSegmentMetadataReader.TableReloadResponse
getReloadCheckResponses(String tableNameWithType,
+ int timeoutMs) throws InvalidConfigException {
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
List<String> serverInstances =
_pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType,
tableType);
Set<String> serverInstanceSet = new HashSet<>(serverInstances);
@@ -213,4 +220,22 @@ public class TableMetadataReader {
return
serverSegmentMetadataReader.getStaleSegmentsFromServer(tableNameWithType,
serverInstanceSet, endpoints,
timeoutMs);
}
+
+ public class TableReloadJsonResponse {
+ private int _numFailedResponses;
+ private Map<String, JsonNode> _serverReloadJsonResponses;
+
+ TableReloadJsonResponse(int numFailedResponses, Map<String, JsonNode>
serverReloadJsonResponses) {
+ _numFailedResponses = numFailedResponses;
+ _serverReloadJsonResponses = serverReloadJsonResponses;
+ }
+
+ public int getNumFailedResponses() {
+ return _numFailedResponses;
+ }
+
+ public Map<String, JsonNode> getServerReloadJsonResponses() {
+ return _serverReloadJsonResponses;
+ }
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index c5ac9e82bd..de83ec6eee 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
@@ -90,7 +91,9 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
}
- TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+ DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+ preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10));
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
@@ -137,8 +140,18 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
// Rebalance in dry-run mode
RebalanceConfig rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setPreChecks(true);
rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ Map<String, String> preCheckResult = rebalanceResult.getPreChecksResult();
+ assertNotNull(preCheckResult);
+ assertEquals(preCheckResult.size(), 2);
+
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS));
+
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT));
+ // Sending request to servers should fail for all, so needsPreprocess
should be set to "error" to indicate that a
+ // manual check is needed
+
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS),
"error");
+
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT),
"false");
// All servers should be assigned to the table
instanceAssignment = rebalanceResult.getInstanceAssignment();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 054d8393cf..6750a7340d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -34,10 +34,13 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
@@ -61,10 +64,16 @@ import
org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import
org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
@@ -73,6 +82,10 @@ import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
+import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
+import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -161,6 +174,9 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// Once this value is set, assert that table size always gets back to this
value after removing the added indices.
private long _tableSize;
+ private PinotHelixResourceManager _resourceManager;
+ private TableRebalancer _tableRebalancer;
+
protected int getNumBrokers() {
return NUM_BROKERS;
}
@@ -268,6 +284,11 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
waitForAllDocsLoaded(600_000L);
_tableSize = getTableSize(getTableName());
+
+ _resourceManager = _controllerStarter.getHelixResourceManager();
+ DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+ preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10));
+ _tableRebalancer = new
TableRebalancer(_resourceManager.getHelixZkManager(), null, null, preChecker);
}
private void reloadAllSegments(String testQuery, boolean forceDownload, long
numTotalDocs)
@@ -777,6 +798,104 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
}, 60_000L, "Failed to execute query");
}
+ @Test
+ public void testRebalancePreChecks()
+ throws Exception {
+ // setup the rebalance config
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+
+ TableConfig tableConfig = getOfflineTableConfig();
+
+ // Ensure pre-check status is null if not enabled
+ RebalanceResult rebalanceResult = _tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ assertNull(rebalanceResult.getPreChecksResult());
+
+ // Enable pre-checks, nothing is set
+ rebalanceConfig.setPreChecks(true);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalancePreCheckStatus(rebalanceResult,
RebalanceResult.Status.NO_OP, false, false);
+
+ // Enable minimizeDataMovement
+
tableConfig.setInstanceAssignmentConfigMap(createInstanceAssignmentConfigMap());
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalancePreCheckStatus(rebalanceResult,
RebalanceResult.Status.NO_OP, true, false);
+
+ // Undo minimizeDataMovement, update the table config to add a column to
bloom filter
+ tableConfig.getIndexingConfig().getBloomFilterColumns().add("Quarter");
+ tableConfig.setInstanceAssignmentConfigMap(null);
+ updateTableConfig(tableConfig);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalancePreCheckStatus(rebalanceResult,
RebalanceResult.Status.NO_OP, false, true);
+
+ // Undo tableConfig change
+ tableConfig.getIndexingConfig().getBloomFilterColumns().remove("Quarter");
+ updateTableConfig(tableConfig);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalancePreCheckStatus(rebalanceResult,
RebalanceResult.Status.NO_OP, false, false);
+
+ // Add a schema change
+ Schema schema = createSchema();
+ schema.addField(new MetricFieldSpec("NewAddedIntMetric", DataType.INT, 1));
+ updateSchema(schema);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalancePreCheckStatus(rebalanceResult,
RebalanceResult.Status.NO_OP, false, true);
+
+ // Keep schema change and update table config to add minimizeDataMovement
+
tableConfig.setInstanceAssignmentConfigMap(createInstanceAssignmentConfigMap());
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalancePreCheckStatus(rebalanceResult,
RebalanceResult.Status.NO_OP, true, true);
+
+ // Add a new server (to force change in instance assignment) and enable
reassignInstances
+ BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS);
+ rebalanceConfig.setReassignInstances(true);
+ tableConfig.setInstanceAssignmentConfigMap(null);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.DONE,
false, true);
+
+ // Disable dry-run
+ rebalanceConfig.setDryRun(false);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertNull(rebalanceResult.getPreChecksResult());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+
+ // Stop the added server
+ serverStarter1.stop();
+ TestUtils.waitForCondition(aVoid ->
_resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful(),
+ 60_000L, "Failed to drop added server");
+ }
+
+ private void checkRebalancePreCheckStatus(RebalanceResult rebalanceResult,
RebalanceResult.Status expectedStatus,
+ boolean expectedMinimizeDataMovement, boolean expectedNeedsReloadStatus)
{
+ assertEquals(rebalanceResult.getStatus(), expectedStatus);
+ Map<String, String> preChecksResult = rebalanceResult.getPreChecksResult();
+ assertNotNull(preChecksResult);
+ assertEquals(preChecksResult.size(), 2);
+
assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT));
+
assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS));
+
assertEquals(preChecksResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT),
+ String.valueOf(expectedMinimizeDataMovement));
+
assertEquals(preChecksResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS),
+ String.valueOf(expectedNeedsReloadStatus));
+ }
+
+ private Map<String, InstanceAssignmentConfig>
createInstanceAssignmentConfigMap() {
+ InstanceTagPoolConfig instanceTagPoolConfig =
+ new InstanceTagPoolConfig("tag", false, 1, null);
+ List<String> constraints = new ArrayList<>();
+ constraints.add("constraints1");
+ InstanceConstraintConfig instanceConstraintConfig = new
InstanceConstraintConfig(constraints);
+ InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 1, 1,
+ 1, 1, 1, true,
+ null);
+ InstanceAssignmentConfig instanceAssignmentConfig = new
InstanceAssignmentConfig(instanceTagPoolConfig,
+ instanceConstraintConfig, instanceReplicaGroupPartitionConfig, null,
true);
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new
HashMap<>();
+ instanceAssignmentConfigMap.put("OFFLINE", instanceAssignmentConfig);
+ return instanceAssignmentConfigMap;
+ }
+
@Test(dataProvider = "useBothQueryEngines")
public void testRegexpReplace(boolean useMultiStageQueryEngine)
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]