[ 
https://issues.apache.org/jira/browse/BEAM-5798?focusedWorklogId=181310&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181310
 ]

ASF GitHub Bot logged work on BEAM-5798:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Jan/19 21:53
            Start Date: 04/Jan/19 21:53
    Worklog Time Spent: 10m 
      Work Description: rangadi commented on pull request #7371: [BEAM-5798] 
Add support of multiple Kafka output topics
URL: https://github.com/apache/beam/pull/7371#discussion_r245431237
 
 

 ##########
 File path: 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 ##########
 @@ -1100,18 +1100,125 @@ public void testRecordsSink() throws Exception {
     }
   }
 
+  @Test
+  public void testSinkToMultipleTopics() throws Exception {
+    // Set different output topic names
+    int numElements = 1000;
+
+    try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
+
+      ProducerSendCompletionThread completionThread =
+          new 
ProducerSendCompletionThread(producerWrapper.mockProducer).start();
+
+      String defaultTopic = "test";
+
+      p.apply(mkKafkaReadTransform(numElements, new 
ValueAsTimestampFn()).withoutMetadata())
+          .apply(ParDo.of(new KV2ProducerRecord(defaultTopic, false)))
+          .setCoder(ProducerRecordCoder.of(VarIntCoder.of(), 
VarLongCoder.of()))
+          .apply(
+              KafkaIO.<Integer, Long>writeRecords()
+                  .withBootstrapServers("none")
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(LongSerializer.class)
+                  .withInputTimestamp()
+                  .withProducerFactoryFn(new 
ProducerFactoryFn(producerWrapper.producerKey)));
+
+      p.run();
+
+      completionThread.shutdown();
+
+      // Verify that appropriate messages are written to different Kafka topics
+      List<ProducerRecord<Integer, Long>> sent = 
producerWrapper.mockProducer.history();
+
+      for (int i = 0; i < numElements; i++) {
+        ProducerRecord<Integer, Long> record = sent.get(i);
+        if (i % 2 == 0) {
+          assertEquals("test_2", record.topic());
+        } else {
+          assertEquals("test_1", record.topic());
+        }
+        assertEquals(i, record.key().intValue());
+        assertEquals(i, record.value().longValue());
+        assertEquals(i, record.timestamp().intValue());
+      }
+    }
+  }
+
+  @Test
+  public void testSinkProducerRecordsWithCustomTS() throws Exception {
+    int numElements = 1000;
+
+    try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
+
+      ProducerSendCompletionThread completionThread =
+          new 
ProducerSendCompletionThread(producerWrapper.mockProducer).start();
+
+      final String defaultTopic = "test";
+      final Long ts = System.currentTimeMillis();
+
+      p.apply(mkKafkaReadTransform(numElements, new 
ValueAsTimestampFn()).withoutMetadata())
+          .apply(ParDo.of(new KV2ProducerRecord(defaultTopic, ts)))
+          .setCoder(ProducerRecordCoder.of(VarIntCoder.of(), 
VarLongCoder.of()))
+          .apply(
+              KafkaIO.<Integer, Long>writeRecords()
+                  .withBootstrapServers("none")
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(LongSerializer.class)
+                  .withProducerFactoryFn(new 
ProducerFactoryFn(producerWrapper.producerKey)));
+
+      p.run();
+
+      completionThread.shutdown();
+
+      // Verify that appropriate messages are written to different Kafka topics
 
 Review comment:
   Update comment to say it checks for timestamp.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 181310)
    Time Spent: 4h 20m  (was: 4h 10m)

> Add support for dynamic destinations when writing to Kafka
> ----------------------------------------------------------
>
>                 Key: BEAM-5798
>                 URL: https://issues.apache.org/jira/browse/BEAM-5798
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-kafka
>            Reporter: Luke Cwik
>            Assignee: Alexey Romanenko
>            Priority: Major
>              Labels: newbie, starter
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Add support for writing to Kafka based upon contents of the data. This is 
> similar to the dynamic destination approach for file IO and other sinks.
>  
> Source of request: 
> https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to