This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bc27ad99ec Emit metrics if there's no consuming segment for a
partition (#8877)
bc27ad99ec is described below
commit bc27ad99ecd68fd4ae8522055f8a6a84e27fa3c1
Author: Sajjad Moradi <[email protected]>
AuthorDate: Thu Jun 30 13:39:25 2022 -0700
Emit metrics if there's no consuming segment for a partition (#8877)
---
.../pinot/common/metrics/ControllerGauge.java | 11 +-
.../controller/helix/SegmentStatusChecker.java | 29 +-
.../realtime/MissingConsumingSegmentFinder.java | 229 +++++++++++++++
.../realtime/PinotLLCRealtimeSegmentManager.java | 3 -
.../controller/helix/SegmentStatusCheckerTest.java | 24 +-
.../MissingConsumingSegmentFinderTest.java | 306 +++++++++++++++++++++
6 files changed, 585 insertions(+), 17 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 2951cfc7fb..2cd8c79dd5 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -101,7 +101,16 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
DROPPED_MINION_INSTANCES("droppedMinionInstances", true),
// Number of online minion instances
- ONLINE_MINION_INSTANCES("onlineMinionInstances", true);
+ ONLINE_MINION_INSTANCES("onlineMinionInstances", true),
+
+ // Number of partitions with missing consuming segments in ideal state
+ MISSING_CONSUMING_SEGMENT_TOTAL_COUNT("missingConsumingSegmentTotalCount",
false),
+
+ // Number of new partitions with missing consuming segments in ideal state
+
MISSING_CONSUMING_SEGMENT_NEW_PARTITION_COUNT("missingConsumingSegmentNewPartitionCount",
false),
+
+ // Maximum duration of a missing consuming segment in ideal state (in
minutes)
+
MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES("missingSegmentsMaxDurationInMinutes",
false);
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index bf9f179925..6470fab0fa 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -27,9 +27,11 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
@@ -41,9 +43,12 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import
org.apache.pinot.controller.helix.core.realtime.MissingConsumingSegmentFinder;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,8 +115,9 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
@Override
protected void processTable(String tableNameWithType, Context context) {
try {
- updateTableConfigMetrics(tableNameWithType);
- updateSegmentMetrics(tableNameWithType, context);
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ updateTableConfigMetrics(tableNameWithType, tableConfig);
+ updateSegmentMetrics(tableNameWithType, tableConfig, context);
updateTableSizeMetrics(tableNameWithType);
} catch (Exception e) {
LOGGER.error("Caught exception while updating segment status for table
{}", tableNameWithType, e);
@@ -131,8 +137,7 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
* Updates metrics related to the table config.
* If table config not found, resets the metrics
*/
- private void updateTableConfigMetrics(String tableNameWithType) {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ private void updateTableConfigMetrics(String tableNameWithType, TableConfig
tableConfig) {
if (tableConfig == null) {
LOGGER.warn("Found null table config for table: {}. Resetting table
config metrics.", tableNameWithType);
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.REPLICATION_FROM_CONFIG, 0);
@@ -156,8 +161,9 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
* Runs a segment status pass over the given table.
* TODO: revisit the logic and reduce the ZK access
*/
- private void updateSegmentMetrics(String tableNameWithType, Context context)
{
- if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) ==
TableType.OFFLINE) {
+ private void updateSegmentMetrics(String tableNameWithType, TableConfig
tableConfig, Context context) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == TableType.OFFLINE) {
context._offlineTableCount++;
} else {
context._realTimeTableCount++;
@@ -197,8 +203,8 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
// Get the segments excluding the replaced segments which are specified in
the segment lineage entries and cannot
// be queried from the table.
Set<String> segmentsExcludeReplaced = new
HashSet<>(idealState.getPartitionSet());
- SegmentLineage segmentLineage =
-
SegmentLineageAccessHelper.getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
tableNameWithType);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
_pinotHelixResourceManager.getPropertyStore();
+ SegmentLineage segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType);
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segmentsExcludeReplaced,
segmentLineage);
_controllerMetrics
.setValueOfTableGauge(tableNameWithType,
ControllerGauge.IDEALSTATE_ZNODE_SIZE, idealState.toString().length());
@@ -299,6 +305,13 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
LOGGER.warn("Table {} has {} replicas, below replication threshold :{}",
tableNameWithType, nReplicasExternal,
nReplicasIdealMax);
}
+
+ if (tableType == TableType.REALTIME && tableConfig != null) {
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ new MissingConsumingSegmentFinder(tableNameWithType, propertyStore,
_controllerMetrics, streamConfig)
+ .findAndEmitMetrics(idealState);
+ }
}
@Override
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
new file mode 100644
index 0000000000..e726b77e5f
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * For a given table, this class finds out if there is any partition group for
which there's no consuming segment in
+ * ideal state. If so, it emits three metrics:
+ * - Total number of partitions with missing consuming segments including
+ * - Number of newly added partitions for which there's no consuming segment
(there's no completed segment either)
+ * - Maximum duration (in minutes) that a partition hasn't had a consuming
segment
+ */
+public class MissingConsumingSegmentFinder {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MissingConsumingSegmentFinder.class);
+
+ private final String _realtimeTableName;
+ private final SegmentMetadataFetcher _segmentMetadataFetcher;
+ private final Map<Integer, StreamPartitionMsgOffset>
_partitionGroupIdToLargestStreamOffsetMap;
+ private final StreamPartitionMsgOffsetFactory
_streamPartitionMsgOffsetFactory;
+
+ private ControllerMetrics _controllerMetrics;
+
+ public MissingConsumingSegmentFinder(String realtimeTableName,
ZkHelixPropertyStore<ZNRecord> propertyStore,
+ ControllerMetrics controllerMetrics, PartitionLevelStreamConfig
streamConfig) {
+ _realtimeTableName = realtimeTableName;
+ _controllerMetrics = controllerMetrics;
+ _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore,
controllerMetrics);
+ _streamPartitionMsgOffsetFactory =
+
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+
+ // create partition group id to largest stream offset map
+ _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>();
+ streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
+ try {
+ PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig,
Collections.emptyList())
+ .forEach(metadata -> {
+
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
+ });
+ } catch (Exception e) {
+ LOGGER.warn("Problem encountered in fetching stream metadata for topic:
{} of table: {}. "
+ + "Continue finding missing consuming segment only with ideal
state information.",
+ streamConfig.getTopicName(), streamConfig.getTableNameWithType());
+ }
+ }
+
+ @VisibleForTesting
+ MissingConsumingSegmentFinder(String realtimeTableName,
SegmentMetadataFetcher segmentMetadataFetcher,
+ Map<Integer, StreamPartitionMsgOffset>
partitionGroupIdToLargestStreamOffsetMap,
+ StreamPartitionMsgOffsetFactory streamPartitionMsgOffsetFactory) {
+ _realtimeTableName = realtimeTableName;
+ _segmentMetadataFetcher = segmentMetadataFetcher;
+ _partitionGroupIdToLargestStreamOffsetMap =
partitionGroupIdToLargestStreamOffsetMap;
+ _streamPartitionMsgOffsetFactory = streamPartitionMsgOffsetFactory;
+ }
+
+ public void findAndEmitMetrics(IdealState idealState) {
+ MissingSegmentInfo info =
findMissingSegments(idealState.getRecord().getMapFields(), Instant.now());
+ _controllerMetrics.setValueOfTableGauge(_realtimeTableName,
ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT,
+ info._totalCount);
+ _controllerMetrics
+ .setValueOfTableGauge(_realtimeTableName,
ControllerGauge.MISSING_CONSUMING_SEGMENT_NEW_PARTITION_COUNT,
+ info._newPartitionGroupCount);
+ _controllerMetrics
+ .setValueOfTableGauge(_realtimeTableName,
ControllerGauge.MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES,
+ info._maxDurationInMinutes);
+ }
+
+ @VisibleForTesting
+ MissingSegmentInfo findMissingSegments(Map<String, Map<String, String>>
idealStateMap, Instant now) {
+ // create the maps
+ Map<Integer, LLCSegmentName> partitionGroupIdToLatestConsumingSegmentMap =
new HashMap<>();
+ Map<Integer, LLCSegmentName> partitionGroupIdToLatestCompletedSegmentMap =
new HashMap<>();
+ idealStateMap.forEach((segmentName, instanceToStatusMap) -> {
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ if (llcSegmentName != null) { // Skip the uploaded realtime segments
that don't conform to llc naming
+ if (instanceToStatusMap.containsValue(SegmentStateModel.CONSUMING)) {
+ updateMap(partitionGroupIdToLatestConsumingSegmentMap,
llcSegmentName);
+ } else if
(instanceToStatusMap.containsValue(SegmentStateModel.ONLINE)) {
+ updateMap(partitionGroupIdToLatestCompletedSegmentMap,
llcSegmentName);
+ }
+ }
+ });
+
+ MissingSegmentInfo missingSegmentInfo = new MissingSegmentInfo();
+ if (!_partitionGroupIdToLargestStreamOffsetMap.isEmpty()) {
+ _partitionGroupIdToLargestStreamOffsetMap.forEach((partitionGroupId,
largestStreamOffset) -> {
+ if
(!partitionGroupIdToLatestConsumingSegmentMap.containsKey(partitionGroupId)) {
+ LLCSegmentName latestCompletedSegment =
partitionGroupIdToLatestCompletedSegmentMap.get(partitionGroupId);
+ if (latestCompletedSegment == null) {
+ // There's no consuming or completed segment for this partition
group. Possibilities:
+ // 1) it's a new partition group that has not yet been detected
+ // 2) the first consuming segment has been deleted from ideal
state manually
+ missingSegmentInfo._newPartitionGroupCount++;
+ missingSegmentInfo._totalCount++;
+ } else {
+ // Completed segment is available, but there's no consuming
segment.
+ // Note that there is no problem in case the partition group has
reached its end of life.
+ SegmentZKMetadata segmentZKMetadata = _segmentMetadataFetcher
+ .fetchSegmentZkMetadata(_realtimeTableName,
latestCompletedSegment.getSegmentName());
+ StreamPartitionMsgOffset completedSegmentEndOffset =
+
_streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
+ if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) {
+ // there are unconsumed messages available on the stream
+ missingSegmentInfo._totalCount++;
+ updateMaxDurationInfo(missingSegmentInfo, partitionGroupId,
segmentZKMetadata.getCreationTime(), now);
+ }
+ }
+ }
+ });
+ } else {
+ partitionGroupIdToLatestCompletedSegmentMap.forEach((partitionGroupId,
latestCompletedSegment) -> {
+ if
(!partitionGroupIdToLatestConsumingSegmentMap.containsKey(partitionGroupId)) {
+ missingSegmentInfo._totalCount++;
+ long segmentCompletionTimeMillis = _segmentMetadataFetcher
+ .fetchSegmentCompletionTime(_realtimeTableName,
latestCompletedSegment.getSegmentName());
+ updateMaxDurationInfo(missingSegmentInfo, partitionGroupId,
segmentCompletionTimeMillis, now);
+ }
+ });
+ }
+ return missingSegmentInfo;
+ }
+
+ private void updateMaxDurationInfo(MissingSegmentInfo missingSegmentInfo,
Integer partitionGroupId,
+ long segmentCompletionTimeMillis, Instant now) {
+ long duration =
Duration.between(Instant.ofEpochMilli(segmentCompletionTimeMillis),
now).toMinutes();
+ if (duration > missingSegmentInfo._maxDurationInMinutes) {
+ missingSegmentInfo._maxDurationInMinutes = duration;
+ }
+ LOGGER.warn("PartitionGroupId {} hasn't had a consuming segment for {}
minutes!", partitionGroupId, duration);
+ }
+
+ private void updateMap(Map<Integer, LLCSegmentName>
partitionGroupIdToLatestSegmentMap,
+ LLCSegmentName llcSegmentName) {
+ int partitionGroupId = llcSegmentName.getPartitionGroupId();
+ partitionGroupIdToLatestSegmentMap.compute(partitionGroupId, (pid,
existingSegment) -> {
+ if (existingSegment == null) {
+ return llcSegmentName;
+ } else {
+ return existingSegment.getSequenceNumber() >
llcSegmentName.getSequenceNumber() ? existingSegment
+ : llcSegmentName;
+ }
+ });
+ }
+
+ @VisibleForTesting
+ static class MissingSegmentInfo {
+ long _totalCount;
+ long _newPartitionGroupCount;
+ long _maxDurationInMinutes;
+ }
+
+ static class SegmentMetadataFetcher {
+ private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private ControllerMetrics _controllerMetrics;
+
+ public SegmentMetadataFetcher(ZkHelixPropertyStore<ZNRecord>
propertyStore, ControllerMetrics controllerMetrics) {
+ _propertyStore = propertyStore;
+ _controllerMetrics = controllerMetrics;
+ }
+
+ public SegmentZKMetadata fetchSegmentZkMetadata(String tableName, String
segmentName) {
+ return fetchSegmentZkMetadata(tableName, segmentName, null);
+ }
+
+ public long fetchSegmentCompletionTime(String tableName, String
segmentName) {
+ Stat stat = new Stat();
+ fetchSegmentZkMetadata(tableName, segmentName, stat);
+ return stat.getMtime();
+ }
+
+ private SegmentZKMetadata fetchSegmentZkMetadata(String tableName, String
segmentName, Stat stat) {
+ try {
+ ZNRecord znRecord = _propertyStore
+
.get(ZKMetadataProvider.constructPropertyStorePathForSegment(tableName,
segmentName), stat,
+ AccessOption.PERSISTENT);
+ Preconditions.checkState(znRecord != null, "Failed to find segment ZK
metadata for segment: %s of table: %s",
+ segmentName, tableName);
+ return new SegmentZKMetadata(znRecord);
+ } catch (Exception e) {
+ _controllerMetrics.addMeteredTableValue(tableName,
ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
+ throw e;
+ }
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 75a3b84f93..7c1e5872f0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -862,9 +862,6 @@ public class PinotLLCRealtimeSegmentManager {
* If so, it should create a new CONSUMING segment for the partition.
* (this operation is done only if @param recreateDeletedConsumingSegment is
set to true,
* which means it's manually triggered by admin not by automatic periodic
task)
- *
- * TODO: We need to find a place to detect and update a gauge for
nonConsumingPartitionsCount for a table, and
- * reset it to 0 at the end of validateLLC
*/
public void ensureAllPartitionsConsuming(TableConfig tableConfig,
PartitionLevelStreamConfig streamConfig,
boolean recreateDeletedConsumingSegment) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index 7d788fc072..00ad93bbe0 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -18,10 +18,12 @@
*/
package org.apache.pinot.controller.helix;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -52,10 +54,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -181,7 +180,7 @@ public class SegmentStatusCheckerTest {
allTableNames.add(tableName);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn").setLLC(true)
- .setNumReplicas(3).build();
+ .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()).build();
final LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0,
System.currentTimeMillis());
final LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1,
System.currentTimeMillis());
final LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1,
System.currentTimeMillis());
@@ -217,6 +216,9 @@ public class SegmentStatusCheckerTest {
when(_helixResourceManager.getAllTables()).thenReturn(allTableNames);
when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView);
+ ZNRecord znRecord = new ZNRecord("0");
+ znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET,
"10000");
+ when(_helixPropertyStore.get(anyString(), any(),
anyInt())).thenReturn(znRecord);
}
{
_config = mock(ControllerConf.class);
@@ -251,6 +253,18 @@ public class SegmentStatusCheckerTest {
100);
Assert.assertEquals(
_controllerMetrics.getValueOfTableGauge(externalView.getId(),
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+ Assert.assertEquals(_controllerMetrics
+ .getValueOfTableGauge(externalView.getId(),
ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2);
+ }
+
+ Map<String, String> getStreamConfigMap() {
+ return ImmutableMap.of(
+ "streamType", "kafka",
+ "stream.kafka.consumer.type", "simple",
+ "stream.kafka.topic.name", "test",
+ "stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
+ "stream.kafka.consumer.factory.class.name",
+
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
}
@Test
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinderTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinderTest.java
new file mode 100644
index 0000000000..1b14592916
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinderTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.realtime;
+
+import com.google.common.collect.ImmutableMap;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+public class MissingConsumingSegmentFinderTest {
+
+ private StreamPartitionMsgOffsetFactory _offsetFactory = new
LongMsgOffsetFactory();
+
+ @Test
+ public void noMissingConsumingSegmentsScenario1() {
+ // scenario 1: no missing segments, but connecting to stream throws
exception
+ // only ideal state info is used
+
+ Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+ // partition 0
+ idealStateMap.put("tableA__0__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 1
+ idealStateMap.put("tableA__1__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__1__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__1__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 2
+ idealStateMap.put("tableA__2__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 3
+ idealStateMap.put("tableA__3__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__3__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__3__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+
+ Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+ MissingConsumingSegmentFinder finder = new
MissingConsumingSegmentFinder("tableA", null, new HashMap<>(), null);
+ MissingConsumingSegmentFinder.MissingSegmentInfo info =
finder.findMissingSegments(idealStateMap, now);
+ assertEquals(info._totalCount, 0);
+ assertEquals(info._newPartitionGroupCount, 0);
+ assertEquals(info._maxDurationInMinutes, 0);
+ }
+
+ @Test
+ public void noMissingConsumingSegmentsScenario2() {
+ // scenario 2: no missing segments and there's no exception in connecting
to stream
+
+ Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+ // partition 0
+ idealStateMap.put("tableA__0__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 1
+ idealStateMap.put("tableA__1__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__1__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__1__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 2
+ idealStateMap.put("tableA__2__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 3
+ idealStateMap.put("tableA__3__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__3__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__3__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+
+ Map<Integer, StreamPartitionMsgOffset>
partitionGroupIdToLargestStreamOffsetMap = ImmutableMap.of(
+ 0, new LongMsgOffset(1000),
+ 1, new LongMsgOffset(1001),
+ 2, new LongMsgOffset(1002),
+ 3, new LongMsgOffset(1003)
+ );
+
+ Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+ MissingConsumingSegmentFinder finder =
+ new MissingConsumingSegmentFinder("tableA", null,
partitionGroupIdToLargestStreamOffsetMap, null);
+ MissingConsumingSegmentFinder.MissingSegmentInfo info =
finder.findMissingSegments(idealStateMap, now);
+ assertEquals(info._totalCount, 0);
+ assertEquals(info._newPartitionGroupCount, 0);
+ assertEquals(info._maxDurationInMinutes, 0);
+ }
+
+ @Test
+ public void noMissingConsumingSegmentsScenario3() {
+ // scenario 3: no missing segments and there's no exception in connecting
to stream
+ // two partitions have reached end of life
+
+ Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+ // partition 0
+ idealStateMap.put("tableA__0__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 1 (has reached end of life)
+ idealStateMap.put("tableA__1__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__1__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ // partition 2
+ idealStateMap.put("tableA__2__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 3 (has reached end of life)
+ idealStateMap.put("tableA__3__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__3__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+
+ Map<Integer, StreamPartitionMsgOffset>
partitionGroupIdToLargestStreamOffsetMap = ImmutableMap.of(
+ 0, new LongMsgOffset(1000),
+ 1, new LongMsgOffset(701),
+ 2, new LongMsgOffset(1002),
+ 3, new LongMsgOffset(703)
+ );
+
+ // setup segment metadata fetcher
+ SegmentZKMetadata m1 = mock(SegmentZKMetadata.class);
+ when(m1.getEndOffset()).thenReturn("701");
+ SegmentZKMetadata m3 = mock(SegmentZKMetadata.class);
+ when(m3.getEndOffset()).thenReturn("703");
+ MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher =
+ mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class);
+ when(metadataFetcher.fetchSegmentZkMetadata("tableA",
"tableA__1__1__20220601T1200Z")).thenReturn(m1);
+ when(metadataFetcher.fetchSegmentZkMetadata("tableA",
"tableA__3__1__20220601T1200Z")).thenReturn(m3);
+
+ Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+ MissingConsumingSegmentFinder finder =
+ new MissingConsumingSegmentFinder("tableA", metadataFetcher,
partitionGroupIdToLargestStreamOffsetMap,
+ _offsetFactory);
+ MissingConsumingSegmentFinder.MissingSegmentInfo info =
finder.findMissingSegments(idealStateMap, now);
+ assertEquals(info._totalCount, 0);
+ assertEquals(info._newPartitionGroupCount, 0);
+ assertEquals(info._maxDurationInMinutes, 0);
+ }
+
+ @Test
+ public void noMissingConsumingSegmentsScenario4() {
+ // scenario 4: no missing segments, but connecting to stream throws
exception
+ // two partitions have reached end of life
+ // since there's no way to detect if the partitions have reached end of
life, those partitions are reported as
+ // missing consuming segments
+
+ Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+ // partition 0
+ idealStateMap.put("tableA__0__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 1 (has reached end of life)
+ idealStateMap.put("tableA__1__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__1__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ // partition 2
+ idealStateMap.put("tableA__2__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 3
+ idealStateMap.put("tableA__3__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__3__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__3__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 4 (has reached end of life)
+ idealStateMap.put("tableA__4__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ // partition 5
+ idealStateMap.put("tableA__5__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__5__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__5__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+
+ // setup segment metadata fetcher
+ MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher =
+ mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class);
+ when(metadataFetcher.fetchSegmentCompletionTime("tableA",
"tableA__1__1__20220601T1200Z"))
+ .thenReturn(Instant.parse("2022-06-01T15:00:00.00Z").toEpochMilli());
+ when(metadataFetcher.fetchSegmentCompletionTime("tableA",
"tableA__4__0__20220601T0900Z"))
+ .thenReturn(Instant.parse("2022-06-01T12:00:00.00Z").toEpochMilli());
+
+ Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+ MissingConsumingSegmentFinder finder =
+ new MissingConsumingSegmentFinder("tableA", metadataFetcher, new
HashMap<>(), null);
+ MissingConsumingSegmentFinder.MissingSegmentInfo info =
finder.findMissingSegments(idealStateMap, now);
+ assertEquals(info._totalCount, 2);
+ assertEquals(info._newPartitionGroupCount, 0);
+ assertEquals(info._maxDurationInMinutes, 6 * 60); // (18:00:00 - 12:00:00)
in minutes
+ }
+
+ @Test
+ public void missingConsumingSegments() {
+
+ Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+ // partition 0
+ idealStateMap.put("tableA__0__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 1 (missing consuming segment)
+ idealStateMap.put("tableA__1__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__1__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ // partition 2
+ idealStateMap.put("tableA__2__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 3
+ idealStateMap.put("tableA__3__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__3__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__3__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 4 (missing consuming segment)
+ idealStateMap.put("tableA__4__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ // partition 5
+ idealStateMap.put("tableA__5__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__5__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__5__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 6 is a new partition and there's no consuming segment in
ideal states for it
+
+ Map<Integer, StreamPartitionMsgOffset>
partitionGroupIdToLargestStreamOffsetMap = new HashMap<>();
+ partitionGroupIdToLargestStreamOffsetMap.put(0, new LongMsgOffset(1000));
+ partitionGroupIdToLargestStreamOffsetMap.put(1, new LongMsgOffset(1001));
+ partitionGroupIdToLargestStreamOffsetMap.put(2, new LongMsgOffset(1002));
+ partitionGroupIdToLargestStreamOffsetMap.put(3, new LongMsgOffset(1003));
+ partitionGroupIdToLargestStreamOffsetMap.put(4, new LongMsgOffset(1004));
+ partitionGroupIdToLargestStreamOffsetMap.put(5, new LongMsgOffset(1005));
+ partitionGroupIdToLargestStreamOffsetMap.put(6, new LongMsgOffset(16));
+
+ // setup segment metadata fetcher
+ SegmentZKMetadata m1 = mock(SegmentZKMetadata.class);
+ when(m1.getEndOffset()).thenReturn("701");
+
when(m1.getCreationTime()).thenReturn(Instant.parse("2022-06-01T15:00:00.00Z").toEpochMilli());
+ SegmentZKMetadata m4 = mock(SegmentZKMetadata.class);
+ when(m4.getEndOffset()).thenReturn("704");
+
when(m4.getCreationTime()).thenReturn(Instant.parse("2022-06-01T12:00:00.00Z").toEpochMilli());
+ MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher =
+ mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class);
+ when(metadataFetcher.fetchSegmentZkMetadata("tableA",
"tableA__1__1__20220601T1200Z")).thenReturn(m1);
+ when(metadataFetcher.fetchSegmentZkMetadata("tableA",
"tableA__4__0__20220601T0900Z")).thenReturn(m4);
+
+ Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+ MissingConsumingSegmentFinder finder =
+ new MissingConsumingSegmentFinder("tableA", metadataFetcher,
partitionGroupIdToLargestStreamOffsetMap,
+ _offsetFactory);
+ MissingConsumingSegmentFinder.MissingSegmentInfo info =
finder.findMissingSegments(idealStateMap, now);
+ assertEquals(info._totalCount, 3);
+ assertEquals(info._newPartitionGroupCount, 1);
+ assertEquals(info._maxDurationInMinutes, 6 * 60); // (18:00:00 - 12:00:00)
in minutes
+ }
+
+ @Test
+ public void missingConsumingSegmentsWithStreamConnectionIssue() {
+
+ Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+ // partition 0
+ idealStateMap.put("tableA__0__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__0__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 1 (missing consuming segment)
+ idealStateMap.put("tableA__1__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__1__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ // partition 2
+ idealStateMap.put("tableA__2__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__2__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 3
+ idealStateMap.put("tableA__3__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__3__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__3__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 4 (missing consuming segment)
+ idealStateMap.put("tableA__4__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ // partition 5
+ idealStateMap.put("tableA__5__0__20220601T0900Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__5__1__20220601T1200Z",
ImmutableMap.of("ServerX", "ONLINE", "ServerY", "ONLINE"));
+ idealStateMap.put("tableA__5__2__20220601T1500Z",
ImmutableMap.of("ServerX", "CONSUMING", "ServerY", "CONSUMING"));
+ // partition 6 is a new partition and there's no consuming segment in
ideal states for it
+
+ // setup segment metadata fetcher
+ MissingConsumingSegmentFinder.SegmentMetadataFetcher metadataFetcher =
+ mock(MissingConsumingSegmentFinder.SegmentMetadataFetcher.class);
+ when(metadataFetcher.fetchSegmentCompletionTime("tableA",
"tableA__1__1__20220601T1200Z"))
+ .thenReturn(Instant.parse("2022-06-01T15:00:00.00Z").toEpochMilli());
+ when(metadataFetcher.fetchSegmentCompletionTime("tableA",
"tableA__4__0__20220601T0900Z"))
+ .thenReturn(Instant.parse("2022-06-01T12:00:00.00Z").toEpochMilli());
+
+ Instant now = Instant.parse("2022-06-01T18:00:00.00Z");
+ MissingConsumingSegmentFinder finder =
+ new MissingConsumingSegmentFinder("tableA", metadataFetcher, new
HashMap<>(), _offsetFactory);
+ MissingConsumingSegmentFinder.MissingSegmentInfo info =
finder.findMissingSegments(idealStateMap, now);
+ assertEquals(info._totalCount, 2);
+ assertEquals(info._newPartitionGroupCount, 0);
+ assertEquals(info._maxDurationInMinutes, 6 * 60); // (18:00:00 - 12:00:00)
in minutes
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]