Repository: beam Updated Branches: refs/heads/master a8d76603b -> af8ead44e
[BEAM-1573] Use Kafka serializers instead of coders in KafkaIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d841e5db Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d841e5db Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d841e5db Branch: refs/heads/master Commit: d841e5dbb3b23d9eb81f7a380daf401bad3367be Parents: a8d7660 Author: peay <[email protected]> Authored: Sun Mar 26 10:51:59 2017 -0400 Committer: Eugene Kirpichov <[email protected]> Committed: Wed Apr 26 14:35:43 2017 -0700 ---------------------------------------------------------------------- .../runners/spark/SparkRunnerDebuggerTest.java | 10 +- .../ResumeFromCheckpointStreamingTest.java | 22 +- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 394 +++++++++++++------ .../CoderBasedKafkaDeserializer.java | 71 ++++ .../CoderBasedKafkaSerializer.java | 73 ++++ .../serialization/InstantDeserializer.java | 45 +++ .../kafka/serialization/InstantSerializer.java | 45 +++ .../io/kafka/serialization/package-info.java | 22 ++ .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 119 ++++-- 9 files changed, 640 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java index 905b30e..ff43fa6 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -44,6 +44,8 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.junit.Rule; @@ -118,14 +120,14 @@ public class SparkRunnerDebuggerTest { KafkaIO.Read<String, String> read = KafkaIO.<String, String>read() .withBootstrapServers("mykafka:9092") .withTopics(Collections.singletonList("my_input_topic")) - .withKeyCoder(StringUtf8Coder.of()) - .withValueCoder(StringUtf8Coder.of()); + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class); KafkaIO.Write<String, String> write = KafkaIO.<String, String>write() .withBootstrapServers("myotherkafka:9092") .withTopic("my_output_topic") - .withKeyCoder(StringUtf8Coder.of()) - .withValueCoder(StringUtf8Coder.of()); + .withKeySerializer(StringSerializer.class) + .withValueSerializer(StringSerializer.class); KvCoder<String, String> stringKvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 1aa76a3..7d7fd08 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -39,15 +39,15 @@ import org.apache.beam.runners.spark.SparkPipelineResult; import org.apache.beam.runners.spark.TestSparkPipelineOptions; import org.apache.beam.runners.spark.UsesCheckpointRecovery; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; -import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.MicrobatchSource; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer; +import org.apache.beam.sdk.io.kafka.serialization.InstantSerializer; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricResult; @@ -75,6 +75,7 @@ import org.apache.beam.sdk.values.PDone; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; import org.joda.time.Instant; @@ -119,18 +120,7 @@ public class ResumeFromCheckpointStreamingTest { producerProps.put("request.required.acks", 1); producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList()); Serializer<String> stringSerializer = new StringSerializer(); - Serializer<Instant> instantSerializer = new Serializer<Instant>() { - @Override - public void configure(Map<String, ?> configs, boolean isKey) { } - - @Override - public byte[] serialize(String topic, Instant data) { - return CoderHelpers.toByteArray(data, InstantCoder.of()); - } - - @Override - public void close() { } - }; + Serializer<Instant> instantSerializer = new InstantSerializer(); try (@SuppressWarnings("unchecked") KafkaProducer<String, Instant> kafkaProducer = new KafkaProducer(producerProps, stringSerializer, instantSerializer)) { @@ -232,8 +222,8 @@ public class ResumeFromCheckpointStreamingTest { KafkaIO.Read<String, Instant> read = KafkaIO.<String, Instant>read() .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList()) .withTopics(Collections.singletonList(TOPIC)) - .withKeyCoder(StringUtf8Coder.of()) - .withValueCoder(InstantCoder.of()) + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(InstantDeserializer.class) .updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest")) .withTimestampFn(new SerializableFunction<KV<String, Instant>, Instant>() { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 68efb9a..a0977b7 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -34,6 +34,8 @@ import com.google.common.io.Closeables; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -54,9 +56,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.Read.Unbounded; @@ -64,6 +66,8 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark; +import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaDeserializer; +import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaSerializer; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; @@ -73,8 +77,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.ExposedByteArrayInputStream; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -94,6 +96,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.joda.time.Duration; import org.joda.time.Instant; @@ -116,9 +119,8 @@ import org.slf4j.LoggerFactory; * <p>Although most applications consume a single topic, the source can be configured to consume * multiple topics or even a specific set of {@link TopicPartition}s. * - * <p>To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt> - * and one or more topics to consume. The following example illustrates various options for - * configuring the source : + * <p>To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>, + * one or more topics to consume, and key and value deserializers. For example: * * <pre>{@code * @@ -126,9 +128,9 @@ import org.slf4j.LoggerFactory; * .apply(KafkaIO.<Long, String>read() * .withBootstrapServers("broker_1:9092,broker_2:9092") * .withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics. - * // set a Coder for Key and Value - * .withKeyCoder(BigEndianLongCoder.of()) - * .withValueCoder(StringUtf8Coder.of()) + * .withKeyDeserializer(LongDeserializer.class) + * .withValueDeserializer(StringDeserializer.class) + * * // above four are required configuration. returns PCollection<KafkaRecord<Long, String>> * * // rest of the settings are optional : @@ -150,6 +152,51 @@ import org.slf4j.LoggerFactory; * ... * }</pre> * + * <p>Kafka provides deserializers for common types in + * {@link org.apache.kafka.common.serialization}. + * + * <p>To read Avro data, {@code fromAvro} can be used. This does not require manually specifying + * a {@link Coder} or {@link Deserializer}. + * + * <p>It's also possible to deserialize data using a Beam {@link Coder} via + * {@link #readWithCoders(Coder, Coder)}, though this is discouraged because the particular + * binary format is not guaranteed by coders. However, this can be useful + * when exchanging data with a Beam pipeline that uses the same coder: + * + * <pre>{@code + * + * pipeline + * .apply(KafkaIO.<MyKey, MyValue>readWithCoders(MyKeyCoder.of(), MyValueCoder.of()) + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withTopic("my_topic") + * ) + * ... + * }</pre> + * + * <p>In most cases, you don't need to specify {@link Coder} for key and value in the resulting + * collection because the coders are inferred from deserializer types. However, in cases when + * coder inference fails, they can be specified manually using {@link Read#withKeyCoder} and + * {@link Read#withValueCoder}. Note that the payloads of Kafka messages is interpreted using + * key and value <i>deserializers</i>; coders are a Beam implementation detail to help runners + * materialize the data for intermediate storage if necessary. + * + * <pre>{@code + * + * pipeline + * .apply(KafkaIO.<Long, Foo>read() + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withTopic("my_topic") + * + * // infer coder from deserializer + * .withKeyDeserializer(LongDeserializer.class) + * + * // explicitly specify coder + * .withValueDeserializer(FooDeserializer.class) + * .withValueCoder(FooCoder.of()) + * ) + * ... + * }</pre> + * * <h3>Partition Assignment and Checkpointing</h3> * The Kafka partitions are evenly distributed among splits (workers). * Checkpointing is fully supported and each split can resume from previous checkpoint. See @@ -165,8 +212,7 @@ import org.slf4j.LoggerFactory; * * <p>KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write * just the values. To configure a Kafka sink, you must specify at the minimum Kafka - * <tt>bootstrapServers</tt> and the topic to write to. The following example illustrates various - * options for configuring the sink: + * <tt>bootstrapServers</tt>, the topic to write to, and key and value serializers. For example: * * <pre>{@code * @@ -175,9 +221,8 @@ import org.slf4j.LoggerFactory; * .withBootstrapServers("broker_1:9092,broker_2:9092") * .withTopic("results") * - * // set Coder for Key and Value - * .withKeyCoder(BigEndianLongCoder.of()) - * .withValueCoder(StringUtf8Coder.of()) + * .withKeySerializer(LongSerializer.class) + * .withValueSerializer(StringSerializer.class) * * // you can further customize KafkaProducer used to write the records by adding more * // settings for ProducerConfig. e.g, to enable compression : @@ -193,11 +238,16 @@ import org.slf4j.LoggerFactory; * strings.apply(KafkaIO.<Void, String>write() * .withBootstrapServers("broker_1:9092,broker_2:9092") * .withTopic("results") - * .withValueCoder(StringUtf8Coder.of()) // just need coder for value + * .withValueSerializer(new StringSerializer()) // just need serializer for value * .values() * ); * }</pre> * + * <p>Same notes on coders vs. serializers apply as above for {@link Read}. + * + * <p>To write Avro data, {@code toAvro} can be used. This does not require specifying serializers + * or coders. + * * <h3>Advanced Kafka Configuration</h3> * KafkaIO allows setting most of the properties in {@link ConsumerConfig} for source or in * {@link ProducerConfig} for sink. E.g. if you would like to enable offset @@ -214,18 +264,53 @@ import org.slf4j.LoggerFactory; */ @Experimental public class KafkaIO { + + /** + * Attempt to infer a {@link Coder} by extracting the type of the deserialized-class from the + * deserializer argument using the {@link Coder} registry. + */ + @VisibleForTesting + static <T> Coder<T> inferCoder( + CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) { + checkNotNull(deserializer); + + for (Type type : deserializer.getGenericInterfaces()) { + if (!(type instanceof ParameterizedType)) { + continue; + } + + // This does not recurse: we will not infer from a class that extends + // a class that extends Deserializer<T>. + ParameterizedType parameterizedType = (ParameterizedType) type; + + if (parameterizedType.getRawType() == Deserializer.class) { + Type parameter = parameterizedType.getActualTypeArguments()[0]; + + try { + @SuppressWarnings("unchecked") + Class<T> clazz = (Class<T>) parameter; + return coderRegistry.getDefaultCoder(clazz); + } catch (CannotProvideCoderException e) { + LOG.warn("Could not infer coder from deserializer type", e); + } + } + } + + throw new RuntimeException("Could not extract deserializer type from " + deserializer); + } + /** * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka * configuration should set with {@link Read#withBootstrapServers(String)} and - * {@link Read#withTopics(List)}. Other optional settings include key and value coders, - * custom timestamp and watermark functions. + * {@link Read#withTopics(List)}. Other optional settings include key and value + * {@link Deserializer}s, custom timestamp and watermark functions. */ public static Read<byte[], byte[]> readBytes() { return new AutoValue_KafkaIO_Read.Builder<byte[], byte[]>() .setTopics(new ArrayList<String>()) .setTopicPartitions(new ArrayList<TopicPartition>()) - .setKeyCoder(ByteArrayCoder.of()) - .setValueCoder(ByteArrayCoder.of()) + .setKeyDeserializer(ByteArrayDeserializer.class) + .setValueDeserializer(ByteArrayDeserializer.class) .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN) .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES) .setMaxNumRecords(Long.MAX_VALUE) @@ -235,8 +320,8 @@ public class KafkaIO { /** * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka * configuration should set with {@link Read#withBootstrapServers(String)} and - * {@link Read#withTopics(List)}. Other optional settings include key and value coders, - * custom timestamp and watermark functions. + * {@link Read#withTopics(List)}. Other optional settings include key and value + * {@link Deserializer}s, custom timestamp and watermark functions. */ public static <K, V> Read<K, V> read() { return new AutoValue_KafkaIO_Read.Builder<K, V>() @@ -249,15 +334,87 @@ public class KafkaIO { } /** + * Creates an uninitialized {@link Read} {@link PTransform}, using Kafka {@link Deserializer}s + * based on {@link Coder} instances. + */ + @SuppressWarnings("unchecked") + public static <K, V> Read<K, V> readWithCoders(Coder<K> keyCoder, Coder<V> valueCoder) { + // Kafka constructs deserializers directly. Pass coder through consumer + // configuration. + ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>(); + Map<String, Object> config = builder + .putAll(Read.DEFAULT_CONSUMER_PROPERTIES) + .put(CoderBasedKafkaDeserializer.configForKeyDeserializer(), keyCoder) + .put(CoderBasedKafkaDeserializer.configForValueDeserializer(), valueCoder) + .build(); + + return new AutoValue_KafkaIO_Read.Builder<K, V>() + .setTopics(new ArrayList<String>()) + .setTopicPartitions(new ArrayList<TopicPartition>()) + .setKeyCoder(keyCoder) + .setValueCoder(valueCoder) + .setKeyDeserializer((Class) CoderBasedKafkaDeserializer.class) + .setValueDeserializer((Class) CoderBasedKafkaDeserializer.class) + .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN) + .setConsumerConfig(config) + .setMaxNumRecords(Long.MAX_VALUE) + .build(); + } + + /** + * Creates an uninitialized {@link Read} {@link PTransform}, using Kafka {@link Deserializer}s + * based on {@link AvroCoder}. This reads data in the Avro binary format directly without using + * an Avro object container. + */ + @SuppressWarnings("unchecked") + public static <K, V> Read<K, V> fromAvro(Class<K> keyClass, Class<V> valueClass) { + return readWithCoders(AvroCoder.of(keyClass), AvroCoder.of(valueClass)); + } + + /** * Creates an uninitialized {@link Write} {@link PTransform}. Before use, Kafka configuration * should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic} - * along with {@link Coder}s for (optional) key and values. + * along with {@link Deserializer}s for (optional) key and values. */ public static <K, V> Write<K, V> write() { return new AutoValue_KafkaIO_Write.Builder<K, V>() .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES) .build(); } + /** + * Creates an uninitialized {@link Write} {@link PTransform}, using Kafka {@link Serializer}s + * based on {@link Coder} instances. + */ + @SuppressWarnings("unchecked") + public static <K, V> Write<K, V> writeWithCoders(Coder<K> keyCoder, Coder<V> valueCoder) { + // Kafka constructs serializers directly. Pass coder through consumer + // configuration. + ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>(); + Map<String, Object> config = builder + .putAll(Write.DEFAULT_PRODUCER_PROPERTIES) + .put(CoderBasedKafkaSerializer.configForKeySerializer(), keyCoder) + .put(CoderBasedKafkaSerializer.configForValueSerializer(), valueCoder) + .build(); + + CoderBasedKafkaSerializer<K> keySerializer = new CoderBasedKafkaSerializer<K>(); + CoderBasedKafkaSerializer<V> valueSerializer = new CoderBasedKafkaSerializer<V>(); + + return new AutoValue_KafkaIO_Write.Builder<K, V>() + .setProducerConfig(config) + .setKeySerializer((Class) CoderBasedKafkaSerializer.class) + .setValueSerializer((Class) CoderBasedKafkaSerializer.class) + .build(); + } + + /** + * Creates an uninitialized {@link Write} {@link PTransform}, using Kafka {@link Serializer}s + * based on {@link AvroCoder}. The coder writes Avro data directly without using an Avro object + * container. + */ + @SuppressWarnings("unchecked") + public static <K, V> Write<K, V> toAvro(Class<K> keyClass, Class<V> valueClass) { + return writeWithCoders(AvroCoder.of(keyClass), AvroCoder.of(valueClass)); + } ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ @@ -273,6 +430,8 @@ public class KafkaIO { abstract List<TopicPartition> getTopicPartitions(); @Nullable abstract Coder<K> getKeyCoder(); @Nullable abstract Coder<V> getValueCoder(); + @Nullable abstract Class<? extends Deserializer<K>> getKeyDeserializer(); + @Nullable abstract Class<? extends Deserializer<V>> getValueDeserializer(); abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> getConsumerFactoryFn(); @Nullable abstract SerializableFunction<KafkaRecord<K, V>, Instant> getTimestampFn(); @@ -290,6 +449,9 @@ public class KafkaIO { abstract Builder<K, V> setTopicPartitions(List<TopicPartition> topicPartitions); abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder); abstract Builder<K, V> setValueCoder(Coder<V> valueCoder); + abstract Builder<K, V> setKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer); + abstract Builder<K, V> setValueDeserializer( + Class<? extends Deserializer<V>> valueDeserializer); abstract Builder<K, V> setConsumerFactoryFn( SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn); abstract Builder<K, V> setTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn); @@ -342,14 +504,28 @@ public class KafkaIO { } /** - * Returns a new {@link Read} with {@link Coder} for key bytes. + * Returns a new {@link Read} with a Kafka {@link Deserializer} for key bytes. + */ + public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) { + return toBuilder().setKeyDeserializer(keyDeserializer).build(); + } + + /** + * Returns a new {@link Read} with a {@link Coder} for the key. */ public Read<K, V> withKeyCoder(Coder<K> keyCoder) { return toBuilder().setKeyCoder(keyCoder).build(); } /** - * Returns a new {@link Read} with {@link Coder} for value bytes. + * Returns a new {@link Read} with a Kafka {@link Deserializer} for value bytes. + */ + public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) { + return toBuilder().setValueDeserializer(valueDeserializer).build(); + } + + /** + * Returns a new {@link Read} with a {@link Coder} for values. */ public Read<K, V> withValueCoder(Coder<V> valueCoder) { return toBuilder().setValueCoder(valueCoder).build(); @@ -436,20 +612,51 @@ public class KafkaIO { } @Override - public void validate(PBegin input) { + public void validate(PBegin input) { checkNotNull(getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), "Kafka bootstrap servers should be set"); checkArgument(getTopics().size() > 0 || getTopicPartitions().size() > 0, "Kafka topics or topic_partitions are required"); - checkNotNull(getKeyCoder(), "Key coder must be set"); - checkNotNull(getValueCoder(), "Value coder must be set"); + checkNotNull(getKeyDeserializer(), "Key deserializer must be set"); + checkNotNull(getValueDeserializer(), "Value deserializer must be set"); + + if (input != null) { + CoderRegistry registry = input.getPipeline().getCoderRegistry(); + + checkNotNull(getKeyCoder() != null + ? getKeyCoder() + : inferCoder(registry, getKeyDeserializer()), + "Key coder must be set"); + + checkNotNull(getValueCoder() != null + ? getValueCoder() + : inferCoder(registry, getValueDeserializer()), + "Value coder must be set"); + } else { + checkNotNull(getKeyCoder(), "Key coder must be set"); + checkNotNull(getValueCoder(), "Value coder must be set"); + } } @Override public PCollection<KafkaRecord<K, V>> expand(PBegin input) { - // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. + // Infer key/value coders if not specified explicitly + CoderRegistry registry = input.getPipeline().getCoderRegistry(); + + Coder<K> keyCoder = getKeyCoder() != null + ? getKeyCoder() + : inferCoder(registry, getKeyDeserializer()); + + Coder<V> valueCoder = getValueCoder() != null + ? getValueCoder() + : inferCoder(registry, getValueDeserializer()); + + // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. Unbounded<KafkaRecord<K, V>> unbounded = - org.apache.beam.sdk.io.Read.from(makeSource()); + org.apache.beam.sdk.io.Read.from(this + .withKeyCoder(keyCoder) + .withValueCoder(valueCoder) + .makeSource()); PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded; @@ -488,8 +695,8 @@ public class KafkaIO { * A set of properties that are not required or don't make sense for our consumer. */ private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyCoder instead", - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueCoder instead" + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer instead", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead" // "group.id", "enable.auto.commit", "auto.commit.interval.ms" : // lets allow these, applications can have better resume point for restarts. ); @@ -739,6 +946,9 @@ public class KafkaIO { private Instant curTimestamp; private Iterator<PartitionState> curBatch = Collections.emptyIterator(); + private Deserializer<K> keyDeserializerInstance = null; + private Deserializer<V> valueDeserializerInstance = null; + private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10); @@ -912,6 +1122,16 @@ public class KafkaIO { consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig()); consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions()); + try { + keyDeserializerInstance = source.spec.getKeyDeserializer().newInstance(); + valueDeserializerInstance = source.spec.getValueDeserializer().newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new IOException("Could not instantiate deserializers", e); + } + + keyDeserializerInstance.configure(spec.getConsumerConfig(), true); + valueDeserializerInstance.configure(spec.getConsumerConfig(), false); + for (PartitionState p : partitionStates) { if (p.nextOffset != UNINITIALIZED_OFFSET) { consumer.seek(p.topicPartition, p.nextOffset); @@ -1003,15 +1223,15 @@ public class KafkaIO { curRecord = null; // user coders below might throw. - // apply user coders. might want to allow skipping records that fail to decode. - // TODO: wrap exceptions from coders to make explicit to users + // apply user deserializers. + // TODO: write records that can't be deserialized to a "dead-letter" additional output. KafkaRecord<K, V> record = new KafkaRecord<K, V>( rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), consumerSpEL.getRecordTimestamp(rawRecord), - decode(rawRecord.key(), source.spec.getKeyCoder()), - decode(rawRecord.value(), source.spec.getValueCoder())); + keyDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.key()), + valueDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.value())); curTimestamp = (source.spec.getTimestampFn() == null) ? Instant.now() : source.spec.getTimestampFn().apply(record); @@ -1032,16 +1252,6 @@ public class KafkaIO { } } - private static byte[] nullBytes = new byte[0]; - private static <T> T decode(byte[] bytes, Coder<T> coder) throws IOException { - // If 'bytes' is null, use byte[0]. It is common for key in Kakfa record to be null. - // This makes it impossible for user to distinguish between zero length byte and null. - // Alternately, we could have a ByteArrayCoder that handles nulls, and use that for default - // coder. - byte[] toDecode = bytes == null ? nullBytes : bytes; - return coder.decode(new ExposedByteArrayInputStream(toDecode), Coder.Context.OUTER); - } - // update latest offset for each partition. // called from offsetFetcher thread private void updateLatestOffsets() { @@ -1153,6 +1363,9 @@ public class KafkaIO { } } + Closeables.close(keyDeserializerInstance, true); + Closeables.close(valueDeserializerInstance, true); + Closeables.close(offsetConsumer, true); Closeables.close(consumer, true); } @@ -1167,22 +1380,23 @@ public class KafkaIO { @AutoValue public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> { @Nullable abstract String getTopic(); - @Nullable abstract Coder<K> getKeyCoder(); - @Nullable abstract Coder<V> getValueCoder(); abstract Map<String, Object> getProducerConfig(); @Nullable abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn(); + @Nullable abstract Class<? extends Serializer<K>> getKeySerializer(); + @Nullable abstract Class<? extends Serializer<V>> getValueSerializer(); + abstract Builder<K, V> toBuilder(); @AutoValue.Builder abstract static class Builder<K, V> { abstract Builder<K, V> setTopic(String topic); - abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder); - abstract Builder<K, V> setValueCoder(Coder<V> valueCoder); abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig); abstract Builder<K, V> setProducerFactoryFn( SerializableFunction<Map<String, Object>, Producer<K, V>> fn); + abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> serializer); + abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> serializer); abstract Write<K, V> build(); } @@ -1204,19 +1418,19 @@ public class KafkaIO { } /** - * Returns a new {@link Write} with {@link Coder} for serializing key (if any) to bytes. + * Returns a new {@link Write} with {@link Serializer} for serializing key (if any) to bytes. * A key is optional while writing to Kafka. Note when a key is set, its hash is used to * determine partition in Kafka (see {@link ProducerRecord} for more details). */ - public Write<K, V> withKeyCoder(Coder<K> keyCoder) { - return toBuilder().setKeyCoder(keyCoder).build(); + public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) { + return toBuilder().setKeySerializer(keySerializer).build(); } /** - * Returns a new {@link Write} with {@link Coder} for serializing value to bytes. + * Returns a new {@link Write} with {@link Serializer} for serializing value to bytes. */ - public Write<K, V> withValueCoder(Coder<V> valueCoder) { - return toBuilder().setValueCoder(valueCoder).build(); + public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) { + return toBuilder().setValueSerializer(valueSerializer).build(); } public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) { @@ -1239,7 +1453,7 @@ public class KafkaIO { * collections of values rather thank {@link KV}s. */ public PTransform<PCollection<V>, PDone> values() { - return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder<K>()).toBuilder().build()); + return new KafkaValueWrite<>(toBuilder().build()); } @Override @@ -1253,26 +1467,19 @@ public class KafkaIO { checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), "Kafka bootstrap servers should be set"); checkNotNull(getTopic(), "Kafka topic should be set"); - checkNotNull(getKeyCoder(), "Key coder should be set"); - checkNotNull(getValueCoder(), "Value coder should be set"); } // set config defaults private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES = ImmutableMap.<String, Object>of( - ProducerConfig.RETRIES_CONFIG, 3, - // See comment about custom serializers in KafkaWriter constructor. - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class); + ProducerConfig.RETRIES_CONFIG, 3); /** * A set of properties that are not required or don't make sense for our producer. */ private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Set keyCoder instead", - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Set valueCoder instead", - configForKeySerializer(), "Reserved for internal serializer", - configForValueSerializer(), "Reserved for internal serializer" + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead", + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead" ); @Override @@ -1310,7 +1517,7 @@ public class KafkaIO { return KV.of(null, element); } })) - .setCoder(KvCoder.of(new NullOnlyCoder<K>(), kvWriteTransform.getValueCoder())) + .setCoder(KvCoder.of(new NullOnlyCoder<K>(), input.getCoder())) .apply(kvWriteTransform); } @@ -1389,8 +1596,11 @@ public class KafkaIO { // Use case : write all the events for a single session to same Kafka partition. this.producerConfig = new HashMap<>(spec.getProducerConfig()); - this.producerConfig.put(configForKeySerializer(), spec.getKeyCoder()); - this.producerConfig.put(configForValueSerializer(), spec.getValueCoder()); + + this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + spec.getKeySerializer()); + this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + spec.getValueSerializer()); } private synchronized void checkForFailures() throws IOException { @@ -1427,48 +1637,4 @@ public class KafkaIO { } } } - - /** - * Implements Kafka's {@link Serializer} with a {@link Coder}. The coder is stored as serialized - * value in producer configuration map. - */ - public static class CoderBasedKafkaSerializer<T> implements Serializer<T> { - - @SuppressWarnings("unchecked") - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - String configKey = isKey ? configForKeySerializer() : configForValueSerializer(); - coder = (Coder<T>) configs.get(configKey); - checkNotNull(coder, "could not instantiate coder for Kafka serialization"); - } - - @Override - public byte[] serialize(String topic, @Nullable T data) { - if (data == null) { - return null; // common for keys to be null - } - - try { - return CoderUtils.encodeToByteArray(coder, data); - } catch (CoderException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() { - } - - private Coder<T> coder = null; - private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.serializer"; - } - - - private static String configForKeySerializer() { - return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "key"); - } - - private static String configForValueSerializer() { - return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "value"); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java new file mode 100644 index 0000000..ca552fb --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.kafka.serialization; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Map; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * Implements a Kafka {@link Deserializer} with a {@link Coder}. + * + * <p>As Kafka instantiates serializers directly, the coder must be stored as serialized value in + * the producer configuration map. + */ +public class CoderBasedKafkaDeserializer<T> implements Deserializer<T> { + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + String configKey = isKey ? configForKeyDeserializer() : configForValueDeserializer(); + coder = (Coder<T>) configs.get(configKey); + checkNotNull(coder, "could not instantiate coder for Kafka deserialization"); + } + + @Override + public T deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + + try { + return CoderUtils.decodeFromByteArray(coder, data); + } catch (CoderException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() {} + + public static String configForKeyDeserializer() { + return String.format(CoderBasedKafkaDeserializer.CONFIG_FORMAT, "key"); + } + + public static String configForValueDeserializer() { + return String.format(CoderBasedKafkaDeserializer.CONFIG_FORMAT, "value"); + } + + private Coder<T> coder = null; + private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.deserializer"; +} http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java new file mode 100644 index 0000000..1044d6f --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.kafka.serialization; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Map; +import javax.annotation.Nullable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.kafka.common.serialization.Serializer; + +/** + * Implements Kafka's {@link Serializer} with a {@link Coder}. + * + * <p>As Kafka instantiates serializers directly, the coder + * must be stored as serialized value in the producer configuration map. + */ +public class CoderBasedKafkaSerializer<T> implements Serializer<T> { + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + String configKey = isKey ? configForKeySerializer() : configForValueSerializer(); + coder = (Coder<T>) configs.get(configKey); + checkNotNull(coder, "could not instantiate coder for Kafka serialization"); + } + + @Override + public byte[] serialize(String topic, @Nullable T data) { + if (data == null) { + return null; // common for keys to be null + } + + try { + return CoderUtils.encodeToByteArray(coder, data); + } catch (CoderException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } + + public static String configForKeySerializer() { + return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "key"); + } + + public static String configForValueSerializer() { + return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "value"); + } + + private Coder<T> coder = null; + private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.serializer"; +} http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java new file mode 100644 index 0000000..fe4749f --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.kafka.serialization; + +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.joda.time.Instant; + +/** + * Kafka {@link Deserializer} for {@link Instant}. + * + * <p>This decodes the number of milliseconds since epoch using {@link LongDeserializer}. + */ +public class InstantDeserializer implements Deserializer<Instant> { + private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); + + @Override + public void configure(Map<String, ?> configs, boolean isKey) {} + + @Override + public Instant deserialize(String topic, byte[] bytes) { + return new Instant(LONG_DESERIALIZER.deserialize(topic, bytes)); + } + + @Override + public void close() {} +} http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java new file mode 100644 index 0000000..8fa4429 --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.kafka.serialization; + +import java.util.Map; + +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.joda.time.Instant; + +/** + * Kafka {@link Serializer} for {@link Instant}. + * + * <p>This encodes the number of milliseconds since epoch using {@link LongSerializer}. + */ +public class InstantSerializer implements Serializer<Instant> { + private static final LongSerializer LONG_SERIALIZER = new LongSerializer(); + + @Override + public void configure(Map<String, ?> configs, boolean isKey) {} + + @Override + public byte[] serialize(String topic, Instant instant) { + return LONG_SERIALIZER.serialize(topic, instant.getMillis()); + } + + @Override + public void close() {} +}; http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java new file mode 100644 index 0000000..747d64c --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Kafka serializers and deserializers. + */ +package org.apache.beam.sdk.io.kafka.serialization; http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 2b11162..e6ed2f7 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -41,12 +41,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -74,7 +77,14 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Utils; import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.joda.time.Instant; @@ -231,8 +241,8 @@ public class KafkaIOTest { .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions - .withKeyCoder(BigEndianIntegerCoder.of()) - .withValueCoder(BigEndianLongCoder.of()) + .withKeyDeserializer(IntegerDeserializer.class) + .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements); if (timestampFn != null) { @@ -303,9 +313,9 @@ public class KafkaIOTest { .withTopic("my_topic") .withConsumerFactoryFn(new ConsumerFactoryFn( ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) - .withKeyCoder(BigEndianIntegerCoder.of()) - .withValueCoder(BigEndianLongCoder.of()) - .withMaxNumRecords(numElements); + .withMaxNumRecords(numElements) + .withKeyDeserializer(IntegerDeserializer.class) + .withValueDeserializer(LongDeserializer.class); PCollection<Long> input = p .apply(reader.withoutMetadata()) @@ -326,8 +336,8 @@ public class KafkaIOTest { .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5))) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 partitions - .withKeyCoder(ByteArrayCoder.of()) - .withValueCoder(BigEndianLongCoder.of()) + .withKeyDeserializer(ByteArrayDeserializer.class) + .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements / 10); PCollection<Long> input = p @@ -386,8 +396,14 @@ public class KafkaIOTest { int numElements = 1000; int numSplits = 10; + // Coders must be specified explicitly here due to the way the transform + // is used in the test. UnboundedSource<KafkaRecord<Integer, Long>, ?> initial = - mkKafkaReadTransform(numElements, null).makeSource(); + mkKafkaReadTransform(numElements, null) + .withKeyCoder(VarIntCoder.of()) + .withValueCoder(VarLongCoder.of()) + .makeSource(); + List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits = initial.split(numSplits, p.getOptions()); assertEquals("Expected exact splitting", numSplits, splits.size()); @@ -510,8 +526,8 @@ public class KafkaIOTest { .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.LATEST)) - .withKeyCoder(BigEndianIntegerCoder.of()) - .withValueCoder(BigEndianLongCoder.of()) + .withKeyDeserializer(IntegerDeserializer.class) + .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements) .withTimestampFn(new ValueAsTimestampFn()) .makeSource() @@ -554,8 +570,8 @@ public class KafkaIOTest { .apply(KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) - .withKeyCoder(BigEndianIntegerCoder.of()) - .withValueCoder(BigEndianLongCoder.of()) + .withKeySerializer(IntegerSerializer.class) + .withValueSerializer(LongSerializer.class) .withProducerFactoryFn(new ProducerFactoryFn())); p.run(); @@ -587,7 +603,7 @@ public class KafkaIOTest { .apply(KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) - .withValueCoder(BigEndianLongCoder.of()) + .withValueSerializer(LongSerializer.class) .withProducerFactoryFn(new ProducerFactoryFn()) .values()); @@ -628,8 +644,8 @@ public class KafkaIOTest { .apply(KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) - .withKeyCoder(BigEndianIntegerCoder.of()) - .withValueCoder(BigEndianLongCoder.of()) + .withKeySerializer(IntegerSerializer.class) + .withValueSerializer(LongSerializer.class) .withProducerFactoryFn(new ProducerFactoryFn())); try { @@ -664,8 +680,8 @@ public class KafkaIOTest { new TopicPartition("test", 6))) .withConsumerFactoryFn(new ConsumerFactoryFn( Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions - .withKeyCoder(ByteArrayCoder.of()) - .withValueCoder(BigEndianLongCoder.of()); + .withKeyDeserializer(ByteArrayDeserializer.class) + .withValueDeserializer(LongDeserializer.class); DisplayData displayData = DisplayData.from(read); @@ -681,7 +697,7 @@ public class KafkaIOTest { KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write() .withBootstrapServers("myServerA:9092,myServerB:9092") .withTopic("myTopic") - .withValueCoder(BigEndianLongCoder.of()) + .withValueSerializer(LongSerializer.class) .withProducerFactoryFn(new ProducerFactoryFn()); DisplayData displayData = DisplayData.from(write); @@ -691,6 +707,51 @@ public class KafkaIOTest { assertThat(displayData, hasDisplayItem("retries", 3)); } + // interface for testing coder inference + private interface DummyInterface<T> { + } + + // interface for testing coder inference + private interface DummyNonparametricInterface { + } + + // class for testing coder inference + private static class DeserializerWithInterfaces + implements DummyInterface<String>, DummyNonparametricInterface, + Deserializer<Long> { + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + } + + @Override + public Long deserialize(String topic, byte[] bytes) { + return 0L; + } + + @Override + public void close() { + + } + } + + @Test + public void testInferKeyCoder() { + CoderRegistry registry = CoderRegistry.createDefault(); + + assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class) + instanceof VarLongCoder); + + assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class) + instanceof StringUtf8Coder); + + assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class) + instanceof InstantCoder); + + assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class) + instanceof VarLongCoder); + } + private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) { // verify that appropriate messages are written to kafka @@ -724,8 +785,8 @@ public class KafkaIOTest { private static final MockProducer<Integer, Long> MOCK_PRODUCER = new MockProducer<Integer, Long>( false, // disable synchronous completion of send. see ProducerSendCompletionThread below. - new KafkaIO.CoderBasedKafkaSerializer<Integer>(), - new KafkaIO.CoderBasedKafkaSerializer<Long>()) { + new IntegerSerializer(), + new LongSerializer()) { // override flush() so that it does not complete all the waiting sends, giving a chance to // ProducerCompletionThread to inject errors. @@ -754,10 +815,14 @@ public class KafkaIOTest { public Producer<Integer, Long> apply(Map<String, Object> config) { // Make sure the config is correctly set up for serializers. - Utils.newInstance( - ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) - .asSubclass(Serializer.class) - ).configure(config, true); + + // There may not be a key serializer if we're interested only in values. + if (config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) != null) { + Utils.newInstance( + ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) + .asSubclass(Serializer.class) + ).configure(config, true); + } Utils.newInstance( ((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
