This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f9074f9a91 Enhance SegmentStatusChecker to honor CONSUMING segment
(#13562)
f9074f9a91 is described below
commit f9074f9a914b61621ddba497dbe259da40bdb07d
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jul 10 11:56:26 2024 -0700
Enhance SegmentStatusChecker to honor CONSUMING segment (#13562)
---
.../controller/helix/SegmentStatusChecker.java | 285 ++++---
.../controller/helix/SegmentStatusCheckerTest.java | 929 +++++++--------------
2 files changed, 472 insertions(+), 742 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 6488ecb022..ceb33402e8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.controller.helix;
-import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -51,6 +52,8 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.stream.StreamConfig;
+import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -63,22 +66,18 @@ import org.slf4j.LoggerFactory;
*/
public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusChecker.Context> {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentStatusChecker.class);
- private static final int MAX_OFFLINE_SEGMENTS_TO_LOG = 5;
- public static final String ONLINE = "ONLINE";
- public static final String ERROR = "ERROR";
- public static final String CONSUMING = "CONSUMING";
-
- // log messages about disabled tables atmost once a day
- private static final long DISABLED_TABLE_LOG_INTERVAL_MS =
TimeUnit.DAYS.toMillis(1);
private static final ZNRecordSerializer RECORD_SERIALIZER = new
ZNRecordSerializer();
-
private static final int TABLE_CHECKER_TIMEOUT_MS = 30_000;
+ // log messages about disabled tables at most once a day
+ private static final long DISABLED_TABLE_LOG_INTERVAL_MS =
TimeUnit.DAYS.toMillis(1);
+ private static final int MAX_SEGMENTS_TO_LOG = 10;
+
private final int _waitForPushTimeSeconds;
- private long _lastDisabledTableLogTimestamp = 0;
+ private final TableSizeReader _tableSizeReader;
+ private final Set<String> _tierBackendGauges = new HashSet<>();
- private TableSizeReader _tableSizeReader;
- private Set<String> _tierBackendGauges = new HashSet<>();
+ private long _lastDisabledTableLogTimestamp = 0;
/**
* Constructs the segment status checker.
@@ -190,7 +189,7 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
}
}
tierBackendSet.forEach(tierBackend ->
context._tierBackendTableCountMap.put(tierBackend,
- context._tierBackendTableCountMap.getOrDefault(tierBackend, 0) +
1));
+ context._tierBackendTableCountMap.getOrDefault(tierBackend, 0) + 1));
context._tierBackendConfiguredTableCount += tierBackendSet.isEmpty() ? 0
: 1;
}
int replication = tableConfig.getReplication();
@@ -226,145 +225,173 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
return;
}
- //check if table consumption is paused
- boolean isTablePaused =
-
Boolean.parseBoolean(idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.IS_TABLE_PAUSED));
-
- if (isTablePaused) {
+ if
(Boolean.parseBoolean(idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.IS_TABLE_PAUSED)))
{
context._pausedTables.add(tableNameWithType);
}
- if (idealState.getPartitionSet().isEmpty()) {
- int nReplicasFromIdealState = 1;
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.IDEALSTATE_ZNODE_SIZE,
+ idealState.toString().length());
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE,
+ idealState.serialize(RECORD_SERIALIZER).length);
+
+ Set<String> segmentsIncludingReplaced = idealState.getPartitionSet();
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED,
+ segmentsIncludingReplaced.size());
+ // Get the segments excluding the replaced segments which are specified in
the segment lineage entries and cannot
+ // be queried from the table.
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
_pinotHelixResourceManager.getPropertyStore();
+ Set<String> segments;
+ if (segmentsIncludingReplaced.isEmpty()) {
+ segments = Set.of();
+ } else {
+ segments = new HashSet<>(segmentsIncludingReplaced);
+ SegmentLineage segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType);
+ SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segments,
segmentLineage);
+ }
+ int numSegments = segments.size();
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENT_COUNT, numSegments);
+ if (numSegments == 0) {
+ int numReplicasFromIS;
try {
- nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
+ numReplicasFromIS =
Math.max(Integer.parseInt(idealState.getReplicas()), 1);
} catch (NumberFormatException e) {
- // Ignore
+ numReplicasFromIS = 1;
}
- _controllerMetrics
- .setValueOfTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS, numReplicasFromIS);
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_OF_REPLICAS, 100);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0);
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS, 0);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.TABLE_COMPRESSED_SIZE, 0);
return;
}
- // Get the segments excluding the replaced segments which are specified in
the segment lineage entries and cannot
- // be queried from the table.
- Set<String> segmentsExcludeReplaced = new
HashSet<>(idealState.getPartitionSet());
- ZkHelixPropertyStore<ZNRecord> propertyStore =
_pinotHelixResourceManager.getPropertyStore();
- SegmentLineage segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType);
-
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segmentsExcludeReplaced,
segmentLineage);
- _controllerMetrics
- .setValueOfTableGauge(tableNameWithType,
ControllerGauge.IDEALSTATE_ZNODE_SIZE, idealState.toString().length());
- _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE,
- idealState.serialize(RECORD_SERIALIZER).length);
- _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENT_COUNT,
- (long) segmentsExcludeReplaced.size());
- _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED,
- (long) (idealState.getPartitionSet().size()));
ExternalView externalView =
_pinotHelixResourceManager.getTableExternalView(tableNameWithType);
- int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in
ideal state
- int nReplicasExternal = -1; // Keeps track of minimum number of replicas
in external view
- int nErrors = 0; // Keeps track of number of segments in error state
- int nOffline = 0; // Keeps track of number segments with no online replicas
- int nNumOfReplicasLessThanIdeal = 0; // Keeps track of number of segments
running with less than expected replicas
- int nSegments = 0; // Counts number of segments
- long tableCompressedSize = 0; // Tracks the total compressed segment size
in deep store per table
- for (String partitionName : segmentsExcludeReplaced) {
- int nReplicas = 0;
- int nIdeal = 0;
- nSegments++;
- // Skip segments not online in ideal state
- for (Map.Entry<String, String> serverAndState :
idealState.getInstanceStateMap(partitionName).entrySet()) {
- if (serverAndState == null) {
- break;
- }
- if (serverAndState.getValue().equals(ONLINE)) {
- nIdeal++;
- break;
+ // Maximum number of replicas in ideal state
+ int maxISReplicas = Integer.MIN_VALUE;
+ // Minimum number of replicas in external view
+ int minEVReplicas = Integer.MAX_VALUE;
+ // Total compressed segment size in deep store
+ long tableCompressedSize = 0;
+ // Segments without ZK metadata
+ List<String> segmentsWithoutZKMetadata = new ArrayList<>();
+ // Pairs of segment-instance in ERROR state
+ List<Pair<String, String>> errorSegments = new ArrayList<>();
+ // Offline segments
+ List<String> offlineSegments = new ArrayList<>();
+ // Segments with fewer replicas online (ONLINE/CONSUMING) in external view
than in ideal state
+ List<String> partialOnlineSegments = new ArrayList<>();
+ for (String segment : segments) {
+ int numISReplicas = 0;
+ for (Map.Entry<String, String> entry :
idealState.getInstanceStateMap(segment).entrySet()) {
+ String state = entry.getValue();
+ if (state.equals(SegmentStateModel.ONLINE) ||
state.equals(SegmentStateModel.CONSUMING)) {
+ numISReplicas++;
}
}
- if (nIdeal == 0) {
- // No online segments in ideal state
+ // Skip segments not ONLINE/CONSUMING in ideal state
+ if (numISReplicas == 0) {
continue;
}
- SegmentZKMetadata segmentZKMetadata =
- _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType,
partitionName);
- if (segmentZKMetadata != null
- && segmentZKMetadata.getPushTime() > System.currentTimeMillis() -
_waitForPushTimeSeconds * 1000) {
- // Push is not finished yet, skip the segment
+ maxISReplicas = Math.max(maxISReplicas, numISReplicas);
+
+ SegmentZKMetadata segmentZKMetadata =
_pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment);
+ // Skip the segment when it doesn't have ZK metadata. Most likely the
segment is just deleted.
+ if (segmentZKMetadata == null) {
+ segmentsWithoutZKMetadata.add(segment);
continue;
}
- if (segmentZKMetadata != null) {
- long sizeInBytes = segmentZKMetadata.getSizeInBytes();
- if (sizeInBytes > 0) {
- tableCompressedSize += sizeInBytes;
- }
+ long sizeInBytes = segmentZKMetadata.getSizeInBytes();
+ if (sizeInBytes > 0) {
+ tableCompressedSize += sizeInBytes;
}
- nReplicasIdealMax =
(idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ?
idealState
- .getInstanceStateMap(partitionName).size() : nReplicasIdealMax;
- if ((externalView == null) || (externalView.getStateMap(partitionName)
== null)) {
- // No replicas for this segment
- nOffline++;
- if (nOffline < MAX_OFFLINE_SEGMENTS_TO_LOG) {
- LOGGER.warn("Segment {} of table {} has no replicas", partitionName,
tableNameWithType);
- }
- nReplicasExternal = 0;
+
+ // NOTE: We want to skip segments that are just created/pushed to avoid
false alerts because it is expected for
+ // servers to take some time to load them. For consuming
(IN_PROGRESS) segments, we use creation time from
+ // the ZK metadata; for pushed segments, we use push time from the
ZK metadata. Both of them are the time
+ // when segment is newly created. For committed segments from
real-time table, push time doesn't exist, and
+ // creationTimeMs will be Long.MIN_VALUE, which is fine because we
want to include them in the check.
+ long creationTimeMs = segmentZKMetadata.getStatus() ==
Status.IN_PROGRESS ? segmentZKMetadata.getCreationTime()
+ : segmentZKMetadata.getPushTime();
+ if (creationTimeMs > System.currentTimeMillis() -
_waitForPushTimeSeconds * 1000L) {
continue;
}
- for (Map.Entry<String, String> serverAndState :
externalView.getStateMap(partitionName).entrySet()) {
- // Count number of online replicas. Ignore if state is CONSUMING.
- // It is possible for a segment to be ONLINE in idealstate, and
CONSUMING in EV for a short period of time.
- // So, ignore this combination. If a segment exists in this
combination for a long time, we will get
- // low level-partition-not-consuming alert anyway.
- if (serverAndState.getValue().equals(ONLINE) ||
serverAndState.getValue().equals(CONSUMING)) {
- nReplicas++;
- }
- if (serverAndState.getValue().equals(ERROR)) {
- nErrors++;
+
+ int numEVReplicas = 0;
+ if (externalView != null) {
+ Map<String, String> stateMap = externalView.getStateMap(segment);
+ if (stateMap != null) {
+ for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+ String state = entry.getValue();
+ if (state.equals(SegmentStateModel.ONLINE) ||
state.equals(SegmentStateModel.CONSUMING)) {
+ numEVReplicas++;
+ }
+ if (state.equals(SegmentStateModel.ERROR)) {
+ errorSegments.add(Pair.of(segment, entry.getKey()));
+ }
+ }
}
}
- if (nReplicas == 0) {
- if (nOffline < MAX_OFFLINE_SEGMENTS_TO_LOG) {
- LOGGER.warn("Segment {} of table {} has no online replicas",
partitionName, tableNameWithType);
- }
- nOffline++;
- } else if (nReplicas < nReplicasIdealMax) {
- LOGGER.debug("Segment {} of table {} is running with {} replicas which
is less than the expected values {}",
- partitionName, tableNameWithType, nReplicas, nReplicasIdealMax);
- nNumOfReplicasLessThanIdeal++;
+ if (numEVReplicas == 0) {
+ offlineSegments.add(segment);
+ } else if (numEVReplicas < numISReplicas) {
+ partialOnlineSegments.add(segment);
+ } else {
+ // Do not allow nReplicasEV to be larger than nReplicasIS
+ numEVReplicas = numISReplicas;
+ }
+ minEVReplicas = Math.min(minEVReplicas, numEVReplicas);
+ }
+
+ if (maxISReplicas == Integer.MIN_VALUE) {
+ try {
+ maxISReplicas = Math.max(Integer.parseInt(idealState.getReplicas()),
1);
+ } catch (NumberFormatException e) {
+ maxISReplicas = 1;
}
- nReplicasExternal =
- ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ?
nReplicas : nReplicasExternal;
}
- if (nReplicasExternal == -1) {
- nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+ // Do not allow minEVReplicas to be larger than maxISReplicas
+ minEVReplicas = Math.min(minEVReplicas, maxISReplicas);
+
+ if (minEVReplicas < maxISReplicas) {
+ LOGGER.warn("Table {} has at least one segment running with only {}
replicas, below replication threshold :{}",
+ tableNameWithType, minEVReplicas, maxISReplicas);
+ }
+ int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size();
+ if (numSegmentsWithoutZKMetadata > 0) {
+ LOGGER.warn("Table {} has {} segments without ZK metadata: {}",
tableNameWithType, numSegmentsWithoutZKMetadata,
+ logSegments(segmentsWithoutZKMetadata));
+ }
+ int numErrorSegments = errorSegments.size();
+ if (numErrorSegments > 0) {
+ LOGGER.warn("Table {} has {} segments in ERROR state: {}",
tableNameWithType, numErrorSegments,
+ logSegments(errorSegments));
+ }
+ int numOfflineSegments = offlineSegments.size();
+ if (numOfflineSegments > 0) {
+ LOGGER.warn("Table {} has {} segments without ONLINE/CONSUMING replica:
{}", tableNameWithType,
+ numOfflineSegments, logSegments(offlineSegments));
+ }
+ int numPartialOnlineSegments = partialOnlineSegments.size();
+ if (numPartialOnlineSegments > 0) {
+ LOGGER.warn("Table {} has {} segments with fewer replicas than the
replication factor: {}", tableNameWithType,
+ numPartialOnlineSegments, logSegments(partialOnlineSegments));
}
+
// Synchronization provided by Controller Gauge to make sure that only one
thread updates the gauge
- _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas);
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_OF_REPLICAS,
- (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 /
nReplicasIdealMax) : 100);
- _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
- _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS,
- nNumOfReplicasLessThanIdeal);
+ minEVReplicas * 100L / maxISReplicas);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_IN_ERROR_STATE,
+ numErrorSegments);
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
- (nSegments > 0) ? (nSegments - nOffline) * 100 / nSegments : 100);
+ numOfflineSegments > 0 ? (numSegments - numOfflineSegments) * 100L /
numSegments : 100);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS,
+ numPartialOnlineSegments);
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.TABLE_COMPRESSED_SIZE,
tableCompressedSize);
- if (nOffline > 0) {
- LOGGER.warn("Table {} has {} segments with no online replicas",
tableNameWithType, nOffline);
- }
- if (nNumOfReplicasLessThanIdeal > 0) {
- LOGGER.warn("Table {} has {} segments with number of replicas less than
the replication factor",
- tableNameWithType, nNumOfReplicasLessThanIdeal);
- }
- if (nReplicasExternal < nReplicasIdealMax) {
- LOGGER.warn("Table {} has at least one segment running with only {}
replicas, below replication threshold :{}",
- tableNameWithType, nReplicasExternal, nReplicasIdealMax);
- }
-
if (tableType == TableType.REALTIME && tableConfig != null) {
StreamConfig streamConfig =
new StreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
@@ -373,6 +400,13 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
}
}
+ private static String logSegments(List<?> segments) {
+ if (segments.size() <= MAX_SEGMENTS_TO_LOG) {
+ return segments.toString();
+ }
+ return segments.subList(0, MAX_SEGMENTS_TO_LOG) + "...";
+ }
+
@Override
protected void nonLeaderCleanup(List<String> tableNamesWithType) {
tableNamesWithType.forEach(this::removeMetricsForTable);
@@ -403,20 +437,15 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
public void cleanUpTask() {
}
- @VisibleForTesting
- void setTableSizeReader(TableSizeReader tableSizeReader) {
- _tableSizeReader = tableSizeReader;
- }
-
public static final class Context {
private boolean _logDisabledTables;
private int _realTimeTableCount;
private int _offlineTableCount;
private int _upsertTableCount;
private int _tierBackendConfiguredTableCount;
- private Map<String, Integer> _tierBackendTableCountMap = new HashMap<>();
- private Set<String> _processedTables = new HashSet<>();
- private Set<String> _disabledTables = new HashSet<>();
- private Set<String> _pausedTables = new HashSet<>();
+ private final Map<String, Integer> _tierBackendTableCountMap = new
HashMap<>();
+ private final Set<String> _processedTables = new HashSet<>();
+ private final Set<String> _disabledTables = new HashSet<>();
+ private final Set<String> _pausedTables = new HashSet<>();
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index 429391e8e3..81a0f345c1 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.controller.helix;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -42,8 +41,8 @@ import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
-import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.Test;
@@ -58,25 +57,23 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+@SuppressWarnings("unchecked")
public class SegmentStatusCheckerTest {
+ private static final String RAW_TABLE_NAME = "myTable";
+ private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+ private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
- private SegmentStatusChecker _segmentStatusChecker;
- private PinotHelixResourceManager _helixResourceManager;
- private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
- private LeadControllerManager _leadControllerManager;
- private PinotMetricsRegistry _metricsRegistry;
- private ControllerMetrics _controllerMetrics;
- private ControllerConf _config;
- private TableSizeReader _tableSizeReader;
+ // Intentionally not reset the metrics to test all metrics being refreshed.
+ private final ControllerMetrics _controllerMetrics =
+ new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
@Test
- public void offlineBasicTest()
- throws Exception {
- String offlineTableName = "myTable_OFFLINE";
+ public void offlineBasicTest() {
+ // Intentionally set the replication number to 2 to test the metrics.
TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(2).build();
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(2).build();
- IdealState idealState = new IdealState(offlineTableName);
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
idealState.setPartitionState("myTable_0", "pinot3", "ONLINE");
@@ -90,10 +87,10 @@ public class SegmentStatusCheckerTest {
idealState.setPartitionState("myTable_4", "pinot1", "ONLINE");
idealState.setPartitionState("myTable_4", "pinot2", "ONLINE");
idealState.setPartitionState("myTable_4", "pinot3", "ONLINE");
- idealState.setReplicas("2");
+ idealState.setReplicas("3");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- ExternalView externalView = new ExternalView(offlineTableName);
+ ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
externalView.setState("myTable_0", "pinot1", "ONLINE");
externalView.setState("myTable_0", "pinot2", "ONLINE");
externalView.setState("myTable_1", "pinot1", "ERROR");
@@ -104,170 +101,161 @@ public class SegmentStatusCheckerTest {
externalView.setState("myTable_3", "pinot3", "ONLINE");
externalView.setState("myTable_4", "pinot1", "ONLINE");
- {
- _helixResourceManager = mock(PinotHelixResourceManager.class);
-
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-
when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig);
-
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
- }
- {
- _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
- // Based on the lineage entries: {myTable_1 -> myTable_3, COMPLETED},
{myTable_3 -> myTable_4, IN_PROGRESS},
- // myTable_1 and myTable_4 will be skipped for the metrics.
- SegmentLineage segmentLineage = new SegmentLineage(offlineTableName);
-
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
- new LineageEntry(List.of("myTable_1"), List.of("myTable_3"),
LineageEntryState.COMPLETED, 11111L));
-
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
- new LineageEntry(List.of("myTable_3"), List.of("myTable_4"),
LineageEntryState.IN_PROGRESS, 11111L));
- when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName),
any(),
-
eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord());
- }
- {
- _config = mock(ControllerConf.class);
- when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
- }
- {
- _leadControllerManager = mock(LeadControllerManager.class);
-
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
- }
- {
- _tableSizeReader = mock(TableSizeReader.class);
- when(_tableSizeReader.getTableSizeDetails(anyString(),
anyInt())).thenReturn(null);
- }
- PinotMetricUtils.cleanUp();
- _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
- _controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- _tableSizeReader);
- _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
- _segmentStatusChecker.start();
- _segmentStatusChecker.run();
-
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
offlineTableName,
- ControllerGauge.REPLICATION_FROM_CONFIG), 2);
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
+
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
+ SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234,
11111L);
+ when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME),
anyString())).thenReturn(segmentZKMetadata);
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+ // Based on the lineage entries: {myTable_1 -> myTable_3, COMPLETED},
{myTable_3 -> myTable_4, IN_PROGRESS},
+ // myTable_1 and myTable_4 will be skipped for the metrics.
+ SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME);
+
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
+ new LineageEntry(List.of("myTable_1"), List.of("myTable_3"),
LineageEntryState.COMPLETED, 11111L));
+
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
+ new LineageEntry(List.of("myTable_3"), List.of("myTable_4"),
LineageEntryState.IN_PROGRESS, 11111L));
+ when(
+ propertyStore.get(eq("/SEGMENT_LINEAGE/" + OFFLINE_TABLE_NAME), any(),
eq(AccessOption.PERSISTENT))).thenReturn(
+ segmentLineage.toZNRecord());
+
+ runSegmentStatusChecker(resourceManager, 0);
+ verifyControllerMetrics(OFFLINE_TABLE_NAME, 2, 5, 3, 2, 66, 1, 100, 2,
2468);
+ }
+
+ private SegmentZKMetadata mockPushedSegmentZKMetadata(long sizeInBytes, long
pushTimeMs) {
+ SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+ when(segmentZKMetadata.getStatus()).thenReturn(Status.UPLOADED);
+ when(segmentZKMetadata.getSizeInBytes()).thenReturn(sizeInBytes);
+ when(segmentZKMetadata.getPushTime()).thenReturn(pushTimeMs);
+ return segmentZKMetadata;
+ }
+
+ private void runSegmentStatusChecker(PinotHelixResourceManager
resourceManager, int waitForPushTimeInSeconds) {
+ LeadControllerManager leadControllerManager =
mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ ControllerConf controllerConf = mock(ControllerConf.class);
+
when(controllerConf.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(waitForPushTimeInSeconds);
+ TableSizeReader tableSizeReader = mock(TableSizeReader.class);
+ SegmentStatusChecker segmentStatusChecker =
+ new SegmentStatusChecker(resourceManager, leadControllerManager,
controllerConf, _controllerMetrics,
+ tableSizeReader);
+ segmentStatusChecker.start();
+ segmentStatusChecker.run();
+ }
+
+ private void verifyControllerMetrics(String tableNameWithType, int
expectedReplicationFromConfig,
+ int expectedNumSegmentsIncludingReplaced, int expectedNumSegment, int
expectedNumReplicas,
+ int expectedPercentOfReplicas, int expectedSegmentsInErrorState, int
expectedPercentSegmentsAvailable,
+ int expectedSegmentsWithLessReplicas, int expectedTableCompressedSize) {
+ assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableNameWithType,
+ ControllerGauge.REPLICATION_FROM_CONFIG),
expectedReplicationFromConfig);
+ assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableNameWithType,
+ ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED),
expectedNumSegmentsIncludingReplaced);
+ assertEquals(
+ MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableNameWithType, ControllerGauge.SEGMENT_COUNT),
+ expectedNumSegment);
+ assertEquals(
+ MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS),
+ expectedNumReplicas);
assertEquals(
- MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(), ControllerGauge.SEGMENT_COUNT),
- 3);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), 5);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.NUMBER_OF_REPLICAS), 2);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.PERCENT_OF_REPLICAS), 66);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
+ MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS),
+ expectedPercentOfReplicas);
+ assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableNameWithType,
+ ControllerGauge.SEGMENTS_IN_ERROR_STATE),
expectedSegmentsInErrorState);
+ assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableNameWithType,
+ ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
expectedPercentSegmentsAvailable);
+ assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableNameWithType,
+ ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS),
expectedSegmentsWithLessReplicas);
+ assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
tableNameWithType,
+ ControllerGauge.TABLE_COMPRESSED_SIZE), expectedTableCompressedSize);
}
@Test
- public void realtimeBasicTest()
- throws Exception {
- String rawTableName = "myTable";
- String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+ public void realtimeBasicTest() {
TableConfig tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setTimeColumnName("timeColumn")
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn")
.setNumReplicas(3).setStreamConfigs(getStreamConfigMap()).build();
- LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0,
System.currentTimeMillis());
- LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1,
System.currentTimeMillis());
- LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1,
System.currentTimeMillis());
- IdealState idealState = new IdealState(realtimeTableName);
- idealState.setPartitionState(seg1.getSegmentName(), "pinot1", "ONLINE");
- idealState.setPartitionState(seg1.getSegmentName(), "pinot2", "ONLINE");
- idealState.setPartitionState(seg1.getSegmentName(), "pinot3", "ONLINE");
- idealState.setPartitionState(seg2.getSegmentName(), "pinot1", "ONLINE");
- idealState.setPartitionState(seg2.getSegmentName(), "pinot2", "ONLINE");
- idealState.setPartitionState(seg2.getSegmentName(), "pinot3", "ONLINE");
- idealState.setPartitionState(seg3.getSegmentName(), "pinot1", "CONSUMING");
- idealState.setPartitionState(seg3.getSegmentName(), "pinot2", "CONSUMING");
- idealState.setPartitionState(seg3.getSegmentName(), "pinot3", "OFFLINE");
+ String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0,
System.currentTimeMillis()).getSegmentName();
+ String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1,
System.currentTimeMillis()).getSegmentName();
+ String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1,
System.currentTimeMillis()).getSegmentName();
+ IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
+ idealState.setPartitionState(seg1, "pinot1", "ONLINE");
+ idealState.setPartitionState(seg1, "pinot2", "ONLINE");
+ idealState.setPartitionState(seg1, "pinot3", "ONLINE");
+ idealState.setPartitionState(seg2, "pinot1", "ONLINE");
+ idealState.setPartitionState(seg2, "pinot2", "ONLINE");
+ idealState.setPartitionState(seg2, "pinot3", "ONLINE");
+ idealState.setPartitionState(seg3, "pinot1", "CONSUMING");
+ idealState.setPartitionState(seg3, "pinot2", "CONSUMING");
+ idealState.setPartitionState(seg3, "pinot3", "OFFLINE");
idealState.setReplicas("3");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- ExternalView externalView = new ExternalView(realtimeTableName);
- externalView.setState(seg1.getSegmentName(), "pinot1", "ONLINE");
- externalView.setState(seg1.getSegmentName(), "pinot2", "ONLINE");
- externalView.setState(seg1.getSegmentName(), "pinot3", "ONLINE");
- externalView.setState(seg2.getSegmentName(), "pinot1", "CONSUMING");
- externalView.setState(seg2.getSegmentName(), "pinot2", "ONLINE");
- externalView.setState(seg2.getSegmentName(), "pinot3", "CONSUMING");
- externalView.setState(seg3.getSegmentName(), "pinot1", "CONSUMING");
- externalView.setState(seg3.getSegmentName(), "pinot2", "CONSUMING");
- externalView.setState(seg3.getSegmentName(), "pinot3", "OFFLINE");
-
- {
- _helixResourceManager = mock(PinotHelixResourceManager.class);
- _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-
when(_helixResourceManager.getTableConfig(realtimeTableName)).thenReturn(tableConfig);
-
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
-
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
-
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(externalView);
- ZNRecord znRecord = new ZNRecord("0");
- znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET,
"10000");
- when(_helixPropertyStore.get(anyString(), any(),
anyInt())).thenReturn(znRecord);
- }
- {
- _config = mock(ControllerConf.class);
- when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
- }
- {
- _leadControllerManager = mock(LeadControllerManager.class);
-
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
- }
- {
- _tableSizeReader = mock(TableSizeReader.class);
- when(_tableSizeReader.getTableSizeDetails(anyString(),
anyInt())).thenReturn(null);
- }
- PinotMetricUtils.cleanUp();
- _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
- _controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- _tableSizeReader);
- _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
- _segmentStatusChecker.start();
- _segmentStatusChecker.run();
-
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName,
- ControllerGauge.REPLICATION_FROM_CONFIG), 3);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.NUMBER_OF_REPLICAS), 3);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.PERCENT_OF_REPLICAS), 100);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
+ ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
+ externalView.setState(seg1, "pinot1", "ONLINE");
+ externalView.setState(seg1, "pinot2", "ONLINE");
+ externalView.setState(seg1, "pinot3", "ONLINE");
+ externalView.setState(seg2, "pinot1", "CONSUMING");
+ externalView.setState(seg2, "pinot2", "ONLINE");
+ externalView.setState(seg2, "pinot3", "CONSUMING");
+ externalView.setState(seg3, "pinot1", "CONSUMING");
+ externalView.setState(seg3, "pinot2", "CONSUMING");
+ externalView.setState(seg3, "pinot3", "OFFLINE");
+
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
+
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
+
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
+
when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView);
+ SegmentZKMetadata committedSegmentZKMetadata =
mockCommittedSegmentZKMetadata();
+ when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
seg1)).thenReturn(committedSegmentZKMetadata);
+ when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
seg2)).thenReturn(committedSegmentZKMetadata);
+ SegmentZKMetadata consumingSegmentZKMetadata =
mockConsumingSegmentZKMetadata(11111L);
+ when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
seg3)).thenReturn(consumingSegmentZKMetadata);
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+ ZNRecord znRecord = new ZNRecord("0");
+ znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET,
"10000");
+ when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord);
+
+ runSegmentStatusChecker(resourceManager, 0);
+ verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 2, 66, 0, 100, 0, 0);
+ assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
REALTIME_TABLE_NAME,
ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2);
}
- Map<String, String> getStreamConfigMap() {
+ private Map<String, String> getStreamConfigMap() {
return Map.of("streamType", "kafka", "stream.kafka.consumer.type",
"simple", "stream.kafka.topic.name", "test",
"stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name",
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
}
- @Test
- public void missingEVPartitionTest()
- throws Exception {
- String offlineTableName = "myTable_OFFLINE";
+ private SegmentZKMetadata mockCommittedSegmentZKMetadata() {
+ SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+ when(segmentZKMetadata.getStatus()).thenReturn(Status.DONE);
+ when(segmentZKMetadata.getSizeInBytes()).thenReturn(-1L);
+ when(segmentZKMetadata.getPushTime()).thenReturn(Long.MIN_VALUE);
+ return segmentZKMetadata;
+ }
+
+ private SegmentZKMetadata mockConsumingSegmentZKMetadata(long
creationTimeMs) {
+ SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+ when(segmentZKMetadata.getStatus()).thenReturn(Status.IN_PROGRESS);
+ when(segmentZKMetadata.getSizeInBytes()).thenReturn(-1L);
+ when(segmentZKMetadata.getCreationTime()).thenReturn(creationTimeMs);
+ return segmentZKMetadata;
+ }
- IdealState idealState = new IdealState(offlineTableName);
+ @Test
+ public void missingEVPartitionTest() {
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
idealState.setPartitionState("myTable_0", "pinot3", "ONLINE");
@@ -276,191 +264,89 @@ public class SegmentStatusCheckerTest {
idealState.setPartitionState("myTable_1", "pinot3", "ONLINE");
idealState.setPartitionState("myTable_2", "pinot3", "OFFLINE");
idealState.setPartitionState("myTable_3", "pinot3", "ONLINE");
- idealState.setReplicas("2");
+ idealState.setReplicas("3");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- ExternalView externalView = new ExternalView(offlineTableName);
+ ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
externalView.setState("myTable_0", "pinot1", "ONLINE");
externalView.setState("myTable_0", "pinot2", "ONLINE");
externalView.setState("myTable_1", "pinot1", "ERROR");
externalView.setState("myTable_1", "pinot2", "ONLINE");
- ZNRecord znrecord = new ZNRecord("myTable_0");
- znrecord.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1");
- znrecord.setLongField(CommonConstants.Segment.START_TIME, 1000);
- znrecord.setLongField(CommonConstants.Segment.END_TIME, 2000);
- znrecord.setSimpleField(CommonConstants.Segment.TIME_UNIT,
TimeUnit.HOURS.toString());
- znrecord.setLongField(CommonConstants.Segment.TOTAL_DOCS, 10000);
- znrecord.setLongField(CommonConstants.Segment.CRC, 1234);
- znrecord.setLongField(CommonConstants.Segment.CREATION_TIME, 3000);
- znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL,
"http://localhost:8000/myTable_0");
- znrecord.setLongField(CommonConstants.Segment.PUSH_TIME,
System.currentTimeMillis());
- znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME,
System.currentTimeMillis());
- znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
-
- ZkHelixPropertyStore<ZNRecord> propertyStore;
- {
- propertyStore = (ZkHelixPropertyStore<ZNRecord>)
mock(ZkHelixPropertyStore.class);
- when(propertyStore.get("/SEGMENTS/myTable_OFFLINE/myTable_3", null,
AccessOption.PERSISTENT)).thenReturn(
- znrecord);
- }
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
+ SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234,
11111L);
+ when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME),
anyString())).thenReturn(segmentZKMetadata);
- {
- _helixResourceManager = mock(PinotHelixResourceManager.class);
- _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
- when(_helixResourceManager.getSegmentZKMetadata(offlineTableName,
"myTable_3")).thenReturn(
- new SegmentZKMetadata(znrecord));
- }
- {
- _config = mock(ControllerConf.class);
- when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(0);
- }
- {
- _leadControllerManager = mock(LeadControllerManager.class);
-
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
- }
- {
- _tableSizeReader = mock(TableSizeReader.class);
- when(_tableSizeReader.getTableSizeDetails(anyString(),
anyInt())).thenReturn(null);
- }
- PinotMetricUtils.cleanUp();
- _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
- _controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- _tableSizeReader);
- _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
- _segmentStatusChecker.start();
- _segmentStatusChecker.run();
-
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.NUMBER_OF_REPLICAS), 0);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.TABLE_COMPRESSED_SIZE), 1111);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+ runSegmentStatusChecker(resourceManager, 0);
+ verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 4, 4, 0, 0, 1, 75, 2, 3702);
}
@Test
- public void missingEVTest()
- throws Exception {
- String realtimeTableName = "myTable_REALTIME";
-
- IdealState idealState = new IdealState(realtimeTableName);
+ public void missingEVTest() {
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
idealState.setPartitionState("myTable_0", "pinot3", "ONLINE");
idealState.setPartitionState("myTable_1", "pinot1", "ONLINE");
idealState.setPartitionState("myTable_1", "pinot2", "ONLINE");
idealState.setPartitionState("myTable_1", "pinot3", "ONLINE");
- idealState.setPartitionState("myTable_2", "pinot3", "OFFLINE");
- idealState.setReplicas("2");
+ idealState.setReplicas("3");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- {
- _helixResourceManager = mock(PinotHelixResourceManager.class);
- _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
-
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
-
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
- }
- {
- _config = mock(ControllerConf.class);
- when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
- }
- {
- _leadControllerManager = mock(LeadControllerManager.class);
-
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
- }
- {
- _tableSizeReader = mock(TableSizeReader.class);
- when(_tableSizeReader.getTableSizeDetails(anyString(),
anyInt())).thenReturn(null);
- }
- PinotMetricUtils.cleanUp();
- _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
- _controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- _tableSizeReader);
- _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
- _segmentStatusChecker.start();
- _segmentStatusChecker.run();
-
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName,
- ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName,
- ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
- assertEquals(
- MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS),
- 0);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName,
- ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+ SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234,
11111L);
+ when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME),
anyString())).thenReturn(segmentZKMetadata);
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+ runSegmentStatusChecker(resourceManager, 0);
+ verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 2, 2, 0, 0, 0, 0, 0, 2468);
}
@Test
- public void missingIdealTest()
- throws Exception {
- String realtimeTableName = "myTable_REALTIME";
-
- {
- _helixResourceManager = mock(PinotHelixResourceManager.class);
-
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
-
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(null);
-
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
- }
- {
- _config = mock(ControllerConf.class);
- when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
- }
- {
- _leadControllerManager = mock(LeadControllerManager.class);
-
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
- }
- {
- _tableSizeReader = mock(TableSizeReader.class);
- when(_tableSizeReader.getTableSizeDetails(anyString(),
anyInt())).thenReturn(null);
- }
- PinotMetricUtils.cleanUp();
- _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
- _controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- _tableSizeReader);
- _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
- _segmentStatusChecker.start();
- _segmentStatusChecker.run();
-
- assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
realtimeTableName,
- ControllerGauge.SEGMENTS_IN_ERROR_STATE));
- assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
realtimeTableName,
- ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS));
+ public void missingIdealTest() {
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+
+ runSegmentStatusChecker(resourceManager, 0);
+ verifyControllerMetricsNotExist();
+ }
+
+ private void verifyControllerMetricsNotExist() {
+ assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
OFFLINE_TABLE_NAME,
+ ControllerGauge.REPLICATION_FROM_CONFIG), 0);
+ assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
OFFLINE_TABLE_NAME,
+ ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED));
+ assertFalse(
+ MetricValueUtils.tableGaugeExists(_controllerMetrics,
OFFLINE_TABLE_NAME, ControllerGauge.SEGMENT_COUNT));
assertFalse(
- MetricValueUtils.tableGaugeExists(_controllerMetrics,
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS));
+ MetricValueUtils.tableGaugeExists(_controllerMetrics,
OFFLINE_TABLE_NAME, ControllerGauge.NUMBER_OF_REPLICAS));
assertFalse(
- MetricValueUtils.tableGaugeExists(_controllerMetrics,
realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS));
- assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
realtimeTableName,
+ MetricValueUtils.tableGaugeExists(_controllerMetrics,
OFFLINE_TABLE_NAME, ControllerGauge.PERCENT_OF_REPLICAS));
+ assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
OFFLINE_TABLE_NAME,
+ ControllerGauge.SEGMENTS_IN_ERROR_STATE));
+ assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
OFFLINE_TABLE_NAME,
+ ControllerGauge.PERCENT_SEGMENTS_AVAILABLE));
+ assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
OFFLINE_TABLE_NAME,
+ ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS));
+ assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
OFFLINE_TABLE_NAME,
ControllerGauge.TABLE_COMPRESSED_SIZE));
}
@Test
- public void missingEVPartitionPushTest()
- throws Exception {
- String offlineTableName = "myTable_OFFLINE";
-
- IdealState idealState = new IdealState(offlineTableName);
+ public void missingEVPartitionPushTest() {
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
+ idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
idealState.setPartitionState("myTable_1", "pinot1", "ONLINE");
idealState.setPartitionState("myTable_1", "pinot2", "ONLINE");
idealState.setPartitionState("myTable_2", "pinot1", "ONLINE");
@@ -468,246 +354,144 @@ public class SegmentStatusCheckerTest {
idealState.setReplicas("2");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- ExternalView externalView = new ExternalView(offlineTableName);
+ ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+ externalView.setState("myTable_0", "pinot1", "ONLINE");
+ externalView.setState("myTable_0", "pinot2", "ONLINE");
externalView.setState("myTable_1", "pinot1", "ONLINE");
externalView.setState("myTable_1", "pinot2", "ONLINE");
// myTable_2 is push in-progress and only one replica has been downloaded
by servers. It will be skipped for
// the segment status check.
externalView.setState("myTable_2", "pinot1", "ONLINE");
- ZNRecord znrecord = new ZNRecord("myTable_0");
- znrecord.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1");
- znrecord.setLongField(CommonConstants.Segment.START_TIME, 1000);
- znrecord.setLongField(CommonConstants.Segment.END_TIME, 2000);
- znrecord.setSimpleField(CommonConstants.Segment.TIME_UNIT,
TimeUnit.HOURS.toString());
- znrecord.setLongField(CommonConstants.Segment.TOTAL_DOCS, 10000);
- znrecord.setLongField(CommonConstants.Segment.CRC, 1234);
- znrecord.setLongField(CommonConstants.Segment.CREATION_TIME, 3000);
- znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL,
"http://localhost:8000/myTable_0");
- znrecord.setLongField(CommonConstants.Segment.PUSH_TIME,
System.currentTimeMillis());
- znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME,
System.currentTimeMillis());
- znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
-
- ZNRecord znrecord2 = new ZNRecord("myTable_2");
- znrecord2.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1");
- znrecord2.setLongField(CommonConstants.Segment.START_TIME, 1000);
- znrecord2.setLongField(CommonConstants.Segment.END_TIME, 2000);
- znrecord2.setSimpleField(CommonConstants.Segment.TIME_UNIT,
TimeUnit.HOURS.toString());
- znrecord2.setLongField(CommonConstants.Segment.TOTAL_DOCS, 10000);
- znrecord2.setLongField(CommonConstants.Segment.CRC, 1235);
- znrecord2.setLongField(CommonConstants.Segment.CREATION_TIME, 3000);
- znrecord2.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL,
"http://localhost:8000/myTable_2");
- znrecord2.setLongField(CommonConstants.Segment.PUSH_TIME,
System.currentTimeMillis());
- znrecord2.setLongField(CommonConstants.Segment.REFRESH_TIME,
System.currentTimeMillis());
- znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
-
- {
- _helixResourceManager = mock(PinotHelixResourceManager.class);
- _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
- when(_helixResourceManager.getSegmentZKMetadata(offlineTableName,
"myTable_0")).thenReturn(
- new SegmentZKMetadata(znrecord));
- when(_helixResourceManager.getSegmentZKMetadata(offlineTableName,
"myTable_2")).thenReturn(
- new SegmentZKMetadata(znrecord2));
- }
- {
- _config = mock(ControllerConf.class);
- when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
- }
- {
- _leadControllerManager = mock(LeadControllerManager.class);
-
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
- }
- {
- _tableSizeReader = mock(TableSizeReader.class);
- when(_tableSizeReader.getTableSizeDetails(anyString(),
anyInt())).thenReturn(null);
- }
- PinotMetricUtils.cleanUp();
- _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
- _controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- _tableSizeReader);
- _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
- _segmentStatusChecker.start();
- _segmentStatusChecker.run();
-
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.NUMBER_OF_REPLICAS), 2);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.PERCENT_OF_REPLICAS), 100);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
+ SegmentZKMetadata segmentZKMetadata01 = mockPushedSegmentZKMetadata(1234,
11111L);
+ when(resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME,
"myTable_0")).thenReturn(segmentZKMetadata01);
+ when(resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME,
"myTable_1")).thenReturn(segmentZKMetadata01);
+ SegmentZKMetadata segmentZKMetadata2 = mockPushedSegmentZKMetadata(1234,
System.currentTimeMillis());
+ when(resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME,
"myTable_2")).thenReturn(segmentZKMetadata2);
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+ runSegmentStatusChecker(resourceManager, 600);
+ verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 3, 3, 2, 100, 0, 100, 0,
3702);
}
@Test
- public void noReplicas()
- throws Exception {
- String realtimeTableName = "myTable_REALTIME";
+ public void missingEVUploadedConsumingTest() {
+ IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
+ idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
+ idealState.setPartitionState("myTable_1", "pinot2", "CONSUMING");
+ idealState.setReplicas("1");
+ idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
+
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
+ SegmentZKMetadata updatedSegmentZKMetadata =
mockPushedSegmentZKMetadata(1234, System.currentTimeMillis());
+ when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
"myTable_0")).thenReturn(updatedSegmentZKMetadata);
+ SegmentZKMetadata consumingSegmentZKMetadata =
mockConsumingSegmentZKMetadata(System.currentTimeMillis());
+ when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
"myTable_1")).thenReturn(consumingSegmentZKMetadata);
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
- IdealState idealState = new IdealState(realtimeTableName);
+ runSegmentStatusChecker(resourceManager, 600);
+ verifyControllerMetrics(REALTIME_TABLE_NAME, 0, 2, 2, 1, 100, 0, 100, 0,
1234);
+ }
+
+ @Test
+ public void noReplicaTest() {
+ IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
idealState.setPartitionState("myTable_0", "pinot1", "OFFLINE");
idealState.setPartitionState("myTable_0", "pinot2", "OFFLINE");
idealState.setPartitionState("myTable_0", "pinot3", "OFFLINE");
idealState.setReplicas("0");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- {
- _helixResourceManager = mock(PinotHelixResourceManager.class);
- _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
-
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
-
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
- }
- {
- _config = mock(ControllerConf.class);
- when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
- }
- {
- _leadControllerManager = mock(LeadControllerManager.class);
-
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
- }
- {
- _tableSizeReader = mock(TableSizeReader.class);
- when(_tableSizeReader.getTableSizeDetails(anyString(),
anyInt())).thenReturn(null);
- }
- PinotMetricUtils.cleanUp();
- _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
- _controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- _tableSizeReader);
- _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
- _segmentStatusChecker.start();
- _segmentStatusChecker.run();
-
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName,
- ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName,
- ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
- assertEquals(
- MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS),
- 1);
- assertEquals(
- MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS),
- 100);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName,
- ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
+
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
+
when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(null);
+ SegmentZKMetadata segmentZKMetadata =
mockConsumingSegmentZKMetadata(11111L);
+ when(resourceManager.getSegmentZKMetadata(eq(REALTIME_TABLE_NAME),
anyString())).thenReturn(segmentZKMetadata);
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+ runSegmentStatusChecker(resourceManager, 0);
+ verifyControllerMetrics(REALTIME_TABLE_NAME, 0, 1, 1, 1, 100, 0, 100, 0,
0);
}
@Test
- public void disabledTableTest() {
- String offlineTableName = "myTable_OFFLINE";
-
- IdealState idealState = new IdealState(offlineTableName);
- // disable table in idealstate
- idealState.enable(false);
- idealState.setPartitionState("myTable_OFFLINE", "pinot1", "OFFLINE");
- idealState.setPartitionState("myTable_OFFLINE", "pinot2", "OFFLINE");
- idealState.setPartitionState("myTable_OFFLINE", "pinot3", "OFFLINE");
+ public void noSegmentZKMetadataTest() {
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+ idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
idealState.setReplicas("1");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- {
- _helixResourceManager = mock(PinotHelixResourceManager.class);
-
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null);
- }
- {
- _config = mock(ControllerConf.class);
- when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
- }
- {
- _leadControllerManager = mock(LeadControllerManager.class);
-
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
- }
- PinotMetricUtils.cleanUp();
- _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
- _controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- null);
-
- // verify state before test
- assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics,
ControllerGauge.DISABLED_TABLE_COUNT), 0);
-
- // update metrics
- _segmentStatusChecker.start();
- _segmentStatusChecker.run();
- assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics,
ControllerGauge.DISABLED_TABLE_COUNT), 1);
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+ runSegmentStatusChecker(resourceManager, 0);
+ verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 1, 1, 1, 100, 0, 100, 0, 0);
}
@Test
- public void disabledEmptyTableTest() {
- String offlineTableName = "myTable_OFFLINE";
-
- IdealState idealState = new IdealState(offlineTableName);
+ public void disabledTableTest() {
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
// disable table in idealstate
idealState.enable(false);
- idealState.setReplicas("1");
+ idealState.setPartitionState("myTable_OFFLINE", "pinot1", "ONLINE");
+ idealState.setPartitionState("myTable_OFFLINE", "pinot2", "ONLINE");
+ idealState.setPartitionState("myTable_OFFLINE", "pinot3", "ONLINE");
+ idealState.setReplicas("3");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- {
- _helixResourceManager = mock(PinotHelixResourceManager.class);
-
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null);
- }
- {
- _config = mock(ControllerConf.class);
- when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
- }
- {
- _leadControllerManager = mock(LeadControllerManager.class);
-
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
- }
- PinotMetricUtils.cleanUp();
- _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
- _controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- null);
-
- // verify state before test
- assertFalse(MetricValueUtils.globalGaugeExists(_controllerMetrics,
ControllerGauge.DISABLED_TABLE_COUNT));
-
- // update metrics
- _segmentStatusChecker.start();
- _segmentStatusChecker.run();
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+
+ runSegmentStatusChecker(resourceManager, 0);
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics,
ControllerGauge.DISABLED_TABLE_COUNT), 1);
+ verifyControllerMetricsNotExist();
}
@Test
- public void noSegments()
- throws Exception {
- noSegmentsInternal(0);
- noSegmentsInternal(5);
- noSegmentsInternal(-1);
+ public void noSegmentTest() {
+ noSegmentTest(0);
+ noSegmentTest(5);
+ noSegmentTest(-1);
+ }
+
+ public void noSegmentTest(int numReplicas) {
+ String numReplicasStr = numReplicas >= 0 ? Integer.toString(numReplicas) :
"abc";
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+ idealState.setReplicas(numReplicasStr);
+ idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+
+ runSegmentStatusChecker(resourceManager, 0);
+ int expectedNumReplicas = Math.max(numReplicas, 1);
+ verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 0, 0, expectedNumReplicas,
100, 0, 100, 0, 0);
}
@Test
- public void lessThanOnePercentSegmentsUnavailableTest()
- throws Exception {
- String offlineTableName = "myTable_OFFLINE";
+ public void lessThanOnePercentSegmentsUnavailableTest() {
TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(1).build();
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(1).build();
- IdealState idealState = new IdealState(offlineTableName);
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
int numSegments = 200;
for (int i = 0; i < numSegments; i++) {
idealState.setPartitionState("myTable_" + i, "pinot1", "ONLINE");
@@ -715,107 +499,24 @@ public class SegmentStatusCheckerTest {
idealState.setReplicas("1");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- ExternalView externalView = new ExternalView(offlineTableName);
+ ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
externalView.setState("myTable_0", "pinot1", "OFFLINE");
for (int i = 1; i < numSegments; i++) {
externalView.setState("myTable_" + i, "pinot1", "ONLINE");
}
- {
- _helixResourceManager = mock(PinotHelixResourceManager.class);
-
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-
when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig);
-
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
- }
- {
- _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
- SegmentLineage segmentLineage = new SegmentLineage(offlineTableName);
- when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName),
any(),
-
eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord());
- }
- {
- _config = mock(ControllerConf.class);
- when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
- }
- {
- _leadControllerManager = mock(LeadControllerManager.class);
-
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
- }
- {
- _tableSizeReader = mock(TableSizeReader.class);
- when(_tableSizeReader.getTableSizeDetails(anyString(),
anyInt())).thenReturn(null);
- }
- PinotMetricUtils.cleanUp();
- _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
- _controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- _tableSizeReader);
- _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
- _segmentStatusChecker.start();
- _segmentStatusChecker.run();
-
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
externalView.getId(),
- ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 99);
- }
-
- public void noSegmentsInternal(final int nReplicas)
- throws Exception {
- String realtimeTableName = "myTable_REALTIME";
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
+
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
+ SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234,
11111L);
+ when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME),
anyString())).thenReturn(segmentZKMetadata);
- String nReplicasStr = Integer.toString(nReplicas);
- int nReplicasExpectedValue = nReplicas;
- if (nReplicas < 0) {
- nReplicasStr = "abc";
- nReplicasExpectedValue = 1;
- }
- IdealState idealState = new IdealState(realtimeTableName);
- idealState.setReplicas(nReplicasStr);
- idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
- {
- _helixResourceManager = mock(PinotHelixResourceManager.class);
-
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
-
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
-
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
- }
- {
- _config = mock(ControllerConf.class);
- when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
- when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
- }
- {
- _leadControllerManager = mock(LeadControllerManager.class);
-
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
- }
- {
- _tableSizeReader = mock(TableSizeReader.class);
- when(_tableSizeReader.getTableSizeDetails(anyString(),
anyInt())).thenReturn(null);
- }
- PinotMetricUtils.cleanUp();
- _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
- _controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _segmentStatusChecker =
- new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- _tableSizeReader);
- _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
- _segmentStatusChecker.start();
- _segmentStatusChecker.run();
-
- assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
realtimeTableName,
- ControllerGauge.SEGMENTS_IN_ERROR_STATE));
- assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics,
realtimeTableName,
- ControllerGauge.SEGMENTS_IN_ERROR_STATE));
- assertEquals(
- MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS),
- nReplicasExpectedValue);
- assertEquals(
- MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS),
- 100);
- assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
realtimeTableName,
- ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+ runSegmentStatusChecker(resourceManager, 0);
+ verifyControllerMetrics(OFFLINE_TABLE_NAME, 1, numSegments, numSegments,
0, 0, 0, 99, 0, 246800);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]