This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6bfcacb [Cleanup] Merge RealtimeSegmentOnlineOfflineStateModel and SegmentOnlineOfflineStateModel in CommonConstants (#5459) 6bfcacb is described below commit 6bfcacb239f55c27491c2a580bf4a63cf2ec88fe Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri May 29 11:39:44 2020 -0700 [Cleanup] Merge RealtimeSegmentOnlineOfflineStateModel and SegmentOnlineOfflineStateModel in CommonConstants (#5459) We only have one SegmentOnlineOfflineStateModel, so there is no value keeping both of them --- .../HelixExternalViewBasedQueryQuotaManager.java | 4 +- .../pinot/broker/routing/RoutingManager.java | 6 +-- .../instanceselector/BaseInstanceSelector.java | 6 +-- .../segmentselector/RealtimeSegmentSelector.java | 4 +- .../instanceselector/InstanceSelectorTest.java | 8 ++-- .../segmentselector/SegmentSelectorTest.java | 4 +- .../apache/pinot/common/utils/CommonConstants.java | 10 +--- .../helix/core/PinotHelixResourceManager.java | 20 ++++---- .../segment/OfflineSegmentAssignment.java | 6 +-- .../segment/RealtimeSegmentAssignment.java | 14 +++--- .../assignment/segment/SegmentAssignmentUtils.java | 13 +++--- .../realtime/PinotLLCRealtimeSegmentManager.java | 39 ++++++++-------- .../helix/core/rebalance/TableRebalancer.java | 6 +-- .../helix/core/retention/RetentionManager.java | 3 +- ...fflineNonReplicaGroupSegmentAssignmentTest.java | 14 +++--- .../OfflineReplicaGroupSegmentAssignmentTest.java | 26 +++++------ ...altimeNonReplicaGroupSegmentAssignmentTest.java | 19 ++++---- .../RealtimeReplicaGroupSegmentAssignmentTest.java | 19 ++++---- .../segment/SegmentAssignmentUtilsTest.java | 8 ++-- .../PinotLLCRealtimeSegmentManagerTest.java | 54 +++++++++++----------- .../core/rebalance/TableRebalancerClusterTest.java | 2 +- .../helix/core/rebalance/TableRebalancerTest.java | 8 ++-- .../ControllerPeriodicTasksIntegrationTest.java | 8 ++-- .../server/starter/helix/HelixServerStarter.java | 5 +- 24 files changed, 148 insertions(+), 158 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index 172b13a..3306e58 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -168,7 +168,7 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan if (stateMap != null) { for (Map.Entry<String, String> state : stateMap.entrySet()) { if (!_helixManager.getInstanceName().equals(state.getKey()) && state.getValue() - .equals(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE)) { + .equals(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)) { otherOnlineBrokerCount++; } } @@ -304,7 +304,7 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan int otherOnlineBrokerCount = 0; for (Map.Entry<String, String> state : stateMap.entrySet()) { if (!_helixManager.getInstanceName().equals(state.getKey()) && state.getValue() - .equals(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE)) { + .equals(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)) { otherOnlineBrokerCount++; } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java index 62eece4..e8bf81f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java @@ -51,7 +51,7 @@ import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.utils.CommonConstants; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.spi.config.table.QueryConfig; @@ -195,8 +195,8 @@ public class RoutingManager implements ClusterChangeHandler { Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size())); for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { Map<String, String> instanceStateMap = entry.getValue(); - if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE) || instanceStateMap - .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) { + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap + .containsValue(SegmentStateModel.CONSUMING)) { onlineSegments.add(entry.getKey()); } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java index 404bfd6..512f721 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java @@ -31,7 +31,7 @@ import org.apache.helix.model.ExternalView; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.request.BrokerRequest; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.HashUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,9 +153,9 @@ abstract class BaseInstanceSelector implements InstanceSelector { String instance = instanceStateEntry.getKey(); String state = instanceStateEntry.getValue(); // Do not track instances in ERROR state - if (!state.equals(RealtimeSegmentOnlineOfflineStateModel.ERROR)) { + if (!state.equals(SegmentStateModel.ERROR)) { _instanceToSegmentsMap.computeIfAbsent(instance, k -> new ArrayList<>()).add(segment); - if (state.equals(RealtimeSegmentOnlineOfflineStateModel.OFFLINE)) { + if (state.equals(SegmentStateModel.OFFLINE)) { offlineInstances.add(instance); } else { onlineInstances.add(instance); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java index b89961a..a052da2 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java @@ -28,7 +28,7 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.helix.model.ExternalView; import org.apache.pinot.common.request.BrokerRequest; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.HLCSegmentName; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.SegmentName; @@ -90,7 +90,7 @@ public class RealtimeSegmentSelector implements SegmentSelector { HLCSegmentName hlcSegmentName = new HLCSegmentName(segment); groupIdToHLCSegmentsMap.computeIfAbsent(hlcSegmentName.getGroupId(), k -> new ArrayList<>()).add(segment); } else { - if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) { + if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { // Keep the first CONSUMING segment for each partition LLCSegmentName llcSegmentName = new LLCSegmentName(segment); partitionIdToFirstConsumingLLCSegmentMap.compute(llcSegmentName.getPartitionId(), (k, consumingSegment) -> { diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java index 8748f77..3806a27 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java @@ -34,10 +34,10 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.testng.annotations.Test; -import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING; -import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ERROR; -import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.OFFLINE; -import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java index 9da10c0..b447bb6 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java @@ -32,8 +32,8 @@ import org.testng.annotations.Test; import static org.apache.pinot.broker.routing.segmentselector.RealtimeSegmentSelector.FORCE_HLC; import static org.apache.pinot.broker.routing.segmentselector.RealtimeSegmentSelector.ROUTING_OPTIONS_KEY; -import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING; -import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEqualsNoOrder; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 18a4bf2..bff81f4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -56,20 +56,14 @@ public class CommonConstants { public static final String UNTAGGED_MINION_INSTANCE = "minion_untagged"; public static class StateModel { - public static class SegmentOnlineOfflineStateModel { - public static final String ONLINE = "ONLINE"; - public static final String OFFLINE = "OFFLINE"; - public static final String ERROR = "ERROR"; - } - - public static class RealtimeSegmentOnlineOfflineStateModel { + public static class SegmentStateModel { public static final String ONLINE = "ONLINE"; public static final String OFFLINE = "OFFLINE"; public static final String ERROR = "ERROR"; public static final String CONSUMING = "CONSUMING"; } - public static class BrokerOnlineOfflineStateModel { + public static class BrokerResourceStateModel { public static final String ONLINE = "ONLINE"; public static final String OFFLINE = "OFFLINE"; public static final String ERROR = "ERROR"; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ec96836..6674aff 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -72,8 +72,8 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import org.apache.pinot.common.utils.CommonConstants.Helix; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnlineOfflineStateModel; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.CommonConstants.Server; import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.config.InstanceUtils; @@ -95,15 +95,15 @@ import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils; import org.apache.pinot.controller.helix.starter.HelixConfig; import org.apache.pinot.core.segment.index.metadata.SegmentMetadata; +import org.apache.pinot.spi.config.instance.Instance; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableCustomConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TenantConfig; -import org.apache.pinot.spi.config.instance.Instance; -import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -631,7 +631,7 @@ public class PinotHelixResourceManager { instanceStateMap.clear(); } for (String brokerInstance : brokerInstances) { - idealState.setPartitionState(tableNameWithType, brokerInstance, BrokerOnlineOfflineStateModel.ONLINE); + idealState.setPartitionState(tableNameWithType, brokerInstance, BrokerResourceStateModel.ONLINE); } return idealState; }, DEFAULT_RETRY_POLICY); @@ -651,7 +651,7 @@ public class PinotHelixResourceManager { Preconditions.checkNotNull(tableConfig); String brokerTag = TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig()); if (brokerTag.equals(brokerTenantTag)) { - tableIdealState.setPartitionState(tableNameWithType, instanceName, BrokerOnlineOfflineStateModel.ONLINE); + tableIdealState.setPartitionState(tableNameWithType, instanceName, BrokerResourceStateModel.ONLINE); } } _helixAdmin.setResourceIdealState(_helixClusterName, Helix.BROKER_RESOURCE_INSTANCE, tableIdealState); @@ -1141,8 +1141,8 @@ public class PinotHelixResourceManager { List<String> brokers = HelixHelper.getInstancesWithTag(_helixZkManager, brokerTag); HelixHelper.updateIdealState(_helixZkManager, Helix.BROKER_RESOURCE_INSTANCE, idealState -> { assert idealState != null; - idealState.getRecord().getMapFields().put(tableNameWithType, - SegmentAssignmentUtils.getInstanceStateMap(brokers, BrokerOnlineOfflineStateModel.ONLINE)); + idealState.getRecord().getMapFields() + .put(tableNameWithType, SegmentAssignmentUtils.getInstanceStateMap(brokers, BrokerResourceStateModel.ONLINE)); return idealState; }); } @@ -1563,7 +1563,7 @@ public class PinotHelixResourceManager { LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, offlineTableName); currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentOnlineOfflineStateModel.ONLINE)); + SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); } return idealState; }); @@ -2090,7 +2090,7 @@ public class PinotHelixResourceManager { _helixAdmin.enableInstance(_helixClusterName, instanceName, enableInstance); long intervalWaitTimeMs = 500L; long deadline = System.currentTimeMillis() + timeOutMs; - String offlineState = SegmentOnlineOfflineStateModel.OFFLINE; + String offlineState = SegmentStateModel.OFFLINE; while (System.currentTimeMillis() < deadline) { PropertyKey liveInstanceKey = _keyBuilder.liveInstance(instanceName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java index 166f2a7..3e02759 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java @@ -33,7 +33,7 @@ import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -179,8 +179,8 @@ public class OfflineSegmentAssignment implements SegmentAssignment { newAssignment = new TreeMap<>(); for (String segment : currentAssignment.keySet()) { List<String> assignedInstances = assignSegment(segment, newAssignment, instancePartitions); - newAssignment.put(segment, - SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentOnlineOfflineStateModel.ONLINE)); + newAssignment + .put(segment, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); } } else { int numReplicaGroups = instancePartitions.getNumReplicaGroups(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index 177b6ee..22a5f0a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -29,7 +29,7 @@ import java.util.TreeMap; import org.apache.commons.configuration.Configuration; import org.apache.helix.HelixManager; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants; import org.apache.pinot.spi.config.table.TableConfig; @@ -208,8 +208,8 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { newAssignment = new TreeMap<>(); for (String segment : completedSegmentAssignment.keySet()) { List<String> assignedInstances = assignCompletedSegment(segment, newAssignment, completedInstancePartitions); - newAssignment.put(segment, SegmentAssignmentUtils - .getInstanceStateMap(assignedInstances, RealtimeSegmentOnlineOfflineStateModel.ONLINE)); + newAssignment + .put(segment, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); } } else { if (completedInstancePartitions.getNumReplicaGroups() == 1) { @@ -252,8 +252,8 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { newAssignment = new TreeMap<>(); for (String segmentName : completedSegmentAssignment.keySet()) { List<String> instancesAssigned = assignConsumingSegment(segmentName, consumingInstancePartitions); - Map<String, String> instanceStateMap = SegmentAssignmentUtils - .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.ONLINE); + Map<String, String> instanceStateMap = + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE); newAssignment.put(segmentName, instanceStateMap); } } @@ -267,8 +267,8 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { for (String segmentName : consumingSegmentAssignment.keySet()) { List<String> instancesAssigned = assignConsumingSegment(segmentName, consumingInstancePartitions); - Map<String, String> instanceStateMap = SegmentAssignmentUtils - .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING); + Map<String, String> instanceStateMap = + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING); newAssignment.put(segmentName, instanceStateMap); } } else { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java index 1b301ad..edf8553 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java @@ -30,8 +30,7 @@ import java.util.Set; import java.util.TreeMap; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.Pairs; @@ -142,7 +141,7 @@ public class SegmentAssignmentUtils { Map<String, Map<String, String>> currentAssignment, List<String> instances, int replication) { // Use Helix AutoRebalanceStrategy to rebalance the table LinkedHashMap<String, Integer> states = new LinkedHashMap<>(); - states.put(SegmentOnlineOfflineStateModel.ONLINE, replication); + states.put(SegmentStateModel.ONLINE, replication); AutoRebalanceStrategy autoRebalanceStrategy = new AutoRebalanceStrategy(null, new ArrayList<>(currentAssignment.keySet()), states); // Make a copy of the current assignment because this step might change the passed in assignment @@ -252,8 +251,8 @@ public class SegmentAssignmentUtils { Map<String, String> instanceStateMap = new TreeMap<>(); int numReplicaGroups = instancePartitions.getNumReplicaGroups(); for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { - instanceStateMap.put(instancePartitions.getInstances(partitionId, replicaGroupId).get(instanceId), - SegmentOnlineOfflineStateModel.ONLINE); + instanceStateMap + .put(instancePartitions.getInstances(partitionId, replicaGroupId).get(instanceId), SegmentStateModel.ONLINE); } return instanceStateMap; } @@ -305,9 +304,9 @@ public class SegmentAssignmentUtils { for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { String segmentName = entry.getKey(); Map<String, String> instanceStateMap = entry.getValue(); - if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) { + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { _completedSegmentAssignment.put(segmentName, instanceStateMap); - } else if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) { + } else if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { _consumingSegmentAssignment.put(segmentName, instanceStateMap); } else { _offlineSegmentAssignment.put(segmentName, instanceStateMap); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 2325e92..8cae0ec 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -49,7 +49,7 @@ import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.SegmentName; @@ -412,9 +412,9 @@ public class PinotLLCRealtimeSegmentManager { TableConfig tableConfig = getTableConfig(realtimeTableName); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); IdealState idealState = getIdealState(realtimeTableName); - Preconditions.checkState(idealState.getInstanceStateMap(committingSegmentName) - .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING), - "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName); + Preconditions + .checkState(idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING), + "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName); int numPartitions = getNumPartitionsFromIdealState(idealState); int numReplicas = getNumReplicas(tableConfig, instancePartitions); @@ -608,11 +608,11 @@ public class PinotLLCRealtimeSegmentManager { assert idealState != null; Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName); String state = stateMap.get(instanceName); - if (RealtimeSegmentOnlineOfflineStateModel.CONSUMING.equals(state)) { - stateMap.put(instanceName, RealtimeSegmentOnlineOfflineStateModel.OFFLINE); + if (SegmentStateModel.CONSUMING.equals(state)) { + stateMap.put(instanceName, SegmentStateModel.OFFLINE); } else { - LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", - segmentName, state, instanceName); + LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", segmentName, state, + instanceName); } return idealState; }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true); @@ -733,16 +733,16 @@ public class PinotLLCRealtimeSegmentManager { if (committingSegmentName != null) { // Change committing segment state to ONLINE Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); - instanceStatesMap.put(committingSegmentName, - SegmentAssignmentUtils.getInstanceStateMap(instances, RealtimeSegmentOnlineOfflineStateModel.ONLINE)); + instanceStatesMap + .put(committingSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE)); LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName); } // Assign instances to the new segment and add instances as state CONSUMING List<String> instancesAssigned = segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); - instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils - .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING)); + instanceStatesMap.put(newSegmentName, + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); } @@ -854,7 +854,7 @@ public class PinotLLCRealtimeSegmentManager { Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegmentName); if (instanceStateMap != null) { // Latest segment of metadata is in idealstate. - if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) { + if (instanceStateMap.values().contains(SegmentStateModel.CONSUMING)) { if (latestSegmentZKMetadata.getStatus() == Status.DONE) { // step-1 of commmitSegmentMetadata is done (i.e. marking old segment as DONE) @@ -868,9 +868,8 @@ public class PinotLLCRealtimeSegmentManager { LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); String newSegmentName = newLLCSegmentName.getSegmentName(); - CommittingSegmentDescriptor committingSegmentDescriptor = - new CommittingSegmentDescriptor(latestSegmentName, - new StreamPartitionMsgOffset(latestSegmentZKMetadata.getEndOffset()), 0); + CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName, + new StreamPartitionMsgOffset(latestSegmentZKMetadata.getEndOffset()), 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName, @@ -883,7 +882,7 @@ public class PinotLLCRealtimeSegmentManager { // 1. all replicas OFFLINE and metadata IN_PROGRESS/DONE - a segment marked itself OFFLINE during consumption for some reason // 2. all replicas ONLINE and metadata DONE - Resolved in https://github.com/linkedin/pinot/pull/2890 // 3. we should never end up with some replicas ONLINE and some OFFLINE. - if (isAllInstancesInState(instanceStateMap, RealtimeSegmentOnlineOfflineStateModel.OFFLINE)) { + if (isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE)) { LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", latestSegmentName); // Create a new segment to re-consume from the previous start offset @@ -932,7 +931,7 @@ public class PinotLLCRealtimeSegmentManager { for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) { LLCSegmentName llcSegmentName = new LLCSegmentName(segmentEntry.getKey()); if (llcSegmentName.getPartitionId() == partitionId && segmentEntry.getValue() - .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) { + .containsValue(SegmentStateModel.CONSUMING)) { previousConsumingSegment = llcSegmentName.getSegmentName(); break; } @@ -985,8 +984,8 @@ public class PinotLLCRealtimeSegmentManager { new LLCSegmentName(rawTableName, partitionId, STARTING_SEQUENCE_NUMBER, creationTimeMs); String newSegmentName = newLLCSegmentName.getSegmentName(); long startOffset = getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionId); - CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, - new StreamPartitionMsgOffset(startOffset), 0); + CommittingSegmentDescriptor committingSegmentDescriptor = + new CommittingSegmentDescriptor(null, new StreamPartitionMsgOffset(startOffset), 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs, committingSegmentDescriptor, null, instancePartitions, numPartitions, numReplicas); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index c6c5d57..5115fe1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -35,7 +35,7 @@ import org.apache.helix.model.IdealState; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.assignment.InstancePartitionsUtils; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; @@ -473,7 +473,7 @@ public class TableRebalancer { for (Map.Entry<String, String> instanceStateEntry : idealStateInstanceStateMap.entrySet()) { // Ignore OFFLINE state in IdealState String idealStateInstanceState = instanceStateEntry.getValue(); - if (idealStateInstanceState.equals(RealtimeSegmentOnlineOfflineStateModel.OFFLINE)) { + if (idealStateInstanceState.equals(SegmentStateModel.OFFLINE)) { continue; } @@ -486,7 +486,7 @@ public class TableRebalancer { String instanceName = instanceStateEntry.getKey(); String externalViewInstanceState = externalViewInstanceStateMap.get(instanceName); if (!idealStateInstanceState.equals(externalViewInstanceState)) { - if (RealtimeSegmentOnlineOfflineStateModel.ERROR.equals(externalViewInstanceState)) { + if (SegmentStateModel.ERROR.equals(externalViewInstanceState)) { if (bestEfforts) { LOGGER .warn("Found ERROR instance: {} for segment: {}, table: {}, counting it as good state (best-efforts)", diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java index a616fc6..140e64e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java @@ -174,8 +174,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { } else { // Delete segment if all of its replicas are OFFLINE Set<String> states = new HashSet<>(stateMap.values()); - return states.size() == 1 && states - .contains(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.OFFLINE); + return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE); } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java index d037725..104770f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java @@ -27,7 +27,7 @@ import java.util.TreeMap; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -95,8 +95,8 @@ public class OfflineNonReplicaGroupSegmentAssignmentTest { assertEquals(instancesAssigned.get(replicaId), INSTANCES.get(expectedAssignedInstanceId)); expectedAssignedInstanceId = (expectedAssignedInstanceId + 1) % NUM_INSTANCES; } - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } } @@ -106,8 +106,8 @@ public class OfflineNonReplicaGroupSegmentAssignmentTest { for (String segmentName : SEGMENTS) { List<String> instancesAssigned = _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } // There should be 100 segments assigned @@ -134,8 +134,8 @@ public class OfflineNonReplicaGroupSegmentAssignmentTest { for (String segmentName : SEGMENTS) { List<String> instancesAssigned = _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } // Bootstrap table should reassign all segments based on their alphabetical order diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java index 31f3bb4..78a06bc 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java @@ -34,7 +34,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; @@ -201,8 +201,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest { assertEquals(instancesAssigned.get(replicaGroupId), INSTANCES.get(expectedAssignedInstanceId)); } - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } } @@ -234,8 +234,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest { assertEquals(instancesAssigned.get(replicaGroupId), INSTANCES.get(expectedAssignedInstanceId)); } - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } } @@ -245,8 +245,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest { for (String segmentName : SEGMENTS) { List<String> instancesAssigned = _segmentAssignmentWithoutPartition .assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithoutPartition); - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } // There should be 90 segments assigned @@ -274,8 +274,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest { for (String segmentName : SEGMENTS) { List<String> instancesAssigned = _segmentAssignmentWithPartition .assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithPartition); - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } // There should be 90 segments assigned @@ -303,8 +303,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest { for (String segmentName : SEGMENTS) { List<String> instancesAssigned = _segmentAssignmentWithoutPartition .assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithoutPartition); - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } // Bootstrap table should reassign all segments based on their alphabetical order @@ -326,8 +326,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest { for (String segmentName : SEGMENTS) { List<String> instancesAssigned = _segmentAssignmentWithPartition .assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithPartition); - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } // Bootstrap table should reassign all segments based on their alphabetical order within the partition diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java index 51a13ab..b659514 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java @@ -24,7 +24,7 @@ import java.util.Map; import java.util.TreeMap; import org.apache.commons.configuration.BaseConfiguration; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants; import org.apache.pinot.spi.config.table.TableConfig; @@ -145,7 +145,7 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { String offlineSegmentName = "offlineSegment"; Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils .getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS), - RealtimeSegmentOnlineOfflineStateModel.OFFLINE); + SegmentStateModel.OFFLINE); currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); // Rebalance without COMPLETED instance partitions should not change the segment assignment @@ -167,14 +167,14 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); - assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.ONLINE); + assertEquals(entry.getValue(), SegmentStateModel.ONLINE); } } else { // CONSUMING segments Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); - assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.CONSUMING); + assertEquals(entry.getValue(), SegmentStateModel.CONSUMING); } } } @@ -215,10 +215,10 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { for (Map.Entry<String, Map<String, String>> entry : newAssignment.entrySet()) { String segmentName = entry.getKey(); Map<String, String> instanceStateMap = entry.getValue(); - if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) { + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { for (int i = 0; i < NUM_REPLICAS; i++) { String expectedInstance = COMPLETED_INSTANCES.get(index++ % NUM_COMPLETED_INSTANCES); - assertEquals(instanceStateMap.get(expectedInstance), RealtimeSegmentOnlineOfflineStateModel.ONLINE); + assertEquals(instanceStateMap.get(expectedInstance), SegmentStateModel.ONLINE); } } else { // CONSUMING and OFFLINE segments should not be reassigned @@ -234,12 +234,11 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { String lastSegmentInPartition = _segments.get(segmentId - NUM_PARTITIONS); Map<String, String> instanceStateMap = currentAssignment.get(lastSegmentInPartition); currentAssignment.put(lastSegmentInPartition, SegmentAssignmentUtils - .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()), - RealtimeSegmentOnlineOfflineStateModel.ONLINE)); + .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()), SegmentStateModel.ONLINE)); } // Add the new segment into the assignment as CONSUMING - currentAssignment.put(_segments.get(segmentId), SegmentAssignmentUtils - .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING)); + currentAssignment.put(_segments.get(segmentId), + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java index 6eb5148..ee3c3c5 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.TreeMap; import org.apache.commons.configuration.BaseConfiguration; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants; @@ -170,7 +170,7 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { String offlineSegmentName = "offlineSegment"; Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils .getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS), - RealtimeSegmentOnlineOfflineStateModel.OFFLINE); + SegmentStateModel.OFFLINE); currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); // Rebalance without COMPLETED instance partitions should not change the segment assignment @@ -192,14 +192,14 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); - assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.ONLINE); + assertEquals(entry.getValue(), SegmentStateModel.ONLINE); } } else { // CONSUMING segments Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); - assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.CONSUMING); + assertEquals(entry.getValue(), SegmentStateModel.CONSUMING); } } } @@ -239,12 +239,12 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { for (Map.Entry<String, Map<String, String>> entry : newAssignment.entrySet()) { String segmentName = entry.getKey(); Map<String, String> instanceStateMap = entry.getValue(); - if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) { + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { int expectedInstanceId = index++ % numCompletedInstancesPerReplicaGroup; for (int i = 0; i < NUM_REPLICAS; i++) { String expectedInstance = COMPLETED_INSTANCES.get(expectedInstanceId + i * numCompletedInstancesPerReplicaGroup); - assertEquals(instanceStateMap.get(expectedInstance), RealtimeSegmentOnlineOfflineStateModel.ONLINE); + assertEquals(instanceStateMap.get(expectedInstance), SegmentStateModel.ONLINE); } } else { // CONSUMING and OFFLINE segments should not be reassigned @@ -260,12 +260,11 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { String lastSegmentInPartition = _segments.get(segmentId - NUM_PARTITIONS); Map<String, String> instanceStateMap = currentAssignment.get(lastSegmentInPartition); currentAssignment.put(lastSegmentInPartition, SegmentAssignmentUtils - .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()), - RealtimeSegmentOnlineOfflineStateModel.ONLINE)); + .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()), SegmentStateModel.ONLINE)); } // Add the new segment into the assignment as CONSUMING - currentAssignment.put(_segments.get(segmentId), SegmentAssignmentUtils - .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING)); + currentAssignment.put(_segments.get(segmentId), + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java index ef8364e..6e867c6 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -57,8 +57,8 @@ public class SegmentAssignmentUtilsTest { instancesAssigned.add(instances.get(assignedInstanceId)); assignedInstanceId = (assignedInstanceId + 1) % numInstances; } - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + currentAssignment + .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } // There should be 100 segments assigned @@ -240,7 +240,7 @@ public class SegmentAssignmentUtilsTest { instancesAssigned.add(instances.get(assignedInstanceId)); } currentAssignment.put(segments.get(segmentId), - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } // There should be 90 segments assigned diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index db98072..485ed6c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -38,7 +38,7 @@ import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.CommonConstants.Helix; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; @@ -157,7 +157,7 @@ public class PinotLLCRealtimeSegmentManagerTest { assertNotNull(instanceStateMap); assertEquals(instanceStateMap.size(), numReplicas); for (String state : instanceStateMap.values()) { - assertEquals(state, RealtimeSegmentOnlineOfflineStateModel.CONSUMING); + assertEquals(state, SegmentStateModel.CONSUMING); } LLCRealtimeSegmentZKMetadata segmentZKMetadata = @@ -188,7 +188,8 @@ public class PinotLLCRealtimeSegmentManagerTest { // Commit a segment for partition 0 String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); CommittingSegmentDescriptor committingSegmentDescriptor = - new CommittingSegmentDescriptor(committingSegment, new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS), 0L); + new CommittingSegmentDescriptor(committingSegment, new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS), + 0L); committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata()); segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor); @@ -196,13 +197,13 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, String> committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment); assertNotNull(committedSegmentInstanceStateMap); assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()), - Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.ONLINE)); + Collections.singleton(SegmentStateModel.ONLINE)); String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName(); Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment); assertNotNull(consumingSegmentInstanceStateMap); assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()), - Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)); + Collections.singleton(SegmentStateModel.CONSUMING)); // Verify segment ZK metadata for committed segment and new consuming segment LLCRealtimeSegmentZKMetadata committedSegmentZKMetadata = @@ -223,11 +224,10 @@ public class PinotLLCRealtimeSegmentManagerTest { assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS); // Turn one instance of the consuming segment OFFLINE and commit the segment - consumingSegmentInstanceStateMap.entrySet().iterator().next() - .setValue(RealtimeSegmentOnlineOfflineStateModel.OFFLINE); + consumingSegmentInstanceStateMap.entrySet().iterator().next().setValue(SegmentStateModel.OFFLINE); committingSegment = consumingSegment; - committingSegmentDescriptor = - new CommittingSegmentDescriptor(committingSegment, new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS + NUM_DOCS), 0L); + committingSegmentDescriptor = new CommittingSegmentDescriptor(committingSegment, + new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS + NUM_DOCS), 0L); committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata()); segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor); @@ -235,13 +235,13 @@ public class PinotLLCRealtimeSegmentManagerTest { committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment); assertNotNull(committedSegmentInstanceStateMap); assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()), - Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.ONLINE)); + Collections.singleton(SegmentStateModel.ONLINE)); consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2, CURRENT_TIME_MS).getSegmentName(); consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment); assertNotNull(consumingSegmentInstanceStateMap); assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()), - Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)); + Collections.singleton(SegmentStateModel.CONSUMING)); // Illegal segment commit - commit the segment again try { @@ -357,7 +357,7 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, String> instanceStateMap = instanceStatesMap.get(segmentName); assertEquals(instanceStateMap.size(), segmentManager._numReplicas); for (String state : instanceStateMap.values()) { - assertEquals(state, RealtimeSegmentOnlineOfflineStateModel.CONSUMING); + assertEquals(state, SegmentStateModel.CONSUMING); } // NOTE: Old segment ZK metadata might exist when previous round failed due to not enough instances assertTrue(segmentZKMetadataMap.containsKey(segmentName)); @@ -513,15 +513,15 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.remove(consumingSegment); assertNotNull(consumingSegmentInstanceStateMap); assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()), - Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)); + Collections.singleton(SegmentStateModel.CONSUMING)); if (latestCommittedSegment != null) { Map<String, String> latestCommittedSegmentInstanceStateMap = instanceStatesMap.get(latestCommittedSegment); assertNotNull(latestCommittedSegmentInstanceStateMap); for (Map.Entry<String, String> entry : latestCommittedSegmentInstanceStateMap.entrySet()) { // Latest committed segment should have all instances in ONLINE state - assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.ONLINE); - entry.setValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING); + assertEquals(entry.getValue(), SegmentStateModel.ONLINE); + entry.setValue(SegmentStateModel.CONSUMING); } } } @@ -535,8 +535,8 @@ public class PinotLLCRealtimeSegmentManagerTest { assertNotNull(consumingSegmentInstanceStateMap); for (Map.Entry<String, String> entry : consumingSegmentInstanceStateMap.entrySet()) { // Consuming segment should have all instances in CONSUMING state - assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.CONSUMING); - entry.setValue(RealtimeSegmentOnlineOfflineStateModel.OFFLINE); + assertEquals(entry.getValue(), SegmentStateModel.CONSUMING); + entry.setValue(SegmentStateModel.OFFLINE); } } @@ -581,8 +581,8 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, String> instanceStateMap = entry.getValue(); // Skip segments with all instances OFFLINE - if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE) || instanceStateMap - .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) { + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap + .containsValue(SegmentStateModel.CONSUMING)) { LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); int partitionsId = llcSegmentName.getPartitionId(); Map<Integer, String> sequenceNumberToSegmentMap = partitionIdToSegmentsMap.get(partitionsId); @@ -601,8 +601,8 @@ public class PinotLLCRealtimeSegmentManagerTest { // Latest segment should have CONSUMING instance but no ONLINE instance in ideal state Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegment); - assertTrue(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)); - assertFalse(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE)); + assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)); + assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE)); // Latest segment ZK metadata should be IN_PROGRESS assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(), Status.IN_PROGRESS); @@ -612,8 +612,7 @@ public class PinotLLCRealtimeSegmentManagerTest { // Committed segment should have all instances in ONLINE state instanceStateMap = instanceStatesMap.get(segmentName); - assertEquals(new HashSet<>(instanceStateMap.values()), - Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.ONLINE)); + assertEquals(new HashSet<>(instanceStateMap.values()), Collections.singleton(SegmentStateModel.ONLINE)); // Committed segment ZK metadata should be DONE LLCRealtimeSegmentZKMetadata segmentZKMetadata = segmentManager._segmentZKMetadataMap.get(segmentName); @@ -654,7 +653,8 @@ public class PinotLLCRealtimeSegmentManagerTest { // Commit a segment for partition 0 String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); CommittingSegmentDescriptor committingSegmentDescriptor = - new CommittingSegmentDescriptor(committingSegment, new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS), 0L); + new CommittingSegmentDescriptor(committingSegment, new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS), + 0L); committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata()); try { @@ -684,7 +684,8 @@ public class PinotLLCRealtimeSegmentManagerTest { FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(); String segmentLocation = SCHEME + tableDir + "/" + segmentFileName; CommittingSegmentDescriptor committingSegmentDescriptor = - new CommittingSegmentDescriptor(segmentName, new StreamPartitionMsgOffset(PARTITION_OFFSET), 0, segmentLocation); + new CommittingSegmentDescriptor(segmentName, new StreamPartitionMsgOffset(PARTITION_OFFSET), 0, + segmentLocation); segmentManager.commitSegmentFile(REALTIME_TABLE_NAME, committingSegmentDescriptor); assertFalse(segmentFile.exists()); } @@ -709,7 +710,8 @@ public class PinotLLCRealtimeSegmentManagerTest { FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(); String segmentLocation = SCHEME + tableDir + "/" + segmentFileName; CommittingSegmentDescriptor committingSegmentDescriptor = - new CommittingSegmentDescriptor(segmentName, new StreamPartitionMsgOffset(PARTITION_OFFSET), 0, segmentLocation); + new CommittingSegmentDescriptor(segmentName, new StreamPartitionMsgOffset(PARTITION_OFFSET), 0, + segmentLocation); segmentManager.commitSegmentFile(REALTIME_TABLE_NAME, committingSegmentDescriptor); assertFalse(segmentFile.exists()); assertFalse(extraSegmentFile.exists()); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java index 4253c9e..d135ad1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java @@ -41,7 +41,7 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java index 5dcec81..a4e41f9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java @@ -24,10 +24,10 @@ import java.util.TreeMap; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; import org.testng.annotations.Test; -import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING; -import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ERROR; -import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.OFFLINE; -import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java index 1976fa9..80a6a34 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java @@ -31,7 +31,7 @@ import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.ValidationMetrics; -import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.ControllerConf; @@ -229,7 +229,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati idealState -> { assertNotNull(idealState); Map<String, String> instanceStateMap = idealState.getRecord().getMapFields().values().iterator().next(); - instanceStateMap.entrySet().iterator().next().setValue(RealtimeSegmentOnlineOfflineStateModel.OFFLINE); + instanceStateMap.entrySet().iterator().next().setValue(SegmentStateModel.OFFLINE); return idealState; }, RetryPolicies.fixedDelayRetryPolicy(2, 10)); @@ -324,9 +324,9 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati for (Map<String, String> instanceStateMap : idealState.getRecord().getMapFields().values()) { for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { String state = entry.getValue(); - if (state.equals(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) { + if (state.equals(SegmentStateModel.CONSUMING)) { consumingServers.add(entry.getKey()); - } else if (state.equals(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) { + } else if (state.equals(SegmentStateModel.ONLINE)) { completedServers.add(entry.getKey()); } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java index b583b81..8b5db0c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java @@ -161,7 +161,7 @@ public class HelixServerStarter implements ServiceStartable { } if (checkRealtime && !foundConsuming && TableNameBuilder.isRealtimeTableResource(resourceName)) { for (String partitionName : idealState.getPartitionSet()) { - if (StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING + if (StateModel.SegmentStateModel.CONSUMING .equals(idealState.getInstanceStateMap(partitionName).get(_instanceId))) { foundConsuming = true; break; @@ -570,8 +570,7 @@ public class HelixServerStarter implements ServiceStartable { for (String partition : externalView.getPartitionSet()) { Map<String, String> instanceStateMap = externalView.getStateMap(partition); String state = instanceStateMap.get(_instanceId); - if (StateModel.SegmentOnlineOfflineStateModel.ONLINE.equals(state) - || StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING.equals(state)) { + if (StateModel.SegmentStateModel.ONLINE.equals(state) || StateModel.SegmentStateModel.CONSUMING.equals(state)) { return false; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org