[
https://issues.apache.org/jira/browse/BEAM-5798?focusedWorklogId=181782&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181782
]
ASF GitHub Bot logged work on BEAM-5798:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Jan/19 14:10
Start Date: 07/Jan/19 14:10
Worklog Time Spent: 10m
Work Description: aromanenko-dev commented on pull request #7371:
[BEAM-5798] Add support of multiple Kafka output topics
URL: https://github.com/apache/beam/pull/7371#discussion_r245664562
##########
File path:
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
##########
@@ -1100,18 +1100,125 @@ public void testRecordsSink() throws Exception {
}
}
+ @Test
+ public void testSinkToMultipleTopics() throws Exception {
+ // Set different output topic names
+ int numElements = 1000;
+
+ try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
+
+ ProducerSendCompletionThread completionThread =
+ new
ProducerSendCompletionThread(producerWrapper.mockProducer).start();
+
+ String defaultTopic = "test";
+
+ p.apply(mkKafkaReadTransform(numElements, new
ValueAsTimestampFn()).withoutMetadata())
+ .apply(ParDo.of(new KV2ProducerRecord(defaultTopic, false)))
+ .setCoder(ProducerRecordCoder.of(VarIntCoder.of(),
VarLongCoder.of()))
+ .apply(
+ KafkaIO.<Integer, Long>writeRecords()
+ .withBootstrapServers("none")
+ .withKeySerializer(IntegerSerializer.class)
+ .withValueSerializer(LongSerializer.class)
+ .withInputTimestamp()
+ .withProducerFactoryFn(new
ProducerFactoryFn(producerWrapper.producerKey)));
+
+ p.run();
+
+ completionThread.shutdown();
+
+ // Verify that appropriate messages are written to different Kafka topics
+ List<ProducerRecord<Integer, Long>> sent =
producerWrapper.mockProducer.history();
+
+ for (int i = 0; i < numElements; i++) {
+ ProducerRecord<Integer, Long> record = sent.get(i);
+ if (i % 2 == 0) {
+ assertEquals("test_2", record.topic());
+ } else {
+ assertEquals("test_1", record.topic());
+ }
+ assertEquals(i, record.key().intValue());
+ assertEquals(i, record.value().longValue());
+ assertEquals(i, record.timestamp().intValue());
+ }
+ }
+ }
+
+ @Test
+ public void testSinkProducerRecordsWithCustomTS() throws Exception {
+ int numElements = 1000;
+
+ try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
+
+ ProducerSendCompletionThread completionThread =
+ new
ProducerSendCompletionThread(producerWrapper.mockProducer).start();
+
+ final String defaultTopic = "test";
+ final Long ts = System.currentTimeMillis();
+
+ p.apply(mkKafkaReadTransform(numElements, new
ValueAsTimestampFn()).withoutMetadata())
+ .apply(ParDo.of(new KV2ProducerRecord(defaultTopic, ts)))
+ .setCoder(ProducerRecordCoder.of(VarIntCoder.of(),
VarLongCoder.of()))
+ .apply(
+ KafkaIO.<Integer, Long>writeRecords()
+ .withBootstrapServers("none")
+ .withKeySerializer(IntegerSerializer.class)
+ .withValueSerializer(LongSerializer.class)
+ .withProducerFactoryFn(new
ProducerFactoryFn(producerWrapper.producerKey)));
+
+ p.run();
+
+ completionThread.shutdown();
+
+ // Verify that appropriate messages are written to different Kafka topics
Review comment:
Fixed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 181782)
Time Spent: 4h 50m (was: 4h 40m)
> Add support for dynamic destinations when writing to Kafka
> ----------------------------------------------------------
>
> Key: BEAM-5798
> URL: https://issues.apache.org/jira/browse/BEAM-5798
> Project: Beam
> Issue Type: New Feature
> Components: io-java-kafka
> Reporter: Luke Cwik
> Assignee: Alexey Romanenko
> Priority: Major
> Labels: newbie, starter
> Time Spent: 4h 50m
> Remaining Estimate: 0h
>
> Add support for writing to Kafka based upon contents of the data. This is
> similar to the dynamic destination approach for file IO and other sinks.
>
> Source of request:
> https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)