Repository: beam
Updated Branches:
  refs/heads/master 0af972095 -> ae45bbd63


Ensure Kafka sink serializers are set.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b413a966
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b413a966
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b413a966

Branch: refs/heads/master
Commit: b413a9665f99599bfd929f850fa67d227ea190d5
Parents: 0af9720
Author: Raghu Angadi <[email protected]>
Authored: Fri Oct 20 15:29:20 2017 -0700
Committer: [email protected] <[email protected]>
Committed: Wed Nov 8 08:52:42 2017 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java  | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b413a966/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index f6158ca..33fc289 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -928,10 +928,8 @@ public class KafkaIO {
     // Backlog support :
     // Kafka consumer does not have an API to fetch latest offset for topic. 
We need to seekToEnd()
     // then look at position(). Use another consumer to do this so that the 
primary consumer does
-    // not need to be interrupted. The latest offsets are fetched periodically 
on another thread.
-    // This is still a hack. There could be unintended side effects, e.g. if 
user enabled offset
-    // auto commit in consumer config, this could interfere with the primary 
consumer (we will
-    // handle this particular problem). We might have to make this optional.
+    // not need to be interrupted. The latest offsets are fetched periodically 
on a thread. This is
+    // still a bit of a hack, but so far there haven't been any issues 
reported by the users.
     private Consumer<byte[], byte[]> offsetConsumer;
     private final ScheduledExecutorService offsetFetcherThread =
         Executors.newSingleThreadScheduledExecutor();
@@ -1614,6 +1612,8 @@ public class KafkaIO {
         getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != 
null,
         "withBootstrapServers() is required");
       checkArgument(getTopic() != null, "withTopic() is required");
+      checkArgument(getKeySerializer() != null, "withKeySerializer() is 
required");
+      checkArgument(getValueSerializer() != null, "withValueSerializer() is 
required");
 
       if (isEOS()) {
         EOSWrite.ensureEOSSupport();

Reply via email to