swaminathanmanish commented on code in PR #12697: URL: https://github.com/apache/pinot/pull/12697#discussion_r1550612147
########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java: ########## @@ -47,53 +50,60 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i } @Override - public MessageBatch<StreamMessage<byte[]>> fetchMessages(StreamPartitionMsgOffset startMsgOffset, - StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) { - final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset(); - final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset(); - return fetchMessages(startOffset, endOffset, timeoutMillis); - } - - public synchronized MessageBatch<StreamMessage<byte[]>> fetchMessages(long startOffset, long endOffset, - int timeoutMillis) { + public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) { + long startOffset = ((LongMsgOffset) startMsgOffset).getOffset(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Polling partition: {}, startOffset: {}, endOffset: {} timeout: {}ms", _topicPartition, startOffset, - endOffset, timeoutMillis); + LOGGER.debug("Polling partition: {}, startOffset: {}, timeout: {}ms", _topicPartition, startOffset, timeoutMs); } if (_lastFetchedOffset < 0 || _lastFetchedOffset != startOffset - 1) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Seeking to offset: {}", startOffset); } _consumer.seek(_topicPartition, startOffset); } - ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis)); - List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition); - List<StreamMessage<byte[]>> filtered = new ArrayList<>(messageAndOffsets.size()); - long firstOffset = startOffset; - long lastOffset = startOffset; - StreamMessageMetadata rowMetadata = null; - if (!messageAndOffsets.isEmpty()) { - firstOffset = messageAndOffsets.get(0).offset(); - } - for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) { - long offset = messageAndOffset.offset(); - _lastFetchedOffset = offset; - if (offset >= startOffset && (endOffset > offset || endOffset < 0)) { - Bytes message = messageAndOffset.value(); - rowMetadata = (StreamMessageMetadata) _kafkaMetadataExtractor.extract(messageAndOffset); + ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMs)); + List<ConsumerRecord<String, Bytes>> records = consumerRecords.records(_topicPartition); + List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size()); + long firstOffset = -1; + long offsetOfNextBatch = startOffset; + StreamMessageMetadata lastMessageMetadata = null; + if (!records.isEmpty()) { + firstOffset = records.get(0).offset(); + _lastFetchedOffset = records.get(records.size() - 1).offset(); + offsetOfNextBatch = _lastFetchedOffset + 1; Review Comment: This makes sense and simplifies the logic. Hope we can comprehensively verify through integ tests. ########## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java: ########## @@ -71,52 +61,22 @@ public int getUnfilteredMessageCount() { } @Override - public StreamMessage getMessageAtIndex(int index) { - return _messageList.get(index); - } - - @Override - public int getMessageOffsetAtIndex(int index) { - return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset(); - } - - @Override - public int getMessageLengthAtIndex(int index) { - return _messageList.get(index).getValue().length; - } - - @Override - public long getNextStreamMessageOffsetAtIndex(int index) { - throw new UnsupportedOperationException("This method is deprecated"); - } - - @Override - public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) { - return new LongMsgOffset(((KafkaStreamMessage) _messageList.get(index)).getNextOffset()); + public BytesStreamMessage getStreamMessage(int index) { + return _messages.get(index); } @Override public StreamPartitionMsgOffset getOffsetOfNextBatch() { - return new LongMsgOffset(_lastOffset + 1); - } - - @Override - public RowMetadata getMetadataAtIndex(int index) { - return _messageList.get(index).getMetadata(); + return new LongMsgOffset(_offsetOfNextBatch); } @Override - public byte[] getMessageBytesAtIndex(int index) { - return _messageList.get(index).getValue(); - } - - @Override - public StreamMessage getStreamMessage(int index) { - return _messageList.get(index); + public StreamPartitionMsgOffset getFirstMessageOffset() { Review Comment: is this backwards compatible, since it can return null ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -591,67 +600,52 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi _numRowsErrored++; // when exception happens we prefer abandoning the whole batch and not partially indexing some rows reusedResult.getTransformedRows().clear(); - String errorMessage = String.format("Caught exception while transforming the record at offset: %s , row: %s", - messageOffset, decodedRow.getResult()); + String errorMessage = + String.format("Caught exception while transforming the record at offset: %s , row: %s", offset, + decodedRow.getResult()); _segmentLogger.error(errorMessage, e); _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e)); } if (reusedResult.getSkippedRowCount() > 0) { - realtimeRowsDroppedMeter = - _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_FILTERED, - reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter); + realtimeRowsDroppedMeter = _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_FILTERED, + reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter); } if (reusedResult.getIncompleteRowCount() > 0) { realtimeIncompleteRowsConsumedMeter = _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED, reusedResult.getIncompleteRowCount(), realtimeIncompleteRowsConsumedMeter); } List<GenericRow> transformedRows = reusedResult.getTransformedRows(); - if (transformedRows.size() > 0) { - hasTransformedRows = true; - } for (GenericRow transformedRow : transformedRows) { try { - canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata); + canTakeMore = _realtimeSegment.index(transformedRow, metadata); indexedMessageCount++; - _lastRowMetadata = msgMetadata; + _lastRowMetadata = metadata; _lastConsumedTimestampMs = System.currentTimeMillis(); realtimeRowsConsumedMeter = _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter); _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L); } catch (Exception e) { _numRowsErrored++; - String errorMessage = String.format("Caught exception while indexing the record at offset: %s , row: %s", - messageOffset, transformedRow); + String errorMessage = + String.format("Caught exception while indexing the record at offset: %s , row: %s", offset, + transformedRow); _segmentLogger.error(errorMessage, e); _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e)); } } } - _currentOffset = messageOffset; + _currentOffset = nextOffset; _numRowsIndexed = _realtimeSegment.getNumDocsIndexed(); _numRowsConsumed++; streamMessageCount++; } Review Comment: Thanks for cleaning this up. Hope this can be verified. if (indexedMessageCount > 0) { // Record Ingestion delay for this partition with metadata for last message we processed updateIngestionDelay(_lastRowMetadata); } else if (!hasTransformedRows && (msgMetadata != null)) { // If all messages were filtered by transformation, we still attempt to update ingestion delay using // the metadata for the last message we processed if any. updateIngestionDelay(msgMetadata); } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org