damccorm opened a new issue, #17935:
URL: https://github.com/apache/beam/issues/17935

   I use KafkaIO as a source, and I would like consumed offsets to be stored in 
Kafka (in the `__consumer_offsets` topic).
   
   I'm configuring the Kafka reader with 
   ```
   
   .updateConsumerProperties(ImmutableMap.of(
                 ConsumerConfig.GROUP_ID_CONFIG, "my-group",
   
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
java.lang.Boolean.TRUE,
                 ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
   "10" // doesn't work with default value either (5000ms)
               ))
   
   ```
   
   
   But the offsets are not stored in Kafka (nothing in `__consumer_offsets`, 
next job will restart at latest offset).
   
   I can't find in the code where the offsets are supposed to be committed.
   
   I tried to add a manual commit in the `consumerPollLoop()` method, and it 
works, offsets are committed:
   
   ```
   
   private void consumerPollLoop() {
               // Read in a loop and enqueue the batch of records, if
   any, to availableRecordsQueue
               while (!closed.get()) {
                   try {
              
           ConsumerRecords<byte[], byte[]> records = 
consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
     
                    if (!records.isEmpty() && !closed.get()) {
                           availableRecordsQueue.put(records);
   // blocks until dequeued.
                           // Manual commit
                           consumer.commitSync();
   
                      }
                   } catch (InterruptedException e) {
                       LOG.warn("{}:
   consumer thread is interrupted", this, e); // not expected
                       break;
                
     } catch (WakeupException e) {
                       break;
                   }
               }
   
          
       LOG.info("{}: Returning from consumer pool loop", this);
           }
   
   ```
   
   
   Is this a bug in KafkaIO or am I misconfiguring something?
   
   Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in 
Dataflow SDK 
(https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
 but I'm confident the code is similar for this case.
   
   Edit: I found the correct method where KafkaIO is supposed to commit at the 
end of a batch. I'm currently testing it and will be able to open a pull 
request soon:
   
   ```
   
   // KafkaCheckpointMark.java
   
       /**
        * Optional consumer that will be used to commit offsets into
   Kafka when finalizeCheckpoint() is called
        */
       @Nullable
       private final Consumer consumer;
   
   
      public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable 
Consumer consumer) {
         
    this.partitions = partitions;
           this.consumer = consumer;
       }
   
       /**
        * Commit synchronously
   into Kafka offsets that have been passed downstream.
        */
       @Override
       public void finalizeCheckpoint()
   throws IOException {
           if (consumer == null) {
               LOG.warn("finalizeCheckpoint(): no
   consumer provided, will not commit anything.");
               return;
           }
           if (partitions.size()
   == 0) {
               LOG.info("finalizeCheckpoint(): nothing to commit to Kafka.");
               return;
   
          }
   
           final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap();
           String
   committedOffsets = "";
           for (PartitionMark partition : partitions) {
               TopicPartition
   topicPartition = partition.getTopicPartition();
               offsets.put(topicPartition, new 
OffsetAndMetadata(partition.offset));
   
              committedOffsets += topicPartition.topic() + "-" + 
topicPartition.partition() + ":" + partition.offset
   + ",";
           }
   
           final String printableOffsets = committedOffsets.substring(0, 
committedOffsets.length()
   - 1);
           try {
               consumer.commitSync(offsets);
               LOG.info("finalizeCheckpoint():
   committed Kafka offsets {}", printableOffsets);
           } catch (Exception e) {
               LOG.error("finalizeCheckpoint():
   {} when trying to commit Kafka offsets [{}]",
                       e.getClass().getSimpleName(),
      
                   printableOffsets);
           }
       }
   
   ```
   
   
   Imported from Jira 
[BEAM-990](https://issues.apache.org/jira/browse/BEAM-990). Original Jira may 
contain additional context.
   Reported by: [email protected].


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to