johnjcasey commented on code in PR #17565:
URL: https://github.com/apache/beam/pull/17565#discussion_r868152376


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java:
##########
@@ -67,18 +73,50 @@ public Consumer<byte[], byte[]> apply(Map<String, Object> 
input) {
     Assert.assertEquals(2L, consumer.commit.get(partition).offset());
   }
 
+  @Test
+  public void testCommitOffsetError() {
+    Map<String, Object> configMap = new HashMap<>();
+    configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
+
+    ReadSourceDescriptors<Object, Object> descriptors =
+        ReadSourceDescriptors.read()
+            .withBootstrapServers("bootstrap_server")
+            .withConsumerConfigUpdates(configMap)
+            .withConsumerFactoryFn(
+                new SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>() {
+                  @Override
+                  public Consumer<byte[], byte[]> apply(Map<String, Object> 
input) {
+                    Assert.assertEquals("group1", 
input.get(ConsumerConfig.GROUP_ID_CONFIG));
+                    return errorConsumer;
+                  }
+                });
+    CommitOffsetDoFn doFn = new CommitOffsetDoFn(descriptors);
+
+    doFn.processElement(
+        KV.of(KafkaSourceDescriptor.of(partition, null, null, null, null, 
null), 1L));

Review Comment:
   Kafka offset is just an integer, so if an offset fails to commit, the next 
offset will cause it to "catch up". We are committing only every 5 minutes at 
the moment anyways, so any kafka pipeline would have to be resilient to double 
processing as is.
   
   This test isn't hugely meaningful, except to ensure that we are logging 
properly when we have the commit exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to