Repository: beam Updated Branches: refs/heads/master 9fc8198cd -> 4e8c5f3de
KafkaIO API clean up Removes withKeyCoder() and withValueCoder() methods. Their meaning changed when KafkaIO added support for Deserializers. The coders can be explicitly specified using withKeyDeserializerAndCoder(), likewise for value. This makes it explicit to the user that Deserializer is still required and JavaDoc explains why/when both are required. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d3ff8da7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d3ff8da7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d3ff8da7 Branch: refs/heads/master Commit: d3ff8da7311572e93701bd94aeb9ed93abb0002e Parents: 9fc8198 Author: Raghu Angadi <[email protected]> Authored: Mon May 8 17:14:37 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue May 9 12:14:01 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 287 ++++++------------- .../CoderBasedKafkaDeserializer.java | 70 ----- .../CoderBasedKafkaSerializer.java | 72 ----- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 112 +------- 4 files changed, 97 insertions(+), 444 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d3ff8da7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index e21945f..cb31ea2 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -67,8 +67,6 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark; -import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaDeserializer; -import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaSerializer; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.SinkMetrics; @@ -158,49 +156,14 @@ import org.slf4j.LoggerFactory; * }</pre> * * <p>Kafka provides deserializers for common types in - * {@link org.apache.kafka.common.serialization}. - * - * <p>To read Avro data, {@code fromAvro} can be used. This does not require manually specifying - * a {@link Coder} or {@link Deserializer}. - * - * <p>It's also possible to deserialize data using a Beam {@link Coder} via - * {@link #readWithCoders(Coder, Coder)}, though this is discouraged because the particular - * binary format is not guaranteed by coders. However, this can be useful - * when exchanging data with a Beam pipeline that uses the same coder: - * - * <pre>{@code - * - * pipeline - * .apply(KafkaIO.<MyKey, MyValue>readWithCoders(MyKeyCoder.of(), MyValueCoder.of()) - * .withBootstrapServers("broker_1:9092,broker_2:9092") - * .withTopic("my_topic") - * ) - * ... - * }</pre> - * - * <p>In most cases, you don't need to specify {@link Coder} for key and value in the resulting + * {@link org.apache.kafka.common.serialization}. In addition to deserializers, Beam runners need + * {@link Coder} to materialize key and value objects if necessary. + * In most cases, you don't need to specify {@link Coder} for key and value in the resulting * collection because the coders are inferred from deserializer types. However, in cases when - * coder inference fails, they can be specified manually using {@link Read#withKeyCoder} and - * {@link Read#withValueCoder}. Note that the payloads of Kafka messages is interpreted using - * key and value <i>deserializers</i>; coders are a Beam implementation detail to help runners - * materialize the data for intermediate storage if necessary. - * - * <pre>{@code - * - * pipeline - * .apply(KafkaIO.<Long, Foo>read() - * .withBootstrapServers("broker_1:9092,broker_2:9092") - * .withTopic("my_topic") - * - * // infer coder from deserializer - * .withKeyDeserializer(LongDeserializer.class) - * - * // explicitly specify coder - * .withValueDeserializer(FooDeserializer.class) - * .withValueCoder(FooCoder.of()) - * ) - * ... - * }</pre> + * coder inference fails, they can be specified explicitly along with deserializers using + * {@link Read#withKeyDeserializerAndCoder(Class, Coder)} and + * {@link Read#withValueDeserializerAndCoder(Class, Coder)}. Note that Kafka messages are + * interpreted using key and value <i>deserializers</i>. * * <h3>Partition Assignment and Checkpointing</h3> * The Kafka partitions are evenly distributed among splits (workers). @@ -257,11 +220,6 @@ import org.slf4j.LoggerFactory; * ); * }</pre> * - * <p>Same notes on coders vs. serializers apply as above for {@link Read}. - * - * <p>To write Avro data, {@code toAvro} can be used. This does not require specifying serializers - * or coders. - * * <h3>Advanced Kafka Configuration</h3> * KafkaIO allows setting most of the properties in {@link ConsumerConfig} for source or in * {@link ProducerConfig} for sink. E.g. if you would like to enable offset @@ -280,46 +238,6 @@ import org.slf4j.LoggerFactory; public class KafkaIO { /** - * Attempt to infer a {@link Coder} by extracting the type of the deserialized-class from the - * deserializer argument using the {@link Coder} registry. - */ - @VisibleForTesting - static <T> NullableCoder<T> inferCoder( - CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) { - checkNotNull(deserializer); - - for (Type type : deserializer.getGenericInterfaces()) { - if (!(type instanceof ParameterizedType)) { - continue; - } - - // This does not recurse: we will not infer from a class that extends - // a class that extends Deserializer<T>. - ParameterizedType parameterizedType = (ParameterizedType) type; - - if (parameterizedType.getRawType() == Deserializer.class) { - Type parameter = parameterizedType.getActualTypeArguments()[0]; - - @SuppressWarnings("unchecked") - Class<T> clazz = (Class<T>) parameter; - - try { - return NullableCoder.of(coderRegistry.getCoder(clazz)); - } catch (CannotProvideCoderException e) { - throw new RuntimeException( - String.format("Unable to automatically infer a Coder for " - + "the Kafka Deserializer %s: no coder registered for type %s", - deserializer, clazz)); - } - } - } - - throw new RuntimeException( - String.format("Could not extract the Kafaka Deserializer type from %s", - deserializer)); - } - - /** * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka * configuration should set with {@link Read#withBootstrapServers(String)} and * {@link Read#withTopics(List)}. Other optional settings include key and value @@ -354,44 +272,6 @@ public class KafkaIO { } /** - * Creates an uninitialized {@link Read} {@link PTransform}, using Kafka {@link Deserializer}s - * based on {@link Coder} instances. - */ - @SuppressWarnings("unchecked") - public static <K, V> Read<K, V> readWithCoders(Coder<K> keyCoder, Coder<V> valueCoder) { - // Kafka constructs deserializers directly. Pass coder through consumer - // configuration. - ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>(); - Map<String, Object> config = builder - .putAll(Read.DEFAULT_CONSUMER_PROPERTIES) - .put(CoderBasedKafkaDeserializer.configForKeyDeserializer(), keyCoder) - .put(CoderBasedKafkaDeserializer.configForValueDeserializer(), valueCoder) - .build(); - - return new AutoValue_KafkaIO_Read.Builder<K, V>() - .setTopics(new ArrayList<String>()) - .setTopicPartitions(new ArrayList<TopicPartition>()) - .setKeyCoder(keyCoder) - .setValueCoder(valueCoder) - .setKeyDeserializer((Class) CoderBasedKafkaDeserializer.class) - .setValueDeserializer((Class) CoderBasedKafkaDeserializer.class) - .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN) - .setConsumerConfig(config) - .setMaxNumRecords(Long.MAX_VALUE) - .build(); - } - - /** - * Creates an uninitialized {@link Read} {@link PTransform}, using Kafka {@link Deserializer}s - * based on {@link AvroCoder}. This reads data in the Avro binary format directly without using - * an Avro object container. - */ - @SuppressWarnings("unchecked") - public static <K, V> Read<K, V> fromAvro(Class<K> keyClass, Class<V> valueClass) { - return readWithCoders(AvroCoder.of(keyClass), AvroCoder.of(valueClass)); - } - - /** * Creates an uninitialized {@link Write} {@link PTransform}. Before use, Kafka configuration * should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic} * along with {@link Deserializer}s for (optional) key and values. @@ -401,40 +281,6 @@ public class KafkaIO { .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES) .build(); } - /** - * Creates an uninitialized {@link Write} {@link PTransform}, using Kafka {@link Serializer}s - * based on {@link Coder} instances. - */ - @SuppressWarnings("unchecked") - public static <K, V> Write<K, V> writeWithCoders(Coder<K> keyCoder, Coder<V> valueCoder) { - // Kafka constructs serializers directly. Pass coder through consumer - // configuration. - ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>(); - Map<String, Object> config = builder - .putAll(Write.DEFAULT_PRODUCER_PROPERTIES) - .put(CoderBasedKafkaSerializer.configForKeySerializer(), keyCoder) - .put(CoderBasedKafkaSerializer.configForValueSerializer(), valueCoder) - .build(); - - CoderBasedKafkaSerializer<K> keySerializer = new CoderBasedKafkaSerializer<K>(); - CoderBasedKafkaSerializer<V> valueSerializer = new CoderBasedKafkaSerializer<V>(); - - return new AutoValue_KafkaIO_Write.Builder<K, V>() - .setProducerConfig(config) - .setKeySerializer((Class) CoderBasedKafkaSerializer.class) - .setValueSerializer((Class) CoderBasedKafkaSerializer.class) - .build(); - } - - /** - * Creates an uninitialized {@link Write} {@link PTransform}, using Kafka {@link Serializer}s - * based on {@link AvroCoder}. The coder writes Avro data directly without using an Avro object - * container. - */ - @SuppressWarnings("unchecked") - public static <K, V> Write<K, V> toAvro(Class<K> keyClass, Class<V> valueClass) { - return writeWithCoders(AvroCoder.of(keyClass), AvroCoder.of(valueClass)); - } ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ @@ -524,31 +370,48 @@ public class KafkaIO { } /** - * Returns a new {@link Read} with a Kafka {@link Deserializer} for key bytes. + * Returns new {@link Read} with a Kafka {@link Deserializer} to interpret key bytes read + * from Kafka. In addition, Beam also needs a Coder to serialize and deserialize key objects + * at runtime. KafkaIO tries to infer coders for many of the common types (bytes, strings, + * integers, etc) based on Deserializer class. It might not be able to do so for some types. + * Please use {@link #withKeyDeserializerAndCoder(Class, Coder)} to provide the key coder + * explicitly. + * + * @see KafkaIO.Read#withKeyDeserializerAndCoder */ public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) { return toBuilder().setKeyDeserializer(keyDeserializer).build(); } /** - * Returns a new {@link Read} with a {@link Coder} for the key. + * Returns new {@link Read} with a Kafka {@link Deserializer} to deserializer key bytes read + * from Kafka and {@link Coder} for serializing and deserializing the keys objects at runtime. */ - public Read<K, V> withKeyCoder(Coder<K> keyCoder) { - return toBuilder().setKeyCoder(keyCoder).build(); + public Read<K, V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> keyDeserializer, + Coder<K> keyCoder) { + + return toBuilder() + .setKeyDeserializer(keyDeserializer) + .setKeyCoder(keyCoder) + .build(); } - /** - * Returns a new {@link Read} with a Kafka {@link Deserializer} for value bytes. - */ public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) { return toBuilder().setValueDeserializer(valueDeserializer).build(); } /** - * Returns a new {@link Read} with a {@link Coder} for values. + * Returns new {@link Read} with a Kafka {@link Deserializer} to deserializer value bytes read + * from Kafka and {@link Coder} for serializing and deserializing the value objects at runtime. */ - public Read<K, V> withValueCoder(Coder<V> valueCoder) { - return toBuilder().setValueCoder(valueCoder).build(); + public Read<K, V> withValueDeserializerAndCoder( + Class<? extends Deserializer<V>> valueDeserializer, + Coder<V> valueCoder) { + + return toBuilder() + .setValueDeserializer(valueDeserializer) + .setValueCoder(valueCoder) + .build(); } /** @@ -649,20 +512,23 @@ public class KafkaIO { Coder<K> keyCoder = checkNotNull( getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer()), - "Key coder must be inferable from input or set using readWithCoders"); + "Key coder could not be inferred from key deserializer. Please provide" + + "key coder explicitly using withKeyDeserializerAndCoder()"); Coder<V> valueCoder = checkNotNull( - getValueCoder() != null - ? getValueCoder() + getValueCoder() != null ? getValueCoder() : inferCoder(registry, getValueDeserializer()), - "Value coder must be inferable from input or set using readWithCoders"); + "Value coder could not be inferred from value deserializer. Please provide" + + "value coder explicitly using withValueDeserializerAndCoder()"); // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. Unbounded<KafkaRecord<K, V>> unbounded = - org.apache.beam.sdk.io.Read.from(this - .withKeyCoder(keyCoder) - .withValueCoder(valueCoder) + org.apache.beam.sdk.io.Read.from( + toBuilder() + .setKeyCoder(keyCoder) + .setValueCoder(valueCoder) + .build() .makeSource()); PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded; @@ -683,6 +549,7 @@ public class KafkaIO { */ @VisibleForTesting UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() { + return new UnboundedKafkaSource<K, V>(this, -1); } @@ -703,11 +570,9 @@ public class KafkaIO { */ private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer instead", - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead" // "group.id", "enable.auto.commit", "auto.commit.interval.ms" : // lets allow these, applications can have better resume point for restarts. - CoderBasedKafkaDeserializer.configForKeyDeserializer(), "Use readWithCoders instead", - CoderBasedKafkaDeserializer.configForValueDeserializer(), "Use readWithCoders instead" ); // set config defaults @@ -756,7 +621,9 @@ public class KafkaIO { for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) { String key = conf.getKey(); if (!ignoredConsumerPropertiesKeys.contains(key)) { - builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(conf.getValue()))); + Object value = DisplayData.inferType(conf.getValue()) != null + ? conf.getValue() : String.valueOf(conf.getValue()); + builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value))); } } } @@ -1543,10 +1410,7 @@ public class KafkaIO { */ private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead", - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead", - - CoderBasedKafkaSerializer.configForKeySerializer(), "Use writeWithCoders instead", - CoderBasedKafkaSerializer.configForValueSerializer(), "Use writeWithCoders instead" + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead" ); @Override @@ -1557,7 +1421,9 @@ public class KafkaIO { for (Map.Entry<String, Object> conf : getProducerConfig().entrySet()) { String key = conf.getKey(); if (!ignoredProducerPropertiesKeys.contains(key)) { - builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(conf.getValue()))); + Object value = DisplayData.inferType(conf.getValue()) != null + ? conf.getValue() : String.valueOf(conf.getValue()); + builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value))); } } } @@ -1658,14 +1524,6 @@ public class KafkaIO { KafkaWriter(Write<K, V> spec) { this.spec = spec; - // Set custom kafka serializers. We do not want to serialize user objects then pass the bytes - // to producer since key and value objects are used in Kafka Partitioner interface. - // This does not matter for default partitioner in Kafka as it uses just the serialized - // key bytes to pick a partition. But we don't want to limit use of custom partitions. - // We pass key and values objects the user writes directly Kafka and user supplied - // coders to serialize them are invoked inside CoderBasedKafkaSerializer. - // Use case : write all the events for a single session to same Kafka partition. - this.producerConfig = new HashMap<>(spec.getProducerConfig()); this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -1708,4 +1566,43 @@ public class KafkaIO { } } } + + /** + * Attempt to infer a {@link Coder} by extracting the type of the deserialized-class from the + * deserializer argument using the {@link Coder} registry. + */ + @VisibleForTesting + static <T> NullableCoder<T> inferCoder( + CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) { + checkNotNull(deserializer); + + for (Type type : deserializer.getGenericInterfaces()) { + if (!(type instanceof ParameterizedType)) { + continue; + } + + // This does not recurse: we will not infer from a class that extends + // a class that extends Deserializer<T>. + ParameterizedType parameterizedType = (ParameterizedType) type; + + if (parameterizedType.getRawType() == Deserializer.class) { + Type parameter = parameterizedType.getActualTypeArguments()[0]; + + @SuppressWarnings("unchecked") + Class<T> clazz = (Class<T>) parameter; + + try { + return NullableCoder.of(coderRegistry.getCoder(clazz)); + } catch (CannotProvideCoderException e) { + throw new RuntimeException( + String.format("Unable to automatically infer a Coder for " + + "the Kafka Deserializer %s: no coder registered for type %s", + deserializer, clazz)); + } + } + } + + throw new RuntimeException(String.format( + "Could not extract the Kafka Deserializer type from %s", deserializer)); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/d3ff8da7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java deleted file mode 100644 index a165586..0000000 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.kafka.serialization; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Map; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.kafka.common.serialization.Deserializer; - -/** - * Implements a Kafka {@link Deserializer} with a {@link Coder}. - * - * <p>As Kafka instantiates serializers directly, the coder must be stored as serialized value in - * the producer configuration map. - */ -public class CoderBasedKafkaDeserializer<T> implements Deserializer<T> { - @SuppressWarnings("unchecked") - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - String configKey = isKey ? configForKeyDeserializer() : configForValueDeserializer(); - coder = (Coder<T>) configs.get(configKey); - checkNotNull(coder, "could not instantiate coder for Kafka deserialization"); - } - - @Override - public T deserialize(String topic, byte[] data) { - if (data == null) { - return null; - } - - try { - return CoderUtils.decodeFromByteArray(coder, data); - } catch (CoderException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() {} - - public static String configForKeyDeserializer() { - return String.format(CoderBasedKafkaDeserializer.CONFIG_FORMAT, "key"); - } - - public static String configForValueDeserializer() { - return String.format(CoderBasedKafkaDeserializer.CONFIG_FORMAT, "value"); - } - - private Coder<T> coder = null; - private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.deserializer"; -} http://git-wip-us.apache.org/repos/asf/beam/blob/d3ff8da7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java deleted file mode 100644 index 84b617e..0000000 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.kafka.serialization; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.kafka.common.serialization.Serializer; - -/** - * Implements Kafka's {@link Serializer} with a {@link Coder}. - * - * <p>As Kafka instantiates serializers directly, the coder - * must be stored as serialized value in the producer configuration map. - */ -public class CoderBasedKafkaSerializer<T> implements Serializer<T> { - @SuppressWarnings("unchecked") - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - String configKey = isKey ? configForKeySerializer() : configForValueSerializer(); - coder = (Coder<T>) configs.get(configKey); - checkNotNull(coder, "could not instantiate coder for Kafka serialization"); - } - - @Override - public byte[] serialize(String topic, @Nullable T data) { - if (data == null) { - return null; // common for keys to be null - } - - try { - return CoderUtils.encodeToByteArray(coder, data); - } catch (CoderException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() { - } - - public static String configForKeySerializer() { - return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "key"); - } - - public static String configForValueSerializer() { - return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "value"); - } - - private Coder<T> coder = null; - private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.serializer"; -} http://git-wip-us.apache.org/repos/asf/beam/blob/d3ff8da7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index ccbd3d6..12b7c78 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -43,10 +43,11 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; @@ -264,35 +265,6 @@ public class KafkaIOTest { } } - /** - * Creates a consumer with two topics, with 10 partitions each. - * numElements are (round-robin) assigned all the 20 partitions. - * Coders are specified explicitly. - */ - private static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithCoders( - int numElements, - @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) { - - List<String> topics = ImmutableList.of("topic_a", "topic_b"); - - KafkaIO.Read<Integer, Long> reader = KafkaIO - .<Integer, Long>readWithCoders(VarIntCoder.of(), VarLongCoder.of()) - .withBootstrapServers("myServer1:9092,myServer2:9092") - .withTopics(topics) - .withConsumerFactoryFn(new ConsumerFactoryFn( - topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions - .withKeyDeserializer(IntegerDeserializer.class) - .withValueDeserializer(LongDeserializer.class) - .withMaxNumRecords(numElements); - - if (timestampFn != null) { - return reader.withTimestampFn(timestampFn); - } else { - return reader; - } - } - - private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> { private final int num; @@ -343,19 +315,6 @@ public class KafkaIOTest { } @Test - public void testUnboundedSourceWithCoders() { - int numElements = 1000; - - PCollection<Long> input = p - .apply(mkKafkaReadTransformWithCoders(numElements, new ValueAsTimestampFn()) - .withoutMetadata()) - .apply(Values.<Long>create()); - - addCountingAsserts(input, numElements); - p.run(); - } - - @Test public void testUnboundedSourceWithSingleTopic() { // same as testUnboundedSource, but with single topic @@ -454,9 +413,9 @@ public class KafkaIOTest { // is used in the test. UnboundedSource<KafkaRecord<Integer, Long>, ?> initial = mkKafkaReadTransform(numElements, null) - .withKeyCoder(VarIntCoder.of()) - .withValueCoder(VarLongCoder.of()) - .makeSource(); + .withKeyDeserializerAndCoder(IntegerDeserializer.class, BigEndianIntegerCoder.of()) + .withValueDeserializerAndCoder(LongDeserializer.class, BigEndianLongCoder.of()) + .makeSource(); List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits = initial.split(numSplits, p.getOptions()); @@ -717,39 +676,6 @@ public class KafkaIOTest { } @Test - public void testSinkWithCoders() throws Exception { - // Simply read from kafka source and write to kafka sink. Then verify the records - // are correctly published to mock kafka producer. - - int numElements = 1000; - - synchronized (MOCK_PRODUCER_LOCK) { - - MOCK_PRODUCER.clear(); - - ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start(); - - String topic = "test"; - - p - .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) - .withoutMetadata()) - .apply(KafkaIO.<Integer, Long>writeWithCoders(VarIntCoder.of(), VarLongCoder.of()) - .withBootstrapServers("none") - .withTopic(topic) - .withKeySerializer(IntegerSerializer.class) - .withValueSerializer(LongSerializer.class) - .withProducerFactoryFn(new ProducerFactoryFn())); - - p.run(); - - completionThread.shutdown(); - - verifyProducerRecords(topic, numElements, false); - } - } - - @Test public void testValuesSink() throws Exception { // similar to testSink(), but use values()' interface. @@ -840,19 +766,6 @@ public class KafkaIOTest { } @Test - public void testSourceDisplayDataWithCoders() { - KafkaIO.Read<Integer, Long> read = mkKafkaReadTransformWithCoders(10, null); - - DisplayData displayData = DisplayData.from(read); - - assertThat(displayData, hasDisplayItem("topics", "topic_a,topic_b")); - assertThat(displayData, hasDisplayItem("enable.auto.commit", false)); - assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092")); - assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest")); - assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288)); - } - - @Test public void testSourceWithExplicitPartitionsDisplayData() { KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read() .withBootstrapServers("myServer1:9092,myServer2:9092") @@ -886,21 +799,6 @@ public class KafkaIOTest { assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092")); assertThat(displayData, hasDisplayItem("retries", 3)); } - @Test - public void testSinkDisplayDataWithCoders() { - KafkaIO.Write<Integer, Long> write = KafkaIO - .<Integer, Long>writeWithCoders(VarIntCoder.of(), VarLongCoder.of()) - .withBootstrapServers("myServerA:9092,myServerB:9092") - .withTopic("myTopic") - .withValueSerializer(LongSerializer.class) - .withProducerFactoryFn(new ProducerFactoryFn()); - - DisplayData displayData = DisplayData.from(write); - - assertThat(displayData, hasDisplayItem("topic", "myTopic")); - assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092")); - assertThat(displayData, hasDisplayItem("retries", 3)); - } // interface for testing coder inference private interface DummyInterface<T> {
