This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch clean-up-singleton in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4b5df6ccd1ccd4febb19f8601cc78ae3c6fe5130 Author: jackjlli <[email protected]> AuthorDate: Wed Apr 10 16:16:51 2019 -0700 Remove singleton for PinotLLCRealtimeSegmentManager and SegmentCompletionManager --- .../apache/pinot/controller/ControllerStarter.java | 51 +++-- .../resources/LLCSegmentCompletionHandlers.java | 19 +- .../helix/core/PinotHelixResourceManager.java | 124 ++++++------ .../helix/core/PinotTableIdealStateBuilder.java | 27 +-- .../realtime/PinotLLCRealtimeSegmentManager.java | 28 +-- .../core/realtime/SegmentCompletionManager.java | 71 +++---- .../controller/helix/PinotControllerModeTest.java | 9 + .../helix/core/realtime/SegmentCompletionTest.java | 214 +++++++++++---------- 8 files changed, 265 insertions(+), 278 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java index 10c1c00..0e59158 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java @@ -55,6 +55,7 @@ import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager; +import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory; import org.apache.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator; import org.apache.pinot.controller.helix.core.retention.RetentionManager; @@ -107,6 +108,8 @@ public class ControllerStarter { private ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler; private PinotHelixTaskResourceManager _helixTaskResourceManager; private PinotRealtimeSegmentManager _realtimeSegmentsManager; + private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; + private SegmentCompletionManager _segmentCompletionManager; private ControllerLeadershipManager _controllerLeadershipManager; private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList; @@ -191,15 +194,16 @@ public class ControllerStarter { LOGGER.error("Invalid mode: " + _controllerMode); } - ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList)); + ServiceStatus.setServiceStatusCallback( + new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList)); _controllerMetrics.initializeGlobalMeters(); } private void setUpHelixController() { // Register and connect instance as Helix controller. LOGGER.info("Starting Helix controller"); - _helixControllerManager = HelixSetupUtils - .setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, _enableBatchMessageMode); + _helixControllerManager = HelixSetupUtils.setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, + _enableBatchMessageMode); // Emit helix controller metrics _controllerMetrics.addCallbackGauge("helix.connected", () -> _helixControllerManager.isConnected() ? 1L : 0L); @@ -241,7 +245,15 @@ public class ControllerStarter { // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager LOGGER.info("Starting realtime segment manager"); - PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config, _controllerMetrics, _controllerLeadershipManager); + + _pinotLLCRealtimeSegmentManager = + new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics, + _controllerLeadershipManager); + // TODO: Need to put this inside HelixResourceManager when ControllerLeadershipManager is removed. + _helixResourceManager.setPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager); + _segmentCompletionManager = new SegmentCompletionManager(helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics, _config.getSegmentCommitTimeoutSeconds(), _controllerLeadershipManager); + + _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _controllerLeadershipManager); _realtimeSegmentsManager.start(_controllerMetrics); @@ -279,6 +291,7 @@ public class ControllerStarter { bind(_config).to(ControllerConf.class); bind(_helixResourceManager).to(PinotHelixResourceManager.class); bind(_helixTaskResourceManager).to(PinotHelixTaskResourceManager.class); + bind(_segmentCompletionManager).to(SegmentCompletionManager.class); bind(_taskManager).to(PinotTaskManager.class); bind(connectionManager).to(HttpConnectionManager.class); bind(_executorService).to(Executor.class); @@ -400,7 +413,7 @@ public class ControllerStarter { _controllerMetrics); periodicTasks.add(_offlineSegmentIntervalChecker); _realtimeSegmentValidationManager = new RealtimeSegmentValidationManager(_config, _helixResourceManager, - PinotLLCRealtimeSegmentManager.getInstance(), new ValidationMetrics(_metricsRegistry), _controllerMetrics); + _pinotLLCRealtimeSegmentManager, new ValidationMetrics(_metricsRegistry), _controllerMetrics); periodicTasks.add(_realtimeSegmentValidationManager); _brokerResourceValidationManager = new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics); @@ -414,18 +427,18 @@ public class ControllerStarter { } public void stop() { - switch (_controllerMode) { - case DUAL: - stopPinotController(); - stopHelixController(); - break; - case PINOT_ONLY: - stopPinotController(); - break; - case HELIX_ONLY: - stopHelixController(); - break; - } + switch (_controllerMode) { + case DUAL: + stopPinotController(); + stopHelixController(); + break; + case PINOT_ONLY: + stopPinotController(); + break; + case HELIX_ONLY: + stopHelixController(); + break; + } } private void stopHelixController() { @@ -441,7 +454,7 @@ public class ControllerStarter { // Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API // may interrupt the handlers waiting on an I/O. - PinotLLCRealtimeSegmentManager.getInstance().stop(); + _pinotLLCRealtimeSegmentManager.stop(); LOGGER.info("Closing PinotFS classes"); PinotFSFactory.shutdown(); @@ -465,7 +478,7 @@ public class ControllerStarter { } public boolean isPinotOnlyModeSupported() { - return false; + return true; } public MetricsRegistry getMetricsRegistry() { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java index 19e2123..bbd70b3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java @@ -75,6 +75,9 @@ public class LLCSegmentCompletionHandlers { @Inject ControllerConf _controllerConf; + @Inject + SegmentCompletionManager _segmentCompletionManager; + @VisibleForTesting public static String getScheme() { return SCHEME; @@ -104,7 +107,7 @@ public class LLCSegmentCompletionHandlers { .withExtraTimeSec(extraTimeSec); LOGGER.info("Processing extendBuildTime:{}", requestParams.toString()); - SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance().extendBuildTime(requestParams); + SegmentCompletionProtocol.Response response = _segmentCompletionManager.extendBuildTime(requestParams); final String responseStr = response.toJsonString(); LOGGER.info("Response to extendBuildTime:{}", responseStr); @@ -130,7 +133,7 @@ public class LLCSegmentCompletionHandlers { .withMemoryUsedBytes(memoryUsedBytes).withNumRows(numRows); LOGGER.info("Processing segmentConsumed:{}", requestParams.toString()); - SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance().segmentConsumed(requestParams); + SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentConsumed(requestParams); final String responseStr = response.toJsonString(); LOGGER.info("Response to segmentConsumed:{}", responseStr); return responseStr; @@ -153,7 +156,7 @@ public class LLCSegmentCompletionHandlers { LOGGER.info("Processing segmentStoppedConsuming:{}", requestParams.toString()); SegmentCompletionProtocol.Response response = - SegmentCompletionManager.getInstance().segmentStoppedConsuming(requestParams); + _segmentCompletionManager.segmentStoppedConsuming(requestParams); final String responseStr = response.toJsonString(); LOGGER.info("Response to segmentStoppedConsuming:{}", responseStr); return responseStr; @@ -183,7 +186,7 @@ public class LLCSegmentCompletionHandlers { LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString()); SegmentCompletionProtocol.Response response = - SegmentCompletionManager.getInstance().segmentCommitStart(requestParams); + _segmentCompletionManager.segmentCommitStart(requestParams); final String responseStr = response.toJsonString(); LOGGER.info("Response to segmentCommitStart:{}", responseStr); return responseStr; @@ -230,7 +233,7 @@ public class LLCSegmentCompletionHandlers { CommittingSegmentDescriptor committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata); - SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance() + SegmentCompletionProtocol.Response response = _segmentCompletionManager .segmentCommitEnd(requestParams, isSuccess, isSplitCommit, committingSegmentDescriptor); final String responseStr = response.toJsonString(); LOGGER.info("Response to segmentCommitEnd:{}", responseStr); @@ -255,7 +258,7 @@ public class LLCSegmentCompletionHandlers { .withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes); LOGGER.info("Processing segmentCommit:{}", requestParams.toString()); - final SegmentCompletionManager segmentCompletionManager = SegmentCompletionManager.getInstance(); + final SegmentCompletionManager segmentCompletionManager = _segmentCompletionManager; SegmentCompletionProtocol.Response response = segmentCompletionManager.segmentCommitStart(requestParams); CommittingSegmentDescriptor committingSegmentDescriptor = @@ -297,7 +300,7 @@ public class LLCSegmentCompletionHandlers { // check for existing segment file and remove it. So, the block cannot be removed altogether. // For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer // be used. - synchronized (SegmentCompletionManager.getInstance()) { + synchronized (_segmentCompletionManager) { if (pinotFS.exists(segmentFileURI)) { LOGGER.warn("Segment file {} exists. Replacing with upload from {} for segment {}", segmentFileURI.toString(), instanceId, segmentName); @@ -406,7 +409,7 @@ public class LLCSegmentCompletionHandlers { LOGGER.error("Segment metadata extraction failure for segment {}", segmentName); return SegmentCompletionProtocol.RESP_FAILED.toJsonString(); } - SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance() + SegmentCompletionProtocol.Response response = _segmentCompletionManager .segmentCommitEnd(requestParams, isSuccess, isSplitCommit, CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata)); final String responseStr = response.toJsonString(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 1ad19fc..4216554 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -137,6 +137,7 @@ public class PinotHelixResourceManager { private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor; private Builder _keyBuilder; private SegmentDeletionManager _segmentDeletionManager; + private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; private TableRebalancer _tableRebalancer; public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName, @@ -253,9 +254,8 @@ public class PinotHelixResourceManager { * Register and connect to Helix cluster as PARTICIPANT role. */ private HelixManager registerAndConnectAsHelixParticipant() { - HelixManager helixManager = HelixManagerFactory - .getZKHelixManager(_helixClusterName, CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId, - InstanceType.PARTICIPANT, _helixZkURL); + HelixManager helixManager = HelixManagerFactory.getZKHelixManager(_helixClusterName, + CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId, InstanceType.PARTICIPANT, _helixZkURL); try { helixManager.connect(); return helixManager; @@ -525,8 +525,8 @@ public class PinotHelixResourceManager { // Segment is in error and we don't consider error state as different from target, therefore continue } else { // Will try to read data every 500 ms, only if external view not updated. - Uninterruptibles - .sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); continue deadlineLoop; } } @@ -536,8 +536,8 @@ public class PinotHelixResourceManager { return true; } else { // Segment doesn't exist in EV, wait for a little bit - Uninterruptibles - .sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); } } @@ -579,8 +579,7 @@ public class PinotHelixResourceManager { return PinotResourceManagerResponse.SUCCESS; } - public PinotResourceManagerResponse rebuildBrokerResourceFromHelixTags(String tableNameWithType) - throws Exception { + public PinotResourceManagerResponse rebuildBrokerResourceFromHelixTags(String tableNameWithType) throws Exception { TableConfig tableConfig; try { tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); @@ -602,8 +601,8 @@ public class PinotHelixResourceManager { IdealState brokerIdealState = HelixHelper.getBrokerIdealStates(_helixAdmin, _helixClusterName); Set<String> brokerInstancesInIdealState = brokerIdealState.getInstanceSet(tableNameWithType); if (brokerInstancesInIdealState.equals(brokerInstances)) { - return PinotResourceManagerResponse - .success("Broker resource is not rebuilt because ideal state is the same for table: " + tableNameWithType); + return PinotResourceManagerResponse.success( + "Broker resource is not rebuilt because ideal state is the same for table: " + tableNameWithType); } // Update ideal state with the new broker instances @@ -639,8 +638,8 @@ public class PinotHelixResourceManager { tableIdealState.setPartitionState(tableNameWithType, instanceName, BrokerOnlineOfflineStateModel.ONLINE); } } - _helixAdmin - .setResourceIdealState(_helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, tableIdealState); + _helixAdmin.setResourceIdealState(_helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, + tableIdealState); } private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String brokerTenantTag, @@ -788,8 +787,8 @@ public class PinotHelixResourceManager { public boolean isServerTenantDeletable(String tenantName) { Set<String> taggedInstances = new HashSet<>( HelixHelper.getInstancesWithTag(_helixZkManager, TagNameUtils.getOfflineTagForTenant(tenantName))); - taggedInstances - .addAll(HelixHelper.getInstancesWithTag(_helixZkManager, TagNameUtils.getRealtimeTagForTenant(tenantName))); + taggedInstances.addAll( + HelixHelper.getInstancesWithTag(_helixZkManager, TagNameUtils.getRealtimeTagForTenant(tenantName))); for (String resourceName : getAllResources()) { if (!TableNameBuilder.isTableResource(resourceName)) { continue; @@ -812,9 +811,8 @@ public class PinotHelixResourceManager { for (String instanceName : instancesInCluster) { InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName)); for (String tag : config.getTags()) { - if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag - .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag - .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) { + if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag.equals( + CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag.equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) { continue; } TenantRole tenantRole; @@ -838,9 +836,8 @@ public class PinotHelixResourceManager { for (String instanceName : instancesInCluster) { InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName)); for (String tag : config.getTags()) { - if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag - .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag - .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) { + if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag.equals( + CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag.equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) { continue; } TenantRole tenantRole; @@ -1027,9 +1024,8 @@ public class PinotHelixResourceManager { } public List<String> getSchemaNames() { - return _propertyStore - .getChildNames(PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(), - AccessOption.PERSISTENT); + return _propertyStore.getChildNames( + PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(), AccessOption.PERSISTENT); } /** @@ -1037,8 +1033,7 @@ public class PinotHelixResourceManager { * @throws InvalidTableConfigException * @throws TableAlreadyExistsException for offline tables only if the table already exists */ - public void addTable(@Nonnull TableConfig tableConfig) - throws IOException { + public void addTable(@Nonnull TableConfig tableConfig) throws IOException { final String tableNameWithType = tableConfig.getTableName(); TenantConfig tenantConfig; @@ -1107,9 +1102,8 @@ public class PinotHelixResourceManager { } // now lets build an ideal state LOGGER.info("building empty ideal state for table : " + tableNameWithType); - final IdealState offlineIdealState = PinotTableIdealStateBuilder - .buildEmptyIdealStateFor(tableNameWithType, Integer.parseInt(segmentsConfig.getReplication()), - _enableBatchMessageMode); + final IdealState offlineIdealState = PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, + Integer.parseInt(segmentsConfig.getReplication()), _enableBatchMessageMode); LOGGER.info("adding table via the admin"); _helixAdmin.addResource(_helixClusterName, tableNameWithType, offlineIdealState); LOGGER.info("successfully added the table : " + tableNameWithType + " to the cluster"); @@ -1215,6 +1209,10 @@ public class PinotHelixResourceManager { } } + public void setPinotLLCRealtimeSegmentManager(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) { + _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager; + } + private void ensureRealtimeClusterIsSetUp(TableConfig config, String realtimeTableName, IndexingConfig indexingConfig) { StreamConfig streamConfig = new StreamConfig(indexingConfig.getStreamConfigs()); @@ -1234,7 +1232,7 @@ public class PinotHelixResourceManager { // Only high-level consumer specified in the config. createHelixEntriesForHighLevelConsumer(config, realtimeTableName, idealState); // Clean up any LLC table if they are present - PinotLLCRealtimeSegmentManager.getInstance().cleanupLLC(realtimeTableName); + _pinotLLCRealtimeSegmentManager.cleanupLLC(realtimeTableName); } } @@ -1243,8 +1241,8 @@ public class PinotHelixResourceManager { // Will either create idealstate entry, or update the IS entry with new segments // (unless there are low-level segments already present) if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) { - PinotTableIdealStateBuilder - .buildLowLevelRealtimeIdealStateFor(realtimeTableName, config, idealState, _enableBatchMessageMode); + PinotTableIdealStateBuilder.buildLowLevelRealtimeIdealStateFor(_pinotLLCRealtimeSegmentManager, + realtimeTableName, config, idealState, _enableBatchMessageMode); LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); } else { LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName); @@ -1255,9 +1253,8 @@ public class PinotHelixResourceManager { private void createHelixEntriesForHighLevelConsumer(TableConfig config, String realtimeTableName, IdealState idealState) { if (idealState == null) { - idealState = PinotTableIdealStateBuilder - .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, config, _helixZkManager, _propertyStore, - _enableBatchMessageMode); + idealState = PinotTableIdealStateBuilder.buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, config, + _helixZkManager, _propertyStore, _enableBatchMessageMode); LOGGER.info("Adding helix resource with empty HLC IdealState for {}", realtimeTableName); _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState); } else { @@ -1272,8 +1269,7 @@ public class PinotHelixResourceManager { } } - public void setExistingTableConfig(TableConfig config, String tableNameWithType, TableType type) - throws IOException { + public void setExistingTableConfig(TableConfig config, String tableNameWithType, TableType type) throws IOException { if (type == TableType.REALTIME) { ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, config.toZNRecord()); ensureRealtimeClusterIsSetUp(config, tableNameWithType, config.getIndexingConfig()); @@ -1284,7 +1280,8 @@ public class PinotHelixResourceManager { ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, config.toZNRecord()); IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); final String configReplication = config.getValidationConfig().getReplication(); - if (configReplication != null && !config.getValidationConfig().getReplication() + if (configReplication != null && !config.getValidationConfig() + .getReplication() .equals(idealState.getReplicas())) { HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, new Function<IdealState, IdealState>() { @Nullable @@ -1298,8 +1295,7 @@ public class PinotHelixResourceManager { } } - public void updateMetadataConfigFor(String tableName, TableType type, TableCustomConfig newConfigs) - throws Exception { + public void updateMetadataConfigFor(String tableName, TableType type, TableCustomConfig newConfigs) throws Exception { String tableNameWithType = TableNameBuilder.forType(type).tableNameWithType(tableName); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); if (tableConfig == null) { @@ -1310,8 +1306,7 @@ public class PinotHelixResourceManager { } public void updateSegmentsValidationAndRetentionConfigFor(String tableName, TableType type, - SegmentsValidationAndRetentionConfig newConfigs) - throws Exception { + SegmentsValidationAndRetentionConfig newConfigs) throws Exception { String tableNameWithType = TableNameBuilder.forType(type).tableNameWithType(tableName); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); if (tableConfig == null) { @@ -1321,8 +1316,7 @@ public class PinotHelixResourceManager { setExistingTableConfig(tableConfig, tableNameWithType, type); } - public void updateIndexingConfigFor(String tableName, TableType type, IndexingConfig newConfigs) - throws Exception { + public void updateIndexingConfigFor(String tableName, TableType type, IndexingConfig newConfigs) throws Exception { String tableNameWithType = TableNameBuilder.forType(type).tableNameWithType(tableName); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); if (tableConfig == null) { @@ -1449,8 +1443,8 @@ public class PinotHelixResourceManager { } } - return (status) ? PinotResourceManagerResponse - .success("Table " + tableName + " enabled (reset success = " + resetSuccessful + ")") + return (status) ? PinotResourceManagerResponse.success( + "Table " + tableName + " enabled (reset success = " + resetSuccessful + ")") : PinotResourceManagerResponse.success("Table " + tableName + " disabled"); } @@ -1654,8 +1648,8 @@ public class PinotHelixResourceManager { if (customConfig != null) { Map<String, String> customConfigMap = customConfig.getCustomConfigs(); if (customConfigMap != null) { - if (customConfigMap.containsKey(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY) && !Boolean - .valueOf(customConfigMap.get(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY))) { + if (customConfigMap.containsKey(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY) && !Boolean.valueOf( + customConfigMap.get(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY))) { return false; } } @@ -1727,11 +1721,10 @@ public class PinotHelixResourceManager { Preconditions.checkNotNull(offlineTableConfig); int numReplicas = Integer.parseInt(offlineTableConfig.getValidationConfig().getReplication()); String serverTenant = TagNameUtils.getOfflineTagForTenant(offlineTableConfig.getTenantConfig().getServer()); - SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory - .getSegmentAssignmentStrategy(offlineTableConfig.getValidationConfig().getSegmentAssignmentStrategy()); - return segmentAssignmentStrategy - .getAssignedInstances(_helixZkManager, _helixAdmin, _propertyStore, _helixClusterName, segmentMetadata, - numReplicas, serverTenant); + SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy( + offlineTableConfig.getValidationConfig().getSegmentAssignmentStrategy()); + return segmentAssignmentStrategy.getAssignedInstances(_helixZkManager, _helixAdmin, _propertyStore, + _helixClusterName, segmentMetadata, numReplicas, serverTenant); } /** @@ -1796,9 +1789,9 @@ public class PinotHelixResourceManager { LOGGER.info("Wait until segment - " + segmentName + " to be OFFLINE in ExternalView"); if (!ifExternalViewChangeReflectedForState(tableName, segmentName, "OFFLINE", _externalViewOnlineToOfflineTimeoutMillis, false)) { - LOGGER - .error("External view for segment {} did not reflect the ideal state of OFFLINE within the {} ms time limit", - segmentName, _externalViewOnlineToOfflineTimeoutMillis); + LOGGER.error( + "External view for segment {} did not reflect the ideal state of OFFLINE within the {} ms time limit", + segmentName, _externalViewOnlineToOfflineTimeoutMillis); return false; } @@ -1817,8 +1810,8 @@ public class PinotHelixResourceManager { // Check that the ideal state has been written to ZK updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName); instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName); - LOGGER - .info("Found {} instances for segment '{}', after updating ideal state", instanceStateMap.size(), segmentName); + LOGGER.info("Found {} instances for segment '{}', after updating ideal state", instanceStateMap.size(), + segmentName); for (String state : instanceStateMap.values()) { if (!"ONLINE".equals(state)) { LOGGER.error("Failed to write ONLINE ideal state!"); @@ -1966,8 +1959,8 @@ public class PinotHelixResourceManager { Map<String, String> instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName); for (String state : instanceStateMap.values()) { if (!status.equals(state)) { - return PinotResourceManagerResponse - .failure("Failed to update ideal state when setting status " + status + " for segment " + segmentName); + return PinotResourceManagerResponse.failure( + "Failed to update ideal state when setting status " + status + " for segment " + segmentName); } } @@ -1977,8 +1970,8 @@ public class PinotHelixResourceManager { } } - return (externalViewUpdateSuccessful) ? PinotResourceManagerResponse - .success("Segments " + segments + " now " + status) + return (externalViewUpdateSuccessful) ? PinotResourceManagerResponse.success( + "Segments " + segments + " now " + status) : PinotResourceManagerResponse.failure("Timed out. External view not completely updated"); } @@ -2106,8 +2099,8 @@ public class PinotHelixResourceManager { IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, resource); for (String partition : idealState.getPartitionSet()) { if (idealState.getInstanceSet(partition).contains(instanceName)) { - return PinotResourceManagerResponse - .failure("Instance " + instanceName + " exists in ideal state for " + resource); + return PinotResourceManagerResponse.failure( + "Instance " + instanceName + " exists in ideal state for " + resource); } } } @@ -2186,8 +2179,7 @@ public class PinotHelixResourceManager { @Nonnull public RebalanceResult rebalanceTable(final String rawTableName, TableType tableType, - Configuration rebalanceUserConfig) - throws InvalidConfigException, TableNotFoundException { + Configuration rebalanceUserConfig) throws InvalidConfigException, TableNotFoundException { TableConfig tableConfig = getTableConfig(rawTableName, tableType); if (tableConfig == null) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 0e311e6..f21d4cb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -64,9 +64,11 @@ public class PinotTableIdealStateBuilder { public static IdealState buildEmptyIdealStateFor(String tableName, int numCopies, boolean enableBatchMessageMode) { final CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableName); final int replicas = numCopies; - customModeIdealStateBuilder - .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) - .setNumPartitions(0).setNumReplica(replicas).setMaxPartitionsPerNode(1); + customModeIdealStateBuilder.setStateModel( + PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) + .setNumPartitions(0) + .setNumReplica(replicas) + .setMaxPartitionsPerNode(1); final IdealState idealState = customModeIdealStateBuilder.build(); idealState.setInstanceGroupTag(tableName); idealState.setBatchMessageMode(enableBatchMessageMode); @@ -88,7 +90,8 @@ public class PinotTableIdealStateBuilder { new CustomModeISBuilder(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); customModeIdealStateBuilder.setStateModel( PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL) - .setMaxPartitionsPerNode(Integer.MAX_VALUE).setNumReplica(Integer.MAX_VALUE) + .setMaxPartitionsPerNode(Integer.MAX_VALUE) + .setNumReplica(Integer.MAX_VALUE) .setNumPartitions(Integer.MAX_VALUE); final IdealState idealState = customModeIdealStateBuilder.build(); idealState.setBatchMessageMode(enableBatchMessageMode); @@ -118,8 +121,9 @@ public class PinotTableIdealStateBuilder { return idealState; } - public static void buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig, - IdealState idealState, boolean enableBatchMessageMode) { + public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, + String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState, + boolean enableBatchMessageMode) { // Validate replicasPerPartition here. final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition(); @@ -136,9 +140,8 @@ public class PinotTableIdealStateBuilder { if (idealState == null) { idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode); } - final PinotLLCRealtimeSegmentManager segmentManager = PinotLLCRealtimeSegmentManager.getInstance(); try { - segmentManager.setupNewTable(realtimeTableConfig, idealState); + pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState); } catch (InvalidConfigException e) { throw new IllegalStateException("Caught exception when creating table " + realtimeTableName, e); } @@ -159,9 +162,11 @@ public class PinotTableIdealStateBuilder { public static IdealState buildEmptyRealtimeIdealStateFor(String realtimeTableName, int replicaCount, boolean enableBatchMessageMode) { final CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(realtimeTableName); - customModeIdealStateBuilder - .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) - .setNumPartitions(0).setNumReplica(replicaCount).setMaxPartitionsPerNode(1); + customModeIdealStateBuilder.setStateModel( + PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) + .setNumPartitions(0) + .setNumReplica(replicaCount) + .setMaxPartitionsPerNode(1); final IdealState idealState = customModeIdealStateBuilder.build(); idealState.setInstanceGroupTag(realtimeTableName); idealState.setBatchMessageMode(enableBatchMessageMode); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 324b9ae..d21861c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -111,9 +111,6 @@ public class PinotLLCRealtimeSegmentManager { */ private static int MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000; // 5 MINUTES - // TODO: fix the misuse of singleton. - private static PinotLLCRealtimeSegmentManager INSTANCE = null; - private final HelixAdmin _helixAdmin; private final HelixManager _helixManager; private final ZkHelixPropertyStore<ZNRecord> _propertyStore; @@ -139,25 +136,13 @@ public class PinotLLCRealtimeSegmentManager { return _controllerConf.generateVipUrl(); } - public static synchronized void create(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf, + public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) { - create(helixResourceManager.getHelixAdmin(), helixResourceManager.getHelixClusterName(), + this(helixResourceManager.getHelixAdmin(), helixResourceManager.getHelixClusterName(), helixResourceManager.getHelixZkManager(), helixResourceManager.getPropertyStore(), helixResourceManager, controllerConf, controllerMetrics, controllerLeadershipManager); } - private static synchronized void create(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager, - ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf, - ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) { - if (INSTANCE != null) { - throw new RuntimeException("Instance already created"); - } - INSTANCE = - new PinotLLCRealtimeSegmentManager(helixAdmin, clusterName, helixManager, propertyStore, helixResourceManager, - controllerConf, controllerMetrics, controllerLeadershipManager); - SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics, controllerLeadershipManager); - } - public void stop() { _isStopping = true; LOGGER @@ -181,8 +166,6 @@ public class PinotLLCRealtimeSegmentManager { } } LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get()); - INSTANCE = null; - SegmentCompletionManager.stop(); } protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager, @@ -206,13 +189,6 @@ public class PinotLLCRealtimeSegmentManager { _controllerLeadershipManager = controllerLeadershipManager; } - public static PinotLLCRealtimeSegmentManager getInstance() { - if (INSTANCE == null) { - throw new RuntimeException("Not yet created"); - } - return INSTANCE; - } - protected boolean isLeader() { return _controllerLeadershipManager.isLeader(); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 618c4a6..5ae4b25 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -64,9 +64,6 @@ public class SegmentCompletionManager { ABORTED, // state machine is aborted. we will start a fresh one when the next segmentConsumed comes in. } - // TODO: fix the misuse of singleton. - private static SegmentCompletionManager _instance = null; - private final HelixManager _helixManager; // A map that holds the FSM for each segment. private final Map<String, SegmentCompletionFSM> _fsmMap = new ConcurrentHashMap<>(); @@ -84,12 +81,15 @@ public class SegmentCompletionManager { // TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late. - protected SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, - ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) { + public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, + ControllerMetrics controllerMetrics, int segmentCommitTimeoutSeconds, + ControllerLeadershipManager controllerLeadershipManager) { _helixManager = helixManager; _segmentManager = segmentManager; _controllerMetrics = controllerMetrics; _controllerLeadershipManager = controllerLeadershipManager; + SegmentCompletionProtocol.setMaxSegmentCommitTimeMs( + TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS)); } public boolean isSplitCommitEnabled() { @@ -100,25 +100,6 @@ public class SegmentCompletionManager { return _segmentManager.getControllerVipUrl(); } - public static SegmentCompletionManager create(HelixManager helixManager, - PinotLLCRealtimeSegmentManager segmentManager, ControllerConf controllerConf, - ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) { - if (_instance != null) { - throw new RuntimeException("Cannot create multiple instances"); - } - _instance = new SegmentCompletionManager(helixManager, segmentManager, controllerMetrics, controllerLeadershipManager); - SegmentCompletionProtocol.setMaxSegmentCommitTimeMs( - TimeUnit.MILLISECONDS.convert(controllerConf.getSegmentCommitTimeoutSeconds(), TimeUnit.SECONDS)); - return _instance; - } - - public static SegmentCompletionManager getInstance() { - if (_instance == null) { - throw new RuntimeException("Not yet created"); - } - return _instance; - } - protected long getCurrentTimeMs() { return System.currentTimeMillis(); } @@ -140,11 +121,11 @@ public class SegmentCompletionManager { // Also good for synchronization, because it is possible that multiple threads take this path, and we don't want // multiple instances of the FSM to be created for the same commit sequence at the same time. final long endOffset = segmentMetadata.getEndOffset(); - fsm = SegmentCompletionFSM - .fsmInCommit(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas(), endOffset); + fsm = SegmentCompletionFSM.fsmInCommit(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas(), + endOffset); } else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) { - fsm = SegmentCompletionFSM - .fsmStoppedConsuming(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas()); + fsm = SegmentCompletionFSM.fsmStoppedConsuming(_segmentManager, this, segmentName, + segmentMetadata.getNumReplicas()); } else { // Segment is in the process of completing, and this is the first one to respond. Create fsm fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas()); @@ -627,7 +608,8 @@ public class SegmentCompletionManager { LOGGER.error("Segment upload failed"); return abortAndReturnFailed(); } - SegmentCompletionProtocol.Response response = commitSegment(reqParams, isSplitCommit, committingSegmentDescriptor); + SegmentCompletionProtocol.Response response = + commitSegment(reqParams, isSplitCommit, committingSegmentDescriptor); if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) { return abortAndReturnFailed(); } else { @@ -644,10 +626,11 @@ public class SegmentCompletionManager { private SegmentCompletionProtocol.Response commit(String instanceId, long offset) { long allowedBuildTimeSec = (_maxTimeAllowedToCommitMs - _startTimeMs) / 1000; - LOGGER - .info("{}:COMMIT for instance={} offset={} buldTimeSec={}", _state, instanceId, offset, allowedBuildTimeSec); + LOGGER.info("{}:COMMIT for instance={} offset={} buldTimeSec={}", _state, instanceId, offset, + allowedBuildTimeSec); SegmentCompletionProtocol.Response.Params params = - new SegmentCompletionProtocol.Response.Params().withOffset(offset).withBuildTimeSeconds(allowedBuildTimeSec) + new SegmentCompletionProtocol.Response.Params().withOffset(offset) + .withBuildTimeSeconds(allowedBuildTimeSec) .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT) .withSplitCommit(_isSplitCommitEnabled); if (_isSplitCommitEnabled) { @@ -676,21 +659,21 @@ public class SegmentCompletionManager { private SegmentCompletionProtocol.Response hold(String instanceId, long offset) { LOGGER.info("{}:HOLD for instance={} offset={}", _state, instanceId, offset); - return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params() - .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withOffset(offset)); + return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus( + SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withOffset(offset)); } private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, long offset) { _state = State.ABORTED; - _segmentCompletionManager._controllerMetrics - .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1); + _segmentCompletionManager._controllerMetrics.addMeteredTableValue(_segmentName.getTableName(), + ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1); return hold(instanceId, offset); } private SegmentCompletionProtocol.Response abortAndReturnFailed() { _state = State.ABORTED; - _segmentCompletionManager._controllerMetrics - .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1); + _segmentCompletionManager._controllerMetrics.addMeteredTableValue(_segmentName.getTableName(), + ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1); return SegmentCompletionProtocol.RESP_FAILED; } @@ -962,9 +945,8 @@ public class SegmentCompletionManager { private SegmentCompletionProtocol.Response processStoppedConsuming(String instanceId, long offset, String reason, boolean createNew) { - LOGGER - .info("Instance {} stopped consuming segment {} at offset {}, state {}, createNew: {}, reason:{}", instanceId, - _segmentName, offset, _state, createNew, reason); + LOGGER.info("Instance {} stopped consuming segment {} at offset {}, state {}, createNew: {}, reason:{}", + instanceId, _segmentName, offset, _state, createNew, reason); _segmentManager.segmentStoppedConsuming(_segmentName, instanceId); return SegmentCompletionProtocol.RESP_PROCESSED; } @@ -1008,8 +990,7 @@ public class SegmentCompletionManager { } private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams, - boolean isSplitCommit, - CommittingSegmentDescriptor committingSegmentDescriptor) { + boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) { boolean success; String instanceId = reqParams.getInstanceId(); long offset = reqParams.getOffset(); @@ -1118,10 +1099,6 @@ public class SegmentCompletionManager { } } - public static void stop() { - _instance = null; - } - @VisibleForTesting protected boolean isLeader() { return _controllerLeadershipManager.isLeader(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java index 9bf70bd..b716e90 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java @@ -103,6 +103,15 @@ public class PinotControllerModeTest extends ControllerTest { "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms."); Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY); + // Start the second Pinot only controller + config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++)); + ControllerStarter controllerStarter = new ControllerStarter(config); + controllerStarter.start(); + + Thread.sleep(100_000L); + + controllerStarter.stop(); + stopController(); _controllerStarter = null; helixControllerStarter.stop(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java index 0dfe6b4..d004326 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java @@ -58,13 +58,11 @@ public class SegmentCompletionTest { private final long s3Offset = 30L; @BeforeMethod - public void testCaseSetup() - throws Exception { + public void testCaseSetup() throws Exception { testCaseSetup(true, true); } - public void testCaseSetup(boolean isLeader, boolean isConnected) - throws Exception { + public void testCaseSetup(boolean isLeader, boolean isConnected) throws Exception { segmentManager = new MockPinotLLCRealtimeSegmentManager(isLeader, isConnected); ControllerLeadershipManager controllerLeadershipManager = segmentManager.getControllerLeadershipManager(); final int partitionId = 23; @@ -78,8 +76,7 @@ public class SegmentCompletionTest { metadata.setNumReplicas(3); segmentManager._segmentMetadata = metadata; - segmentCompletionMgr = - new MockSegmentCompletionManager(segmentManager, isLeader, isConnected); + segmentCompletionMgr = new MockSegmentCompletionManager(segmentManager, isLeader, isConnected); segmentManager._segmentCompletionMgr = segmentCompletionMgr; Field fsmMapField = SegmentCompletionManager.class.getDeclaredField("_fsmMap"); @@ -93,8 +90,7 @@ public class SegmentCompletionTest { // Simulate a new controller taking over with an empty completion manager object, // but segment metadata is fine in zk - private void replaceSegmentCompletionManager() - throws Exception { + private void replaceSegmentCompletionManager() throws Exception { long oldSecs = segmentCompletionMgr._secconds; segmentCompletionMgr = new MockSegmentCompletionManager(segmentManager, true, true); segmentCompletionMgr._secconds = oldSecs; @@ -104,8 +100,7 @@ public class SegmentCompletionTest { } @Test - public void testStoppedConsumeDuringCompletion() - throws Exception { + public void testStoppedConsumeDuringCompletion() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; final String reason = "IAmLazy"; @@ -154,8 +149,8 @@ public class SegmentCompletionTest { segmentCompletionMgr._secconds += 5; params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); - response = segmentCompletionMgr - .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = segmentCompletionMgr.segmentCommitEnd(params, true, false, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // Now the FSM should have disappeared from the map @@ -172,8 +167,7 @@ public class SegmentCompletionTest { } @Test - public void testStoppedConsumeBeforeHold() - throws Exception { + public void testStoppedConsumeBeforeHold() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; final String reason = "IAmLazy"; @@ -224,8 +218,8 @@ public class SegmentCompletionTest { segmentCompletionMgr._secconds += 5; params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); - response = segmentCompletionMgr - .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = segmentCompletionMgr.segmentCommitEnd(params, true, false, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // Now the FSM should have disappeared from the map @@ -243,13 +237,14 @@ public class SegmentCompletionTest { // s2 sends stoppedConsuming message, but then may have gotten restarted, so eventually we complete the segment. @Test - public void testHappyPathAfterStoppedConsuming() - throws Exception { + public void testHappyPathAfterStoppedConsuming() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; segmentCompletionMgr._secconds = 5; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withReason("some reason"); response = segmentCompletionMgr.segmentStoppedConsuming(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED); @@ -262,21 +257,18 @@ public class SegmentCompletionTest { } @Test - public void testHappyPath() - throws Exception { + public void testHappyPath() throws Exception { testHappyPath(5L); } // Tests happy path with split commit protocol @Test - public void testHappyPathSplitCommit() - throws Exception { + public void testHappyPathSplitCommit() throws Exception { testHappyPathSplitCommit(5L); } @Test - public void testExceptionInConsumedMessage() - throws Exception { + public void testExceptionInConsumedMessage() throws Exception { segmentManager._segmentMetadata = null; SegmentCompletionProtocol.Response response; @@ -289,8 +281,7 @@ public class SegmentCompletionTest { // When commit segment file fails, makes sure the fsm aborts and that a segment can successfully commit afterwards. @Test - public void testCommitSegmentFileFail() - throws Exception { + public void testCommitSegmentFileFail() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -333,10 +324,12 @@ public class SegmentCompletionTest { Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); // s2's file does not successfully commit segmentCompletionMgr._secconds += 5; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withSegmentLocation("doNotCommitMe"); - response = segmentCompletionMgr - .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = segmentCompletionMgr.segmentCommitEnd(params, true, true, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED); // Now the FSM should have aborted @@ -367,17 +360,18 @@ public class SegmentCompletionTest { Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); // s2's file successfully commits segmentCompletionMgr._secconds += 5; - params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s3) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withSegmentLocation("location"); - response = segmentCompletionMgr - .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = segmentCompletionMgr.segmentCommitEnd(params, true, true, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT_SUCCESS); // And the FSM should be removed. Assert.assertFalse(fsmMap.containsKey(segmentNameStr)); } - public void testHappyPathSplitCommit(long startTime) - throws Exception { + public void testHappyPathSplitCommit(long startTime) throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -420,10 +414,12 @@ public class SegmentCompletionTest { Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); segmentCompletionMgr._secconds += 5; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withSegmentLocation("location"); - response = segmentCompletionMgr - .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = segmentCompletionMgr.segmentCommitEnd(params, true, true, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // Now the FSM should have disappeared from the map @@ -441,8 +437,7 @@ public class SegmentCompletionTest { // Tests that we abort when the server instance comes back with a different offset than it is told to commit with @Test - public void testCommitDifferentOffsetSplitCommit() - throws Exception { + public void testCommitDifferentOffsetSplitCommit() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -470,10 +465,12 @@ public class SegmentCompletionTest { // s3 comes back to try to commit with a different offset segmentCompletionMgr._secconds += 5; - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s3) + .withOffset(s3Offset) + .withSegmentName(segmentNameStr) .withSegmentLocation("location"); - response = segmentCompletionMgr - .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = segmentCompletionMgr.segmentCommitEnd(params, true, true, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED); // Now the FSM should have disappeared from the map @@ -489,8 +486,7 @@ public class SegmentCompletionTest { Assert.assertTrue(fsmMap.containsKey(segmentNameStr)); } - public void testHappyPath(long startTime) - throws Exception { + public void testHappyPath(long startTime) throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -532,8 +528,8 @@ public class SegmentCompletionTest { segmentCompletionMgr._secconds += 5; params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr); - response = segmentCompletionMgr - .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = segmentCompletionMgr.segmentCommitEnd(params, true, false, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // Now the FSM should have disappeared from the map @@ -550,50 +546,57 @@ public class SegmentCompletionTest { } @Test - public void testControllerNotConnected() - throws Exception { + public void testControllerNotConnected() throws Exception { testCaseSetup(true, false); // Leader but not connected SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; segmentCompletionMgr._secconds = 5L; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s1) + .withOffset(s1Offset) + .withSegmentName(segmentNameStr) .withReason("rowLimit"); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.NOT_LEADER); } @Test - public void testWinnerOnTimeLimit() - throws Exception { + public void testWinnerOnTimeLimit() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; segmentCompletionMgr._secconds = 10L; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s1) + .withOffset(s1Offset) + .withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD); } @Test - public void testWinnerOnRowLimit() - throws Exception { + public void testWinnerOnRowLimit() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; segmentCompletionMgr._secconds = 10L; - params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s1) + .withOffset(s1Offset) + .withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT); // S2 comes with the same offset as S1 segmentCompletionMgr._secconds += 1; - params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s1Offset) + .withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD); segmentCompletionMgr._secconds += 1; // S3 comes with a different offset and without row limit. we ask it to hold even though it is higher. - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s3) + .withOffset(s3Offset) + .withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD); @@ -605,16 +608,20 @@ public class SegmentCompletionTest { segmentCompletionMgr._secconds += 5; params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr); - response = segmentCompletionMgr - .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = segmentCompletionMgr.segmentCommitEnd(params, true, false, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // We ask S2 to keep the segment - params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s1Offset) + .withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.KEEP); // And we ask S3 to discard because it was ahead. - params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s3) + .withOffset(s3Offset) + .withSegmentName(segmentNameStr) .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.DISCARD); @@ -622,19 +629,16 @@ public class SegmentCompletionTest { // Tests that when server is delayed(Stalls for a hour), when server comes back, we commit successfully. @Test - public void testDelayedServerSplitCommit() - throws Exception { + public void testDelayedServerSplitCommit() throws Exception { testDelayedServer(true); } @Test - public void testDelayedServer() - throws Exception { + public void testDelayedServer() throws Exception { testDelayedServer(false); } - public void testDelayedServer(boolean isSplitCommit) - throws Exception { + public void testDelayedServer(boolean isSplitCommit) throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -670,7 +674,9 @@ public class SegmentCompletionTest { response = segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); segmentCompletionMgr._secconds += 5; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withSegmentLocation("location"); response = segmentCompletionMgr.segmentCommitEnd(params, true, isSplitCommit, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); @@ -689,8 +695,7 @@ public class SegmentCompletionTest { // We test the case where all servers go silent after controller asks one of them commit @Test - public void testDeadServers() - throws Exception { + public void testDeadServers() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -737,8 +742,7 @@ public class SegmentCompletionTest { // We test the case when the committer is asked to commit, but they never come back. @Test - public void testCommitterFailure() - throws Exception { + public void testCommitterFailure() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -807,8 +811,7 @@ public class SegmentCompletionTest { } @Test - public void testHappyPathSlowCommit() - throws Exception { + public void testHappyPathSlowCommit() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 1509242466s; @@ -845,7 +848,9 @@ public class SegmentCompletionTest { // Fast forward to one second before commit time, and send a lease renewal request for 20s segmentCompletionMgr._secconds = startTime + commitTimeSec - 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withExtraTimeSec(20); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED); @@ -853,7 +858,9 @@ public class SegmentCompletionTest { // Another lease extension in 19s. segmentCompletionMgr._secconds += 19; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withExtraTimeSec(20); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED); @@ -867,16 +874,15 @@ public class SegmentCompletionTest { long commitTimeMs = (segmentCompletionMgr._secconds - startTime) * 1000; Assert.assertEquals(commitTimeMap.get(tableName).longValue(), commitTimeMs); segmentCompletionMgr._secconds += 55; - response = segmentCompletionMgr - .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); + response = segmentCompletionMgr.segmentCommitEnd(params, true, false, + CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); // now FSM should be out of the map. Assert.assertFalse((fsmMap.containsKey(segmentNameStr))); } @Test - public void testFailedSlowCommit() - throws Exception { + public void testFailedSlowCommit() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; final String tableName = new LLCSegmentName(segmentNameStr).getTableName(); @@ -912,7 +918,9 @@ public class SegmentCompletionTest { // Fast forward to one second before commit time, and send a lease renewal request for 20s segmentCompletionMgr._secconds = startTime + commitTimeSec - 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withExtraTimeSec(20); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED); @@ -929,8 +937,7 @@ public class SegmentCompletionTest { } @Test - public void testLeaseTooLong() - throws Exception { + public void testLeaseTooLong() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -965,7 +972,9 @@ public class SegmentCompletionTest { // Fast forward to one second before commit time, and send a lease renewal request for 20s segmentCompletionMgr._secconds = startTime + commitTimeSec - 1; - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withExtraTimeSec(20); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED); @@ -973,9 +982,11 @@ public class SegmentCompletionTest { final int leaseTimeSec = 20; // Lease will not be granted if the time taken so far plus lease time exceeds the max allowabale. - while (segmentCompletionMgr._secconds + leaseTimeSec <= startTime + SegmentCompletionManager - .getMaxCommitTimeForAllSegmentsSeconds()) { - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + while (segmentCompletionMgr._secconds + leaseTimeSec + <= startTime + SegmentCompletionManager.getMaxCommitTimeForAllSegmentsSeconds()) { + params = new Request.Params().withInstanceId(s2) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withExtraTimeSec(leaseTimeSec); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED); @@ -984,7 +995,9 @@ public class SegmentCompletionTest { } // Now the lease request should fail. - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withExtraTimeSec(leaseTimeSec); response = segmentCompletionMgr.extendBuildTime(params); Assert.assertEquals(response.getStatus(), ControllerResponseStatus.FAILED); @@ -992,8 +1005,7 @@ public class SegmentCompletionTest { } @Test - public void testControllerFailureDuringCommit() - throws Exception { + public void testControllerFailureDuringCommit() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -1052,8 +1064,7 @@ public class SegmentCompletionTest { // Tests that when controller fails, after controller failure, commit doesn't continue. Then because all server instances // are in holding state, we will ask one to commit. @Test - public void testControllerFailureDuringSplitCommit() - throws Exception { + public void testControllerFailureDuringSplitCommit() throws Exception { SegmentCompletionProtocol.Response response; Request.Params params; // s1 sends offset of 20, gets HOLD at t = 5s; @@ -1104,15 +1115,16 @@ public class SegmentCompletionTest { Assert.assertTrue(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD)); // So s2 goes back into HOLDING state. s1 and s3 are already holding, so now it will get COMMIT back. - params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr) + params = new Request.Params().withInstanceId(s2) + .withOffset(s2Offset) + .withSegmentName(segmentNameStr) .withSegmentLocation("location"); response = segmentCompletionMgr.segmentConsumed(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT); } @Test - public void testNotLeader() - throws Exception { + public void testNotLeader() throws Exception { testCaseSetup(false, true); SegmentCompletionProtocol.Response response; SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params(); @@ -1157,8 +1169,8 @@ public class SegmentCompletionTest { public boolean commitSegmentMetadata(String rawTableName, CommittingSegmentDescriptor committingSegmentDescriptor) { _segmentMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE); _segmentMetadata.setEndOffset(committingSegmentDescriptor.getNextOffset()); - _segmentMetadata.setDownloadUrl(ControllerConf - .constructDownloadUrl(rawTableName, committingSegmentDescriptor.getSegmentName(), + _segmentMetadata.setDownloadUrl( + ControllerConf.constructDownloadUrl(rawTableName, committingSegmentDescriptor.getSegmentName(), CONTROLLER_CONF.generateVipUrl())); _segmentMetadata.setEndTime(_segmentCompletionMgr.getCurrentTimeMs()); return true; @@ -1194,15 +1206,15 @@ public class SegmentCompletionTest { this(createMockHelixManager(isLeader, isConnected), segmentManager, isLeader, isConnected); } - protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader, - boolean isConnected) { + protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, + boolean isLeader, boolean isConnected) { this(helixManager, segmentManager, isLeader, isConnected, new ControllerLeadershipManager(helixManager)); } protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader, boolean isConnected, ControllerLeadershipManager controllerLeadershipManager) { super(helixManager, segmentManager, new ControllerMetrics(new MetricsRegistry()), - controllerLeadershipManager); + SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds(), controllerLeadershipManager); _isLeader = isLeader; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
