Jackie-Jiang commented on code in PR #16492:
URL: https://github.com/apache/pinot/pull/16492#discussion_r2267972326
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +968,65 @@ private void createNewSegmentZKMetadata(TableConfig
tableConfig, StreamConfig st
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
+ private String computeStartOffset(String nextOffset, StreamConfig
streamConfig, int partitionId) {
+ if (!streamConfig.isEnableOffsetAutoReset()) {
+ return nextOffset;
+ }
+ long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+ int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+ if (timeThreshold <= 0 && offsetThreshold <= 0) {
+ LOGGER.warn("Invalid offset auto reset configuration for table: {},
topic: {}. "
+ + "timeThreshold: {}, offsetThreshold: {}",
+ streamConfig.getTableNameWithType(), streamConfig.getTopicName(),
timeThreshold, offsetThreshold);
+ return nextOffset;
+ }
+ String clientId =
Review Comment:
You probably need to create a unique client id, similar to
`getPartitionIds()`. If so, you may extract a common helper method
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1711,8 +1776,8 @@ private void createNewConsumingSegment(TableConfig
tableConfig, StreamConfig str
LLCSegmentName newLLCSegmentName =
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
CommittingSegmentDescriptor committingSegmentDescriptor =
new
CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(),
startOffset.toString(), 0);
- createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
currentTimeMs, committingSegmentDescriptor,
- latestSegmentZKMetadata, instancePartitions, numPartitions,
numReplicas);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
currentTimeMs,
Review Comment:
(minor) This change seems wrong
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java:
##########
@@ -143,6 +143,25 @@ private StreamConfigProperties() {
public static final String PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS =
"realtime.segment.pauseless.download.timeoutSeconds";
+ /**
+ * Config used to enable offset auto reset during segment commit.
+ */
+ public static final String ENABLE_OFFSET_AUTO_RESET =
"realtime.segment.offsetAutoReset.enable";
+
+ /**
+ * During segment commit, the new segment startOffset would skip to the
latest offset if thisValue is set as positive
+ * and (latestStreamOffset - latestIngestedOffset > thisValue)
+ */
+ public static final String OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY =
+ "realtime.segment.offsetAutoReset.offsetThreshold";
+
+ /**
+ * During segment commit, the new segment startOffset would skip to the
latest offset if thisValue is set as positive
+ * and (latestStreamOffset's timestamp - latestIngestedOffset's timestamp >
thisValue)
+ */
+ public static final String OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY =
+ "realtime.segment.offsetAutoReset.timeSecThreshold";
Review Comment:
Or `timeThresholdMs` to given finer control
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java:
##########
@@ -143,6 +143,25 @@ private StreamConfigProperties() {
public static final String PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS =
"realtime.segment.pauseless.download.timeoutSeconds";
+ /**
+ * Config used to enable offset auto reset during segment commit.
+ */
+ public static final String ENABLE_OFFSET_AUTO_RESET =
"realtime.segment.offsetAutoReset.enable";
+
+ /**
+ * During segment commit, the new segment startOffset would skip to the
latest offset if thisValue is set as positive
+ * and (latestStreamOffset - latestIngestedOffset > thisValue)
+ */
+ public static final String OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY =
+ "realtime.segment.offsetAutoReset.offsetThreshold";
+
+ /**
+ * During segment commit, the new segment startOffset would skip to the
latest offset if thisValue is set as positive
+ * and (latestStreamOffset's timestamp - latestIngestedOffset's timestamp >
thisValue)
+ */
+ public static final String OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY =
+ "realtime.segment.offsetAutoReset.timeSecThreshold";
Review Comment:
```suggestion
"realtime.segment.offsetAutoReset.timeThresholdSeconds";
```
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java:
##########
@@ -187,6 +187,13 @@ public List<TopicMetadata> getTopics() {
}
}
+ @Override
+ public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long
timestampMillis, long timeoutMillis) {
+ return new LongMsgOffset(
Review Comment:
Please fix the format. Same for other places
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +968,65 @@ private void createNewSegmentZKMetadata(TableConfig
tableConfig, StreamConfig st
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
+ private String computeStartOffset(String nextOffset, StreamConfig
streamConfig, int partitionId) {
+ if (!streamConfig.isEnableOffsetAutoReset()) {
+ return nextOffset;
+ }
+ long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+ int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+ if (timeThreshold <= 0 && offsetThreshold <= 0) {
+ LOGGER.warn("Invalid offset auto reset configuration for table: {},
topic: {}. "
+ + "timeThreshold: {}, offsetThreshold: {}",
+ streamConfig.getTableNameWithType(), streamConfig.getTopicName(),
timeThreshold, offsetThreshold);
+ return nextOffset;
+ }
+ String clientId =
+ PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
+ + streamConfig.getTopicName();
+ StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ StreamPartitionMsgOffsetFactory offsetFactory =
consumerFactory.createStreamMsgOffsetFactory();
+ StreamPartitionMsgOffset nextOffsetWithType =
offsetFactory.create(nextOffset);
+ StreamPartitionMsgOffset offsetAtSLA;
+ StreamPartitionMsgOffset latestOffset;
+ try (StreamMetadataProvider metadataProvider =
consumerFactory.createPartitionMetadataProvider(clientId,
+ partitionId)) {
+ // Fetching timestamp from an offset is an expensive operation which
requires reading the data,
+ // while fetching offset from timestamp is lightweight and only needs to
read metadata.
+ // Hence, instead of checking if latestOffset's time - nextOffset's time
< SLA, we would check
+ // (CurrentTime - SLA)'s offset > nextOffset.
+ // TODO: it is relying on System.currentTimeMillis() which might be
affected by time drift. If we are able to
+ // get nextOffset's time, we should instead check (nextOffset's time +
SLA)'s offset < latestOffset
+ latestOffset = metadataProvider.fetchStreamPartitionOffset(
+ OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS);
+ LOGGER.info("Latest offset of topic {} and partition {} is {}",
streamConfig.getTopicName(), partitionId,
+ latestOffset);
+ offsetAtSLA =
Review Comment:
Should we fetch this only when time threshold is positive?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +968,65 @@ private void createNewSegmentZKMetadata(TableConfig
tableConfig, StreamConfig st
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
+ private String computeStartOffset(String nextOffset, StreamConfig
streamConfig, int partitionId) {
+ if (!streamConfig.isEnableOffsetAutoReset()) {
+ return nextOffset;
+ }
+ long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+ int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+ if (timeThreshold <= 0 && offsetThreshold <= 0) {
+ LOGGER.warn("Invalid offset auto reset configuration for table: {},
topic: {}. "
+ + "timeThreshold: {}, offsetThreshold: {}",
+ streamConfig.getTableNameWithType(), streamConfig.getTopicName(),
timeThreshold, offsetThreshold);
+ return nextOffset;
+ }
+ String clientId =
+ PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
+ + streamConfig.getTopicName();
+ StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ StreamPartitionMsgOffsetFactory offsetFactory =
consumerFactory.createStreamMsgOffsetFactory();
+ StreamPartitionMsgOffset nextOffsetWithType =
offsetFactory.create(nextOffset);
+ StreamPartitionMsgOffset offsetAtSLA;
+ StreamPartitionMsgOffset latestOffset;
+ try (StreamMetadataProvider metadataProvider =
consumerFactory.createPartitionMetadataProvider(clientId,
+ partitionId)) {
+ // Fetching timestamp from an offset is an expensive operation which
requires reading the data,
+ // while fetching offset from timestamp is lightweight and only needs to
read metadata.
+ // Hence, instead of checking if latestOffset's time - nextOffset's time
< SLA, we would check
+ // (CurrentTime - SLA)'s offset > nextOffset.
+ // TODO: it is relying on System.currentTimeMillis() which might be
affected by time drift. If we are able to
+ // get nextOffset's time, we should instead check (nextOffset's time +
SLA)'s offset < latestOffset
+ latestOffset = metadataProvider.fetchStreamPartitionOffset(
+ OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS);
+ LOGGER.info("Latest offset of topic {} and partition {} is {}",
streamConfig.getTopicName(), partitionId,
+ latestOffset);
+ offsetAtSLA =
+ metadataProvider.getOffsetAtTimestamp(partitionId,
System.currentTimeMillis() - timeThreshold * 1000,
+ STREAM_FETCH_TIMEOUT_MS);
+ LOGGER.info("Offset at SLA of topic {} and partition {} is {}",
streamConfig.getTopicName(), partitionId,
+ offsetAtSLA);
+ } catch (Exception e) {
+ LOGGER.warn("Not able to fetch the offset metadata, skip auto resetting
offsets", e);
+ return nextOffset;
+ }
+ try {
+ if (timeThreshold > 0 && offsetAtSLA != null &&
offsetAtSLA.compareTo(nextOffsetWithType) < 0) {
+ LOGGER.info("Auto reset offset from {} to {} on partition {} because
time threshold reached", nextOffset,
Review Comment:
We probably want to log as warning given this can cause data loss. Same for
line 1020
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -931,7 +933,11 @@ private void createNewSegmentZKMetadata(TableConfig
tableConfig, StreamConfig st
int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
String segmentName = newLLCSegmentName.getSegmentName();
- String startOffset = committingSegmentDescriptor.getNextOffset();
+
+ // Handle offset auto reset
+ String oldStartOffset = committingSegmentDescriptor.getNextOffset();
Review Comment:
(minor) Let's call it `nextOffset`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]