chamikaramj commented on code in PR #22450:
URL: https://github.com/apache/beam/pull/22450#discussion_r931168600


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -239,6 +252,94 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() 
throws IOException {
     }
   }
 
+  // Because of existing limitations in streaming testing, this is verified 
via a combination of
+  // DoFns.  CrashOnExtra will throw an exception if we see any extra records 
beyond those we
+  // expect, and LogFn acts as a sink we can inspect using ExpectedLogs to 
verify that we got all
+  // those we expect.
+  @Test
+  public void testKafkaIOSDFResumesCorrectly() throws IOException {
+    roundtripElements("first-pass", 4, writePipeline, sdfReadPipeline);
+    roundtripElements("second-pass", 3, writePipeline2, sdfReadPipeline2);
+  }
+
+  private void roundtripElements(
+      String recordPrefix, Integer recordCount, TestPipeline wPipeline, 
TestPipeline rPipeline)
+      throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", 
options.getKafkaBootstrapServerAddresses()));
+    client.listTopics();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < recordCount; i++) {
+      records.put(i, recordPrefix + "-" + i);
+    }
+
+    wPipeline
+        .apply("Generate Write Elements", Create.of(records))
+        .apply(
+            "Write to Kafka",
+            KafkaIO.<Integer, String>write()
+                
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeySerializer(IntegerSerializer.class)
+                .withValueSerializer(StringSerializer.class));
+
+    wPipeline.run().waitUntilFinish(Duration.standardSeconds(10));
+
+    rPipeline
+        .apply(
+            "Read from Kafka",
+            KafkaIO.<Integer, String>read()
+                
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withConsumerConfigUpdates(
+                    ImmutableMap.of(
+                        "group.id",
+                        "resuming-group",
+                        "auto.offset.reset",
+                        "earliest",
+                        "enable.auto.commit",
+                        "true"))
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeyDeserializer(IntegerDeserializer.class)
+                .withValueDeserializer(StringDeserializer.class)
+                .withoutMetadata())
+        .apply("Get Values", Values.create())
+        .apply(ParDo.of(new CrashOnExtra(records.values())))
+        .apply(ParDo.of(new LogFn()));
+
+    
rPipeline.run().waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    for (String value : records.values()) {
+      kafkaIOITExpectedLogs.verifyError(value);
+    }
+  }
+
+  public static class CrashOnExtra extends DoFn<String, String> {
+    final Set<String> expected;
+
+    public CrashOnExtra(Collection<String> records) {
+      expected = new HashSet<>(records);
+    }
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> 
outputReceiver) {
+      if (!expected.contains(element)) {
+        throw new RuntimeException("Received unexpected element: " + element);
+      } else {
+        expected.remove(element);

Review Comment:
   Ah ok. Makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to