swaminathanmanish commented on code in PR #12697:
URL: https://github.com/apache/pinot/pull/12697#discussion_r1550612147
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -47,53 +50,60 @@ public KafkaPartitionLevelConsumer(String clientId,
StreamConfig streamConfig, i
}
@Override
- public MessageBatch<StreamMessage<byte[]>>
fetchMessages(StreamPartitionMsgOffset startMsgOffset,
- StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
- final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
- final long endOffset = endMsgOffset == null ? Long.MAX_VALUE :
((LongMsgOffset) endMsgOffset).getOffset();
- return fetchMessages(startOffset, endOffset, timeoutMillis);
- }
-
- public synchronized MessageBatch<StreamMessage<byte[]>> fetchMessages(long
startOffset, long endOffset,
- int timeoutMillis) {
+ public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset
startMsgOffset, int timeoutMs) {
+ long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Polling partition: {}, startOffset: {}, endOffset: {}
timeout: {}ms", _topicPartition, startOffset,
- endOffset, timeoutMillis);
+ LOGGER.debug("Polling partition: {}, startOffset: {}, timeout: {}ms",
_topicPartition, startOffset, timeoutMs);
}
if (_lastFetchedOffset < 0 || _lastFetchedOffset != startOffset - 1) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Seeking to offset: {}", startOffset);
}
_consumer.seek(_topicPartition, startOffset);
}
- ConsumerRecords<String, Bytes> consumerRecords =
_consumer.poll(Duration.ofMillis(timeoutMillis));
- List<ConsumerRecord<String, Bytes>> messageAndOffsets =
consumerRecords.records(_topicPartition);
- List<StreamMessage<byte[]>> filtered = new
ArrayList<>(messageAndOffsets.size());
- long firstOffset = startOffset;
- long lastOffset = startOffset;
- StreamMessageMetadata rowMetadata = null;
- if (!messageAndOffsets.isEmpty()) {
- firstOffset = messageAndOffsets.get(0).offset();
- }
- for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
- long offset = messageAndOffset.offset();
- _lastFetchedOffset = offset;
- if (offset >= startOffset && (endOffset > offset || endOffset < 0)) {
- Bytes message = messageAndOffset.value();
- rowMetadata = (StreamMessageMetadata)
_kafkaMetadataExtractor.extract(messageAndOffset);
+ ConsumerRecords<String, Bytes> consumerRecords =
_consumer.poll(Duration.ofMillis(timeoutMs));
+ List<ConsumerRecord<String, Bytes>> records =
consumerRecords.records(_topicPartition);
+ List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size());
+ long firstOffset = -1;
+ long offsetOfNextBatch = startOffset;
+ StreamMessageMetadata lastMessageMetadata = null;
+ if (!records.isEmpty()) {
+ firstOffset = records.get(0).offset();
+ _lastFetchedOffset = records.get(records.size() - 1).offset();
+ offsetOfNextBatch = _lastFetchedOffset + 1;
Review Comment:
This makes sense and simplifies the logic. Hope we can comprehensively
verify through integ tests.
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java:
##########
@@ -71,52 +61,22 @@ public int getUnfilteredMessageCount() {
}
@Override
- public StreamMessage getMessageAtIndex(int index) {
- return _messageList.get(index);
- }
-
- @Override
- public int getMessageOffsetAtIndex(int index) {
- return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset();
- }
-
- @Override
- public int getMessageLengthAtIndex(int index) {
- return _messageList.get(index).getValue().length;
- }
-
- @Override
- public long getNextStreamMessageOffsetAtIndex(int index) {
- throw new UnsupportedOperationException("This method is deprecated");
- }
-
- @Override
- public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int
index) {
- return new LongMsgOffset(((KafkaStreamMessage)
_messageList.get(index)).getNextOffset());
+ public BytesStreamMessage getStreamMessage(int index) {
+ return _messages.get(index);
}
@Override
public StreamPartitionMsgOffset getOffsetOfNextBatch() {
- return new LongMsgOffset(_lastOffset + 1);
- }
-
- @Override
- public RowMetadata getMetadataAtIndex(int index) {
- return _messageList.get(index).getMetadata();
+ return new LongMsgOffset(_offsetOfNextBatch);
}
@Override
- public byte[] getMessageBytesAtIndex(int index) {
- return _messageList.get(index).getValue();
- }
-
- @Override
- public StreamMessage getStreamMessage(int index) {
- return _messageList.get(index);
+ public StreamPartitionMsgOffset getFirstMessageOffset() {
Review Comment:
is this backwards compatible, since it can return null
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -591,67 +600,52 @@ private boolean processStreamEvents(MessageBatch
messagesAndOffsets, long idlePi
_numRowsErrored++;
// when exception happens we prefer abandoning the whole batch and
not partially indexing some rows
reusedResult.getTransformedRows().clear();
- String errorMessage = String.format("Caught exception while
transforming the record at offset: %s , row: %s",
- messageOffset, decodedRow.getResult());
+ String errorMessage =
+ String.format("Caught exception while transforming the record at
offset: %s , row: %s", offset,
+ decodedRow.getResult());
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, e));
}
if (reusedResult.getSkippedRowCount() > 0) {
- realtimeRowsDroppedMeter =
- _serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.REALTIME_ROWS_FILTERED,
- reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter);
+ realtimeRowsDroppedMeter =
_serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.REALTIME_ROWS_FILTERED,
+ reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter);
}
if (reusedResult.getIncompleteRowCount() > 0) {
realtimeIncompleteRowsConsumedMeter =
_serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
reusedResult.getIncompleteRowCount(),
realtimeIncompleteRowsConsumedMeter);
}
List<GenericRow> transformedRows = reusedResult.getTransformedRows();
- if (transformedRows.size() > 0) {
- hasTransformedRows = true;
- }
for (GenericRow transformedRow : transformedRows) {
try {
- canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+ canTakeMore = _realtimeSegment.index(transformedRow, metadata);
indexedMessageCount++;
- _lastRowMetadata = msgMetadata;
+ _lastRowMetadata = metadata;
_lastConsumedTimestampMs = System.currentTimeMillis();
realtimeRowsConsumedMeter =
_serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.REALTIME_ROWS_CONSUMED, 1,
realtimeRowsConsumedMeter);
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L);
} catch (Exception e) {
_numRowsErrored++;
- String errorMessage = String.format("Caught exception while
indexing the record at offset: %s , row: %s",
- messageOffset, transformedRow);
+ String errorMessage =
+ String.format("Caught exception while indexing the record at
offset: %s , row: %s", offset,
+ transformedRow);
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, e));
}
}
}
- _currentOffset = messageOffset;
+ _currentOffset = nextOffset;
_numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
_numRowsConsumed++;
streamMessageCount++;
}
Review Comment:
Thanks for cleaning this up. Hope this can be verified.
if (indexedMessageCount > 0) {
// Record Ingestion delay for this partition with metadata for last
message we processed
updateIngestionDelay(_lastRowMetadata);
} else if (!hasTransformedRows && (msgMetadata != null)) {
// If all messages were filtered by transformation, we still attempt
to update ingestion delay using
// the metadata for the last message we processed if any.
updateIngestionDelay(msgMetadata);
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]