This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6bfcacb [Cleanup] Merge RealtimeSegmentOnlineOfflineStateModel and
SegmentOnlineOfflineStateModel in CommonConstants (#5459)
6bfcacb is described below
commit 6bfcacb239f55c27491c2a580bf4a63cf2ec88fe
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri May 29 11:39:44 2020 -0700
[Cleanup] Merge RealtimeSegmentOnlineOfflineStateModel and
SegmentOnlineOfflineStateModel in CommonConstants (#5459)
We only have one SegmentOnlineOfflineStateModel, so there is no value
keeping both of them
---
.../HelixExternalViewBasedQueryQuotaManager.java | 4 +-
.../pinot/broker/routing/RoutingManager.java | 6 +--
.../instanceselector/BaseInstanceSelector.java | 6 +--
.../segmentselector/RealtimeSegmentSelector.java | 4 +-
.../instanceselector/InstanceSelectorTest.java | 8 ++--
.../segmentselector/SegmentSelectorTest.java | 4 +-
.../apache/pinot/common/utils/CommonConstants.java | 10 +---
.../helix/core/PinotHelixResourceManager.java | 20 ++++----
.../segment/OfflineSegmentAssignment.java | 6 +--
.../segment/RealtimeSegmentAssignment.java | 14 +++---
.../assignment/segment/SegmentAssignmentUtils.java | 13 +++---
.../realtime/PinotLLCRealtimeSegmentManager.java | 39 ++++++++--------
.../helix/core/rebalance/TableRebalancer.java | 6 +--
.../helix/core/retention/RetentionManager.java | 3 +-
...fflineNonReplicaGroupSegmentAssignmentTest.java | 14 +++---
.../OfflineReplicaGroupSegmentAssignmentTest.java | 26 +++++------
...altimeNonReplicaGroupSegmentAssignmentTest.java | 19 ++++----
.../RealtimeReplicaGroupSegmentAssignmentTest.java | 19 ++++----
.../segment/SegmentAssignmentUtilsTest.java | 8 ++--
.../PinotLLCRealtimeSegmentManagerTest.java | 54 +++++++++++-----------
.../core/rebalance/TableRebalancerClusterTest.java | 2 +-
.../helix/core/rebalance/TableRebalancerTest.java | 8 ++--
.../ControllerPeriodicTasksIntegrationTest.java | 8 ++--
.../server/starter/helix/HelixServerStarter.java | 5 +-
24 files changed, 148 insertions(+), 158 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 172b13a..3306e58 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -168,7 +168,7 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
if (stateMap != null) {
for (Map.Entry<String, String> state : stateMap.entrySet()) {
if (!_helixManager.getInstanceName().equals(state.getKey()) &&
state.getValue()
-
.equals(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE))
{
+
.equals(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)) {
otherOnlineBrokerCount++;
}
}
@@ -304,7 +304,7 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
int otherOnlineBrokerCount = 0;
for (Map.Entry<String, String> state : stateMap.entrySet()) {
if (!_helixManager.getInstanceName().equals(state.getKey()) &&
state.getValue()
-
.equals(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE))
{
+
.equals(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)) {
otherOnlineBrokerCount++;
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
index 62eece4..e8bf81f 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
@@ -51,7 +51,7 @@ import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.CommonConstants;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.spi.config.table.QueryConfig;
@@ -195,8 +195,8 @@ public class RoutingManager implements ClusterChangeHandler
{
Set<String> onlineSegments = new
HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
Map<String, String> instanceStateMap = entry.getValue();
- if
(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE)
|| instanceStateMap
- .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) ||
instanceStateMap
+ .containsValue(SegmentStateModel.CONSUMING)) {
onlineSegments.add(entry.getKey());
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 404bfd6..512f721 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -31,7 +31,7 @@ import org.apache.helix.model.ExternalView;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.HashUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,9 +153,9 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
String instance = instanceStateEntry.getKey();
String state = instanceStateEntry.getValue();
// Do not track instances in ERROR state
- if (!state.equals(RealtimeSegmentOnlineOfflineStateModel.ERROR)) {
+ if (!state.equals(SegmentStateModel.ERROR)) {
_instanceToSegmentsMap.computeIfAbsent(instance, k -> new
ArrayList<>()).add(segment);
- if (state.equals(RealtimeSegmentOnlineOfflineStateModel.OFFLINE)) {
+ if (state.equals(SegmentStateModel.OFFLINE)) {
offlineInstances.add(instance);
} else {
onlineInstances.add(instance);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
index b89961a..a052da2 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
@@ -28,7 +28,7 @@ import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.helix.model.ExternalView;
import org.apache.pinot.common.request.BrokerRequest;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.HLCSegmentName;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
@@ -90,7 +90,7 @@ public class RealtimeSegmentSelector implements
SegmentSelector {
HLCSegmentName hlcSegmentName = new HLCSegmentName(segment);
groupIdToHLCSegmentsMap.computeIfAbsent(hlcSegmentName.getGroupId(), k
-> new ArrayList<>()).add(segment);
} else {
- if
(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING))
{
+ if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
// Keep the first CONSUMING segment for each partition
LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
partitionIdToFirstConsumingLLCSegmentMap.compute(llcSegmentName.getPartitionId(),
(k, consumingSegment) -> {
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index 8748f77..3806a27 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -34,10 +34,10 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.testng.annotations.Test;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ERROR;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.OFFLINE;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
index 9da10c0..b447bb6 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
@@ -32,8 +32,8 @@ import org.testng.annotations.Test;
import static
org.apache.pinot.broker.routing.segmentselector.RealtimeSegmentSelector.FORCE_HLC;
import static
org.apache.pinot.broker.routing.segmentselector.RealtimeSegmentSelector.ROUTING_OPTIONS_KEY;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEqualsNoOrder;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 18a4bf2..bff81f4 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -56,20 +56,14 @@ public class CommonConstants {
public static final String UNTAGGED_MINION_INSTANCE = "minion_untagged";
public static class StateModel {
- public static class SegmentOnlineOfflineStateModel {
- public static final String ONLINE = "ONLINE";
- public static final String OFFLINE = "OFFLINE";
- public static final String ERROR = "ERROR";
- }
-
- public static class RealtimeSegmentOnlineOfflineStateModel {
+ public static class SegmentStateModel {
public static final String ONLINE = "ONLINE";
public static final String OFFLINE = "OFFLINE";
public static final String ERROR = "ERROR";
public static final String CONSUMING = "CONSUMING";
}
- public static class BrokerOnlineOfflineStateModel {
+ public static class BrokerResourceStateModel {
public static final String ONLINE = "ONLINE";
public static final String OFFLINE = "OFFLINE";
public static final String ERROR = "ERROR";
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ec96836..6674aff 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -72,8 +72,8 @@ import
org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.utils.CommonConstants.Helix;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnlineOfflineStateModel;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
@@ -95,15 +95,15 @@ import
org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TenantConfig;
-import org.apache.pinot.spi.config.instance.Instance;
-import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -631,7 +631,7 @@ public class PinotHelixResourceManager {
instanceStateMap.clear();
}
for (String brokerInstance : brokerInstances) {
- idealState.setPartitionState(tableNameWithType, brokerInstance,
BrokerOnlineOfflineStateModel.ONLINE);
+ idealState.setPartitionState(tableNameWithType, brokerInstance,
BrokerResourceStateModel.ONLINE);
}
return idealState;
}, DEFAULT_RETRY_POLICY);
@@ -651,7 +651,7 @@ public class PinotHelixResourceManager {
Preconditions.checkNotNull(tableConfig);
String brokerTag =
TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
if (brokerTag.equals(brokerTenantTag)) {
- tableIdealState.setPartitionState(tableNameWithType, instanceName,
BrokerOnlineOfflineStateModel.ONLINE);
+ tableIdealState.setPartitionState(tableNameWithType, instanceName,
BrokerResourceStateModel.ONLINE);
}
}
_helixAdmin.setResourceIdealState(_helixClusterName,
Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
@@ -1141,8 +1141,8 @@ public class PinotHelixResourceManager {
List<String> brokers = HelixHelper.getInstancesWithTag(_helixZkManager,
brokerTag);
HelixHelper.updateIdealState(_helixZkManager,
Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
assert idealState != null;
- idealState.getRecord().getMapFields().put(tableNameWithType,
- SegmentAssignmentUtils.getInstanceStateMap(brokers,
BrokerOnlineOfflineStateModel.ONLINE));
+ idealState.getRecord().getMapFields()
+ .put(tableNameWithType,
SegmentAssignmentUtils.getInstanceStateMap(brokers,
BrokerResourceStateModel.ONLINE));
return idealState;
});
}
@@ -1563,7 +1563,7 @@ public class PinotHelixResourceManager {
LOGGER.info("Assigning segment: {} to instances: {} for table: {}",
segmentName, assignedInstances,
offlineTableName);
currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
SegmentOnlineOfflineStateModel.ONLINE));
+ SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
SegmentStateModel.ONLINE));
}
return idealState;
});
@@ -2090,7 +2090,7 @@ public class PinotHelixResourceManager {
_helixAdmin.enableInstance(_helixClusterName, instanceName,
enableInstance);
long intervalWaitTimeMs = 500L;
long deadline = System.currentTimeMillis() + timeOutMs;
- String offlineState = SegmentOnlineOfflineStateModel.OFFLINE;
+ String offlineState = SegmentStateModel.OFFLINE;
while (System.currentTimeMillis() < deadline) {
PropertyKey liveInstanceKey = _keyBuilder.liveInstance(instanceName);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index 166f2a7..3e02759 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -33,7 +33,7 @@ import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -179,8 +179,8 @@ public class OfflineSegmentAssignment implements
SegmentAssignment {
newAssignment = new TreeMap<>();
for (String segment : currentAssignment.keySet()) {
List<String> assignedInstances = assignSegment(segment, newAssignment,
instancePartitions);
- newAssignment.put(segment,
- SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
SegmentOnlineOfflineStateModel.ONLINE));
+ newAssignment
+ .put(segment,
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
SegmentStateModel.ONLINE));
}
} else {
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 177b6ee..22a5f0a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -29,7 +29,7 @@ import java.util.TreeMap;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.assignment.InstancePartitions;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.LLCSegmentName;
import
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -208,8 +208,8 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
newAssignment = new TreeMap<>();
for (String segment : completedSegmentAssignment.keySet()) {
List<String> assignedInstances = assignCompletedSegment(segment,
newAssignment, completedInstancePartitions);
- newAssignment.put(segment, SegmentAssignmentUtils
- .getInstanceStateMap(assignedInstances,
RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ newAssignment
+ .put(segment,
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
SegmentStateModel.ONLINE));
}
} else {
if (completedInstancePartitions.getNumReplicaGroups() == 1) {
@@ -252,8 +252,8 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
newAssignment = new TreeMap<>();
for (String segmentName : completedSegmentAssignment.keySet()) {
List<String> instancesAssigned = assignConsumingSegment(segmentName,
consumingInstancePartitions);
- Map<String, String> instanceStateMap = SegmentAssignmentUtils
- .getInstanceStateMap(instancesAssigned,
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+ Map<String, String> instanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE);
newAssignment.put(segmentName, instanceStateMap);
}
}
@@ -267,8 +267,8 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
for (String segmentName : consumingSegmentAssignment.keySet()) {
List<String> instancesAssigned = assignConsumingSegment(segmentName,
consumingInstancePartitions);
- Map<String, String> instanceStateMap = SegmentAssignmentUtils
- .getInstanceStateMap(instancesAssigned,
RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ Map<String, String> instanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING);
newAssignment.put(segmentName, instanceStateMap);
}
} else {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 1b301ad..edf8553 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -30,8 +30,7 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.pinot.common.assignment.InstancePartitions;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.Pairs;
@@ -142,7 +141,7 @@ public class SegmentAssignmentUtils {
Map<String, Map<String, String>> currentAssignment, List<String>
instances, int replication) {
// Use Helix AutoRebalanceStrategy to rebalance the table
LinkedHashMap<String, Integer> states = new LinkedHashMap<>();
- states.put(SegmentOnlineOfflineStateModel.ONLINE, replication);
+ states.put(SegmentStateModel.ONLINE, replication);
AutoRebalanceStrategy autoRebalanceStrategy =
new AutoRebalanceStrategy(null, new
ArrayList<>(currentAssignment.keySet()), states);
// Make a copy of the current assignment because this step might change
the passed in assignment
@@ -252,8 +251,8 @@ public class SegmentAssignmentUtils {
Map<String, String> instanceStateMap = new TreeMap<>();
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- instanceStateMap.put(instancePartitions.getInstances(partitionId,
replicaGroupId).get(instanceId),
- SegmentOnlineOfflineStateModel.ONLINE);
+ instanceStateMap
+ .put(instancePartitions.getInstances(partitionId,
replicaGroupId).get(instanceId), SegmentStateModel.ONLINE);
}
return instanceStateMap;
}
@@ -305,9 +304,9 @@ public class SegmentAssignmentUtils {
for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
- if
(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE))
{
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
_completedSegmentAssignment.put(segmentName, instanceStateMap);
- } else if
(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING))
{
+ } else if
(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
_consumingSegmentAssignment.put(segmentName, instanceStateMap);
} else {
_offlineSegmentAssignment.put(segmentName, instanceStateMap);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 2325e92..8cae0ec 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -49,7 +49,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
@@ -412,9 +412,9 @@ public class PinotLLCRealtimeSegmentManager {
TableConfig tableConfig = getTableConfig(realtimeTableName);
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
IdealState idealState = getIdealState(realtimeTableName);
-
Preconditions.checkState(idealState.getInstanceStateMap(committingSegmentName)
- .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING),
- "Failed to find instance in CONSUMING state in IdealState for segment:
%s", committingSegmentName);
+ Preconditions
+
.checkState(idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
+ "Failed to find instance in CONSUMING state in IdealState for
segment: %s", committingSegmentName);
int numPartitions = getNumPartitionsFromIdealState(idealState);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
@@ -608,11 +608,11 @@ public class PinotLLCRealtimeSegmentManager {
assert idealState != null;
Map<String, String> stateMap =
idealState.getInstanceStateMap(segmentName);
String state = stateMap.get(instanceName);
- if (RealtimeSegmentOnlineOfflineStateModel.CONSUMING.equals(state)) {
- stateMap.put(instanceName,
RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ if (SegmentStateModel.CONSUMING.equals(state)) {
+ stateMap.put(instanceName, SegmentStateModel.OFFLINE);
} else {
- LOGGER.info("Segment {} in state {} when trying to register
consumption stop from {}",
- segmentName, state, instanceName);
+ LOGGER.info("Segment {} in state {} when trying to register
consumption stop from {}", segmentName, state,
+ instanceName);
}
return idealState;
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
@@ -733,16 +733,16 @@ public class PinotLLCRealtimeSegmentManager {
if (committingSegmentName != null) {
// Change committing segment state to ONLINE
Set<String> instances =
instanceStatesMap.get(committingSegmentName).keySet();
- instanceStatesMap.put(committingSegmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instances,
RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ instanceStatesMap
+ .put(committingSegmentName,
SegmentAssignmentUtils.getInstanceStateMap(instances,
SegmentStateModel.ONLINE));
LOGGER.info("Updating segment: {} to ONLINE state",
committingSegmentName);
}
// Assign instances to the new segment and add instances as state CONSUMING
List<String> instancesAssigned =
segmentAssignment.assignSegment(newSegmentName, instanceStatesMap,
instancePartitionsMap);
- instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils
- .getInstanceStateMap(instancesAssigned,
RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ instanceStatesMap.put(newSegmentName,
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
LOGGER.info("Adding new CONSUMING segment: {} to instances: {}",
newSegmentName, instancesAssigned);
}
@@ -854,7 +854,7 @@ public class PinotLLCRealtimeSegmentManager {
Map<String, String> instanceStateMap =
instanceStatesMap.get(latestSegmentName);
if (instanceStateMap != null) {
// Latest segment of metadata is in idealstate.
- if
(instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.CONSUMING))
{
+ if (instanceStateMap.values().contains(SegmentStateModel.CONSUMING)) {
if (latestSegmentZKMetadata.getStatus() == Status.DONE) {
// step-1 of commmitSegmentMetadata is done (i.e. marking old
segment as DONE)
@@ -868,9 +868,8 @@ public class PinotLLCRealtimeSegmentManager {
LLCSegmentName newLLCSegmentName =
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();
- CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(latestSegmentName,
- new
StreamPartitionMsgOffset(latestSegmentZKMetadata.getEndOffset()), 0);
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(latestSegmentName,
+ new
StreamPartitionMsgOffset(latestSegmentZKMetadata.getEndOffset()), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig,
newLLCSegmentName, currentTimeMs,
committingSegmentDescriptor, latestSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
latestSegmentName, newSegmentName,
@@ -883,7 +882,7 @@ public class PinotLLCRealtimeSegmentManager {
// 1. all replicas OFFLINE and metadata IN_PROGRESS/DONE - a segment
marked itself OFFLINE during consumption for some reason
// 2. all replicas ONLINE and metadata DONE - Resolved in
https://github.com/linkedin/pinot/pull/2890
// 3. we should never end up with some replicas ONLINE and some
OFFLINE.
- if (isAllInstancesInState(instanceStateMap,
RealtimeSegmentOnlineOfflineStateModel.OFFLINE)) {
+ if (isAllInstancesInState(instanceStateMap,
SegmentStateModel.OFFLINE)) {
LOGGER.info("Repairing segment: {} which is OFFLINE for all
instances in IdealState", latestSegmentName);
// Create a new segment to re-consume from the previous start
offset
@@ -932,7 +931,7 @@ public class PinotLLCRealtimeSegmentManager {
for (Map.Entry<String, Map<String, String>> segmentEntry :
instanceStatesMap.entrySet()) {
LLCSegmentName llcSegmentName = new
LLCSegmentName(segmentEntry.getKey());
if (llcSegmentName.getPartitionId() == partitionId &&
segmentEntry.getValue()
-
.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ .containsValue(SegmentStateModel.CONSUMING)) {
previousConsumingSegment = llcSegmentName.getSegmentName();
break;
}
@@ -985,8 +984,8 @@ public class PinotLLCRealtimeSegmentManager {
new LLCSegmentName(rawTableName, partitionId,
STARTING_SEQUENCE_NUMBER, creationTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();
long startOffset = getPartitionOffset(streamConfig,
streamConfig.getOffsetCriteria(), partitionId);
- CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(null,
- new StreamPartitionMsgOffset(startOffset), 0);
+ CommittingSegmentDescriptor committingSegmentDescriptor =
+ new CommittingSegmentDescriptor(null, new
StreamPartitionMsgOffset(startOffset), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
committingSegmentDescriptor, null, instancePartitions, numPartitions,
numReplicas);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index c6c5d57..5115fe1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -35,7 +35,7 @@ import org.apache.helix.model.IdealState;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
@@ -473,7 +473,7 @@ public class TableRebalancer {
for (Map.Entry<String, String> instanceStateEntry :
idealStateInstanceStateMap.entrySet()) {
// Ignore OFFLINE state in IdealState
String idealStateInstanceState = instanceStateEntry.getValue();
- if
(idealStateInstanceState.equals(RealtimeSegmentOnlineOfflineStateModel.OFFLINE))
{
+ if (idealStateInstanceState.equals(SegmentStateModel.OFFLINE)) {
continue;
}
@@ -486,7 +486,7 @@ public class TableRebalancer {
String instanceName = instanceStateEntry.getKey();
String externalViewInstanceState =
externalViewInstanceStateMap.get(instanceName);
if (!idealStateInstanceState.equals(externalViewInstanceState)) {
- if
(RealtimeSegmentOnlineOfflineStateModel.ERROR.equals(externalViewInstanceState))
{
+ if (SegmentStateModel.ERROR.equals(externalViewInstanceState)) {
if (bestEfforts) {
LOGGER
.warn("Found ERROR instance: {} for segment: {}, table: {},
counting it as good state (best-efforts)",
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index a616fc6..140e64e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -174,8 +174,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
} else {
// Delete segment if all of its replicas are OFFLINE
Set<String> states = new HashSet<>(stateMap.values());
- return states.size() == 1 && states
-
.contains(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.OFFLINE);
+ return states.size() == 1 &&
states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
}
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
index d037725..104770f 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupSegmentAssignmentTest.java
@@ -27,7 +27,7 @@ import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.assignment.InstancePartitions;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -95,8 +95,8 @@ public class OfflineNonReplicaGroupSegmentAssignmentTest {
assertEquals(instancesAssigned.get(replicaId),
INSTANCES.get(expectedAssignedInstanceId));
expectedAssignedInstanceId = (expectedAssignedInstanceId + 1) %
NUM_INSTANCES;
}
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
}
@@ -106,8 +106,8 @@ public class OfflineNonReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned =
_segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
// There should be 100 segments assigned
@@ -134,8 +134,8 @@ public class OfflineNonReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned =
_segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
// Bootstrap table should reassign all segments based on their
alphabetical order
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
index 31f3bb4..78a06bc 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import
org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy;
import
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
@@ -201,8 +201,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
assertEquals(instancesAssigned.get(replicaGroupId),
INSTANCES.get(expectedAssignedInstanceId));
}
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
}
@@ -234,8 +234,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
assertEquals(instancesAssigned.get(replicaGroupId),
INSTANCES.get(expectedAssignedInstanceId));
}
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
}
@@ -245,8 +245,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithoutPartition
.assignSegment(segmentName, currentAssignment,
_instancePartitionsMapWithoutPartition);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
// There should be 90 segments assigned
@@ -274,8 +274,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithPartition
.assignSegment(segmentName, currentAssignment,
_instancePartitionsMapWithPartition);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
// There should be 90 segments assigned
@@ -303,8 +303,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithoutPartition
.assignSegment(segmentName, currentAssignment,
_instancePartitionsMapWithoutPartition);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
// Bootstrap table should reassign all segments based on their
alphabetical order
@@ -326,8 +326,8 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
for (String segmentName : SEGMENTS) {
List<String> instancesAssigned = _segmentAssignmentWithPartition
.assignSegment(segmentName, currentAssignment,
_instancePartitionsMapWithPartition);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
// Bootstrap table should reassign all segments based on their
alphabetical order within the partition
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 51a13ab..b659514 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -24,7 +24,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.pinot.common.assignment.InstancePartitions;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.LLCSegmentName;
import
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -145,7 +145,7 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
String offlineSegmentName = "offlineSegment";
Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
.getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_",
NUM_REPLICAS),
- RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ SegmentStateModel.OFFLINE);
currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
// Rebalance without COMPLETED instance partitions should not change the
segment assignment
@@ -167,14 +167,14 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest
{
Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(),
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+ assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
}
} else {
// CONSUMING segments
Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(),
RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
}
}
}
@@ -215,10 +215,10 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest
{
for (Map.Entry<String, Map<String, String>> entry :
newAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
- if
(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE))
{
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
for (int i = 0; i < NUM_REPLICAS; i++) {
String expectedInstance = COMPLETED_INSTANCES.get(index++ %
NUM_COMPLETED_INSTANCES);
- assertEquals(instanceStateMap.get(expectedInstance),
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+ assertEquals(instanceStateMap.get(expectedInstance),
SegmentStateModel.ONLINE);
}
} else {
// CONSUMING and OFFLINE segments should not be reassigned
@@ -234,12 +234,11 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest
{
String lastSegmentInPartition = _segments.get(segmentId -
NUM_PARTITIONS);
Map<String, String> instanceStateMap =
currentAssignment.get(lastSegmentInPartition);
currentAssignment.put(lastSegmentInPartition, SegmentAssignmentUtils
- .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()),
- RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()),
SegmentStateModel.ONLINE));
}
// Add the new segment into the assignment as CONSUMING
- currentAssignment.put(_segments.get(segmentId), SegmentAssignmentUtils
- .getInstanceStateMap(instancesAssigned,
RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ currentAssignment.put(_segments.get(segmentId),
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index 6eb5148..ee3c3c5 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -25,7 +25,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.pinot.common.assignment.InstancePartitions;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import
org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.common.utils.LLCSegmentName;
import
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
@@ -170,7 +170,7 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
String offlineSegmentName = "offlineSegment";
Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
.getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_",
NUM_REPLICAS),
- RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ SegmentStateModel.OFFLINE);
currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
// Rebalance without COMPLETED instance partitions should not change the
segment assignment
@@ -192,14 +192,14 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(),
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+ assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
}
} else {
// CONSUMING segments
Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(),
RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
}
}
}
@@ -239,12 +239,12 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
for (Map.Entry<String, Map<String, String>> entry :
newAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
- if
(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE))
{
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
int expectedInstanceId = index++ %
numCompletedInstancesPerReplicaGroup;
for (int i = 0; i < NUM_REPLICAS; i++) {
String expectedInstance =
COMPLETED_INSTANCES.get(expectedInstanceId + i *
numCompletedInstancesPerReplicaGroup);
- assertEquals(instanceStateMap.get(expectedInstance),
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+ assertEquals(instanceStateMap.get(expectedInstance),
SegmentStateModel.ONLINE);
}
} else {
// CONSUMING and OFFLINE segments should not be reassigned
@@ -260,12 +260,11 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
String lastSegmentInPartition = _segments.get(segmentId -
NUM_PARTITIONS);
Map<String, String> instanceStateMap =
currentAssignment.get(lastSegmentInPartition);
currentAssignment.put(lastSegmentInPartition, SegmentAssignmentUtils
- .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()),
- RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()),
SegmentStateModel.ONLINE));
}
// Add the new segment into the assignment as CONSUMING
- currentAssignment.put(_segments.get(segmentId), SegmentAssignmentUtils
- .getInstanceStateMap(instancesAssigned,
RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ currentAssignment.put(_segments.get(segmentId),
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
index ef8364e..6e867c6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.pinot.common.assignment.InstancePartitions;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -57,8 +57,8 @@ public class SegmentAssignmentUtilsTest {
instancesAssigned.add(instances.get(assignedInstanceId));
assignedInstanceId = (assignedInstanceId + 1) % numInstances;
}
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
// There should be 100 segments assigned
@@ -240,7 +240,7 @@ public class SegmentAssignmentUtilsTest {
instancesAssigned.add(instances.get(assignedInstanceId));
}
currentAssignment.put(segments.get(segmentId),
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentOnlineOfflineStateModel.ONLINE));
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
}
// There should be 90 segments assigned
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index db98072..485ed6c 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -38,7 +38,7 @@ import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.CommonConstants.Helix;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
@@ -157,7 +157,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertNotNull(instanceStateMap);
assertEquals(instanceStateMap.size(), numReplicas);
for (String state : instanceStateMap.values()) {
- assertEquals(state, RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ assertEquals(state, SegmentStateModel.CONSUMING);
}
LLCRealtimeSegmentZKMetadata segmentZKMetadata =
@@ -188,7 +188,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Commit a segment for partition 0
String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(committingSegment, new
StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS), 0L);
+ new CommittingSegmentDescriptor(committingSegment, new
StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS),
+ 0L);
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
@@ -196,13 +197,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, String> committedSegmentInstanceStateMap =
instanceStatesMap.get(committingSegment);
assertNotNull(committedSegmentInstanceStateMap);
assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
- Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ Collections.singleton(SegmentStateModel.ONLINE));
String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1,
CURRENT_TIME_MS).getSegmentName();
Map<String, String> consumingSegmentInstanceStateMap =
instanceStatesMap.get(consumingSegment);
assertNotNull(consumingSegmentInstanceStateMap);
assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
-
Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ Collections.singleton(SegmentStateModel.CONSUMING));
// Verify segment ZK metadata for committed segment and new consuming
segment
LLCRealtimeSegmentZKMetadata committedSegmentZKMetadata =
@@ -223,11 +224,10 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertEquals(committedSegmentZKMetadata.getCreationTime(),
CURRENT_TIME_MS);
// Turn one instance of the consuming segment OFFLINE and commit the
segment
- consumingSegmentInstanceStateMap.entrySet().iterator().next()
- .setValue(RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+
consumingSegmentInstanceStateMap.entrySet().iterator().next().setValue(SegmentStateModel.OFFLINE);
committingSegment = consumingSegment;
- committingSegmentDescriptor =
- new CommittingSegmentDescriptor(committingSegment, new
StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS + NUM_DOCS), 0L);
+ committingSegmentDescriptor = new
CommittingSegmentDescriptor(committingSegment,
+ new StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS + NUM_DOCS),
0L);
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
@@ -235,13 +235,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
committedSegmentInstanceStateMap =
instanceStatesMap.get(committingSegment);
assertNotNull(committedSegmentInstanceStateMap);
assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
- Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ Collections.singleton(SegmentStateModel.ONLINE));
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2,
CURRENT_TIME_MS).getSegmentName();
consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
assertNotNull(consumingSegmentInstanceStateMap);
assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
-
Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ Collections.singleton(SegmentStateModel.CONSUMING));
// Illegal segment commit - commit the segment again
try {
@@ -357,7 +357,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, String> instanceStateMap =
instanceStatesMap.get(segmentName);
assertEquals(instanceStateMap.size(), segmentManager._numReplicas);
for (String state : instanceStateMap.values()) {
- assertEquals(state, RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ assertEquals(state, SegmentStateModel.CONSUMING);
}
// NOTE: Old segment ZK metadata might exist when previous round failed
due to not enough instances
assertTrue(segmentZKMetadataMap.containsKey(segmentName));
@@ -513,15 +513,15 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, String> consumingSegmentInstanceStateMap =
instanceStatesMap.remove(consumingSegment);
assertNotNull(consumingSegmentInstanceStateMap);
assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
-
Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
+ Collections.singleton(SegmentStateModel.CONSUMING));
if (latestCommittedSegment != null) {
Map<String, String> latestCommittedSegmentInstanceStateMap =
instanceStatesMap.get(latestCommittedSegment);
assertNotNull(latestCommittedSegmentInstanceStateMap);
for (Map.Entry<String, String> entry :
latestCommittedSegmentInstanceStateMap.entrySet()) {
// Latest committed segment should have all instances in ONLINE state
- assertEquals(entry.getValue(),
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
- entry.setValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+ assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
+ entry.setValue(SegmentStateModel.CONSUMING);
}
}
}
@@ -535,8 +535,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertNotNull(consumingSegmentInstanceStateMap);
for (Map.Entry<String, String> entry :
consumingSegmentInstanceStateMap.entrySet()) {
// Consuming segment should have all instances in CONSUMING state
- assertEquals(entry.getValue(),
RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
- entry.setValue(RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
+ entry.setValue(SegmentStateModel.OFFLINE);
}
}
@@ -581,8 +581,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, String> instanceStateMap = entry.getValue();
// Skip segments with all instances OFFLINE
- if
(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE)
|| instanceStateMap
- .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) ||
instanceStateMap
+ .containsValue(SegmentStateModel.CONSUMING)) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
int partitionsId = llcSegmentName.getPartitionId();
Map<Integer, String> sequenceNumberToSegmentMap =
partitionIdToSegmentsMap.get(partitionsId);
@@ -601,8 +601,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Latest segment should have CONSUMING instance but no ONLINE instance
in ideal state
Map<String, String> instanceStateMap =
instanceStatesMap.get(latestSegment);
-
assertTrue(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING));
-
assertFalse(instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING));
+ assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE));
// Latest segment ZK metadata should be IN_PROGRESS
assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(),
Status.IN_PROGRESS);
@@ -612,8 +612,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Committed segment should have all instances in ONLINE state
instanceStateMap = instanceStatesMap.get(segmentName);
- assertEquals(new HashSet<>(instanceStateMap.values()),
-
Collections.singleton(RealtimeSegmentOnlineOfflineStateModel.ONLINE));
+ assertEquals(new HashSet<>(instanceStateMap.values()),
Collections.singleton(SegmentStateModel.ONLINE));
// Committed segment ZK metadata should be DONE
LLCRealtimeSegmentZKMetadata segmentZKMetadata =
segmentManager._segmentZKMetadataMap.get(segmentName);
@@ -654,7 +653,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Commit a segment for partition 0
String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(committingSegment, new
StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS), 0L);
+ new CommittingSegmentDescriptor(committingSegment, new
StreamPartitionMsgOffset(PARTITION_OFFSET + NUM_DOCS),
+ 0L);
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
try {
@@ -684,7 +684,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
String segmentLocation = SCHEME + tableDir + "/" + segmentFileName;
CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(segmentName, new
StreamPartitionMsgOffset(PARTITION_OFFSET), 0, segmentLocation);
+ new CommittingSegmentDescriptor(segmentName, new
StreamPartitionMsgOffset(PARTITION_OFFSET), 0,
+ segmentLocation);
segmentManager.commitSegmentFile(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
assertFalse(segmentFile.exists());
}
@@ -709,7 +710,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
String segmentLocation = SCHEME + tableDir + "/" + segmentFileName;
CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(segmentName, new
StreamPartitionMsgOffset(PARTITION_OFFSET), 0, segmentLocation);
+ new CommittingSegmentDescriptor(segmentName, new
StreamPartitionMsgOffset(PARTITION_OFFSET), 0,
+ segmentLocation);
segmentManager.commitSegmentFile(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
assertFalse(segmentFile.exists());
assertFalse(extraSegmentFile.exists());
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java
index 4253c9e..d135ad1 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterTest.java
@@ -41,7 +41,7 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.ONLINE;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index 5dcec81..a4e41f9 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -24,10 +24,10 @@ import java.util.TreeMap;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.testng.annotations.Test;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ERROR;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.OFFLINE;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
index 1976fa9..80a6a34 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -31,7 +31,7 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
-import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
@@ -229,7 +229,7 @@ public class ControllerPeriodicTasksIntegrationTest extends
BaseClusterIntegrati
idealState -> {
assertNotNull(idealState);
Map<String, String> instanceStateMap =
idealState.getRecord().getMapFields().values().iterator().next();
-
instanceStateMap.entrySet().iterator().next().setValue(RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+
instanceStateMap.entrySet().iterator().next().setValue(SegmentStateModel.OFFLINE);
return idealState;
}, RetryPolicies.fixedDelayRetryPolicy(2, 10));
@@ -324,9 +324,9 @@ public class ControllerPeriodicTasksIntegrationTest extends
BaseClusterIntegrati
for (Map<String, String> instanceStateMap :
idealState.getRecord().getMapFields().values()) {
for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
String state = entry.getValue();
- if (state.equals(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ if (state.equals(SegmentStateModel.CONSUMING)) {
consumingServers.add(entry.getKey());
- } else if
(state.equals(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
+ } else if (state.equals(SegmentStateModel.ONLINE)) {
completedServers.add(entry.getKey());
}
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index b583b81..8b5db0c 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -161,7 +161,7 @@ public class HelixServerStarter implements ServiceStartable
{
}
if (checkRealtime && !foundConsuming &&
TableNameBuilder.isRealtimeTableResource(resourceName)) {
for (String partitionName : idealState.getPartitionSet()) {
- if (StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING
+ if (StateModel.SegmentStateModel.CONSUMING
.equals(idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
foundConsuming = true;
break;
@@ -570,8 +570,7 @@ public class HelixServerStarter implements ServiceStartable
{
for (String partition : externalView.getPartitionSet()) {
Map<String, String> instanceStateMap =
externalView.getStateMap(partition);
String state = instanceStateMap.get(_instanceId);
- if (StateModel.SegmentOnlineOfflineStateModel.ONLINE.equals(state)
- ||
StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING.equals(state)) {
+ if (StateModel.SegmentStateModel.ONLINE.equals(state) ||
StateModel.SegmentStateModel.CONSUMING.equals(state)) {
return false;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]