Repository: incubator-beam Updated Branches: refs/heads/master 0e62c29eb -> c2146b9f9
squash PR 271 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5b6ebd1e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5b6ebd1e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5b6ebd1e Branch: refs/heads/master Commit: 5b6ebd1e6a8bce34b95820e57c7817f8665ba2f1 Parents: 0e62c29 Author: Raghu Angadi <[email protected]> Authored: Fri Jun 3 17:46:38 2016 -0700 Committer: Raghu Angadi <[email protected]> Committed: Fri Jun 3 17:46:38 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 441 ++++++++++++++++++- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 300 ++++++++++++- 2 files changed, 705 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5b6ebd1e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 7fff641..9645d7c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -23,7 +23,10 @@ import static com.google.common.base.Preconditions.checkState; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Unbounded; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; @@ -34,10 +37,12 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.ExposedByteArrayInputStream; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; import com.google.common.annotations.VisibleForTesting; @@ -56,10 +61,17 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Serializer; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -86,8 +98,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; /** - * An unbounded source for <a href="http://kafka.apache.org/">Kafka</a> topics. Kafka version 0.9 - * and above are supported. + * An unbounded source and a sink for <a href="http://kafka.apache.org/">Kafka</a> topics. + * Kafka version 0.9 and above are supported. * * <h3>Reading from Kafka topics</h3> * @@ -146,25 +158,54 @@ import javax.annotation.Nullable; * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through * {@link Read#updateConsumerProperties(Map)}. * + * <h3>Writing to Kafka</h3> + * + * KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write + * just the values. To configure a Kafka sink, you must specify at the minimum Kafka + * <tt>bootstrapServers</tt> and the topic to write to. The following example illustrates various + * options for configuring the sink: + * + * <pre>{@code + * + * pipeline + * .apply(...) // returns PCollection<KV<Long, String>> + * .apply(KafkaIO.write() + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withTopic("results") + * + * // set Coder for Key and Value + * .withKeyCoder(BigEndianLongCoder.of()) + * .withValueCoder(StringUtf8Coder.of()) + + * // you can further customize KafkaProducer used to write the records by adding more + * // settings for ProducerConfig. e.g, to enable compression : + * .updateProducerProperties(ImmutableMap.of("compression.type", "gzip")) + * ); + * }</pre> + * + * Often you might want to write just values without any keys to Kafka. Use {@code values()} to + * write records with default empty(null) key: + * + * <pre>{@code + * PCollection<String> strings = ...; + * strings.apply(KafkaIO.write() + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withTopic("results") + * .withValueCoder(StringUtf8Coder.of()) // just need coder for value + * .values() // writes values to Kafka with default key + * ); + * }</pre> + * * <h3>Advanced Kafka Configuration</h3> - * KafakIO allows setting most of the properties in {@link ConsumerConfig}. E.g. if you would like - * to enable offset <em>auto commit</em> (for external monitoring or other purposes), you can set + * KafakIO allows setting most of the properties in {@link ConsumerConfig} for source or in + * {@link ProducerConfig} for sink. E.g. if you would like to enable offset + * <em>auto commit</em> (for external monitoring or other purposes), you can set * <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc. */ public class KafkaIO { - private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); - - private static class NowTimestampFn<T> implements SerializableFunction<T, Instant> { - @Override - public Instant apply(T input) { - return Instant.now(); - } - } - - /** - * Creates and uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka + * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka * configuration should set with {@link Read#withBootstrapServers(String)} and * {@link Read#withTopics(List)}. Other optional settings include key and value coders, * custom timestamp and watermark functions. @@ -182,6 +223,21 @@ public class KafkaIO { } /** + * Creates an uninitialized {@link Write} {@link PTransform}. Before use, Kafka configuration + * should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic} + * along with {@link Coder}s for (optional) key and values. + */ + public static Write<byte[], byte[]> write() { + return new Write<byte[], byte[]>( + null, + ByteArrayCoder.of(), + ByteArrayCoder.of(), + TypedWrite.DEFAULT_PRODUCER_PROPERTIES); + } + + ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ + + /** * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more * information on usage and configuration. */ @@ -253,13 +309,9 @@ public class KafkaIO { * Update consumer configuration with new properties. */ public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) { - for (String key : configUpdates.keySet()) { - checkArgument(!IGNORED_CONSUMER_PROPERTIES.containsKey(key), - "No need to configure '%s'. %s", key, IGNORED_CONSUMER_PROPERTIES.get(key)); - } - Map<String, Object> config = new HashMap<>(consumerConfig); - config.putAll(configUpdates); + Map<String, Object> config = updateKafkaProperties(consumerConfig, + IGNORED_CONSUMER_PROPERTIES, configUpdates); return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder, consumerFactoryFn, config, maxNumRecords, maxReadTime); @@ -305,8 +357,8 @@ public class KafkaIO { * A set of properties that are not required or don't make sense for our consumer. */ private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDecoderFn instead", - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDecoderFn instead" + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyCoder instead", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueCoder instead" // "group.id", "enable.auto.commit", "auto.commit.interval.ms" : // lets allow these, applications can have better resume point for restarts. ); @@ -508,6 +560,37 @@ public class KafkaIO { } } + //////////////////////////////////////////////////////////////////////////////////////////////// + + private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); + + /** + * Returns a new config map which is merge of current config and updates. + * Verifies the updates do not includes ignored properties. + */ + private static Map<String, Object> updateKafkaProperties( + Map<String, Object> currentConfig, + Map<String, String> ignoredProperties, + Map<String, Object> updates) { + + for (String key : updates.keySet()) { + checkArgument(!ignoredProperties.containsKey(key), + "No need to configure '%s'. %s", key, ignoredProperties.get(key)); + } + + Map<String, Object> config = new HashMap<>(currentConfig); + config.putAll(updates); + + return config; + } + + private static class NowTimestampFn<T> implements SerializableFunction<T, Instant> { + @Override + public Instant apply(T input) { + return Instant.now(); + } + } + /** Static class, prevent instantiation. */ private KafkaIO() {} @@ -719,7 +802,6 @@ public class KafkaIO { private double avgRecordSize = 0; private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements - PartitionState(TopicPartition partition, long offset) { this.topicPartition = partition; this.consumedOffset = offset; @@ -1073,4 +1155,315 @@ public class KafkaIO { Closeables.close(consumer, true); } } + + //////////////////////// Sink Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ + + /** + * A {@link PTransform} to write to a Kafka topic. See {@link KafkaIO} for more + * information on usage and configuration. + */ + public static class Write<K, V> extends TypedWrite<K, V> { + + /** + * Returns a new {@link Write} transform with Kafka producer pointing to + * {@code bootstrapServers}. + */ + public Write<K, V> withBootstrapServers(String bootstrapServers) { + return updateProducerProperties( + ImmutableMap.<String, Object>of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); + } + + /** + * Returns a new {@link Write} transform that write to given topic. + */ + public Write<K, V> withTopic(String topic) { + return new Write<K, V>(topic, keyCoder, valueCoder, producerConfig); + } + + /** + * Returns a new {@link Write} with {@link Coder} for serializing key (if any) to bytes. + * A key is optional while writing to Kafka. Note when a key is set, its hash is used to + * determine partition in Kafka (see {@link ProducerRecord} for more details). + */ + public <KeyT> Write<KeyT, V> withKeyCoder(Coder<KeyT> keyCoder) { + return new Write<KeyT, V>(topic, keyCoder, valueCoder, producerConfig); + } + + /** + * Returns a new {@link Write} with {@link Coder} for serializing value to bytes. + */ + public <ValueT> Write<K, ValueT> withValueCoder(Coder<ValueT> valueCoder) { + return new Write<K, ValueT>(topic, keyCoder, valueCoder, producerConfig); + } + + public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) { + Map<String, Object> config = updateKafkaProperties(producerConfig, + TypedWrite.IGNORED_PRODUCER_PROPERTIES, configUpdates); + return new Write<K, V>(topic, keyCoder, valueCoder, config); + } + + private Write( + String topic, + Coder<K> keyCoder, + Coder<V> valueCoder, + Map<String, Object> producerConfig) { + super(topic, keyCoder, valueCoder, producerConfig, + Optional.<SerializableFunction<Map<String, Object>, Producer<K, V>>>absent()); + } + } + + /** + * A {@link PTransform} to write to a Kafka topic. See {@link KafkaIO} for more + * information on usage and configuration. + */ + public static class TypedWrite<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> { + + /** + * Returns a new {@link Write} with a custom function to create Kafka producer. Primarily used + * for tests. Default is {@link KafkaProducer} + */ + public TypedWrite<K, V> withProducerFactoryFn( + SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) { + return new TypedWrite<K, V>(topic, keyCoder, valueCoder, producerConfig, + Optional.of(producerFactoryFn)); + } + + /** + * Returns a new transform that writes just the values to Kafka. This is useful for writing + * collections of values rather thank {@link KV}s. + */ + @SuppressWarnings("unchecked") + public PTransform<PCollection<V>, PDone> values() { + return new KafkaValueWrite<V>((TypedWrite<Void, V>) this); + // Any way to avoid casting here to TypedWrite<Void, V>? We can't create + // new TypedWrite without casting producerFactoryFn. + } + + @Override + public PDone apply(PCollection<KV<K, V>> input) { + input.apply(ParDo.of(new KafkaWriter<K, V>( + topic, keyCoder, valueCoder, producerConfig, producerFactoryFnOpt))); + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PCollection<KV<K, V>> input) { + checkNotNull(producerConfig.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), + "Kafka bootstrap servers should be set"); + checkNotNull(topic, "Kafka topic should be set"); + } + + ////////////////////////////////////////////////////////////////////////////////////////// + + protected final String topic; + protected final Coder<K> keyCoder; + protected final Coder<V> valueCoder; + protected final Optional<SerializableFunction<Map<String, Object>, Producer<K, V>>> + producerFactoryFnOpt; + protected final Map<String, Object> producerConfig; + + protected TypedWrite( + String topic, + Coder<K> keyCoder, + Coder<V> valueCoder, + Map<String, Object> producerConfig, + Optional<SerializableFunction<Map<String, Object>, Producer<K, V>>> producerFactoryFnOpt) { + + this.topic = topic; + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + this.producerConfig = producerConfig; + this.producerFactoryFnOpt = producerFactoryFnOpt; + } + + // set config defaults + private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES = + ImmutableMap.<String, Object>of( + ProducerConfig.RETRIES_CONFIG, 3, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class); + + /** + * A set of properties that are not required or don't make sense for our consumer. + */ + private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Set keyCoder instead", + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Set valueCoder instead", + configForKeySerializer(), "Reserved for internal serializer", + configForValueSerializer(), "Reserved for internal serializer" + ); + } + + /** + * Same as Write<K, V> without a Key. Null is used for key as it is the convention is Kafka + * when there is no key specified. Majority of Kafka writers don't specify a key. + */ + private static class KafkaValueWrite<V> extends PTransform<PCollection<V>, PDone> { + + private final TypedWrite<Void, V> kvWriteTransform; + + private KafkaValueWrite(TypedWrite<Void, V> kvWriteTransform) { + this.kvWriteTransform = kvWriteTransform; + } + + @Override + public PDone apply(PCollection<V> input) { + return input + .apply("Kafka values with default key", + ParDo.of(new DoFn<V, KV<Void, V>>() { + @Override + public void processElement(ProcessContext ctx) throws Exception { + ctx.output(KV.<Void, V>of(null, ctx.element())); + } + })) + .setCoder(KvCoder.of(VoidCoder.of(), kvWriteTransform.valueCoder)) + .apply(kvWriteTransform); + } + } + + private static class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> { + + @Override + public void startBundle(Context c) throws Exception { + // Producer initialization is fairly costly. Move this to future initialization api to avoid + // creating a producer for each bundle. + if (producer == null) { + if (producerFactoryFnOpt.isPresent()) { + producer = producerFactoryFnOpt.get().apply(producerConfig); + } else { + producer = new KafkaProducer<K, V>(producerConfig); + } + } + } + + @Override + public void processElement(ProcessContext ctx) throws Exception { + checkForFailures(); + + KV<K, V> kv = ctx.element(); + producer.send( + new ProducerRecord<K, V>(topic, kv.getKey(), kv.getValue()), + new SendCallback()); + } + + @Override + public void finishBundle(Context c) throws Exception { + producer.flush(); + producer.close(); + producer = null; + checkForFailures(); + } + + /////////////////////////////////////////////////////////////////////////////////// + + private final String topic; + private final Map<String, Object> producerConfig; + private final Optional<SerializableFunction<Map<String, Object>, Producer<K, V>>> + producerFactoryFnOpt; + + private transient Producer<K, V> producer = null; + //private transient Callback sendCallback = new SendCallback(); + // first exception and number of failures since last invocation of checkForFailures(): + private transient Exception sendException = null; + private transient long numSendFailures = 0; + + KafkaWriter(String topic, + Coder<K> keyCoder, + Coder<V> valueCoder, + Map<String, Object> producerConfig, + Optional<SerializableFunction<Map<String, Object>, Producer<K, V>>> producerFactoryFnOpt) { + + this.topic = topic; + this.producerFactoryFnOpt = producerFactoryFnOpt; + + // Set custom kafka serializers. We can not serialize user objects then pass the bytes to + // producer. The key and value objects are used in kafka Partitioner interface. + // This does not matter for default partitioner in Kafka as it uses just the serialized + // key bytes to pick a partition. But are making sure user's custom partitioner would work + // as expected. + + this.producerConfig = new HashMap<>(producerConfig); + this.producerConfig.put(configForKeySerializer(), keyCoder); + this.producerConfig.put(configForValueSerializer(), valueCoder); + } + + private synchronized void checkForFailures() throws IOException { + if (numSendFailures == 0) { + return; + } + + String msg = String.format( + "KafkaWriter : failed to send %d records (since last report)", numSendFailures); + + Exception e = sendException; + sendException = null; + numSendFailures = 0; + + LOG.warn(msg); + throw new IOException(msg, e); + } + + private class SendCallback implements Callback { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception == null) { + return; + } + + synchronized (KafkaWriter.this) { + if (sendException == null) { + sendException = exception; + } + numSendFailures++; + } + // don't log exception stacktrace here, exception will be propagated up. + LOG.warn("KafkaWriter send failed : '{}'", exception.getMessage()); + } + } + } + + /** + * Implements Kafka's {@link Serializer} with a {@link Coder}. The coder is stored as serialized + * value in producer configuration map. + */ + public static class CoderBasedKafkaSerializer<T> implements Serializer<T> { + + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + String configKey = isKey ? configForKeySerializer() : configForValueSerializer(); + coder = (Coder<T>) configs.get(configKey); + checkNotNull(coder, "could not instantiate coder for Kafka serialization"); + } + + @Override + public byte[] serialize(String topic, @Nullable T data) { + if (data == null) { + return null; // common for keys to be null + } + + try { + return CoderUtils.encodeToByteArray(coder, data); + } catch (CoderException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } + + private Coder<T> coder = null; + private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.serializer"; + } + + + private static String configForKeySerializer() { + return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "key"); + } + + private static String configForValueSerializer() { + return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "value"); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5b6ebd1e/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 957271e..7d4337d 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -18,8 +18,12 @@ package org.apache.beam.sdk.io.kafka; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; @@ -49,20 +53,31 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; @@ -79,6 +94,9 @@ public class KafkaIOTest { * - test KafkaRecordCoder */ + @Rule + public ExpectedException thrown = ExpectedException.none(); + // Update mock consumer with records distributed among the given topics, each with given number // of partitions. Records are assigned in round-robin order among the partitions. private static MockConsumer<byte[], byte[]> mkMockConsumer( @@ -113,8 +131,8 @@ public class KafkaIOTest { tp.topic(), tp.partition(), offsets[pIdx]++, - null, // key - ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id. + ByteBuffer.wrap(new byte[8]).putInt(i).array(), // key is 4 byte record id + ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id } MockConsumer<byte[], byte[]> consumer = @@ -161,16 +179,17 @@ public class KafkaIOTest { * Creates a consumer with two topics, with 5 partitions each. * numElements are (round-robin) assigned all the 10 partitions. */ - private static KafkaIO.TypedRead<byte[], Long> mkKafkaReadTransform( + private static KafkaIO.TypedRead<Integer, Long> mkKafkaReadTransform( int numElements, - @Nullable SerializableFunction<KV<byte[], Long>, Instant> timestampFn) { + @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) { List<String> topics = ImmutableList.of("topic_a", "topic_b"); - KafkaIO.Read<byte[], Long> reader = KafkaIO.read() + KafkaIO.Read<Integer, Long> reader = KafkaIO.read() .withBootstrapServers("none") .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions + .withKeyCoder(BigEndianIntegerCoder.of()) .withValueCoder(BigEndianLongCoder.of()) .withMaxNumRecords(numElements); @@ -305,9 +324,9 @@ public class KafkaIOTest { int numElements = 1000; int numSplits = 10; - UnboundedSource<KafkaRecord<byte[], Long>, ?> initial = + UnboundedSource<KafkaRecord<Integer, Long>, ?> initial = mkKafkaReadTransform(numElements, null).makeSource(); - List<? extends UnboundedSource<KafkaRecord<byte[], Long>, ?>> splits = + List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits = initial.generateInitialSplits(numSplits, p.getOptions()); assertEquals("Expected exact splitting", numSplits, splits.size()); @@ -317,7 +336,7 @@ public class KafkaIOTest { for (int i = 0; i < splits.size(); ++i) { pcollections = pcollections.and( p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit)) - .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<byte[], Long>())) + .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<Integer, Long>())) .apply("collection " + i, Values.<Long>create())); } PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections()); @@ -330,9 +349,9 @@ public class KafkaIOTest { * A timestamp function that uses the given value as the timestamp. */ private static class ValueAsTimestampFn - implements SerializableFunction<KV<byte[], Long>, Instant> { + implements SerializableFunction<KV<Integer, Long>, Instant> { @Override - public Instant apply(KV<byte[], Long> input) { + public Instant apply(KV<Integer, Long> input) { return new Instant(input.getValue()); } } @@ -352,13 +371,13 @@ public class KafkaIOTest { int numElements = 85; // 85 to make sure some partitions have more records than other. // create a single split: - UnboundedSource<KafkaRecord<byte[], Long>, KafkaCheckpointMark> source = + UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source = mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .makeSource() .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create()) .get(0); - UnboundedReader<KafkaRecord<byte[], Long>> reader = source.createReader(null, null); + UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null, null); final int numToSkip = 3; // advance numToSkip elements @@ -394,4 +413,261 @@ public class KafkaIOTest { } } } + + @Test + public void testSink() throws Exception { + // Simply read from kafka source and write to kafka sink. Then verify the records + // are correctly published to mock kafka producer. + + int numElements = 1000; + + synchronized (MOCK_PRODUCER_LOCK) { + + MOCK_PRODUCER.clear(); + + ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start(); + + Pipeline pipeline = TestPipeline.create(); + String topic = "test"; + + pipeline + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) + .withoutMetadata()) + .apply(KafkaIO.write() + .withBootstrapServers("none") + .withTopic(topic) + .withKeyCoder(BigEndianIntegerCoder.of()) + .withValueCoder(BigEndianLongCoder.of()) + .withProducerFactoryFn(new ProducerFactoryFn())); + + pipeline.run(); + + completionThread.shutdown(); + + verifyProducerRecords(topic, numElements, false); + } + } + + @Test + public void testValuesSink() throws Exception { + // similar to testSink(), but use values()' interface. + + int numElements = 1000; + + synchronized (MOCK_PRODUCER_LOCK) { + + MOCK_PRODUCER.clear(); + + ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start(); + + Pipeline pipeline = TestPipeline.create(); + String topic = "test"; + + pipeline + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) + .withoutMetadata()) + .apply(Values.<Long>create()) // there are no keys + .apply(KafkaIO.write() + .withBootstrapServers("none") + .withTopic(topic) + .withKeyCoder(BigEndianIntegerCoder.of()) + .withValueCoder(BigEndianLongCoder.of()) + .withProducerFactoryFn(new ProducerFactoryFn()) + .values()); + + pipeline.run(); + + completionThread.shutdown(); + + verifyProducerRecords(topic, numElements, true); + } + } + + @Test + public void testSinkWithSendErrors() throws Throwable { + // similar to testSink(), except that up to 10 of the send calls to producer will fail + // asynchronously. + + // TODO: Ideally we want the pipeline to run to completion by retrying bundles that fail. + // We limit the number of errors injected to 10 below. This would reflect a real streaming + // pipeline. But I am sure how to achieve that. For now expect an exception: + + thrown.expect(InjectedErrorException.class); + thrown.expectMessage("Injected Error #1"); + + int numElements = 1000; + + synchronized (MOCK_PRODUCER_LOCK) { + + MOCK_PRODUCER.clear(); + + Pipeline pipeline = TestPipeline.create(); + String topic = "test"; + + ProducerSendCompletionThread completionThreadWithErrors = + new ProducerSendCompletionThread(10, 100).start(); + + pipeline + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) + .withoutMetadata()) + .apply(KafkaIO.write() + .withBootstrapServers("none") + .withTopic(topic) + .withKeyCoder(BigEndianIntegerCoder.of()) + .withValueCoder(BigEndianLongCoder.of()) + .withProducerFactoryFn(new ProducerFactoryFn())); + + try { + pipeline.run(); + } catch (PipelineExecutionException e) { + // throwing inner exception helps assert that first exception is thrown from the Sink + throw e.getCause().getCause(); + } finally { + completionThreadWithErrors.shutdown(); + } + } + } + + private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) { + + // verify that appropriate messages are written to kafka + List<ProducerRecord<Integer, Long>> sent = MOCK_PRODUCER.history(); + + // sort by values + Collections.sort(sent, new Comparator<ProducerRecord<Integer, Long>>() { + @Override + public int compare(ProducerRecord<Integer, Long> o1, ProducerRecord<Integer, Long> o2) { + return Long.compare(o1.value(), o2.value()); + } + }); + + for (int i = 0; i < numElements; i++) { + ProducerRecord<Integer, Long> record = sent.get(i); + assertEquals(topic, record.topic()); + if (keyIsAbsent) { + assertNull(record.key()); + } else { + assertEquals(i, record.key().intValue()); + } + assertEquals(i, record.value().longValue()); + } + } + + /** + * Singleton MockProudcer. Using a singleton here since we need access to the object to fetch + * the actual records published to the producer. This prohibits running the tests using + * the producer in parallel, but there are only one or two tests. + */ + private static final MockProducer<Integer, Long> MOCK_PRODUCER = + new MockProducer<Integer, Long>( + false, // disable synchronous completion of send. see ProducerSendCompletionThread below. + new KafkaIO.CoderBasedKafkaSerializer<Integer>(), + new KafkaIO.CoderBasedKafkaSerializer<Long>()) { + + // override flush() so that it does not complete all the waiting sends, giving a chance to + // ProducerCompletionThread to inject errors. + + @Override + public void flush() { + while (completeNext()) { + // there are some uncompleted records. let the completion thread handle them. + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + }; + + // use a separate object serialize tests using MOCK_PRODUCER so that we don't interfere + // with Kafka MockProducer locking itself. + private static final Object MOCK_PRODUCER_LOCK = new Object(); + + private static class ProducerFactoryFn + implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> { + + @Override + public Producer<Integer, Long> apply(Map<String, Object> config) { + return MOCK_PRODUCER; + } + } + + private static class InjectedErrorException extends RuntimeException { + public InjectedErrorException(String message) { + super(message); + } + } + + /** + * We start MockProducer with auto-completion disabled. That implies a record is not marked sent + * until #completeNext() is called on it. This class starts a thread to asynchronously 'complete' + * the the sends. During completion, we can also make those requests fail. This error injection + * is used in one of the tests. + */ + private static class ProducerSendCompletionThread { + + private final int maxErrors; + private final int errorFrequency; + private final AtomicBoolean done = new AtomicBoolean(false); + private final ExecutorService injectorThread; + private int numCompletions = 0; + + ProducerSendCompletionThread() { + // complete everything successfully + this(0, 0); + } + + ProducerSendCompletionThread(final int maxErrors, final int errorFrequency) { + this.maxErrors = maxErrors; + this.errorFrequency = errorFrequency; + injectorThread = Executors.newSingleThreadExecutor(); + } + + ProducerSendCompletionThread start() { + injectorThread.submit(new Runnable() { + @Override + public void run() { + int errorsInjected = 0; + + while (!done.get()) { + boolean successful; + + if (errorsInjected < maxErrors && ((numCompletions + 1) % errorFrequency) == 0) { + successful = MOCK_PRODUCER.errorNext( + new InjectedErrorException("Injected Error #" + (errorsInjected + 1))); + + if (successful) { + errorsInjected++; + } + } else { + successful = MOCK_PRODUCER.completeNext(); + } + + if (successful) { + numCompletions++; + } else { + // wait a bit since there are no unsent records + try { + Thread.sleep(1); + } catch (InterruptedException e) { + } + } + } + } + }); + + return this; + } + + void shutdown() { + done.set(true); + injectorThread.shutdown(); + try { + assertTrue(injectorThread.awaitTermination(10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } }
