Repository: flink Updated Branches: refs/heads/master ffaf10d22 -> 7206b0ed2
[FLINK-4027] Flush FlinkKafkaProducer on checkpoints This closes #2108 This closes #2058 because its an invalid pull request. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7206b0ed Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7206b0ed Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7206b0ed Branch: refs/heads/master Commit: 7206b0ed2adb10c94e1ffd3dbe851250b44edcf4 Parents: ffaf10d Author: Robert Metzger <rmetz...@apache.org> Authored: Wed Jun 15 17:50:38 2016 +0200 Committer: Robert Metzger <rmetz...@apache.org> Committed: Mon Jul 4 11:55:09 2016 +0200 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer08.java | 18 ++ .../kafka/KafkaTestEnvironmentImpl.java | 4 +- .../connectors/kafka/FlinkKafkaProducer09.java | 7 + .../kafka/KafkaTestEnvironmentImpl.java | 4 +- .../kafka/FlinkKafkaProducerBase.java | 120 ++++++++-- .../kafka/AtLeastOnceProducerTest.java | 218 +++++++++++++++++++ .../connectors/kafka/KafkaConsumerTestBase.java | 3 +- 7 files changed, 358 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java index 4975f9a..e509d2f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java @@ -125,4 +125,22 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> { super(topicId, serializationSchema, producerConfig, customPartitioner); } + @Override + protected void flush() { + // The Kafka 0.8 producer doesn't support flushing, we wait here + // until all pending records are confirmed + //noinspection SynchronizeOnNonFinalField + synchronized (pendingRecordsLock) { + while (pendingRecords > 0) { + try { + pendingRecordsLock.wait(); + } catch (InterruptedException e) { + // this can be interrupted when the Task has been cancelled. + // by throwing an exception, we ensure that this checkpoint doesn't get confirmed + throw new RuntimeException("Flushing got interrupted while checkpointing", e); + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 77d41ac..75ca9ed 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -97,7 +97,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { @Override public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { - return new FlinkKafkaProducer08<T>(topic, serSchema, props, partitioner); + FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<T>(topic, serSchema, props, partitioner); + prod.setFlushOnCheckpoint(true); + return prod; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java index 6f7f687..eb3440a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java @@ -127,4 +127,11 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> { public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { super(topicId, serializationSchema, producerConfig, customPartitioner); } + + @Override + protected void flush() { + if (this.producer != null) { + producer.flush(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index b5df6e0..0dbe865 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -92,7 +92,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { @Override public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { - return new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); + FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); + prod.setFlushOnCheckpoint(true); + return prod; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index 6005dff..a9d4917 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -20,7 +20,9 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; @@ -28,6 +30,7 @@ import org.apache.flink.util.NetUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -39,6 +42,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.Properties; @@ -50,11 +54,14 @@ import static java.util.Objects.requireNonNull; /** * Flink Sink to produce data into a Kafka topic. * - * Please note that this producer does not have any reliability guarantees. + * Please note that this producer provides at-least-once reliability guarantees when + * checkpoints are enabled and setFlushOnCheckpoint(true) is set. + * Otherwise, the producer doesn't provide any reliability guarantees. * * @param <IN> Type of the messages to write into Kafka. */ -public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> { +@SuppressWarnings("SynchronizeOnNonFinalField") +public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements Checkpointed<Serializable> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class); @@ -101,6 +108,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> { * Flag indicating whether to accept failures (and log them), or to fail on failures */ protected boolean logFailuresOnly; + + /** + * If true, the producer will wait until all outstanding records have been send to the broker. + */ + private boolean flushOnCheckpoint = false; // -------------------------------- Runtime fields ------------------------------------------ @@ -113,6 +125,16 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> { /** Errors encountered in the async producer are stored here */ protected transient volatile Exception asyncException; + /** + * Number of unacknowledged records. + */ + protected long pendingRecords = 0; + + /** + * Lock for accessing the pending records + */ + protected transient Object pendingRecordsLock; + /** * The main constructor for creating a FlinkKafkaProducer. @@ -150,7 +172,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> { // create a local KafkaProducer to get the list of partitions. // this will also ensure locally that all required ProducerConfig values are set. - try (KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig)) { + try (Producer<Void, IN> getPartitionsProd = getKafkaProducer(this.producerConfig)) { List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(defaultTopicId); this.partitions = new int[partitionsList.size()]; @@ -178,6 +200,24 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> { this.logFailuresOnly = logFailuresOnly; } + /** + * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers + * to be acknowledged by the Kafka producer on a checkpoint. + * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. + * + * @param flush Flag indicating the flushing mode (true = flush on checkpoint) + */ + public void setFlushOnCheckpoint(boolean flush) { + this.flushOnCheckpoint = flush; + } + + /** + * Used for testing only + */ + protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) { + return new KafkaProducer<>(props); + } + // ----------------------------------- Utilities -------------------------- /** @@ -185,10 +225,10 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> { */ @Override public void open(Configuration configuration) { - producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig); + producer = getKafkaProducer(this.producerConfig); RuntimeContext ctx = getRuntimeContext(); - if(partitioner != null) { + if (partitioner != null) { partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions); } @@ -196,32 +236,40 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> { ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), defaultTopicId); // register Kafka metrics to Flink accumulators - if(!Boolean.valueOf(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) { + if (!Boolean.valueOf(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) { Map<MetricName, ? extends Metric> metrics = this.producer.metrics(); - if(metrics == null) { + if (metrics == null) { // MapR's Kafka implementation returns null here. LOG.info("Producer implementation does not support metrics"); } else { - for(Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { + for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { String name = producerId + "-producer-" + metric.getKey().name(); DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue()); // best effort: we only add the accumulator if available. - if(kafkaAccumulator != null) { + if (kafkaAccumulator != null) { getRuntimeContext().addAccumulator(name, kafkaAccumulator); } } } } + if (flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) { + LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing."); + flushOnCheckpoint = false; + } + if (flushOnCheckpoint) { + pendingRecordsLock = new Object(); + } + if (logFailuresOnly) { callback = new Callback() { - @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); } + acknowledgeMessage(); } }; } @@ -232,6 +280,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> { if (exception != null && asyncException == null) { asyncException = exception; } + acknowledgeMessage(); } }; } @@ -251,17 +300,21 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> { byte[] serializedKey = schema.serializeKey(next); byte[] serializedValue = schema.serializeValue(next); String targetTopic = schema.getTargetTopic(next); - if(targetTopic == null) { + if (targetTopic == null) { targetTopic = defaultTopicId; } ProducerRecord<byte[], byte[]> record; - if(partitioner == null) { + if (partitioner == null) { record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue); } else { record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue); } - + if (flushOnCheckpoint) { + synchronized (pendingRecordsLock) { + pendingRecords++; + } + } producer.send(record, callback); } @@ -276,6 +329,47 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> { checkErroneous(); } + // ------------------- Logic for handling checkpoint flushing -------------------------- // + + private void acknowledgeMessage() { + if (!flushOnCheckpoint) { + // the logic is disabled + return; + } + synchronized (pendingRecordsLock) { + pendingRecords--; + if (pendingRecords == 0) { + pendingRecordsLock.notifyAll(); + } + } + } + + /** + * Flush pending records. + */ + protected abstract void flush(); + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { + if (flushOnCheckpoint) { + // flushing is activated: We need to wait until pendingRecords is 0 + flush(); + synchronized (pendingRecordsLock) { + if (pendingRecords != 0) { + throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords); + } + // pending records count is 0. We can now confirm the checkpoint + } + } + // return empty state + return null; + } + + @Override + public void restoreState(Serializable state) { + // nothing to do here + } + // ----------------------------------- Utilities -------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java new file mode 100644 index 0000000..b02593c --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.Assert; +import org.junit.Test; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Test ensuring that the producer is not dropping buffered records + */ +@SuppressWarnings("unchecked") +public class AtLeastOnceProducerTest { + + // we set a timeout because the test will not finish if the logic is broken + @Test(timeout=5000) + public void testAtLeastOnceProducer() throws Throwable { + runTest(true); + } + + // This test ensures that the actual test fails if the flushing is disabled + @Test(expected = AssertionError.class, timeout=5000) + public void ensureTestFails() throws Throwable { + runTest(false); + } + + private void runTest(boolean flushOnCheckpoint) throws Throwable { + Properties props = new Properties(); + final AtomicBoolean snapshottingFinished = new AtomicBoolean(false); + final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, + snapshottingFinished); + producer.setFlushOnCheckpoint(flushOnCheckpoint); + producer.setRuntimeContext(new MockRuntimeContext(0, 1)); + + producer.open(new Configuration()); + + for (int i = 0; i < 100; i++) { + producer.invoke("msg-" + i); + } + // start a thread confirming all pending records + final Tuple1<Throwable> runnableError = new Tuple1<>(null); + final Thread threadA = Thread.currentThread(); + + Runnable confirmer = new Runnable() { + @Override + public void run() { + try { + MockProducer mp = producer.getProducerInstance(); + List<Callback> pending = mp.getPending(); + + // we need to find out if the snapshot() method blocks forever + // this is not possible. If snapshot() is running, it will + // start removing elements from the pending list. + synchronized (threadA) { + threadA.wait(500L); + } + // we now check that no records have been confirmed yet + Assert.assertEquals(100, pending.size()); + Assert.assertFalse("Snapshot method returned before all records were confirmed", + snapshottingFinished.get()); + + // now confirm all checkpoints + for (Callback c: pending) { + c.onCompletion(null, null); + } + pending.clear(); + } catch(Throwable t) { + runnableError.f0 = t; + } + } + }; + Thread threadB = new Thread(confirmer); + threadB.start(); + // this should block: + producer.snapshotState(0, 0); + synchronized (threadA) { + threadA.notifyAll(); // just in case, to let the test fail faster + } + Assert.assertEquals(0, producer.getProducerInstance().getPending().size()); + Deadline deadline = FiniteDuration.apply(5, "s").fromNow(); + while (deadline.hasTimeLeft() && threadB.isAlive()) { + threadB.join(500); + } + Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive()); + if (runnableError.f0 != null) { + throw runnableError.f0; + } + + producer.close(); + } + + + private static class TestingKafkaProducer<T> extends FlinkKafkaProducerBase<T> { + private MockProducer prod; + private AtomicBoolean snapshottingFinished; + + public TestingKafkaProducer(String defaultTopicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, AtomicBoolean snapshottingFinished) { + super(defaultTopicId, serializationSchema, producerConfig, null); + this.snapshottingFinished = snapshottingFinished; + } + + @Override + protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) { + this.prod = new MockProducer(); + return this.prod; + } + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { + // call the actual snapshot state + Serializable ret = super.snapshotState(checkpointId, checkpointTimestamp); + // notify test that snapshotting has been done + snapshottingFinished.set(true); + return ret; + } + + @Override + protected void flush() { + this.prod.flush(); + } + + public MockProducer getProducerInstance() { + return this.prod; + } + } + + private static class MockProducer<K, V> extends KafkaProducer<K, V> { + List<Callback> pendingCallbacks = new ArrayList<>(); + + private static Properties getFakeProperties() { + Properties p = new Properties(); + p.setProperty("bootstrap.servers", "localhost:12345"); + p.setProperty("key.serializer", ByteArraySerializer.class.getName()); + p.setProperty("value.serializer", ByteArraySerializer.class.getName()); + return p; + } + public MockProducer() { + super(getFakeProperties()); + } + + @Override + public Future<RecordMetadata> send(ProducerRecord<K, V> record) { + throw new UnsupportedOperationException("Unexpected"); + } + + @Override + public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { + pendingCallbacks.add(callback); + return null; + } + + @Override + public List<PartitionInfo> partitionsFor(String topic) { + List<PartitionInfo> list = new ArrayList<>(); + list.add(new PartitionInfo(topic, 0, null, null, null)); + return list; + } + + @Override + public Map<MetricName, ? extends Metric> metrics() { + return null; + } + + + public List<Callback> getPending() { + return this.pendingCallbacks; + } + + public void flush() { + while (pendingCallbacks.size() > 0) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException("Unable to flush producer, task was interrupted"); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 220f061..d49b48b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -268,7 +268,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { }); Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); producerProperties.setProperty("retries", "3"); - stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)); + FlinkKafkaProducerBase<Tuple2<Long, String>> prod = kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null); + stream.addSink(prod); // ----------- add consumer dataflow ----------