Repository: beam Updated Branches: refs/heads/master 0af972095 -> ae45bbd63
Ensure Kafka sink serializers are set. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b413a966 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b413a966 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b413a966 Branch: refs/heads/master Commit: b413a9665f99599bfd929f850fa67d227ea190d5 Parents: 0af9720 Author: Raghu Angadi <[email protected]> Authored: Fri Oct 20 15:29:20 2017 -0700 Committer: [email protected] <[email protected]> Committed: Wed Nov 8 08:52:42 2017 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b413a966/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index f6158ca..33fc289 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -928,10 +928,8 @@ public class KafkaIO { // Backlog support : // Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd() // then look at position(). Use another consumer to do this so that the primary consumer does - // not need to be interrupted. The latest offsets are fetched periodically on another thread. - // This is still a hack. There could be unintended side effects, e.g. if user enabled offset - // auto commit in consumer config, this could interfere with the primary consumer (we will - // handle this particular problem). We might have to make this optional. + // not need to be interrupted. The latest offsets are fetched periodically on a thread. This is + // still a bit of a hack, but so far there haven't been any issues reported by the users. private Consumer<byte[], byte[]> offsetConsumer; private final ScheduledExecutorService offsetFetcherThread = Executors.newSingleThreadScheduledExecutor(); @@ -1614,6 +1612,8 @@ public class KafkaIO { getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null, "withBootstrapServers() is required"); checkArgument(getTopic() != null, "withTopic() is required"); + checkArgument(getKeySerializer() != null, "withKeySerializer() is required"); + checkArgument(getValueSerializer() != null, "withValueSerializer() is required"); if (isEOS()) { EOSWrite.ensureEOSSupport();
