This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4d2c3f54a6 Remove ingestion metrics when consuming segment relocates
(#13668)
4d2c3f54a6 is described below
commit 4d2c3f54a64d4607c1d663994327c81896bd80f1
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Aug 13 16:33:53 2024 -0700
Remove ingestion metrics when consuming segment relocates (#13668)
---
.../pinot/common/messages/ForceCommitMessage.java | 2 +-
.../messages/IngestionMetricsRemoveMessage.java | 47 ++++
.../realtime/PinotLLCRealtimeSegmentManager.java | 59 ++++-
.../PinotLLCRealtimeSegmentManagerTest.java | 3 +-
.../manager/realtime/IngestionDelayTracker.java | 290 ++++++++++-----------
.../realtime/RealtimeSegmentDataManager.java | 25 +-
.../manager/realtime/RealtimeTableDataManager.java | 71 ++---
.../realtime/IngestionDelayTrackerTest.java | 201 +++++++-------
.../helix/SegmentMessageHandlerFactory.java | 26 ++
9 files changed, 428 insertions(+), 296 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
b/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
index 8d9d1b02bd..4535789f42 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
@@ -49,7 +49,7 @@ public class ForceCommitMessage extends Message {
super(message.getRecord());
String msgSubType = message.getMsgSubType();
Preconditions.checkArgument(msgSubType.equals(FORCE_COMMIT_MSG_SUB_TYPE),
- "Invalid message sub type: " + msgSubType + " for
SegmentReloadMessage");
+ "Invalid message sub type: " + msgSubType + " for ForceCommitMessage");
}
public String getTableName() {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/messages/IngestionMetricsRemoveMessage.java
b/pinot-common/src/main/java/org/apache/pinot/common/messages/IngestionMetricsRemoveMessage.java
new file mode 100644
index 0000000000..3c2100a46a
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/messages/IngestionMetricsRemoveMessage.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.UUID;
+import org.apache.helix.model.Message;
+
+
+/**
+ * Ingestion metrics remove message is created on controller and get sent to
servers to instruct them to remove
+ * ingestion metrics for the stream partition of the given segment when the
new consuming segment is no longer served by
+ * the server.
+ */
+public class IngestionMetricsRemoveMessage extends Message {
+ public static final String INGESTION_METRICS_REMOVE_MSG_SUB_TYPE =
"INGESTION_METRICS_REMOVE";
+
+ public IngestionMetricsRemoveMessage() {
+ super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+ setMsgSubType(INGESTION_METRICS_REMOVE_MSG_SUB_TYPE);
+ // Give it infinite time to process the message, as long as session is
alive
+ setExecutionTimeout(-1);
+ }
+
+ public IngestionMetricsRemoveMessage(Message message) {
+ super(message.getRecord());
+ String msgSubType = message.getMsgSubType();
+
Preconditions.checkArgument(msgSubType.equals(INGESTION_METRICS_REMOVE_MSG_SUB_TYPE),
+ "Invalid message sub type: " + msgSubType + " for
IngestionMetricsRemoveMessage");
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c471277590..bb331f2bc4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -40,6 +41,7 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
+import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
@@ -50,6 +52,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.messages.ForceCommitMessage;
+import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -576,8 +579,9 @@ public class PinotLLCRealtimeSegmentManager {
// to reduce this contention. We may still contend with RetentionManager,
or other updates
// to idealstate from other controllers, but then we have the retry
mechanism to get around that.
synchronized
(_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) {
- updateIdealStateOnSegmentCompletion(realtimeTableName,
committingSegmentName, newConsumingSegmentName,
- segmentAssignment, instancePartitionsMap);
+ idealState =
+ updateIdealStateOnSegmentCompletion(realtimeTableName,
committingSegmentName, newConsumingSegmentName,
+ segmentAssignment, instancePartitionsMap);
}
long endTimeNs = System.nanoTime();
@@ -598,6 +602,12 @@ public class PinotLLCRealtimeSegmentManager {
// Trigger the metadata event notifier
_metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
+
+ // Handle segment movement if necessary
+ if (newConsumingSegmentName != null) {
+ handleSegmentMovement(realtimeTableName,
idealState.getRecord().getMapFields(), committingSegmentName,
+ newConsumingSegmentName);
+ }
}
/**
@@ -937,10 +947,10 @@ public class PinotLLCRealtimeSegmentManager {
* Updates ideal state after completion of a realtime segment
*/
@VisibleForTesting
- void updateIdealStateOnSegmentCompletion(String realtimeTableName, String
committingSegmentName,
+ IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName,
String committingSegmentName,
String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
- HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState
-> {
+ return HelixHelper.updateIdealState(_helixManager, realtimeTableName,
idealState -> {
assert idealState != null;
// When segment completion begins, the zk metadata is updated, followed
by ideal state.
// We allow only {@link
PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms for a
segment to
@@ -1014,6 +1024,47 @@ public class PinotLLCRealtimeSegmentManager {
}
}
+ /**
+ * Handles segment movement between instances.
+ * If the new consuming segment is served by a different set of servers than
the committed segment, notify the
+ * servers no longer serving the stream partition to remove the ingestion
metrics. This can prevent servers from
+ * emitting high ingestion delay alerts on stream partitions no longer
served.
+ */
+ private void handleSegmentMovement(String realtimeTableName, Map<String,
Map<String, String>> instanceStatesMap,
+ String committedSegment, String newConsumingSegment) {
+ Set<String> oldInstances =
instanceStatesMap.get(committedSegment).keySet();
+ Set<String> newInstances =
instanceStatesMap.get(newConsumingSegment).keySet();
+ if (newInstances.containsAll(oldInstances)) {
+ return;
+ }
+ Set<String> instancesNoLongerServe = new HashSet<>(oldInstances);
+ instancesNoLongerServe.removeAll(newInstances);
+ LOGGER.info("Segment movement detected for committed segment: {} (served
by: {}), "
+ + "consuming segment: {} (served by: {}) in table: {}, "
+ + "sending message to instances: {} to remove ingestion metrics",
committedSegment, oldInstances,
+ newConsumingSegment, newInstances, realtimeTableName,
instancesNoLongerServe);
+
+ ClusterMessagingService messagingService =
_helixManager.getMessagingService();
+ List<String> instancesSent = new
ArrayList<>(instancesNoLongerServe.size());
+ for (String instance : instancesNoLongerServe) {
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setInstanceName(instance);
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setResource(realtimeTableName);
+ recipientCriteria.setPartition(committedSegment);
+ recipientCriteria.setSessionSpecific(true);
+ IngestionMetricsRemoveMessage message = new
IngestionMetricsRemoveMessage();
+ if (messagingService.send(recipientCriteria, message, null, -1) > 0) {
+ instancesSent.add(instance);
+ } else {
+ LOGGER.warn("Failed to send ingestion metrics remove message for
table: {} segment: {} to instance: {}",
+ realtimeTableName, committedSegment, instance);
+ }
+ }
+ LOGGER.info("Sent ingestion metrics remove message for table: {} segment:
{} to instances: {}", realtimeTableName,
+ committedSegment, instancesSent);
+ }
+
/*
* A segment commit takes 3 modifications to zookeeper:
* - Change old segment metadata (mark it DONE, and then other things)
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 5af7fb92de..16d1983a21 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1212,13 +1212,14 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
@Override
- void updateIdealStateOnSegmentCompletion(String realtimeTableName, String
committingSegmentName,
+ IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName,
String committingSegmentName,
String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap)
{
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
committingSegmentName, null,
segmentAssignment, instancePartitionsMap);
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
null, newSegmentName,
segmentAssignment, instancePartitionsMap);
+ return _idealState;
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 6953ddaf33..fd31d8f72b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -16,10 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.pinot.core.data.manager.realtime;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
@@ -32,9 +33,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -81,43 +86,46 @@ import org.slf4j.LoggerFactory;
public class IngestionDelayTracker {
- // Class to wrap supported timestamps collected for an ingested event
- private static class IngestionTimestamps {
- private final long _firstStreamIngestionTimeMs;
- private final long _ingestionTimeMs;
- IngestionTimestamps(long ingestionTimesMs, long
firstStreamIngestionTimeMs) {
- _ingestionTimeMs = ingestionTimesMs;
- _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
- }
- }
+ private static class IngestionInfo {
+ final long _ingestionTimeMs;
+ final long _firstStreamIngestionTimeMs;
+ final StreamPartitionMsgOffset _currentOffset;
+ final StreamPartitionMsgOffset _latestOffset;
- private static class IngestionOffsets {
- private final StreamPartitionMsgOffset _latestOffset;
- private final StreamPartitionMsgOffset _offset;
- IngestionOffsets(StreamPartitionMsgOffset offset, StreamPartitionMsgOffset
latestOffset) {
- _offset = offset;
+ IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs,
+ @Nullable StreamPartitionMsgOffset currentOffset, @Nullable
StreamPartitionMsgOffset latestOffset) {
+ _ingestionTimeMs = ingestionTimeMs;
+ _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
+ _currentOffset = currentOffset;
_latestOffset = latestOffset;
}
}
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IngestionDelayTracker.class);
+
// Sleep interval for scheduled executor service thread that triggers read
of ideal state
private static final int SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS =
300000; // 5 minutes +/- precision in timeouts
// Once a partition is marked for verification, we wait 10 minutes to pull
its ideal state.
private static final int PARTITION_TIMEOUT_MS = 600000; // 10
minutes timeouts
// Delay scheduled executor service for this amount of time after starting
service
private static final int INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS = 100;
- private static final Logger _logger =
LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
- // HashMap used to store ingestion time measures for all partitions active
for the current table.
- private final Map<Integer, IngestionTimestamps>
_partitionToIngestionTimestampsMap = new ConcurrentHashMap<>();
+ // Cache expire time for ignored segment if there is no update from the
segment.
+ private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10;
+
+ // Per partition info for all partitions active for the current table.
+ private final Map<Integer, IngestionInfo> _ingestionInfoMap = new
ConcurrentHashMap<>();
- private final Map<Integer, IngestionOffsets> _partitionToOffsetMap = new
ConcurrentHashMap<>();
// We mark partitions that go from CONSUMING to ONLINE in
_partitionsMarkedForVerification: if they do not
// go back to CONSUMING in some period of time, we verify whether they are
still hosted in this server by reading
// ideal state. This is done with the goal of minimizing reading ideal state
for efficiency reasons.
+ // TODO: Consider removing this mechanism after releasing 1.2.0, and use
{@link #stopTrackingPartitionIngestionDelay}
+ // instead.
private final Map<Integer, Long> _partitionsMarkedForVerification = new
ConcurrentHashMap<>();
- final int _scheduledExecutorThreadTickIntervalMs;
+ private final Cache<String, Boolean> _segmentsToIgnore =
+
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES,
TimeUnit.MINUTES).build();
+
// TODO: Make thread pool a server/cluster level config
// ScheduledExecutorService to check partitions that are inactive against
ideal state.
private final ScheduledExecutorService _scheduledExecutor =
Executors.newScheduledThreadPool(2);
@@ -131,6 +139,7 @@ public class IngestionDelayTracker {
private Clock _clock;
+ @VisibleForTesting
public IngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
RealtimeTableDataManager realtimeTableDataManager, int
scheduledExecutorThreadTickIntervalMs,
Supplier<Boolean> isServerReadyToServeQueries)
@@ -144,9 +153,8 @@ public class IngestionDelayTracker {
// Handle negative timer values
if (scheduledExecutorThreadTickIntervalMs <= 0) {
throw new RuntimeException(String.format("Illegal timer timeout
argument, expected > 0, got=%d for table=%s",
- scheduledExecutorThreadTickIntervalMs, _tableNameWithType));
+ scheduledExecutorThreadTickIntervalMs, _tableNameWithType));
}
- _scheduledExecutorThreadTickIntervalMs =
scheduledExecutorThreadTickIntervalMs;
// ThreadFactory to set the thread's name
ThreadFactory threadFactory = new ThreadFactory() {
@@ -162,7 +170,7 @@ public class IngestionDelayTracker {
((ScheduledThreadPoolExecutor)
_scheduledExecutor).setThreadFactory(threadFactory);
_scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions,
- INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS,
_scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);
+ INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS,
scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);
}
public IngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
@@ -188,41 +196,26 @@ public class IngestionDelayTracker {
return agedIngestionDelayMs;
}
- private long getPartitionOffsetLag(IngestionOffsets offset) {
- if (offset == null) {
- return 0;
- }
- StreamPartitionMsgOffset currentOffset = offset._offset;
- StreamPartitionMsgOffset latestOffset = offset._latestOffset;
-
- if (currentOffset == null || latestOffset == null) {
- return 0;
- }
-
- // Compute aged delay for current partition
- // TODO: Support other types of offsets
- if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof
LongMsgOffset)) {
- return 0;
- }
-
- return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset)
currentOffset).getOffset();
- }
-
/*
* Helper function to be called when we should stop tracking a given
partition. Removes the partition from
* all our maps.
*
- * @param partitionGroupId partition ID which we should stop tracking.
+ * @param partitionId partition ID which we should stop tracking.
*/
- private void removePartitionId(int partitionGroupId) {
- _partitionToIngestionTimestampsMap.remove(partitionGroupId);
- _partitionToOffsetMap.remove(partitionGroupId);
+ private void removePartitionId(int partitionId) {
+ _ingestionInfoMap.compute(partitionId, (k, v) -> {
+ if (v != null) {
+ // Remove all metrics associated with this partition
+ _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_DELAY_MS);
+ _serverMetrics.removePartitionGauge(_metricName, partitionId,
+ ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
+ _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
+ }
+ return null;
+ });
+
// If we are removing a partition we should stop reading its ideal state.
- _partitionsMarkedForVerification.remove(partitionGroupId);
- _serverMetrics.removePartitionGauge(_metricName, partitionGroupId,
ServerGauge.REALTIME_INGESTION_DELAY_MS);
- _serverMetrics.removePartitionGauge(_metricName, partitionGroupId,
- ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
- _serverMetrics.removePartitionGauge(_metricName, partitionGroupId,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
+ _partitionsMarkedForVerification.remove(partitionId);
}
/*
@@ -241,7 +234,6 @@ public class IngestionDelayTracker {
return partitionsToVerify;
}
-
/**
* Function that enable use to set predictable clocks for testing purposes.
*
@@ -255,80 +247,74 @@ public class IngestionDelayTracker {
/**
* Called by RealTimeSegmentDataManagers to update the ingestion delay
metrics for a given partition.
*
- * @param ingestionTimeMs ingestion time being recorded.
- * @param firstStreamIngestionTimeMs time the event was ingested in the
first stage of the ingestion pipeline.
- * @param msgOffset message offset of the event being
ingested.
- * @param latestOffset latest message offset in the stream.
- * @param partitionGroupId partition ID for which the ingestion
metrics are being recorded.
+ * @param segmentName name of the consuming segment
+ * @param partitionId partition id of the consuming segment (directly passed
in to avoid parsing the segment name)
+ * @param ingestionTimeMs ingestion time of the last consumed message (from
{@link RowMetadata})
+ * @param firstStreamIngestionTimeMs ingestion time of the last consumed
message in the first stream (from
+ * {@link RowMetadata})
+ * @param currentOffset offset of the last consumed message (from {@link
RowMetadata})
+ * @param latestOffset offset of the latest message in the partition (from
{@link StreamMetadataProvider})
*/
- public void updateIngestionMetrics(long ingestionTimeMs, long
firstStreamIngestionTimeMs,
- StreamPartitionMsgOffset msgOffset, StreamPartitionMsgOffset
latestOffset,
- int partitionGroupId) {
+ public void updateIngestionMetrics(String segmentName, int partitionId, long
ingestionTimeMs,
+ long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset
currentOffset,
+ @Nullable StreamPartitionMsgOffset latestOffset) {
if (!_isServerReadyToServeQueries.get() ||
_realTimeTableDataManager.isShutDown()) {
// Do not update the ingestion delay metrics during server startup period
// or once the table data manager has been shutdown.
return;
}
- updateIngestionDelay(ingestionTimeMs, firstStreamIngestionTimeMs,
partitionGroupId);
- updateIngestionOffsets(msgOffset, latestOffset, partitionGroupId);
-
- // If we are consuming we do not need to track this partition for removal.
- _partitionsMarkedForVerification.remove(partitionGroupId);
- }
-
- public void updateIngestionDelay(long ingestionTimeMs, long
firstStreamIngestionTimeMs, int partitionGroupId) {
- if ((ingestionTimeMs < 0) && (firstStreamIngestionTimeMs < 0)) {
- // If stream does not return a valid ingestion timestamps don't publish
a metric
+ if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 &&
(currentOffset == null || latestOffset == null)) {
+ // Do not publish metrics if stream does not return valid ingestion time
or offset.
return;
}
- IngestionTimestamps previousMeasure =
_partitionToIngestionTimestampsMap.put(partitionGroupId,
- new IngestionTimestamps(ingestionTimeMs, firstStreamIngestionTimeMs));
- if (previousMeasure == null) {
- // First time we start tracking a partition we should start tracking it
via metric
- // Only publish the metric if supported by the underlying stream. If not
supported the stream
- // returns Long.MIN_VALUE
- if (ingestionTimeMs >= 0) {
- _serverMetrics.setOrUpdatePartitionGauge(_metricName,
partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS,
- () -> getPartitionIngestionDelayMs(partitionGroupId));
+
+ _ingestionInfoMap.compute(partitionId, (k, v) -> {
+ if (_segmentsToIgnore.getIfPresent(segmentName) != null) {
+ // Do not update the metrics for the segment that is marked to be
ignored.
+ return v;
}
- if (firstStreamIngestionTimeMs >= 0) {
- // Only publish this metric when creation time is supported by the
underlying stream
- // When this timestamp is not supported it always returns the value
Long.MIN_VALUE
- _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId,
- ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
- () -> getPartitionEndToEndIngestionDelayMs(partitionGroupId));
+ if (v == null) {
+ // Add metric when we start tracking a partition. Only publish the
metric if supported by the stream.
+ if (ingestionTimeMs > 0) {
+ _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_DELAY_MS,
+ () -> getPartitionIngestionDelayMs(partitionId));
+ }
+ if (firstStreamIngestionTimeMs > 0) {
+ _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
+ ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
+ () -> getPartitionEndToEndIngestionDelayMs(partitionId));
+ }
+ if (currentOffset != null && latestOffset != null) {
+ _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
+ () -> getPartitionIngestionOffsetLag(partitionId));
+ }
}
- }
- }
+ return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs,
currentOffset, latestOffset);
+ });
- public void updateIngestionOffsets(StreamPartitionMsgOffset currentOffset,
StreamPartitionMsgOffset latestOffset,
- int partitionGroupId) {
- if ((currentOffset == null)) {
- // If stream does not return a valid ingestion offset don't publish a
metric
- return;
- }
- IngestionOffsets previousMeasure =
- _partitionToOffsetMap.put(partitionGroupId, new
IngestionOffsets(currentOffset, latestOffset));
- if (previousMeasure == null) {
- // First time we start tracking a partition we should start tracking it
via metric
- // Only publish the metric if supported by the underlying stream. If not
supported the stream
- // returns Long.MIN_VALUE
- if (currentOffset != null) {
- _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId,
- ServerGauge.REALTIME_INGESTION_OFFSET_LAG, () ->
getPartitionIngestionOffsetLag(partitionGroupId));
- }
- }
+ // If we are consuming we do not need to track this partition for removal.
+ _partitionsMarkedForVerification.remove(partitionId);
}
/*
* Handle partition removal event. This must be invoked when we stop serving
a given partition for
* this table in the current server.
*
- * @param partitionGroupId partition id that we should stop tracking.
+ * @param partitionId partition id that we should stop tracking.
*/
- public void stopTrackingPartitionIngestionDelay(int partitionGroupId) {
- removePartitionId(partitionGroupId);
+ public void stopTrackingPartitionIngestionDelay(int partitionId) {
+ removePartitionId(partitionId);
+ }
+
+ /**
+ * Stops tracking the partition ingestion delay, and also ignores the
updates from the given segment. This is useful
+ * when we want to stop tracking the ingestion delay for a partition when
the segment might still be consuming, e.g.
+ * when the new consuming segment is created on a different server.
+ */
+ public void stopTrackingPartitionIngestionDelay(String segmentName) {
+ _segmentsToIgnore.put(segmentName, true);
+ removePartitionId(new LLCSegmentName(segmentName).getPartitionGroupId());
}
/*
@@ -345,7 +331,7 @@ public class IngestionDelayTracker {
// Check if we have any partition to verify, else don't make the call to
check ideal state as that
// involves network traffic and may be inefficient.
List<Integer> partitionsToVerify = getPartitionsToBeVerified();
- if (partitionsToVerify.size() == 0) {
+ if (partitionsToVerify.isEmpty()) {
// Don't make the call to getHostedPartitionsGroupIds() as it involves
checking ideal state.
return;
}
@@ -353,87 +339,81 @@ public class IngestionDelayTracker {
try {
partitionsHostedByThisServer =
_realTimeTableDataManager.getHostedPartitionsGroupIds();
} catch (Exception e) {
- _logger.error("Failed to get partitions hosted by this server, table={},
exception={}:{}", _tableNameWithType,
+ LOGGER.error("Failed to get partitions hosted by this server, table={},
exception={}:{}", _tableNameWithType,
e.getClass(), e.getMessage());
return;
}
- for (int partitionGroupId : partitionsToVerify) {
- if (!partitionsHostedByThisServer.contains(partitionGroupId)) {
+ for (int partitionId : partitionsToVerify) {
+ if (!partitionsHostedByThisServer.contains(partitionId)) {
// Partition is not hosted in this server anymore, stop tracking it
- removePartitionId(partitionGroupId);
+ removePartitionId(partitionId);
}
}
}
- /*
- * This function is invoked when a partition goes from CONSUMING to ONLINE,
so we can assert whether the
- * partition is still hosted by this server after some interval of time.
- *
- * @param partitionGroupId Partition id that we need confirmed via ideal
state as still hosted by this server.
+ /**
+ * This function is invoked when a segment goes from CONSUMING to ONLINE, so
we can assert whether the partition of
+ * the segment is still hosted by this server after some interval of time.
*/
- public void markPartitionForVerification(int partitionGroupId) {
- if (!_isServerReadyToServeQueries.get()) {
- // Do not update the tracker state during server startup period
+ public void markPartitionForVerification(String segmentName) {
+ if (!_isServerReadyToServeQueries.get() ||
_segmentsToIgnore.getIfPresent(segmentName) != null) {
+ // Do not update the tracker state during server startup period or if
the segment is marked to be ignored
return;
}
- _partitionsMarkedForVerification.put(partitionGroupId, _clock.millis());
+ _partitionsMarkedForVerification.put(new
LLCSegmentName(segmentName).getPartitionGroupId(), _clock.millis());
}
/*
* Method to get timestamp used for the ingestion delay for a given
partition.
*
- * @param partitionGroupId partition for which we are retrieving the delay
+ * @param partitionId partition for which we are retrieving the delay
*
* @return ingestion delay timestamp in milliseconds for the given partition
ID.
*/
- public long getPartitionIngestionTimeMs(int partitionGroupId) {
- // Not protected as this will only be invoked when metric is installed
which happens after server ready
- IngestionTimestamps currentMeasure =
_partitionToIngestionTimestampsMap.get(partitionGroupId);
- if (currentMeasure == null) { // Guard just in case we read the metric
without initializing it
- return Long.MIN_VALUE;
- }
- return currentMeasure._ingestionTimeMs;
+ public long getPartitionIngestionTimeMs(int partitionId) {
+ IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+ return ingestionInfo != null ? ingestionInfo._ingestionTimeMs :
Long.MIN_VALUE;
}
/*
* Method to get ingestion delay for a given partition.
*
- * @param partitionGroupId partition for which we are retrieving the delay
+ * @param partitionId partition for which we are retrieving the delay
*
* @return ingestion delay in milliseconds for the given partition ID.
*/
- public long getPartitionIngestionDelayMs(int partitionGroupId) {
- // Not protected as this will only be invoked when metric is installed
which happens after server ready
- IngestionTimestamps currentMeasure =
_partitionToIngestionTimestampsMap.get(partitionGroupId);
- if (currentMeasure == null) { // Guard just in case we read the metric
without initializing it
- return 0;
- }
- return getIngestionDelayMs(currentMeasure._ingestionTimeMs);
- }
-
- public long getPartitionIngestionOffsetLag(int partitionGroupId) {
- // Not protected as this will only be invoked when metric is installed
which happens after server ready
- IngestionOffsets currentMeasure =
_partitionToOffsetMap.get(partitionGroupId);
- if (currentMeasure == null) { // Guard just in case we read the metric
without initializing it
- return 0;
- }
- return getPartitionOffsetLag(currentMeasure);
+ public long getPartitionIngestionDelayMs(int partitionId) {
+ IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+ return ingestionInfo != null ?
getIngestionDelayMs(ingestionInfo._ingestionTimeMs) : 0;
}
/*
* Method to get end to end ingestion delay for a given partition.
*
- * @param partitionGroupId partition for which we are retrieving the delay
+ * @param partitionId partition for which we are retrieving the delay
*
* @return End to end ingestion delay in milliseconds for the given
partition ID.
*/
- public long getPartitionEndToEndIngestionDelayMs(int partitionGroupId) {
- // Not protected as this will only be invoked when metric is installed
which happens after server ready
- IngestionTimestamps currentMeasure =
_partitionToIngestionTimestampsMap.get(partitionGroupId);
- if (currentMeasure == null) { // Guard just in case we read the metric
without initializing it
+ public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
+ IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+ return ingestionInfo != null ?
getIngestionDelayMs(ingestionInfo._firstStreamIngestionTimeMs) : 0;
+ }
+
+ public long getPartitionIngestionOffsetLag(int partitionId) {
+ IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+ if (ingestionInfo == null) {
return 0;
}
- return getIngestionDelayMs(currentMeasure._firstStreamIngestionTimeMs);
+ StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
+ StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
+ if (currentOffset == null || latestOffset == null) {
+ return 0;
+ }
+ // TODO: Support other types of offsets
+ if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof
LongMsgOffset)) {
+ return 0;
+ }
+ return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset)
currentOffset).getOffset();
}
/*
@@ -448,8 +428,8 @@ public class IngestionDelayTracker {
return;
}
// Remove partitions so their related metrics get uninstalled.
- for (Map.Entry<Integer, IngestionTimestamps> entry :
_partitionToIngestionTimestampsMap.entrySet()) {
- removePartitionId(entry.getKey());
+ for (Integer partitionId : _ingestionInfoMap.keySet()) {
+ removePartitionId(partitionId);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 5acd5d57ee..648bed6080 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -964,6 +964,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
return _currentOffset;
}
+ @Nullable
public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() {
return _latestStreamOffsetAtStartupTime;
}
@@ -1683,22 +1684,27 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
return _idleTimer.getTimeSinceEventLastConsumedMs();
}
+ @Nullable
public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs,
boolean useDebugLog) {
return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
maxWaitTimeMs, useDebugLog);
}
+ @Nullable
public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {
return fetchLatestStreamOffset(maxWaitTimeMs, false);
}
+ @Nullable
public StreamPartitionMsgOffset fetchEarliestStreamOffset(long
maxWaitTimeMs, boolean useDebugLog) {
return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
maxWaitTimeMs, useDebugLog);
}
+ @Nullable
public StreamPartitionMsgOffset fetchEarliestStreamOffset(long
maxWaitTimeMs) {
return fetchEarliestStreamOffset(maxWaitTimeMs, false);
}
+ @Nullable
private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria
offsetCriteria, long maxWaitTimeMs,
boolean useDebugLog) {
if (_partitionMetadataProvider == null) {
@@ -1797,9 +1803,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
* Assumes there is a valid instance of {@link PartitionGroupConsumer}
*/
private void recreateStreamConsumer(String reason) {
- _segmentLogger.info("Recreating stream consumer for topic partition {},
reason: {}", _clientId, reason);
- _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
- closePartitionGroupConsumer();
+ _segmentLogger.info("Recreating stream consumer for topic partition {},
reason: {}", _clientId, reason);
+ _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+ closePartitionGroupConsumer();
try {
_partitionGroupConsumer =
_streamConsumerFactory.createPartitionGroupConsumer(_clientId,
_partitionGroupConsumptionStatus);
@@ -1825,20 +1831,23 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
if (metadata != null) {
try {
StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000,
true);
-
_realtimeTableDataManager.updateIngestionMetrics(metadata.getRecordIngestionTimeMs(),
- metadata.getFirstStreamRecordIngestionTimeMs(),
metadata.getOffset(), latestOffset, _partitionGroupId);
+ _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr,
_partitionGroupId,
+ metadata.getRecordIngestionTimeMs(),
metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(),
+ latestOffset);
} catch (Exception e) {
_segmentLogger.warn("Failed to fetch latest offset for updating
ingestion delay", e);
}
}
}
- /*
+ /**
* Sets ingestion delay to zero in situations where we are caught up
processing events.
+ * TODO: Revisit if we should preserve the offset info.
*/
private void setIngestionDelayToZero() {
long currentTimeMs = System.currentTimeMillis();
- _realtimeTableDataManager.updateIngestionMetrics(currentTimeMs,
currentTimeMs, null, null, _partitionGroupId);
+ _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr,
_partitionGroupId, currentTimeMs, currentTimeMs,
+ null, null);
}
// This should be done during commit? We may not always commit when we build
a segment....
@@ -1878,14 +1887,12 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
/**
* Creates a {@link StreamMessageDecoder} using properties in {@link
StreamConfig}.
*
- * @param streamConfig The stream config from the table config
* @param fieldsToRead The fields to read from the source stream
* @return The initialized StreamMessageDecoder
*/
private StreamMessageDecoder createMessageDecoder(Set<String> fieldsToRead) {
String decoderClass = _streamConfig.getDecoderClass();
try {
- Map<String, String> decoderProperties =
_streamConfig.getDecoderProperties();
StreamMessageDecoder decoder =
PluginManager.get().createInstance(decoderClass);
decoder.init(fieldsToRead, _streamConfig, _tableConfig, _schema);
return decoder;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 75e8a4c235..8393da3884 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
@@ -74,6 +75,8 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
@@ -262,57 +265,63 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
}
- /*
- * Method used by RealtimeSegmentManagers to update their partition delays
+ /**
+ * Updates the ingestion metrics for the given partition.
*
- * @param ingestionTimeMs Ingestion delay being reported.
- * @param firstStreamIngestionTimeMs Ingestion time of the first message in
the stream.
- * @param partitionGroupId Partition ID for which delay is being updated.
- * @param offset last offset received for the partition.
- * @param latestOffset latest upstream offset for the partition.
+ * @param segmentName name of the consuming segment
+ * @param partitionId partition id of the consuming segment (directly passed
in to avoid parsing the segment name)
+ * @param ingestionTimeMs ingestion time of the last consumed message (from
{@link RowMetadata})
+ * @param firstStreamIngestionTimeMs ingestion time of the last consumed
message in the first stream (from
+ * {@link RowMetadata})
+ * @param currentOffset offset of the last consumed message (from {@link
RowMetadata})
+ * @param latestOffset offset of the latest message in the partition (from
{@link StreamMetadataProvider})
*/
- public void updateIngestionMetrics(long ingestionTimeMs, long
firstStreamIngestionTimeMs,
- StreamPartitionMsgOffset offset, StreamPartitionMsgOffset latestOffset,
int partitionGroupId) {
- _ingestionDelayTracker.updateIngestionMetrics(ingestionTimeMs,
firstStreamIngestionTimeMs, offset, latestOffset,
- partitionGroupId);
+ public void updateIngestionMetrics(String segmentName, int partitionId, long
ingestionTimeMs,
+ long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset
currentOffset,
+ @Nullable StreamPartitionMsgOffset latestOffset) {
+ _ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId,
ingestionTimeMs, firstStreamIngestionTimeMs,
+ currentOffset, latestOffset);
}
- /*
- * Method used during query execution (ServerQueryExecutorV1Impl) to get the
current timestamp for the ingestion
- * delay for a partition
- *
- * @param segmentNameStr name of segment for which we want the ingestion
delay timestamp.
- * @return timestamp of the ingestion delay for the partition.
+ /**
+ * Returns the ingestion time of the last consumed message for the partition
of the given segment. Returns
+ * {@code Long.MIN_VALUE} when it is not available.
*/
- public long getPartitionIngestionTimeMs(String segmentNameStr) {
- LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
- int partitionGroupId = segmentName.getPartitionGroupId();
- return
_ingestionDelayTracker.getPartitionIngestionTimeMs(partitionGroupId);
+ public long getPartitionIngestionTimeMs(String segmentName) {
+ return _ingestionDelayTracker.getPartitionIngestionTimeMs(new
LLCSegmentName(segmentName).getPartitionGroupId());
}
- /*
+ /**
+ * Removes the ingestion metrics for the partition of the given segment, and
also ignores the updates from the given
+ * segment. This is useful when we want to stop tracking the ingestion delay
for a partition when the segment might
+ * still be consuming, e.g. when the new consuming segment is created on a
different server.
+ */
+ public void removeIngestionMetrics(String segmentName) {
+ _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName);
+ }
+
+ /**
* Method to handle CONSUMING -> DROPPED segment state transitions:
* We stop tracking partitions whose segments are dropped.
*
- * @param segmentNameStr name of segment which is transitioning state.
+ * @param segmentName name of segment which is transitioning state.
*/
@Override
- public void onConsumingToDropped(String segmentNameStr) {
- LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
-
_ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName.getPartitionGroupId());
+ public void onConsumingToDropped(String segmentName) {
+ // NOTE: No need to mark segment ignored here because it should have
already been dropped.
+ _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(new
LLCSegmentName(segmentName).getPartitionGroupId());
}
- /*
+ /**
* Method to handle CONSUMING -> ONLINE segment state transitions:
* We mark partitions for verification against ideal state when we do not
see a consuming segment for some time
* for that partition. The idea is to remove the related metrics when the
partition moves from the current server.
*
- * @param segmentNameStr name of segment which is transitioning state.
+ * @param segmentName name of segment which is transitioning state.
*/
@Override
- public void onConsumingToOnline(String segmentNameStr) {
- LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
-
_ingestionDelayTracker.markPartitionForVerification(segmentName.getPartitionGroupId());
+ public void onConsumingToOnline(String segmentName) {
+ _ingestionDelayTracker.markPartitionForVerification(segmentName);
}
@Override
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index 8072731583..9cb527b121 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.pinot.core.data.manager.realtime;
import java.time.Clock;
@@ -24,8 +23,10 @@ import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -33,7 +34,8 @@ import static org.mockito.Mockito.mock;
public class IngestionDelayTrackerTest {
- private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
private static final int TIMER_THREAD_TICK_INTERVAL_MS = 100;
private final ServerMetrics _serverMetrics = mock(ServerMetrics.class);
@@ -66,8 +68,7 @@ public class IngestionDelayTrackerTest {
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0),
Long.MIN_VALUE);
// Test bad timer args to the constructor
try {
- new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME,
_realtimeTableDataManager,
- 0, () -> true);
+ new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME,
_realtimeTableDataManager, 0, () -> true);
Assert.fail("Must have asserted due to invalid arguments"); //
Constructor must assert
} catch (Exception e) {
if ((e instanceof NullPointerException) || !(e instanceof
RuntimeException)) {
@@ -80,7 +81,9 @@ public class IngestionDelayTrackerTest {
public void testRecordIngestionDelayWithNoAging() {
final long maxTestDelay = 100;
final int partition0 = 0;
+ final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0,
123).getSegmentName();
final int partition1 = 1;
+ final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0,
234).getSegmentName();
IngestionDelayTracker ingestionDelayTracker = createTracker();
// Use fixed clock so samples dont age
@@ -90,43 +93,54 @@ public class IngestionDelayTrackerTest {
ingestionDelayTracker.setClock(clock);
// Test we follow a single partition up and down
- for (long i = 0; i <= maxTestDelay; i++) {
- long firstStreamIngestionTimeMs = i + 1;
- ingestionDelayTracker.updateIngestionDelay(i,
firstStreamIngestionTimeMs, partition0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
clock.millis() - i);
+ for (long ingestionTimeMs = 0; ingestionTimeMs <= maxTestDelay;
ingestionTimeMs++) {
+ long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
+ ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
ingestionTimeMs, firstStreamIngestionTimeMs,
+ null, null);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
+ clock.millis() - ingestionTimeMs);
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
clock.millis() - firstStreamIngestionTimeMs);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
i);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
ingestionTimeMs);
}
// Test tracking down a measure for a given partition
- for (long i = maxTestDelay; i >= 0; i--) {
- ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
clock.millis() - i);
+ for (long ingestionTimeMs = maxTestDelay; ingestionTimeMs >= 0;
ingestionTimeMs--) {
+ long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
+ ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
ingestionTimeMs, firstStreamIngestionTimeMs,
+ null, null);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
+ clock.millis() - ingestionTimeMs);
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
- clock.millis() - (i + 1));
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
i);
+ clock.millis() - (ingestionTimeMs + 1));
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
ingestionTimeMs);
}
// Make the current partition maximum
- ingestionDelayTracker.updateIngestionDelay(maxTestDelay, maxTestDelay,
partition0);
+ ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
maxTestDelay, maxTestDelay, null, null);
// Bring up partition1 delay up and verify values
- for (long i = 0; i <= 2 * maxTestDelay; i++) {
- ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
clock.millis() - i);
+ for (long ingestionTimeMs = 0; ingestionTimeMs <= 2 * maxTestDelay;
ingestionTimeMs++) {
+ long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
+ ingestionDelayTracker.updateIngestionMetrics(segment1, partition1,
ingestionTimeMs, firstStreamIngestionTimeMs,
+ null, null);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
+ clock.millis() - ingestionTimeMs);
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
- clock.millis() - (i + 1));
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
i);
+ clock.millis() - firstStreamIngestionTimeMs);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
ingestionTimeMs);
}
// Bring down values of partition1 and verify values
- for (long i = 2 * maxTestDelay; i >= 0; i--) {
- ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
clock.millis() - i);
+ for (long ingestionTimeMs = 2 * maxTestDelay; ingestionTimeMs >= 0;
ingestionTimeMs--) {
+ long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
+ ingestionDelayTracker.updateIngestionMetrics(segment1, partition1,
ingestionTimeMs, firstStreamIngestionTimeMs,
+ null, null);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
+ clock.millis() - ingestionTimeMs);
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
- clock.millis() - (i + 1));
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
i);
+ clock.millis() - firstStreamIngestionTimeMs);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
ingestionTimeMs);
}
ingestionDelayTracker.shutdown();
@@ -136,11 +150,13 @@ public class IngestionDelayTrackerTest {
@Test
public void testRecordIngestionDelayWithAging() {
final int partition0 = 0;
+ final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0,
123).getSegmentName();
final long partition0Delay0 = 1000;
final long partition0Delay1 = 10; // record lower delay to make sure max
gets reduced
final long partition0Offset0Ms = 300;
final long partition0Offset1Ms = 1000;
final int partition1 = 1;
+ final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0,
234).getSegmentName();
final long partition1Delay0 = 11;
final long partition1Offset0Ms = 150;
@@ -151,14 +167,12 @@ public class IngestionDelayTrackerTest {
ZoneId zoneId = ZoneId.systemDefault();
Clock clock = Clock.fixed(now, zoneId);
ingestionDelayTracker.setClock(clock);
- long ingestionTimeMs = (clock.millis() - partition0Delay0);
- ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs,
- (clock.millis() - partition0Delay0), partition0);
- ingestionDelayTracker.updateIngestionDelay((clock.millis() -
partition0Delay0), (clock.millis() - partition0Delay0),
- partition0);
+ long ingestionTimeMs = clock.millis() - partition0Delay0;
+ ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
ingestionTimeMs, ingestionTimeMs, null, null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
partition0Delay0);
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
partition0Delay0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
ingestionTimeMs);
+
// Advance clock and test aging
Clock offsetClock = Clock.offset(clock,
Duration.ofMillis(partition0Offset0Ms));
ingestionDelayTracker.setClock(offsetClock);
@@ -168,9 +182,8 @@ public class IngestionDelayTrackerTest {
(partition0Delay0 + partition0Offset0Ms));
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
ingestionTimeMs);
- ingestionTimeMs = (offsetClock.millis() - partition0Delay1);
- ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs,
- (offsetClock.millis() - partition0Delay1), partition0);
+ ingestionTimeMs = offsetClock.millis() - partition0Delay1;
+ ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
ingestionTimeMs, ingestionTimeMs, null, null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
partition0Delay1);
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
partition0Delay1);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
ingestionTimeMs);
@@ -181,9 +194,8 @@ public class IngestionDelayTrackerTest {
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
(partition0Delay1 + partition0Offset1Ms));
- ingestionTimeMs = (offsetClock.millis() - partition1Delay0);
- ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs,
- (offsetClock.millis() - partition1Delay0), partition1);
+ ingestionTimeMs = offsetClock.millis() - partition1Delay0;
+ ingestionDelayTracker.updateIngestionMetrics(segment1, partition1,
ingestionTimeMs, ingestionTimeMs, null, null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
partition1Delay0);
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
partition1Delay0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
ingestionTimeMs);
@@ -210,26 +222,54 @@ public class IngestionDelayTrackerTest {
ingestionDelayTracker.setClock(clock);
// Record a number of partitions with delay equal to partition id
- for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay;
partitionGroupId++) {
- long ingestionTimeMs = (clock.millis() - partitionGroupId);
- ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs,
ingestionTimeMs, partitionGroupId);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
partitionGroupId);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
- partitionGroupId);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionGroupId),
ingestionTimeMs);
+ for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
+ String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0,
123).getSegmentName();
+ long ingestionTimeMs = clock.millis() - partitionId;
+ ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId,
ingestionTimeMs, ingestionTimeMs, null,
+ null);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
partitionId);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
partitionId);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionId),
ingestionTimeMs);
}
- for (int partitionGroupId = maxPartition; partitionGroupId >= 0;
partitionGroupId--) {
-
ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionGroupId);
+ for (int partitionId = maxPartition; partitionId >= 0; partitionId--) {
+ ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionId);
}
- for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay;
partitionGroupId++) {
+ for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
// Untracked partitions must return 0
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
0);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(
- partitionGroupId), Long.MIN_VALUE);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionId),
Long.MIN_VALUE);
}
}
+ @Test
+ public void testStopTrackingIngestionDelayWithSegment() {
+ IngestionDelayTracker ingestionDelayTracker = createTracker();
+ // Use fixed clock so samples don't age
+ Instant now = Instant.now();
+ ZoneId zoneId = ZoneId.systemDefault();
+ Clock clock = Clock.fixed(now, zoneId);
+ ingestionDelayTracker.setClock(clock);
+
+ String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
123).getSegmentName();
+ long ingestionTimeMs = clock.millis() - 10;
+ ingestionDelayTracker.updateIngestionMetrics(segmentName, 0,
ingestionTimeMs, ingestionTimeMs, null, null);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
10);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
10);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0),
ingestionTimeMs);
+
+ ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0),
Long.MIN_VALUE);
+
+ // Should not update metrics for removed segment
+ ingestionDelayTracker.updateIngestionMetrics(segmentName, 0,
ingestionTimeMs, ingestionTimeMs, null, null);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0),
Long.MIN_VALUE);
+ }
+
@Test
public void testShutdown() {
final long maxTestDelay = 100;
@@ -242,12 +282,13 @@ public class IngestionDelayTrackerTest {
ingestionDelayTracker.setClock(clock);
// Test Shutdown with partitions active
- for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay;
partitionGroupId++) {
- ingestionDelayTracker.updateIngestionDelay((clock.millis() -
partitionGroupId),
- (clock.millis() - partitionGroupId), partitionGroupId);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
partitionGroupId);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
- partitionGroupId);
+ for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
+ String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0,
123).getSegmentName();
+ long ingestionTimeMs = clock.millis() - partitionId;
+ ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId,
ingestionTimeMs, ingestionTimeMs, null,
+ null);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
partitionId);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
partitionId);
}
ingestionDelayTracker.shutdown();
@@ -257,65 +298,35 @@ public class IngestionDelayTrackerTest {
}
@Test
- public void testRecordIngestionDelayOffsetWithNoAging() {
+ public void testRecordIngestionDelayOffset() {
final int partition0 = 0;
+ final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0,
123).getSegmentName();
final int partition1 = 1;
+ final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0,
234).getSegmentName();
IngestionDelayTracker ingestionDelayTracker = createTracker();
- // Use fixed clock so samples don't age
- Instant now = Instant.now();
- ZoneId zoneId = ZoneId.systemDefault();
- Clock clock = Clock.fixed(now, zoneId);
- ingestionDelayTracker.setClock(clock);
// Test tracking offset lag for a single partition
StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100);
StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200);
- ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0,
partition0);
+ ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
+ latestOffset0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
100);
// Test tracking offset lag for another partition
StreamPartitionMsgOffset msgOffset1 = new LongMsgOffset(50);
StreamPartitionMsgOffset latestOffset1 = new LongMsgOffset(150);
- ingestionDelayTracker.updateIngestionOffsets(msgOffset1, latestOffset1,
partition1);
+ ingestionDelayTracker.updateIngestionMetrics(segment1, partition1,
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1,
+ latestOffset1);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1),
100);
// Update offset lag for partition0
msgOffset0 = new LongMsgOffset(150);
latestOffset0 = new LongMsgOffset(200);
- ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0,
partition0);
+ ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
+ latestOffset0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
50);
ingestionDelayTracker.shutdown();
}
-
- @Test
- public void testRecordIngestionDelayOffsetWithAging() {
- final int partition0 = 0;
- final long partition0OffsetLag0 = 100;
- final long partition0OffsetLag1 = 50;
-
- IngestionDelayTracker ingestionDelayTracker = createTracker();
-
- // With samples for a single partition, test that sample is aged as
expected
- Instant now = Instant.now();
- ZoneId zoneId = ZoneId.systemDefault();
- Clock clock = Clock.fixed(now, zoneId);
- ingestionDelayTracker.setClock(clock);
-
- StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100);
- StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200);
- ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0,
partition0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
partition0OffsetLag0);
-
- // Update offset lag and test aging
- msgOffset0 = new LongMsgOffset(150);
- latestOffset0 = new LongMsgOffset(200);
- ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0,
partition0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
partition0OffsetLag1);
-
-
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
partition0OffsetLag1);
- ingestionDelayTracker.shutdown();
- }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index 11729316c4..0b7111a75e 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -30,6 +30,7 @@ import
org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.messages.ForceCommitMessage;
+import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.messages.TableDeletionMessage;
@@ -39,7 +40,9 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.core.util.SegmentRefreshSemaphore;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +75,8 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
return new TableDeletionMessageHandler(new
TableDeletionMessage(message), _metrics, context);
case ForceCommitMessage.FORCE_COMMIT_MSG_SUB_TYPE:
return new ForceCommitMessageHandler(new ForceCommitMessage(message),
_metrics, context);
+ case IngestionMetricsRemoveMessage.INGESTION_METRICS_REMOVE_MSG_SUB_TYPE:
+ return new IngestionMetricsRemoveMessageHandler(new
IngestionMetricsRemoveMessage(message), _metrics, context);
default:
LOGGER.warn("Unsupported user defined message sub type: {} for
segment: {}", msgSubType,
message.getPartitionName());
@@ -229,6 +234,27 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
}
}
+ private class IngestionMetricsRemoveMessageHandler extends
DefaultMessageHandler {
+
+ IngestionMetricsRemoveMessageHandler(IngestionMetricsRemoveMessage
message, ServerMetrics metrics,
+ NotificationContext context) {
+ super(message, metrics, context);
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() {
+ _logger.info("Handling ingestion metrics remove message for table: {},
segment: {}", _tableNameWithType,
+ _segmentName);
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(_tableNameWithType);
+ if (tableDataManager instanceof RealtimeTableDataManager) {
+ ((RealtimeTableDataManager)
tableDataManager).removeIngestionMetrics(_segmentName);
+ }
+ HelixTaskResult helixTaskResult = new HelixTaskResult();
+ helixTaskResult.setSuccess(true);
+ return helixTaskResult;
+ }
+ }
+
private static class DefaultMessageHandler extends MessageHandler {
final String _segmentName;
final String _tableNameWithType;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]