[
https://issues.apache.org/jira/browse/BEAM-14413?focusedWorklogId=767427&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-767427
]
ASF GitHub Bot logged work on BEAM-14413:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 06/May/22 21:46
Start Date: 06/May/22 21:46
Worklog Time Spent: 10m
Work Description: pabloem commented on code in PR #17565:
URL: https://github.com/apache/beam/pull/17565#discussion_r867234398
##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java:
##########
@@ -67,18 +73,50 @@ public Consumer<byte[], byte[]> apply(Map<String, Object>
input) {
Assert.assertEquals(2L, consumer.commit.get(partition).offset());
}
+ @Test
+ public void testCommitOffsetError() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
+
+ ReadSourceDescriptors<Object, Object> descriptors =
+ ReadSourceDescriptors.read()
+ .withBootstrapServers("bootstrap_server")
+ .withConsumerConfigUpdates(configMap)
+ .withConsumerFactoryFn(
+ new SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>>() {
+ @Override
+ public Consumer<byte[], byte[]> apply(Map<String, Object>
input) {
+ Assert.assertEquals("group1",
input.get(ConsumerConfig.GROUP_ID_CONFIG));
+ return errorConsumer;
+ }
+ });
+ CommitOffsetDoFn doFn = new CommitOffsetDoFn(descriptors);
+
+ doFn.processElement(
+ KV.of(KafkaSourceDescriptor.of(partition, null, null, null, null,
null), 1L));
Review Comment:
what does this test do? first throw a failure and then succeed? should the
doFn throw the exception? How do we know that the exception is not being
swallowed and the offset commit is eventually making it in?
Issue Time Tracking
-------------------
Worklog Id: (was: 767427)
Time Spent: 1h 20m (was: 1h 10m)
> Add Kafka IO exception test cases
> ---------------------------------
>
> Key: BEAM-14413
> URL: https://issues.apache.org/jira/browse/BEAM-14413
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Reporter: John Casey
> Assignee: John Casey
> Priority: P2
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)