This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8e10320595 handle absent segments so that catchup checker doesn't get
stuck on them (#12883)
8e10320595 is described below
commit 8e103205955e8af4fe286ebd6e97b30605724be2
Author: Xiaobing <[email protected]>
AuthorDate: Mon Apr 22 16:41:46 2024 -0700
handle absent segments so that catchup checker doesn't get stuck on them
(#12883)
* skip missing segments while checking freshness during server startup
* get new consuming segments again if current consuming segments are
committed by other servers
---
.../server/starter/helix/BaseServerStarter.java | 71 +++++++-----
.../FreshnessBasedConsumptionStatusChecker.java | 7 +-
.../IngestionBasedConsumptionStatusChecker.java | 128 ++++++++++++++-------
.../helix/OffsetBasedConsumptionStatusChecker.java | 7 +-
.../helix/ConsumptionStatusCheckerTestUtils.java | 38 ++++++
...FreshnessBasedConsumptionStatusCheckerTest.java | 103 ++++++++++++++---
.../OffsetBasedConsumptionStatusCheckerTest.java | 32 ++++--
7 files changed, 288 insertions(+), 98 deletions(-)
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 02c7b81ea5..78cd1a14e7 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -153,8 +154,8 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_helixClusterName =
_serverConf.getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
ServiceStartableUtils.applyClusterConfig(_serverConf, _zkAddress,
_helixClusterName, ServiceRole.SERVER);
- PinotInsecureMode.setPinotInInsecureMode(
-
Boolean.valueOf(_serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
+ PinotInsecureMode.setPinotInInsecureMode(Boolean.parseBoolean(
+ _serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
CommonConstants.DEFAULT_PINOT_INSECURE_MODE)));
setupHelixSystemProperties();
@@ -275,8 +276,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
// collect all resources which have this instance in the ideal state
List<String> resourcesToMonitor = new ArrayList<>();
-
- Set<String> consumingSegments = new HashSet<>();
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
boolean checkRealtime = realtimeConsumptionCatchupWaitMs > 0;
if (isFreshnessStatusCheckerEnabled && realtimeMinFreshnessMs <= 0) {
LOGGER.warn("Realtime min freshness {} must be > 0. Setting relatime min
freshness to default {}.",
@@ -289,23 +289,22 @@ public abstract class BaseServerStarter implements
ServiceStartable {
if (!TableNameBuilder.isTableResource(resourceName)) {
continue;
}
-
// Only monitor enabled resources
IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, resourceName);
- if (idealState.isEnabled()) {
-
- for (String partitionName : idealState.getPartitionSet()) {
- if (idealState.getInstanceSet(partitionName).contains(_instanceId)) {
- resourcesToMonitor.add(resourceName);
- break;
- }
+ if (idealState == null || !idealState.isEnabled()) {
+ continue;
+ }
+ for (String partitionName : idealState.getPartitionSet()) {
+ if (idealState.getInstanceSet(partitionName).contains(_instanceId)) {
+ resourcesToMonitor.add(resourceName);
+ break;
}
- if (checkRealtime &&
TableNameBuilder.isRealtimeTableResource(resourceName)) {
- for (String partitionName : idealState.getPartitionSet()) {
- if (StateModel.SegmentStateModel.CONSUMING.equals(
-
idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
- consumingSegments.add(partitionName);
- }
+ }
+ if (checkRealtime &&
TableNameBuilder.isRealtimeTableResource(resourceName)) {
+ for (String partitionName : idealState.getPartitionSet()) {
+ if (StateModel.SegmentStateModel.CONSUMING.equals(
+ idealState.getInstanceStateMap(partitionName).get(_instanceId)))
{
+ consumingSegments.computeIfAbsent(resourceName, k -> new
HashSet<>()).add(partitionName);
}
}
}
@@ -332,7 +331,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
realtimeMinFreshnessMs, idleTimeoutMs);
FreshnessBasedConsumptionStatusChecker freshnessStatusChecker =
new
FreshnessBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(),
consumingSegments,
- realtimeMinFreshnessMs, idleTimeoutMs);
+ this::getConsumingSegments, realtimeMinFreshnessMs,
idleTimeoutMs);
Supplier<Integer> getNumConsumingSegmentsNotReachedMinFreshness =
freshnessStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
serviceStatusCallbackListBuilder.add(
@@ -341,7 +340,8 @@ public abstract class BaseServerStarter implements
ServiceStartable {
} else if (isOffsetBasedConsumptionStatusCheckerEnabled) {
LOGGER.info("Setting up offset based status checker");
OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
- new
OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(),
consumingSegments);
+ new
OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(),
consumingSegments,
+ this::getConsumingSegments);
Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset =
consumptionStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
serviceStatusCallbackListBuilder.add(
@@ -359,6 +359,22 @@ public abstract class BaseServerStarter implements
ServiceStartable {
new
ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
}
+ @Nullable
+ private Set<String> getConsumingSegments(String realtimeTableName) {
+ IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, realtimeTableName);
+ if (idealState == null || !idealState.isEnabled()) {
+ return null;
+ }
+ Set<String> consumingSegments = new HashSet<>();
+ for (String partitionName : idealState.getPartitionSet()) {
+ if (StateModel.SegmentStateModel.CONSUMING.equals(
+ idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
+ consumingSegments.add(partitionName);
+ }
+ }
+ return consumingSegments;
+ }
+
private void updateInstanceConfigIfNeeded(ServerConf serverConf) {
InstanceConfig instanceConfig =
HelixHelper.getInstanceConfig(_helixManager, _instanceId);
@@ -518,12 +534,13 @@ public abstract class BaseServerStarter implements
ServiceStartable {
}
}
- boolean exitServerOnIncompleteStartup = _serverConf.getProperty(
- Server.CONFIG_OF_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE,
- Server.DEFAULT_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE);
+ boolean exitServerOnIncompleteStartup =
+
_serverConf.getProperty(Server.CONFIG_OF_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE,
+ Server.DEFAULT_EXIT_ON_SERVICE_STATUS_CHECK_FAILURE);
if (exitServerOnIncompleteStartup) {
- String errorMessage = String.format("Service status %s has not turned
GOOD within %dms: %s. Exiting server.",
- serviceStatus, System.currentTimeMillis() - startTimeMs,
ServiceStatus.getStatusDescription());
+ String errorMessage =
+ String.format("Service status %s has not turned GOOD within %dms:
%s. Exiting server.", serviceStatus,
+ System.currentTimeMillis() - startTimeMs,
ServiceStatus.getStatusDescription());
throw new IllegalStateException(errorMessage);
}
LOGGER.warn("Service status has not turned GOOD within {}ms: {}",
System.currentTimeMillis() - startTimeMs,
@@ -581,8 +598,8 @@ public abstract class BaseServerStarter implements
ServiceStartable {
InstanceDataManager instanceDataManager =
_serverInstance.getInstanceDataManager();
instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() ->
_isServerReadyToServeQueries);
// initialize the thread accountant for query killing
- Tracing.ThreadAccountantOps
-
.initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX),
_instanceId);
+ Tracing.ThreadAccountantOps.initializeThreadAccountant(
+ _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX),
_instanceId);
initSegmentFetcher(_serverConf);
StateModelFactory<?> stateModelFactory =
new SegmentOnlineOfflineStateModelFactory(_instanceId,
instanceDataManager);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 6f3610e596..77eac3832e 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -19,7 +19,9 @@
package org.apache.pinot.server.starter.helix;
+import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -37,9 +39,10 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
private final long _minFreshnessMs;
private final long _idleTimeoutMs;
- public FreshnessBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager, Set<String> consumingSegments,
+ public FreshnessBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
+ Map<String, Set<String>> consumingSegments, Function<String,
Set<String>> consumingSegmentsSupplier,
long minFreshnessMs, long idleTimeoutMs) {
- super(instanceDataManager, consumingSegments);
+ super(instanceDataManager, consumingSegments, consumingSegmentsSupplier);
_minFreshnessMs = minFreshnessMs;
_idleTimeoutMs = idleTimeoutMs;
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
index 83de35a63c..c6fe0d16d6 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
@@ -19,15 +19,16 @@
package org.apache.pinot.server.starter.helix;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
-import org.apache.pinot.common.utils.LLCSegmentName;
+import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,64 +36,103 @@ import org.slf4j.LoggerFactory;
public abstract class IngestionBasedConsumptionStatusChecker {
protected final Logger _logger = LoggerFactory.getLogger(getClass());
- // constructor parameters
- protected final InstanceDataManager _instanceDataManager;
- protected final Set<String> _consumingSegments;
-
- // helper variable
- private final Set<String> _caughtUpSegments = new HashSet<>();
+ private final InstanceDataManager _instanceDataManager;
+ private final Map<String, Set<String>> _consumingSegmentsByTable;
+ private final Map<String, Set<String>> _caughtUpSegmentsByTable = new
HashMap<>();
+ private final Function<String, Set<String>> _consumingSegmentsSupplier;
+ /**
+ * Both consumingSegmentsByTable and consumingSegmentsSupplier are provided
as it can be costly to get
+ * consumingSegmentsByTable via the supplier, so only use it when any
missing segment is detected.
+ */
public IngestionBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
- Set<String> consumingSegments) {
+ Map<String, Set<String>> consumingSegmentsByTable, Function<String,
Set<String>> consumingSegmentsSupplier) {
_instanceDataManager = instanceDataManager;
- _consumingSegments = consumingSegments;
+ _consumingSegmentsByTable = consumingSegmentsByTable;
+ _consumingSegmentsSupplier = consumingSegmentsSupplier;
}
- public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
- for (String segName : _consumingSegments) {
- if (_caughtUpSegments.contains(segName)) {
- continue;
- }
- TableDataManager tableDataManager = getTableDataManager(segName);
+ // This might be called by multiple threads, thus synchronized to be correct.
+ public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria()
{
+ // If the checker found any consuming segments are missing or committed
for a table, it should reset the set of
+ // consuming segments for the table to continue to monitor the freshness,
otherwise the checker might get stuck
+ // on deleted segments or tables, or miss new consuming segments created
in the table and get ready prematurely.
+ Set<String> tablesToRefresh = new HashSet<>();
+ Iterator<Map.Entry<String, Set<String>>> itr =
_consumingSegmentsByTable.entrySet().iterator();
+ while (itr.hasNext()) {
+ Map.Entry<String, Set<String>> tableSegments = itr.next();
+ String tableNameWithType = tableSegments.getKey();
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
if (tableDataManager == null) {
- _logger.info("TableDataManager is not yet setup for segment {}. Will
check consumption status later", segName);
+ _logger.info("No tableDataManager for table: {}. Refresh table's
consuming segments", tableNameWithType);
+ tablesToRefresh.add(tableNameWithType);
continue;
}
- SegmentDataManager segmentDataManager = null;
- try {
- segmentDataManager = tableDataManager.acquireSegment(segName);
- if (segmentDataManager == null) {
- _logger.info("SegmentDataManager is not yet setup for segment {}.
Will check consumption status later",
- segName);
+ Set<String> consumingSegments = tableSegments.getValue();
+ Set<String> caughtUpSegments =
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new
HashSet<>());
+ for (String segName : consumingSegments) {
+ if (caughtUpSegments.contains(segName)) {
continue;
}
- if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
- // There's a possibility that a consuming segment has converted to a
committed segment. If that's the case,
- // segment data manager will not be of type
RealtimeSegmentDataManager.
- _logger.info("Segment {} is already committed and is considered
caught up.", segName);
- _caughtUpSegments.add(segName);
+ SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segName);
+ if (segmentDataManager == null) {
+ _logger.info("No segmentDataManager for segment: {} from table: {}.
Refresh table's consuming segments",
+ segName, tableNameWithType);
+ tablesToRefresh.add(tableNameWithType);
continue;
}
-
- RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
- _caughtUpSegments.add(segName);
- }
- } finally {
- if (segmentDataManager != null) {
+ try {
+ if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
+ // It's possible that the consuming segment has been committed by
another server. In this case, we should
+ // get the new consuming segments for the table and continue to
monitor their consumption status, until the
+ // current server catches up the consuming segments.
+ _logger.info("Segment: {} from table: {} is already committed.
Refresh table's consuming segments.",
+ segName, tableNameWithType);
+ tablesToRefresh.add(tableNameWithType);
+ continue;
+ }
+ RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
+ if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
+ caughtUpSegments.add(segName);
+ }
+ } finally {
tableDataManager.releaseSegment(segmentDataManager);
}
}
+ int numLaggingSegments = consumingSegments.size() -
caughtUpSegments.size();
+ if (numLaggingSegments == 0) {
+ _logger.info("Consuming segments from table: {} have all caught up",
tableNameWithType);
+ itr.remove();
+ _caughtUpSegmentsByTable.remove(tableNameWithType);
+ }
+ }
+ if (!tablesToRefresh.isEmpty()) {
+ for (String tableNameWithType : tablesToRefresh) {
+ Set<String> updatedConsumingSegments =
_consumingSegmentsSupplier.apply(tableNameWithType);
+ if (updatedConsumingSegments == null ||
updatedConsumingSegments.isEmpty()) {
+ _consumingSegmentsByTable.remove(tableNameWithType);
+ _caughtUpSegmentsByTable.remove(tableNameWithType);
+ _logger.info("Found no consuming segments from table: {}, which is
probably removed", tableNameWithType);
+ } else {
+ _consumingSegmentsByTable.put(tableNameWithType,
updatedConsumingSegments);
+ _caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new
HashSet<>())
+ .retainAll(updatedConsumingSegments);
+ _logger.info(
+ "Updated consumingSegments: {} and caughtUpSegments: {} for
table: {}, as consuming segments were "
+ + "missing or committed", updatedConsumingSegments,
_caughtUpSegmentsByTable.get(tableNameWithType),
+ tableNameWithType);
+ }
+ }
}
- return _consumingSegments.size() - _caughtUpSegments.size();
+ int numLaggingSegments = 0;
+ for (Map.Entry<String, Set<String>> tableSegments :
_consumingSegmentsByTable.entrySet()) {
+ String tableNameWithType = tableSegments.getKey();
+ Set<String> consumingSegments = tableSegments.getValue();
+ Set<String> caughtUpSegments =
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new
HashSet<>());
+ numLaggingSegments += consumingSegments.size() - caughtUpSegments.size();
+ }
+ return numLaggingSegments;
}
protected abstract boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager);
-
- private TableDataManager getTableDataManager(String segmentName) {
- LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
- String tableName = llcSegmentName.getTableName();
- String tableNameWithType =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
- return _instanceDataManager.getTableDataManager(tableNameWithType);
- }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
index 6b597e3fa2..ad7d2905ba 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
@@ -19,7 +19,9 @@
package org.apache.pinot.server.starter.helix;
+import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -34,8 +36,9 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
*/
public class OffsetBasedConsumptionStatusChecker extends
IngestionBasedConsumptionStatusChecker {
- public OffsetBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager, Set<String> consumingSegments) {
- super(instanceDataManager, consumingSegments);
+ public OffsetBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
+ Map<String, Set<String>> consumingSegments, Function<String,
Set<String>> consumingSegmentsSupplier) {
+ super(instanceDataManager, consumingSegments, consumingSegmentsSupplier);
}
@Override
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/ConsumptionStatusCheckerTestUtils.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/ConsumptionStatusCheckerTestUtils.java
new file mode 100644
index 0000000000..ccd8f6f855
--- /dev/null
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/ConsumptionStatusCheckerTestUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+
+class ConsumptionStatusCheckerTestUtils {
+ private ConsumptionStatusCheckerTestUtils() {
+ }
+
+ public static Function<String, Set<String>> getConsumingSegments(Map<String,
Set<String>> consumingSegments) {
+ // Create a new Set instance to keep updates separated from the
consumingSegments.
+ return (tableName) -> {
+ Set<String> updated = consumingSegments.get(tableName);
+ return updated == null ? null : new HashSet<>(updated);
+ };
+ }
+}
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index 6301b54d04..e619ba7d70 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -20,7 +20,11 @@
package org.apache.pinot.server.starter.helix;
import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
@@ -42,8 +46,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
private final long _now;
public FakeFreshnessBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
- Set<String> consumingSegments, long minFreshnessMs, long
idleTimeoutMs, long now) {
- super(instanceDataManager, consumingSegments, minFreshnessMs,
idleTimeoutMs);
+ Map<String, Set<String>> consumingSegments, Function<String,
Set<String>> consumingSegmentsSupplier,
+ long minFreshnessMs, long idleTimeoutMs, long now) {
+ super(instanceDataManager, consumingSegments, consumingSegmentsSupplier,
minFreshnessMs, idleTimeoutMs);
_now = now;
}
@@ -58,10 +63,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
- Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+ consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10000L, 0L);
+ new FreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments),
10000L, 0L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -119,6 +127,55 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
+ @Test
+ public void testWithDroppedTableAndSegment()
+ throws InterruptedException {
+ String segA0 = "tableA__0__0__123Z";
+ String segA1 = "tableA__1__0__123Z";
+ String segB0 = "tableB__0__0__123Z";
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.computeIfAbsent("tableA_REALTIME", k -> new
HashSet<>()).add(segA0);
+ consumingSegments.computeIfAbsent("tableA_REALTIME", k -> new
HashSet<>()).add(segA1);
+ consumingSegments.computeIfAbsent("tableB_REALTIME", k -> new
HashSet<>()).add(segB0);
+ InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+ FreshnessBasedConsumptionStatusChecker statusChecker =
+ new FreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L);
+
+ // TableDataManager is not set up yet
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
+
+ // setup TableDataMangers
+ TableDataManager tableDataManagerA = mock(TableDataManager.class);
+
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(null);
+
+ // setup SegmentDataManagers
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+ when(tableDataManagerA.acquireSegment(segA1)).thenReturn(null);
+
+ when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
+ // ensure negative values are ignored
+ setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
+
+ // current offset latest stream offset current
time last ingestion time
+ // segA0 0 20 100
Long.MIN_VALUE
+ // segA1 (segment is absent)
+ // segB0 (table is absent)
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
+
+ // updatedConsumingSegments still provide 3 segments to checker but one
has caught up.
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
2);
+ // Remove the missing segments and check again.
+ consumingSegments.get("tableA_REALTIME").remove(segA1);
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
1);
+ consumingSegments.remove("tableB_REALTIME");
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
+ }
+
private void setupLatestIngestionTimestamp(RealtimeSegmentDataManager
segmentDataManager,
long latestIngestionTimestamp) {
MutableSegment mockSegment = mock(MutableSegment.class);
@@ -133,10 +190,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
- Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+ consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, 0L, 100L);
+ new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L, 100L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -195,12 +255,14 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
- Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+ consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
long idleTimeoutMs = 10L;
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, idleTimeoutMs,
- 100L);
+ new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
idleTimeoutMs, 100L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -270,10 +332,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
- Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+ consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, 0L, 100L);
+ new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L, 100L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -319,10 +384,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
- Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+ consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, 0L, 100L);
+ new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L, 100L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -369,6 +437,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
setupLatestIngestionTimestamp(segMngrA0, 90L);
// Unexpected case where latest ingested is somehow after current time
setupLatestIngestionTimestamp(segMngrA1, 101L);
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
1);
+ consumingSegments.get("tableB_REALTIME").remove(segB0);
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
@@ -377,10 +447,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
- Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+ consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, 0L, 100L);
+ new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L, 100L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
index 88b05b8ff0..2248f731d2 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
@@ -20,6 +20,8 @@
package org.apache.pinot.server.starter.helix;
import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -41,10 +43,13 @@ public class OffsetBasedConsumptionStatusCheckerTest {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
- Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+ consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
OffsetBasedConsumptionStatusChecker statusChecker =
- new OffsetBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments);
+ new OffsetBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
// setup TableDataMangers
TableDataManager tableDataManagerA = mock(TableDataManager.class);
@@ -88,11 +93,14 @@ public class OffsetBasedConsumptionStatusCheckerTest {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
- Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+ consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
OffsetBasedConsumptionStatusChecker statusChecker =
- new OffsetBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments);
+ new OffsetBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -149,10 +157,13 @@ public class OffsetBasedConsumptionStatusCheckerTest {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
- Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+ consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
OffsetBasedConsumptionStatusChecker statusChecker =
- new OffsetBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments);
+ new OffsetBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
// setup TableDataMangers
TableDataManager tableDataManagerA = mock(TableDataManager.class);
@@ -190,6 +201,8 @@ public class OffsetBasedConsumptionStatusCheckerTest {
// segB0 committed at 1200 1500
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200));
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
1);
+ consumingSegments.get("tableB_REALTIME").remove(segB0);
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
@@ -199,10 +212,13 @@ public class OffsetBasedConsumptionStatusCheckerTest {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
- Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+ consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
OffsetBasedConsumptionStatusChecker statusChecker =
- new OffsetBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments);
+ new OffsetBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
// setup TableDataMangers
TableDataManager tableDataManagerA = mock(TableDataManager.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]