[ 
https://issues.apache.org/jira/browse/BEAM-14413?focusedWorklogId=767970&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-767970
 ]

ASF GitHub Bot logged work on BEAM-14413:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/May/22 15:37
            Start Date: 09/May/22 15:37
    Worklog Time Spent: 10m 
      Work Description: johnjcasey commented on code in PR #17565:
URL: https://github.com/apache/beam/pull/17565#discussion_r868152376


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java:
##########
@@ -67,18 +73,50 @@ public Consumer<byte[], byte[]> apply(Map<String, Object> 
input) {
     Assert.assertEquals(2L, consumer.commit.get(partition).offset());
   }
 
+  @Test
+  public void testCommitOffsetError() {
+    Map<String, Object> configMap = new HashMap<>();
+    configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
+
+    ReadSourceDescriptors<Object, Object> descriptors =
+        ReadSourceDescriptors.read()
+            .withBootstrapServers("bootstrap_server")
+            .withConsumerConfigUpdates(configMap)
+            .withConsumerFactoryFn(
+                new SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>() {
+                  @Override
+                  public Consumer<byte[], byte[]> apply(Map<String, Object> 
input) {
+                    Assert.assertEquals("group1", 
input.get(ConsumerConfig.GROUP_ID_CONFIG));
+                    return errorConsumer;
+                  }
+                });
+    CommitOffsetDoFn doFn = new CommitOffsetDoFn(descriptors);
+
+    doFn.processElement(
+        KV.of(KafkaSourceDescriptor.of(partition, null, null, null, null, 
null), 1L));

Review Comment:
   Kafka offset is just an integer, so if an offset fails to commit, the next 
offset will cause it to "catch up". We are committing only every 5 minutes at 
the moment anyways, so any kafka pipeline would have to be resilient to double 
processing as is.
   
   This test isn't hugely meaningful, except to ensure that we are logging 
properly when we have the commit exception





Issue Time Tracking
-------------------

    Worklog Id:     (was: 767970)
    Time Spent: 1h 40m  (was: 1.5h)

> Add Kafka IO exception test cases
> ---------------------------------
>
>                 Key: BEAM-14413
>                 URL: https://issues.apache.org/jira/browse/BEAM-14413
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: John Casey
>            Assignee: John Casey
>            Priority: P2
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to