scwhittle commented on code in PR #34275: URL: https://github.com/apache/beam/pull/34275#discussion_r1998562203
########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java: ########## @@ -583,13 +586,15 @@ private boolean topicPartitionExists( // see https://github.com/apache/beam/issues/25962 private ConsumerRecords<byte[], byte[]> poll( - Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) { + Consumer<byte[], byte[]> consumer, TopicPartition topicPartition, KafkaMetrics kafkaMetrics) { final Stopwatch sw = Stopwatch.createStarted(); long previousPosition = -1; - java.time.Duration elapsed = java.time.Duration.ZERO; java.time.Duration timeout = java.time.Duration.ofSeconds(this.consumerPollingTimeout); while (true) { + java.time.Duration elapsed = sw.elapsed(); Review Comment: oops, I misread that elapsed was used to calculate the poll timeout. The structure before was to have elapsed just set once per loop but be updated after polling which may take time. So I think that you should go back to having elapsed outside the loop as before and move where it is set to after the poll. ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java: ########## @@ -454,122 +454,125 @@ public ProcessContinuation processElement( final Stopwatch sw = Stopwatch.createStarted(); while (true) { - rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition()); - // When there are no records available for the current TopicPartition, self-checkpoint - // and move to process the next element. - if (rawRecords.isEmpty()) { - if (!topicPartitionExists( - kafkaSourceDescriptor.getTopicPartition(), - consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) { - return ProcessContinuation.stop(); - } - if (timestampPolicy != null) { - updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker); - } - return ProcessContinuation.resume(); - } - for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { - // If the Kafka consumer returns a record with an offset that is already processed - // the record can be safely skipped. This is needed because there is a possibility - // that the seek() above fails to move the offset to the desired position. In which - // case poll() would return records that are already cnsumed. - if (rawRecord.offset() < startOffset) { - // If the start offset is not reached even after skipping the records for 10 seconds - // then the processing is stopped with a backoff to give the Kakfa server some time - // catch up. - if (sw.elapsed().getSeconds() > 10L) { - LOG.error( - "The expected offset ({}) was not reached even after" - + " skipping consumed records for 10 seconds. The offset we could" - + " reach was {}. The processing of this bundle will be attempted" - + " at a later time.", - expectedOffset, - rawRecord.offset()); - return ProcessContinuation.resume() - .withResumeDelay(org.joda.time.Duration.standardSeconds(10L)); - } - skippedRecords++; - continue; - } - if (skippedRecords > 0L) { - LOG.warn( - "{} records were skipped due to seek returning an" - + " earlier position than requested position of {}", - skippedRecords, - expectedOffset); - skippedRecords = 0L; - } - if (!tracker.tryClaim(rawRecord.offset())) { - return ProcessContinuation.stop(); - } - try { - KafkaRecord<K, V> kafkaRecord = - new KafkaRecord<>( - rawRecord.topic(), - rawRecord.partition(), - rawRecord.offset(), - ConsumerSpEL.getRecordTimestamp(rawRecord), - ConsumerSpEL.getRecordTimestampType(rawRecord), - ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null, - ConsumerSpEL.deserializeKey(keyDeserializerInstance, rawRecord), - ConsumerSpEL.deserializeValue(valueDeserializerInstance, rawRecord)); - int recordSize = - (rawRecord.key() == null ? 0 : rawRecord.key().length) - + (rawRecord.value() == null ? 0 : rawRecord.value().length); - avgRecordSizeCache - .getUnchecked(kafkaSourceDescriptor) - .update(recordSize, rawRecord.offset() - expectedOffset); - rawSizes.update(recordSize); - expectedOffset = rawRecord.offset() + 1; - Instant outputTimestamp; - // The outputTimestamp and watermark will be computed by timestampPolicy, where the - // WatermarkEstimator should be a manual one. - if (timestampPolicy != null) { - TimestampPolicyContext context = - updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker); - outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord); - } else { - Preconditions.checkStateNotNull(this.extractOutputTimestampFn); - outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord); + KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics(); Review Comment: can we instead have the metrics lifetime longer? Seems like avoiding allocation/flush per pull will be better for performance. KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics(); try { while (true) { } } finally { } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org