Jackie-Jiang commented on code in PR #12883:
URL: https://github.com/apache/pinot/pull/12883#discussion_r1571539714
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java:
##########
@@ -359,6 +361,20 @@ private void registerServiceStatusHandler() {
new
ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
}
+ private Set<String> getConsumingSegments(String tableName) {
+ Set<String> consumingSegments = new HashSet<>();
+ IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableName);
+ if (idealState.isEnabled() &&
TableNameBuilder.isRealtimeTableResource(tableName)) {
Review Comment:
(nit) Second check is not needed
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -37,62 +38,81 @@ public abstract class
IngestionBasedConsumptionStatusChecker {
// constructor parameters
protected final InstanceDataManager _instanceDataManager;
- protected final Set<String> _consumingSegments;
+ protected final Map<String, Set<String>> _consumingSegments;
+ protected final Function<String, Set<String>> _consumingSegmentsSupplier;
- // helper variable
- private final Set<String> _caughtUpSegments = new HashSet<>();
+ // helper variable, which is thread safe, as the method might be called from
multiple threads when the health check
+ // endpoint is called by many probes.
+ private final Set<String> _caughtUpSegments = ConcurrentHashMap.newKeySet();
+ /**
+ * Both consumingSegments and consumingSegmentsSupplier are provided as it
can be costly to get consumingSegments
+ * via the supplier, so only use it when any missing segment is detected.
+ */
public IngestionBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
- Set<String> consumingSegments) {
+ Map<String, Set<String>> consumingSegments, @Nullable Function<String,
Set<String>> consumingSegmentsSupplier) {
_instanceDataManager = instanceDataManager;
- _consumingSegments = consumingSegments;
+ _consumingSegments = new ConcurrentHashMap<>(consumingSegments);
+ _consumingSegmentsSupplier = consumingSegmentsSupplier;
}
public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
- for (String segName : _consumingSegments) {
- if (_caughtUpSegments.contains(segName)) {
- continue;
- }
- TableDataManager tableDataManager = getTableDataManager(segName);
+ Set<String> tablesWithMissingSegment = new HashSet<>();
+ for (Map.Entry<String, Set<String>> tableSegments :
_consumingSegments.entrySet()) {
+ String tableNameWithType = tableSegments.getKey();
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
if (tableDataManager == null) {
- _logger.info("TableDataManager is not yet setup for segment {}. Will
check consumption status later", segName);
+ _logger.info("No tableDataManager for table: {}. Will check
consumption status later", tableNameWithType);
+ tablesWithMissingSegment.add(tableNameWithType);
continue;
}
- SegmentDataManager segmentDataManager = null;
- try {
- segmentDataManager = tableDataManager.acquireSegment(segName);
- if (segmentDataManager == null) {
- _logger.info("SegmentDataManager is not yet setup for segment {}.
Will check consumption status later",
- segName);
+ for (String segName : tableSegments.getValue()) {
+ if (_caughtUpSegments.contains(segName)) {
continue;
}
- if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
- // There's a possibility that a consuming segment has converted to a
committed segment. If that's the case,
- // segment data manager will not be of type
RealtimeSegmentDataManager.
- _logger.info("Segment {} is already committed and is considered
caught up.", segName);
- _caughtUpSegments.add(segName);
- continue;
- }
-
- RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
- _caughtUpSegments.add(segName);
+ SegmentDataManager segmentDataManager = null;
+ try {
+ segmentDataManager = tableDataManager.acquireSegment(segName);
+ if (segmentDataManager == null) {
+ _logger.info("No SegmentDataManager for segment: {}. Will check
consumption status later", segName);
+ tablesWithMissingSegment.add(tableNameWithType);
+ continue;
+ }
+ if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
+ // There's a possibility that a consuming segment has converted to
a committed segment. If that's the case,
+ // segment data manager will not be of type
RealtimeSegmentDataManager.
+ _logger.info("Segment: {} is already committed and is considered
caught up.", segName);
+ _caughtUpSegments.add(segName);
+ continue;
+ }
+ RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
+ if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
+ _caughtUpSegments.add(segName);
+ }
+ } finally {
+ if (segmentDataManager != null) {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
}
- } finally {
- if (segmentDataManager != null) {
- tableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
+ if (!tablesWithMissingSegment.isEmpty() && _consumingSegmentsSupplier !=
null) {
+ for (String tableName : tablesWithMissingSegment) {
+ Set<String> consumingSegments =
_consumingSegmentsSupplier.apply(tableName);
+ if (consumingSegments == null || consumingSegments.isEmpty()) {
+ _consumingSegments.remove(tableName);
+ } else {
+ _consumingSegments.put(tableName, consumingSegments);
}
+ _logger.info("Found missing segments in table: {}. Updated its
consumingSegments: {}", tableName,
+ consumingSegments);
}
}
- return _consumingSegments.size() - _caughtUpSegments.size();
+ Set<String> currentConsumingSegments = new HashSet<>();
+ _consumingSegments.forEach((k, v) -> currentConsumingSegments.addAll(v));
+ _caughtUpSegments.retainAll(currentConsumingSegments);
+ return currentConsumingSegments.size() - _caughtUpSegments.size();
}
protected abstract boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager);
-
Review Comment:
Nice!
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -37,62 +38,81 @@ public abstract class
IngestionBasedConsumptionStatusChecker {
// constructor parameters
protected final InstanceDataManager _instanceDataManager;
- protected final Set<String> _consumingSegments;
+ protected final Map<String, Set<String>> _consumingSegments;
+ protected final Function<String, Set<String>> _consumingSegmentsSupplier;
- // helper variable
- private final Set<String> _caughtUpSegments = new HashSet<>();
+ // helper variable, which is thread safe, as the method might be called from
multiple threads when the health check
+ // endpoint is called by many probes.
+ private final Set<String> _caughtUpSegments = ConcurrentHashMap.newKeySet();
Review Comment:
Do we need to make it thread safe? I feel we can simply synchronize on
`getNumConsumingSegmentsNotReachedIngestionCriteria()`. There should be only
one caller
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java:
##########
@@ -359,6 +361,20 @@ private void registerServiceStatusHandler() {
new
ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
}
+ private Set<String> getConsumingSegments(String tableName) {
Review Comment:
(nit)
```suggestion
private Set<String> getConsumingSegments(String realtimeTableName) {
```
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java:
##########
@@ -37,9 +40,15 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
private final long _minFreshnessMs;
private final long _idleTimeoutMs;
- public FreshnessBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager, Set<String> consumingSegments,
+ public FreshnessBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
+ Map<String, Set<String>> consumingSegments, long minFreshnessMs, long
idleTimeoutMs) {
+ this(instanceDataManager, consumingSegments, null, minFreshnessMs,
idleTimeoutMs);
+ }
+
+ public FreshnessBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
Review Comment:
Since we are modifying the API anyway, maybe we should enforce a non-null
supplier?
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -37,62 +38,81 @@ public abstract class
IngestionBasedConsumptionStatusChecker {
// constructor parameters
protected final InstanceDataManager _instanceDataManager;
- protected final Set<String> _consumingSegments;
+ protected final Map<String, Set<String>> _consumingSegments;
+ protected final Function<String, Set<String>> _consumingSegmentsSupplier;
- // helper variable
- private final Set<String> _caughtUpSegments = new HashSet<>();
+ // helper variable, which is thread safe, as the method might be called from
multiple threads when the health check
+ // endpoint is called by many probes.
+ private final Set<String> _caughtUpSegments = ConcurrentHashMap.newKeySet();
Review Comment:
We may also make `_caughtUpSegments` a map, and whenever a table has all
segments caught up, we can remove the table from both maps.
Currently you'll need to merge all table segments into one in each check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]