Repository: beam Updated Branches: refs/heads/master 420a71860 -> 0e6b3794c
Converts KafkaIO to AutoValue Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0c704f1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0c704f1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0c704f1 Branch: refs/heads/master Commit: e0c704f1c3ccfe760dc9bbb156fe4a56843f2b3a Parents: 420a718 Author: Eugene Kirpichov <[email protected]> Authored: Thu Sep 29 15:30:10 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Feb 1 14:26:46 2017 -0800 ---------------------------------------------------------------------- .../streaming/KafkaStreamingTest.java | 4 +- .../ResumeFromCheckpointStreamingTest.java | 2 +- sdks/java/io/kafka/pom.xml | 6 + .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 619 ++++++++----------- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 17 +- 5 files changed, 287 insertions(+), 361 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e0c704f1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index 0853e9f..404cb5d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -116,7 +116,7 @@ public class KafkaStreamingTest { "auto.offset.reset", "earliest" ); - KafkaIO.Read<String, String> read = KafkaIO.read() + KafkaIO.Read<String, String> read = KafkaIO.<String, String>read() .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList()) .withTopics(Arrays.asList(topic1, topic2)) .withKeyCoder(StringUtf8Coder.of()) @@ -168,7 +168,7 @@ public class KafkaStreamingTest { "auto.offset.reset", "latest" ); - KafkaIO.Read<String, String> read = KafkaIO.read() + KafkaIO.Read<String, String> read = KafkaIO.<String, String>read() .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList()) .withTopics(Collections.singletonList(topic)) .withKeyCoder(StringUtf8Coder.of()) http://git-wip-us.apache.org/repos/asf/beam/blob/e0c704f1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 8280672..721d617 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -160,7 +160,7 @@ public class ResumeFromCheckpointStreamingTest { "auto.offset.reset", "earliest" ); - KafkaIO.Read<String, String> read = KafkaIO.read() + KafkaIO.Read<String, String> read = KafkaIO.<String, String>read() .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList()) .withTopics(Collections.singletonList(TOPIC)) .withKeyCoder(StringUtf8Coder.of()) http://git-wip-us.apache.org/repos/asf/beam/blob/e0c704f1/sdks/java/io/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml index 2dd775e..02150b2 100644 --- a/sdks/java/io/kafka/pom.xml +++ b/sdks/java/io/kafka/pom.xml @@ -101,6 +101,12 @@ <artifactId>jsr305</artifactId> </dependency> + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + <!-- test dependencies--> <dependency> <groupId>org.apache.beam</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/e0c704f1/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 36ab1fd..80a0eb7 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -21,10 +21,10 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; -import com.google.common.base.Optional; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -32,6 +32,8 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.io.Closeables; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -49,13 +51,12 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; - +import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Unbounded; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; @@ -116,17 +117,16 @@ import org.slf4j.LoggerFactory; * <pre>{@code * * pipeline - * .apply(KafkaIO.read() + * .apply(KafkaIO.<Long, String>read() * .withBootstrapServers("broker_1:9092,broker_2:9092") * .withTopics(ImmutableList.of("topic_a", "topic_b")) - * // above two are required configuration. returns PCollection<KafkaRecord<byte[], byte[]> + * // set a Coder for Key and Value + * .withKeyCoder(BigEndianLongCoder.of()) + * .withValueCoder(StringUtf8Coder.of()) + * // above four are required configuration. returns PCollection<KafkaRecord<Long, String>> * * // rest of the settings are optional : * - * // set a Coder for Key and Value (note the change to return type) - * .withKeyCoder(BigEndianLongCoder.of()) // PCollection<KafkaRecord<Long, byte[]> - * .withValueCoder(StringUtf8Coder.of()) // PCollection<KafkaRecord<Long, String> - * * // you can further customize KafkaConsumer used to read the records by adding more * // settings for ConsumerConfig. e.g : * .updateConsumerProperties(ImmutableMap.of("receive.buffer.bytes", 1024 * 1024)) @@ -166,14 +166,14 @@ import org.slf4j.LoggerFactory; * <pre>{@code * * PCollection<KV<Long, String>> kvColl = ...; - * kvColl.apply(KafkaIO.write() + * kvColl.apply(KafkaIO.<Long, String>write() * .withBootstrapServers("broker_1:9092,broker_2:9092") * .withTopic("results") * * // set Coder for Key and Value * .withKeyCoder(BigEndianLongCoder.of()) * .withValueCoder(StringUtf8Coder.of()) - + * * // you can further customize KafkaProducer used to write the records by adding more * // settings for ProducerConfig. e.g, to enable compression : * .updateProducerProperties(ImmutableMap.of("compression.type", "gzip")) @@ -185,11 +185,11 @@ import org.slf4j.LoggerFactory; * * <pre>{@code * PCollection<String> strings = ...; - * strings.apply(KafkaIO.write() + * strings.apply(KafkaIO.<Void, String>write() * .withBootstrapServers("broker_1:9092,broker_2:9092") * .withTopic("results") * .withValueCoder(StringUtf8Coder.of()) // just need coder for value - * .values() // writes values to Kafka with default key + * .values() * ); * }</pre> * @@ -200,6 +200,23 @@ import org.slf4j.LoggerFactory; * <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc. */ public class KafkaIO { + /** + * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka + * configuration should set with {@link Read#withBootstrapServers(String)} and + * {@link Read#withTopics(List)}. Other optional settings include key and value coders, + * custom timestamp and watermark functions. + */ + public static Read<byte[], byte[]> readBytes() { + return new AutoValue_KafkaIO_Read.Builder<byte[], byte[]>() + .setTopics(new ArrayList<String>()) + .setTopicPartitions(new ArrayList<TopicPartition>()) + .setKeyCoder(ByteArrayCoder.of()) + .setValueCoder(ByteArrayCoder.of()) + .setConsumerFactoryFn(Read.KAFKA_9_CONSUMER_FACTORY_FN) + .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES) + .setMaxNumRecords(Long.MAX_VALUE) + .build(); + } /** * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka @@ -207,16 +224,14 @@ public class KafkaIO { * {@link Read#withTopics(List)}. Other optional settings include key and value coders, * custom timestamp and watermark functions. */ - public static Read<byte[], byte[]> read() { - return new Read<byte[], byte[]>( - new ArrayList<String>(), - new ArrayList<TopicPartition>(), - ByteArrayCoder.of(), - ByteArrayCoder.of(), - Read.KAFKA_9_CONSUMER_FACTORY_FN, - Read.DEFAULT_CONSUMER_PROPERTIES, - Long.MAX_VALUE, - null); + public static <K, V> Read<K, V> read() { + return new AutoValue_KafkaIO_Read.Builder<K, V>() + .setTopics(new ArrayList<String>()) + .setTopicPartitions(new ArrayList<TopicPartition>()) + .setConsumerFactoryFn(Read.KAFKA_9_CONSUMER_FACTORY_FN) + .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES) + .setMaxNumRecords(Long.MAX_VALUE) + .build(); } /** @@ -224,21 +239,53 @@ public class KafkaIO { * should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic} * along with {@link Coder}s for (optional) key and values. */ - public static Write<byte[], byte[]> write() { - return new Write<byte[], byte[]>( - null, - ByteArrayCoder.of(), - ByteArrayCoder.of(), - TypedWrite.DEFAULT_PRODUCER_PROPERTIES); + public static <K, V> Write<K, V> write() { + return new AutoValue_KafkaIO_Write.Builder<K, V>() + .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES) + .setValueOnly(false) + .build(); } ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ /** - * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more - * information on usage and configuration. + * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more information on + * usage and configuration. */ - public static class Read<K, V> extends TypedRead<K, V> { + @AutoValue + public abstract static class Read<K, V> + extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> { + abstract Map<String, Object> getConsumerConfig(); + abstract List<String> getTopics(); + abstract List<TopicPartition> getTopicPartitions(); + @Nullable abstract Coder<K> getKeyCoder(); + @Nullable abstract Coder<V> getValueCoder(); + abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> + getConsumerFactoryFn(); + @Nullable abstract SerializableFunction<KafkaRecord<K, V>, Instant> getTimestampFn(); + @Nullable abstract SerializableFunction<KafkaRecord<K, V>, Instant> getWatermarkFn(); + + abstract long getMaxNumRecords(); + @Nullable abstract Duration getMaxReadTime(); + + abstract Builder<K, V> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<K, V> { + abstract Builder<K, V> setConsumerConfig(Map<String, Object> config); + abstract Builder<K, V> setTopics(List<String> topics); + abstract Builder<K, V> setTopicPartitions(List<TopicPartition> topicPartitions); + abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder); + abstract Builder<K, V> setValueCoder(Coder<V> valueCoder); + abstract Builder<K, V> setConsumerFactoryFn( + SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn); + abstract Builder<K, V> setTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn); + abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn); + abstract Builder<K, V> setMaxNumRecords(long maxNumRecords); + abstract Builder<K, V> setMaxReadTime(Duration maxReadTime); + + abstract Read<K, V> build(); + } /** * Returns a new {@link Read} with Kafka consumer pointing to {@code bootstrapServers}. @@ -256,10 +303,9 @@ public class KafkaIO { * of how the partitions are distributed among the splits. */ public Read<K, V> withTopics(List<String> topics) { - checkState(topicPartitions.isEmpty(), "Only topics or topicPartitions can be set, not both"); - - return new Read<K, V>(ImmutableList.copyOf(topics), topicPartitions, keyCoder, valueCoder, - consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + checkState( + getTopicPartitions().isEmpty(), "Only topics or topicPartitions can be set, not both"); + return toBuilder().setTopics(ImmutableList.copyOf(topics)).build(); } /** @@ -269,26 +315,22 @@ public class KafkaIO { * of how the partitions are distributed among the splits. */ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) { - checkState(topics.isEmpty(), "Only topics or topicPartitions can be set, not both"); - - return new Read<K, V>(topics, ImmutableList.copyOf(topicPartitions), keyCoder, valueCoder, - consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be set, not both"); + return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build(); } /** * Returns a new {@link Read} with {@link Coder} for key bytes. */ - public <KeyT> Read<KeyT, V> withKeyCoder(Coder<KeyT> keyCoder) { - return new Read<KeyT, V>(topics, topicPartitions, keyCoder, valueCoder, - consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + public Read<K, V> withKeyCoder(Coder<K> keyCoder) { + return toBuilder().setKeyCoder(keyCoder).build(); } /** * Returns a new {@link Read} with {@link Coder} for value bytes. */ - public <ValueT> Read<K, ValueT> withValueCoder(Coder<ValueT> valueCoder) { - return new Read<K, ValueT>(topics, topicPartitions, keyCoder, valueCoder, - consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + public Read<K, V> withValueCoder(Coder<V> valueCoder) { + return toBuilder().setValueCoder(valueCoder).build(); } /** @@ -298,20 +340,16 @@ public class KafkaIO { */ public Read<K, V> withConsumerFactoryFn( SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) { - return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder, - consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build(); } /** * Update consumer configuration with new properties. */ public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) { - - Map<String, Object> config = updateKafkaProperties(consumerConfig, + Map<String, Object> config = updateKafkaProperties(getConsumerConfig(), IGNORED_CONSUMER_PROPERTIES, configUpdates); - - return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder, - consumerFactoryFn, config, maxNumRecords, maxReadTime); + return toBuilder().setConsumerConfig(config).build(); } /** @@ -319,8 +357,7 @@ public class KafkaIO { * Mainly used for tests and demo applications. */ public Read<K, V> withMaxNumRecords(long maxNumRecords) { - return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder, - consumerFactoryFn, consumerConfig, maxNumRecords, null); + return toBuilder().setMaxNumRecords(maxNumRecords).setMaxReadTime(null).build(); } /** @@ -330,100 +367,32 @@ public class KafkaIO { * applications. */ public Read<K, V> withMaxReadTime(Duration maxReadTime) { - return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder, - consumerFactoryFn, consumerConfig, Long.MAX_VALUE, maxReadTime); - } - - /////////////////////////////////////////////////////////////////////////////////////// - - private Read( - List<String> topics, - List<TopicPartition> topicPartitions, - Coder<K> keyCoder, - Coder<V> valueCoder, - SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn, - Map<String, Object> consumerConfig, - long maxNumRecords, - @Nullable Duration maxReadTime) { - - super(topics, topicPartitions, keyCoder, valueCoder, null, null, - consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime); + return toBuilder().setMaxNumRecords(Long.MAX_VALUE).setMaxReadTime(maxReadTime).build(); } /** - * A set of properties that are not required or don't make sense for our consumer. - */ - private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyCoder instead", - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueCoder instead" - // "group.id", "enable.auto.commit", "auto.commit.interval.ms" : - // lets allow these, applications can have better resume point for restarts. - ); - - // set config defaults - private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES = - ImmutableMap.<String, Object>of( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), - - // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ not be required. - // with default value of of 32K, It takes multiple seconds between successful polls. - // All the consumer work is done inside poll(), with smaller send buffer size, it - // takes many polls before a 1MB chunk from the server is fully read. In my testing - // about half of the time select() inside kafka consumer waited for 20-30ms, though - // the server had lots of data in tcp send buffers on its side. Compared to default, - // this setting increased throughput increased by many fold (3-4x). - ConsumerConfig.RECEIVE_BUFFER_CONFIG, 512 * 1024, - - // default to latest offset when we are not resuming. - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest", - // disable auto commit of offsets. we don't require group_id. could be enabled by user. - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - - // default Kafka 0.9 Consumer supplier. - private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> - KAFKA_9_CONSUMER_FACTORY_FN = - new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() { - public Consumer<byte[], byte[]> apply(Map<String, Object> config) { - return new KafkaConsumer<>(config); - } - }; - } - - /** - * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more - * information on usage and configuration. - */ - public static class TypedRead<K, V> - extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> { - - /** * A function to assign a timestamp to a record. Default is processing timestamp. */ - public TypedRead<K, V> withTimestampFn2( + public Read<K, V> withTimestampFn2( SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) { checkNotNull(timestampFn); - return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder, - timestampFn, watermarkFn, consumerFactoryFn, consumerConfig, - maxNumRecords, maxReadTime); + return toBuilder().setTimestampFn(timestampFn).build(); } /** * A function to calculate watermark after a record. Default is last record timestamp * @see #withTimestampFn(SerializableFunction) */ - public TypedRead<K, V> withWatermarkFn2( + public Read<K, V> withWatermarkFn2( SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) { checkNotNull(watermarkFn); - return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder, - timestampFn, watermarkFn, consumerFactoryFn, consumerConfig, - maxNumRecords, maxReadTime); + return toBuilder().setWatermarkFn(watermarkFn).build(); } /** * A function to assign a timestamp to a record. Default is processing timestamp. */ - public TypedRead<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) { + public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) { checkNotNull(timestampFn); return withTimestampFn2(unwrapKafkaAndThen(timestampFn)); } @@ -432,7 +401,7 @@ public class KafkaIO { * A function to calculate watermark after a record. Default is last record timestamp * @see #withTimestampFn(SerializableFunction) */ - public TypedRead<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) { + public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) { checkNotNull(watermarkFn); return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn)); } @@ -445,6 +414,15 @@ public class KafkaIO { } @Override + public void validate(PBegin input) { + checkNotNull(getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), + "Kafka bootstrap servers should be set"); + checkArgument(getTopics().size() > 0 || getTopicPartitions().size() > 0, + "Kafka topics or topic_partitions are required"); + checkNotNull(getKeyCoder(), "Key coder must be set"); + checkNotNull(getValueCoder(), "Value coder must be set"); + } + public PCollection<KafkaRecord<K, V>> expand(PBegin input) { // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. Unbounded<KafkaRecord<K, V>> unbounded = @@ -452,100 +430,92 @@ public class KafkaIO { PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded; - if (maxNumRecords < Long.MAX_VALUE) { - transform = unbounded.withMaxNumRecords(maxNumRecords); - } else if (maxReadTime != null) { - transform = unbounded.withMaxReadTime(maxReadTime); + if (getMaxNumRecords() < Long.MAX_VALUE) { + transform = unbounded.withMaxNumRecords(getMaxNumRecords()); + } else if (getMaxReadTime() != null) { + transform = unbounded.withMaxReadTime(getMaxReadTime()); } return input.getPipeline().apply(transform); } - //////////////////////////////////////////////////////////////////////////////////////// - - protected final List<String> topics; - protected final List<TopicPartition> topicPartitions; // mutually exclusive with topics - protected final Coder<K> keyCoder; - protected final Coder<V> valueCoder; - @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn; - @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn; - protected final - SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn; - protected final Map<String, Object> consumerConfig; - protected final long maxNumRecords; // bounded read, mainly for testing - protected final Duration maxReadTime; // bounded read, mainly for testing - - private TypedRead(List<String> topics, - List<TopicPartition> topicPartitions, - Coder<K> keyCoder, - Coder<V> valueCoder, - @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn, - @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn, - SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn, - Map<String, Object> consumerConfig, - long maxNumRecords, - @Nullable Duration maxReadTime) { - super("KafkaIO.Read"); - - this.topics = topics; - this.topicPartitions = topicPartitions; - this.keyCoder = keyCoder; - this.valueCoder = valueCoder; - this.timestampFn = timestampFn; - this.watermarkFn = watermarkFn; - this.consumerFactoryFn = consumerFactoryFn; - this.consumerConfig = consumerConfig; - this.maxNumRecords = maxNumRecords; - this.maxReadTime = maxReadTime; - } - /** * Creates an {@link UnboundedSource UnboundedSource<KafkaRecord<K, V>, ?>} with the - * configuration in {@link TypedRead}. Primary use case is unit tests, should not be used in an + * configuration in {@link Read}. Primary use case is unit tests, should not be used in an * application. */ @VisibleForTesting UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() { - return new UnboundedKafkaSource<K, V>( - -1, - topics, - topicPartitions, - keyCoder, - valueCoder, - timestampFn, - Optional.fromNullable(watermarkFn), - consumerFactoryFn, - consumerConfig); + return new UnboundedKafkaSource<K, V>(this, -1); } // utility method to convert KafkRecord<K, V> to user KV<K, V> before applying user functions private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> - unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) { - return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() { - public OutT apply(KafkaRecord<KeyT, ValueT> record) { - return fn.apply(record.getKV()); + unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) { + return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() { + public OutT apply(KafkaRecord<KeyT, ValueT> record) { + return fn.apply(record.getKV()); + } + }; + } + /////////////////////////////////////////////////////////////////////////////////////// + + /** + * A set of properties that are not required or don't make sense for our consumer. + */ + private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyCoder instead", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueCoder instead" + // "group.id", "enable.auto.commit", "auto.commit.interval.ms" : + // lets allow these, applications can have better resume point for restarts. + ); + + // set config defaults + private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES = + ImmutableMap.<String, Object>of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + + // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ not be required. + // with default value of of 32K, It takes multiple seconds between successful polls. + // All the consumer work is done inside poll(), with smaller send buffer size, it + // takes many polls before a 1MB chunk from the server is fully read. In my testing + // about half of the time select() inside kafka consumer waited for 20-30ms, though + // the server had lots of data in tcp send buffers on its side. Compared to default, + // this setting increased throughput increased by many fold (3-4x). + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 512 * 1024, + + // default to latest offset when we are not resuming. + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest", + // disable auto commit of offsets. we don't require group_id. could be enabled by user. + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + // default Kafka 0.9 Consumer supplier. + private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> + KAFKA_9_CONSUMER_FACTORY_FN = + new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() { + public Consumer<byte[], byte[]> apply(Map<String, Object> config) { + return new KafkaConsumer<>(config); } }; - } } /** - * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.TypedRead}, but + * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.Read}, but * removes Kafka metatdata and returns a {@link PCollection} of {@link KV}. * See {@link KafkaIO} for more information on usage and configuration of reader. */ public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { + private final Read<K, V> read; - private final TypedRead<K, V> typedRead; - - TypedWithoutMetadata(TypedRead<K, V> read) { + TypedWithoutMetadata(Read<K, V> read) { super("KafkaIO.Read"); - this.typedRead = read; + this.read = read; } @Override public PCollection<KV<K, V>> expand(PBegin begin) { - return typedRead + return read .expand(begin) .apply("Remove Kafka Metadata", ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() { @@ -581,52 +551,17 @@ public class KafkaIO { return config; } - private static class NowTimestampFn<T> implements SerializableFunction<T, Instant> { - @Override - public Instant apply(T input) { - return Instant.now(); - } - } - /** Static class, prevent instantiation. */ private KafkaIO() {} private static class UnboundedKafkaSource<K, V> extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> { - + private Read<K, V> spec; private final int id; // split id, mainly for debugging - private final List<String> topics; - private final List<TopicPartition> assignedPartitions; - private final Coder<K> keyCoder; - private final Coder<V> valueCoder; - private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn; - // would it be a good idea to pass currentTimestamp to watermarkFn? - private final Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn; - private - SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn; - private final Map<String, Object> consumerConfig; - - public UnboundedKafkaSource( - int id, - List<String> topics, - List<TopicPartition> assignedPartitions, - Coder<K> keyCoder, - Coder<V> valueCoder, - @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn, - Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn, - SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn, - Map<String, Object> consumerConfig) { + public UnboundedKafkaSource(Read<K, V> spec, int id) { + this.spec = spec; this.id = id; - this.assignedPartitions = assignedPartitions; - this.topics = topics; - this.keyCoder = keyCoder; - this.valueCoder = valueCoder; - this.timestampFn = - (timestampFn == null ? new NowTimestampFn<KafkaRecord<K, V>>() : timestampFn); - this.watermarkFn = watermarkFn; - this.consumerFactoryFn = consumerFactoryFn; - this.consumerConfig = consumerConfig; } /** @@ -642,15 +577,16 @@ public class KafkaIO { public List<UnboundedKafkaSource<K, V>> generateInitialSplits( int desiredNumSplits, PipelineOptions options) throws Exception { - List<TopicPartition> partitions = new ArrayList<>(assignedPartitions); + List<TopicPartition> partitions = new ArrayList<>(spec.getTopicPartitions()); // (a) fetch partitions for each topic // (b) sort by <topic, partition> // (c) round-robin assign the partitions to splits if (partitions.isEmpty()) { - try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) { - for (String topic : topics) { + try (Consumer<?, ?> consumer = + spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) { + for (String topic : spec.getTopics()) { for (PartitionInfo p : consumer.partitionsFor(topic)) { partitions.add(new TopicPartition(p.topic(), p.partition())); } @@ -690,16 +626,13 @@ public class KafkaIO { LOG.info("Partitions assigned to split {} (total {}): {}", i, assignedToSplit.size(), Joiner.on(",").join(assignedToSplit)); - result.add(new UnboundedKafkaSource<K, V>( - i, - this.topics, - assignedToSplit, - this.keyCoder, - this.valueCoder, - this.timestampFn, - this.watermarkFn, - this.consumerFactoryFn, - this.consumerConfig)); + result.add( + new UnboundedKafkaSource<>( + spec.toBuilder() + .setTopics(Collections.<String>emptyList()) + .setTopicPartitions(assignedToSplit) + .build(), + i)); } return result; @@ -708,7 +641,7 @@ public class KafkaIO { @Override public UnboundedKafkaReader<K, V> createReader(PipelineOptions options, KafkaCheckpointMark checkpointMark) { - if (assignedPartitions.isEmpty()) { + if (spec.getTopicPartitions().isEmpty()) { LOG.warn("Looks like generateSplits() is not called. Generate single split."); try { return new UnboundedKafkaReader<K, V>( @@ -734,15 +667,12 @@ public class KafkaIO { @Override public void validate() { - checkNotNull(consumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), - "Kafka bootstrap servers should be set"); - checkArgument(topics.size() > 0 || assignedPartitions.size() > 0, - "Kafka topics or topic_partitions are required"); + spec.validate(null); } @Override public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() { - return KafkaRecordCoder.of(keyCoder, valueCoder); + return KafkaRecordCoder.of(spec.getKeyCoder(), spec.getValueCoder()); } } @@ -839,7 +769,8 @@ public class KafkaIO { this.source = source; this.name = "Reader-" + source.id; - partitionStates = ImmutableList.copyOf(Lists.transform(source.assignedPartitions, + List<TopicPartition> partitions = source.spec.getTopicPartitions(); + partitionStates = ImmutableList.copyOf(Lists.transform(partitions, new Function<TopicPartition, PartitionState>() { public PartitionState apply(TopicPartition tp) { return new PartitionState(tp, UNINITIALIZED_OFFSET); @@ -850,13 +781,13 @@ public class KafkaIO { // a) verify that assigned and check-pointed partitions match exactly // b) set consumed offsets - checkState(checkpointMark.getPartitions().size() == source.assignedPartitions.size(), + checkState(checkpointMark.getPartitions().size() == partitions.size(), "checkPointMark and assignedPartitions should match"); // we could consider allowing a mismatch, though it is not expected in current Dataflow - for (int i = 0; i < source.assignedPartitions.size(); i++) { + for (int i = 0; i < partitions.size(); i++) { PartitionMark ckptMark = checkpointMark.getPartitions().get(i); - TopicPartition assigned = source.assignedPartitions.get(i); + TopicPartition assigned = partitions.get(i); TopicPartition partition = new TopicPartition(ckptMark.getTopic(), ckptMark.getPartition()); checkState(partition.equals(assigned), @@ -920,8 +851,9 @@ public class KafkaIO { @Override public boolean start() throws IOException { - consumer = source.consumerFactoryFn.apply(source.consumerConfig); - consumer.assign(source.assignedPartitions); + Read<K, V> spec = source.spec; + consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig()); + consumer.assign(spec.getTopicPartitions()); for (PartitionState p : partitionStates) { if (p.nextOffset != UNINITIALIZED_OFFSET) { @@ -948,16 +880,16 @@ public class KafkaIO { // offsetConsumer setup : - Object groupId = source.consumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG); + Object groupId = spec.getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG); // override group_id and disable auto_commit so that it does not interfere with main consumer String offsetGroupId = String.format("%s_offset_consumer_%d_%s", name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId)); - Map<String, Object> offsetConsumerConfig = new HashMap<>(source.consumerConfig); + Map<String, Object> offsetConsumerConfig = new HashMap<>(spec.getConsumerConfig()); offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId); offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - offsetConsumer = source.consumerFactoryFn.apply(offsetConsumerConfig); - offsetConsumer.assign(source.assignedPartitions); + offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig); + offsetConsumer.assign(spec.getTopicPartitions()); offsetFetcherThread.scheduleAtFixedRate( new Runnable() { @@ -1018,10 +950,11 @@ public class KafkaIO { rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), - decode(rawRecord.key(), source.keyCoder), - decode(rawRecord.value(), source.valueCoder)); + decode(rawRecord.key(), source.spec.getKeyCoder()), + decode(rawRecord.value(), source.spec.getValueCoder())); - curTimestamp = source.timestampFn.apply(record); + curTimestamp = (source.spec.getTimestampFn() == null) + ? Instant.now() : source.spec.getTimestampFn().apply(record); curRecord = record; int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) @@ -1081,8 +1014,8 @@ public class KafkaIO { return initialWatermark; } - return source.watermarkFn.isPresent() - ? source.watermarkFn.get().apply(curRecord) : curTimestamp; + return source.spec.getWatermarkFn() != null + ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp; } @Override @@ -1170,7 +1103,29 @@ public class KafkaIO { * A {@link PTransform} to write to a Kafka topic. See {@link KafkaIO} for more * information on usage and configuration. */ - public static class Write<K, V> extends TypedWrite<K, V> { + @AutoValue + public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> { + @Nullable abstract String getTopic(); + @Nullable abstract Coder<K> getKeyCoder(); + @Nullable abstract Coder<V> getValueCoder(); + abstract boolean getValueOnly(); + abstract Map<String, Object> getProducerConfig(); + @Nullable + abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn(); + + abstract Builder<K, V> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<K, V> { + abstract Builder<K, V> setTopic(String topic); + abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder); + abstract Builder<K, V> setValueCoder(Coder<V> valueCoder); + abstract Builder<K, V> setValueOnly(boolean valueOnly); + abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig); + abstract Builder<K, V> setProducerFactoryFn( + SerializableFunction<Map<String, Object>, Producer<K, V>> fn); + abstract Write<K, V> build(); + } /** * Returns a new {@link Write} transform with Kafka producer pointing to @@ -1186,7 +1141,7 @@ public class KafkaIO { * Returns a new {@link Write} transform that writes to given topic. */ public Write<K, V> withTopic(String topic) { - return new Write<K, V>(topic, keyCoder, valueCoder, producerConfig); + return toBuilder().setTopic(topic).build(); } /** @@ -1194,95 +1149,55 @@ public class KafkaIO { * A key is optional while writing to Kafka. Note when a key is set, its hash is used to * determine partition in Kafka (see {@link ProducerRecord} for more details). */ - public <KeyT> Write<KeyT, V> withKeyCoder(Coder<KeyT> keyCoder) { - return new Write<KeyT, V>(topic, keyCoder, valueCoder, producerConfig); + public Write<K, V> withKeyCoder(Coder<K> keyCoder) { + return toBuilder().setKeyCoder(keyCoder).build(); } /** * Returns a new {@link Write} with {@link Coder} for serializing value to bytes. */ - public <ValueT> Write<K, ValueT> withValueCoder(Coder<ValueT> valueCoder) { - return new Write<K, ValueT>(topic, keyCoder, valueCoder, producerConfig); + public Write<K, V> withValueCoder(Coder<V> valueCoder) { + return toBuilder().setValueCoder(valueCoder).build(); } public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) { - Map<String, Object> config = updateKafkaProperties(producerConfig, - TypedWrite.IGNORED_PRODUCER_PROPERTIES, configUpdates); - return new Write<K, V>(topic, keyCoder, valueCoder, config); + Map<String, Object> config = updateKafkaProperties(getProducerConfig(), + IGNORED_PRODUCER_PROPERTIES, configUpdates); + return toBuilder().setProducerConfig(config).build(); } - private Write( - String topic, - Coder<K> keyCoder, - Coder<V> valueCoder, - Map<String, Object> producerConfig) { - super(topic, keyCoder, valueCoder, producerConfig, - Optional.<SerializableFunction<Map<String, Object>, Producer<K, V>>>absent()); - } - } - - /** - * A {@link PTransform} to write to a Kafka topic. See {@link KafkaIO} for more - * information on usage and configuration. - */ - public static class TypedWrite<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> { - /** * Returns a new {@link Write} with a custom function to create Kafka producer. Primarily used * for tests. Default is {@link KafkaProducer} */ - public TypedWrite<K, V> withProducerFactoryFn( + public Write<K, V> withProducerFactoryFn( SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) { - return new TypedWrite<K, V>(topic, keyCoder, valueCoder, producerConfig, - Optional.of(producerFactoryFn)); + return toBuilder().setProducerFactoryFn(producerFactoryFn).build(); } /** * Returns a new transform that writes just the values to Kafka. This is useful for writing * collections of values rather thank {@link KV}s. */ - @SuppressWarnings("unchecked") public PTransform<PCollection<V>, PDone> values() { - return new KafkaValueWrite<V>((TypedWrite<Void, V>) this); - // Any way to avoid casting here to TypedWrite<Void, V>? We can't create - // new TypedWrite without casting producerFactoryFn. + return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build()); } @Override public PDone expand(PCollection<KV<K, V>> input) { - input.apply(ParDo.of(new KafkaWriter<K, V>( - topic, keyCoder, valueCoder, producerConfig, producerFactoryFnOpt))); + input.apply(ParDo.of(new KafkaWriter<>(this))); return PDone.in(input.getPipeline()); } @Override public void validate(PCollection<KV<K, V>> input) { - checkNotNull(producerConfig.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), + checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), "Kafka bootstrap servers should be set"); - checkNotNull(topic, "Kafka topic should be set"); - } - - ////////////////////////////////////////////////////////////////////////////////////////// - - protected final String topic; - protected final Coder<K> keyCoder; - protected final Coder<V> valueCoder; - protected final Optional<SerializableFunction<Map<String, Object>, Producer<K, V>>> - producerFactoryFnOpt; - protected final Map<String, Object> producerConfig; - - protected TypedWrite( - String topic, - Coder<K> keyCoder, - Coder<V> valueCoder, - Map<String, Object> producerConfig, - Optional<SerializableFunction<Map<String, Object>, Producer<K, V>>> producerFactoryFnOpt) { - - this.topic = topic; - this.keyCoder = keyCoder; - this.valueCoder = valueCoder; - this.producerConfig = producerConfig; - this.producerFactoryFnOpt = producerFactoryFnOpt; + checkNotNull(getTopic(), "Kafka topic should be set"); + if (!getValueOnly()) { + checkNotNull(getKeyCoder(), "Key coder should be set"); + } + checkNotNull(getValueCoder(), "Value coder should be set"); } // set config defaults @@ -1307,11 +1222,10 @@ public class KafkaIO { * Same as {@code Write<K, V>} without a Key. Null is used for key as it is the convention is * Kafka when there is no key specified. Majority of Kafka writers don't specify a key. */ - private static class KafkaValueWrite<V> extends PTransform<PCollection<V>, PDone> { + private static class KafkaValueWrite<K, V> extends PTransform<PCollection<V>, PDone> { + private final Write<K, V> kvWriteTransform; - private final TypedWrite<Void, V> kvWriteTransform; - - private KafkaValueWrite(TypedWrite<Void, V> kvWriteTransform) { + private KafkaValueWrite(Write<K, V> kvWriteTransform) { this.kvWriteTransform = kvWriteTransform; } @@ -1319,23 +1233,36 @@ public class KafkaIO { public PDone expand(PCollection<V> input) { return input .apply("Kafka values with default key", - MapElements.via(new SimpleFunction<V, KV<Void, V>>() { + MapElements.via(new SimpleFunction<V, KV<K, V>>() { @Override - public KV<Void, V> apply(V element) { - return KV.<Void, V>of(null, element); + public KV<K, V> apply(V element) { + return KV.of(null, element); } })) - .setCoder(KvCoder.of(VoidCoder.of(), kvWriteTransform.valueCoder)) + .setCoder(KvCoder.of(new NullOnlyCoder<K>(), kvWriteTransform.getValueCoder())) .apply(kvWriteTransform); } } + private static class NullOnlyCoder<T> extends AtomicCoder<T> { + @Override + public void encode(T value, OutputStream outStream, Context context) { + checkArgument(value == null, "Can only encode nulls"); + // Encode as the empty string. + } + + @Override + public T decode(InputStream inStream, Context context) { + return null; + } + } + private static class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> { @Setup public void setup() { - if (producerFactoryFnOpt.isPresent()) { - producer = producerFactoryFnOpt.get().apply(producerConfig); + if (spec.getProducerFactoryFn() != null) { + producer = spec.getProducerFactoryFn().apply(producerConfig); } else { producer = new KafkaProducer<K, V>(producerConfig); } @@ -1347,7 +1274,7 @@ public class KafkaIO { KV<K, V> kv = ctx.element(); producer.send( - new ProducerRecord<K, V>(topic, kv.getKey(), kv.getValue()), + new ProducerRecord<K, V>(spec.getTopic(), kv.getKey(), kv.getValue()), new SendCallback()); } @@ -1364,10 +1291,8 @@ public class KafkaIO { /////////////////////////////////////////////////////////////////////////////////// - private final String topic; + private final Write<K, V> spec; private final Map<String, Object> producerConfig; - private final Optional<SerializableFunction<Map<String, Object>, Producer<K, V>>> - producerFactoryFnOpt; private transient Producer<K, V> producer = null; //private transient Callback sendCallback = new SendCallback(); @@ -1375,14 +1300,8 @@ public class KafkaIO { private transient Exception sendException = null; private transient long numSendFailures = 0; - KafkaWriter(String topic, - Coder<K> keyCoder, - Coder<V> valueCoder, - Map<String, Object> producerConfig, - Optional<SerializableFunction<Map<String, Object>, Producer<K, V>>> producerFactoryFnOpt) { - - this.topic = topic; - this.producerFactoryFnOpt = producerFactoryFnOpt; + KafkaWriter(Write<K, V> spec) { + this.spec = spec; // Set custom kafka serializers. We can not serialize user objects then pass the bytes to // producer. The key and value objects are used in kafka Partitioner interface. @@ -1390,9 +1309,9 @@ public class KafkaIO { // key bytes to pick a partition. But are making sure user's custom partitioner would work // as expected. - this.producerConfig = new HashMap<>(producerConfig); - this.producerConfig.put(configForKeySerializer(), keyCoder); - this.producerConfig.put(configForValueSerializer(), valueCoder); + this.producerConfig = new HashMap<>(spec.getProducerConfig()); + this.producerConfig.put(configForKeySerializer(), spec.getKeyCoder()); + this.producerConfig.put(configForValueSerializer(), spec.getValueCoder()); } private synchronized void checkForFailures() throws IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/e0c704f1/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index e18d628..5424b61 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -41,6 +41,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; @@ -210,13 +211,13 @@ public class KafkaIOTest { * Creates a consumer with two topics, with 5 partitions each. * numElements are (round-robin) assigned all the 10 partitions. */ - private static KafkaIO.TypedRead<Integer, Long> mkKafkaReadTransform( + private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform( int numElements, @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) { List<String> topics = ImmutableList.of("topic_a", "topic_b"); - KafkaIO.Read<Integer, Long> reader = KafkaIO.read() + KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read() .withBootstrapServers("none") .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn( @@ -289,11 +290,12 @@ public class KafkaIOTest { List<String> topics = ImmutableList.of("test"); - KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read() + KafkaIO.Read<byte[], Long> reader = KafkaIO.<byte[], Long>read() .withBootstrapServers("none") .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5))) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 partitions + .withKeyCoder(ByteArrayCoder.of()) .withValueCoder(BigEndianLongCoder.of()) .withMaxNumRecords(numElements / 10); @@ -474,7 +476,7 @@ public class KafkaIOTest { int numElements = 100; // all the 20 partitions will have elements List<String> topics = ImmutableList.of("topic_a", "topic_b"); - source = KafkaIO.read() + source = KafkaIO.<Integer, Long>read() .withBootstrapServers("none") .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn( @@ -520,7 +522,7 @@ public class KafkaIOTest { p .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) - .apply(KafkaIO.write() + .apply(KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) .withKeyCoder(BigEndianIntegerCoder.of()) @@ -553,10 +555,9 @@ public class KafkaIOTest { .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) .apply(Values.<Long>create()) // there are no keys - .apply(KafkaIO.write() + .apply(KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) - .withKeyCoder(BigEndianIntegerCoder.of()) .withValueCoder(BigEndianLongCoder.of()) .withProducerFactoryFn(new ProducerFactoryFn()) .values()); @@ -595,7 +596,7 @@ public class KafkaIOTest { p .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) - .apply(KafkaIO.write() + .apply(KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) .withKeyCoder(BigEndianIntegerCoder.of())
