[ https://issues.apache.org/jira/browse/BEAM-990 ]
Danny McCormick deleted comment on BEAM-990:
--------------------------------------
was (Author: JIRAUSER282469):
This issue has been migrated to https://github.com/apache/beam/issues/17935
> KafkaIO does not commit offsets to Kafka
> ----------------------------------------
>
> Key: BEAM-990
> URL: https://issues.apache.org/jira/browse/BEAM-990
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka
> Reporter: Alban Perillat-Merceroz
> Priority: P3
> Labels: KafkaIO
>
> I use KafkaIO as a source, and I would like consumed offsets to be stored in
> Kafka (in the {{__consumer_offsets}} topic).
> I'm configuring the Kafka reader with
> {code:java}
> .updateConsumerProperties(ImmutableMap.of(
> ConsumerConfig.GROUP_ID_CONFIG, "my-group",
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> java.lang.Boolean.TRUE,
> ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't
> work with default value either (5000ms)
> ))
> {code}
> But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}},
> next job will restart at latest offset).
> I can't find in the code where the offsets are supposed to be committed.
> I tried to add a manual commit in the {{consumerPollLoop()}} method, and it
> works, offsets are committed:
> {code:java}
> private void consumerPollLoop() {
> // Read in a loop and enqueue the batch of records, if any, to
> availableRecordsQueue
> while (!closed.get()) {
> try {
> ConsumerRecords<byte[], byte[]> records =
> consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
> if (!records.isEmpty() && !closed.get()) {
> availableRecordsQueue.put(records); // blocks until
> dequeued.
> // Manual commit
> consumer.commitSync();
> }
> } catch (InterruptedException e) {
> LOG.warn("{}: consumer thread is interrupted", this, e);
> // not expected
> break;
> } catch (WakeupException e) {
> break;
> }
> }
> LOG.info("{}: Returning from consumer pool loop", this);
> }
> {code}
> Is this a bug in KafkaIO or am I misconfiguring something?
> Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in
> Dataflow SDK
> (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
> but I'm confident the code is similar for this case.
> Edit: I found the correct method where KafkaIO is supposed to commit at the
> end of a batch. I'm currently testing it and will be able to open a pull
> request soon:
> {code:java}
> // KafkaCheckpointMark.java
> /**
> * Optional consumer that will be used to commit offsets into Kafka when
> finalizeCheckpoint() is called
> */
> @Nullable
> private final Consumer consumer;
> public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable
> Consumer consumer) {
> this.partitions = partitions;
> this.consumer = consumer;
> }
> /**
> * Commit synchronously into Kafka offsets that have been passed
> downstream.
> */
> @Override
> public void finalizeCheckpoint() throws IOException {
> if (consumer == null) {
> LOG.warn("finalizeCheckpoint(): no consumer provided, will not
> commit anything.");
> return;
> }
> if (partitions.size() == 0) {
> LOG.info("finalizeCheckpoint(): nothing to commit to Kafka.");
> return;
> }
> final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap();
> String committedOffsets = "";
> for (PartitionMark partition : partitions) {
> TopicPartition topicPartition = partition.getTopicPartition();
> offsets.put(topicPartition, new
> OffsetAndMetadata(partition.offset));
> committedOffsets += topicPartition.topic() + "-" +
> topicPartition.partition() + ":" + partition.offset + ",";
> }
> final String printableOffsets = committedOffsets.substring(0,
> committedOffsets.length() - 1);
> try {
> consumer.commitSync(offsets);
> LOG.info("finalizeCheckpoint(): committed Kafka offsets {}",
> printableOffsets);
> } catch (Exception e) {
> LOG.error("finalizeCheckpoint(): {} when trying to commit Kafka
> offsets [{}]",
> e.getClass().getSimpleName(),
> printableOffsets);
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)