This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 268f6b5c [FLINK-36176] Remove support for Kafka 0.1 (#115)
268f6b5c is described below
commit 268f6b5c209f119611242400e6d7d2fd1e65a0a5
Author: Arvid Heise <[email protected]>
AuthorDate: Fri Aug 30 12:08:02 2024 +0200
[FLINK-36176] Remove support for Kafka 0.1 (#115)
Warp up FLINK-19152 by also removing all related producer code.
Move FlinkKafkaProducerBase.getPropertiesFromBrokerList to a new test
utility class as this method is used by some tests.
---
.../connectors/kafka/FlinkKafkaProducer011.java | 72 ---
.../connectors/kafka/FlinkKafkaProducerBase.java | 442 -------------------
.../kafka/FlinkKafkaProducerBaseTest.java | 482 ---------------------
.../FlinkKafkaProducerMigrationOperatorTest.java | 60 ---
.../connectors/kafka/KafkaConsumerTestBase.java | 11 +-
.../connectors/kafka/KafkaProducerTestBase.java | 4 +-
.../connectors/kafka/testutils/DataGenerators.java | 7 +-
.../connectors/kafka/testutils/KafkaUtils.java | 41 ++
...11-migration-kafka-producer-flink-1.10-snapshot | Bin 2032 -> 0 bytes
...11-migration-kafka-producer-flink-1.11-snapshot | Bin 2040 -> 0 bytes
....11-migration-kafka-producer-flink-1.8-snapshot | Bin 2032 -> 0 bytes
....11-migration-kafka-producer-flink-1.9-snapshot | Bin 2032 -> 0 bytes
12 files changed, 51 insertions(+), 1068 deletions(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
deleted file mode 100644
index c5b008c1..00000000
---
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
-
-/**
- * Compatibility class to make migration possible from the 0.11 connector to
the universal one.
- *
- * <p>Problem is that FlinkKafkaProducer (universal) and FlinkKafkaProducer011
have different names
- * and they both defined static classes NextTransactionalIdHint,
KafkaTransactionState and
- * KafkaTransactionContext inside the parent classes. This is causing
incompatibility problems since
- * for example FlinkKafkaProducer011.KafkaTransactionState and
- * FlinkKafkaProducer.KafkaTransactionState are treated as completely
incompatible classes, despite
- * being identical.
- *
- * <p>This issue is solved by using custom serialization logic: keeping a
fake/dummy
- * FlinkKafkaProducer011.*Serializer classes in the universal connector (this
class), as entry
- * points for the deserialization and converting them to
FlinkKafkaProducer.*Serializer counter
- * parts. After all serialized binary data are exactly the same in all of
those cases.
- *
- * <p>For more details check FLINK-11249 and the discussion in the pull
requests.
- */
-// CHECKSTYLE:OFF: JavadocType
-public class FlinkKafkaProducer011 {
- public static class NextTransactionalIdHintSerializer {
- public static final class NextTransactionalIdHintSerializerSnapshot
- extends
SimpleTypeSerializerSnapshot<FlinkKafkaProducer.NextTransactionalIdHint> {
- public NextTransactionalIdHintSerializerSnapshot() {
-
super(FlinkKafkaProducer.NextTransactionalIdHintSerializer::new);
- }
- }
- }
-
- public static class ContextStateSerializer {
- public static final class ContextStateSerializerSnapshot
- extends
SimpleTypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionContext> {
- public ContextStateSerializerSnapshot() {
- super(FlinkKafkaProducer.ContextStateSerializer::new);
- }
- }
- }
-
- public static class TransactionStateSerializer {
- public static final class TransactionStateSerializerSnapshot
- extends
SimpleTypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState> {
- public TransactionStateSerializerSnapshot() {
- super(FlinkKafkaProducer.TransactionStateSerializer::new);
- }
- }
- }
-
- public static class NextTransactionalIdHint
- extends FlinkKafkaProducer.NextTransactionalIdHint {}
-}
-// CHECKSTYLE:ON: JavadocType
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
deleted file mode 100644
index 234fd9af..00000000
---
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import
org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
-import
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
-import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.SerializableObject;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Flink Sink to produce data into a Kafka topic.
- *
- * <p>Please note that this producer provides at-least-once reliability
guarantees when checkpoints
- * are enabled and setFlushOnCheckpoint(true) is set. Otherwise, the producer
doesn't provide any
- * reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-@Internal
-public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
- implements CheckpointedFunction {
-
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
-
- private static final long serialVersionUID = 1L;
-
- /** Configuration key for disabling the metrics reporting. */
- public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
-
- /** User defined properties for the Producer. */
- protected final Properties producerConfig;
-
- /** The name of the default topic this producer is writing data to. */
- protected final String defaultTopicId;
-
- /**
- * (Serializable) SerializationSchema for turning objects used with Flink
into. byte[] for
- * Kafka.
- */
- protected final KeyedSerializationSchema<IN> schema;
-
- /** User-provided partitioner for assigning an object to a Kafka partition
for each topic. */
- protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
-
- /** Partitions of each topic. */
- protected final Map<String, int[]> topicPartitionsMap;
-
- /** Flag indicating whether to accept failures (and log them), or to fail
on failures. */
- protected boolean logFailuresOnly;
-
- /**
- * If true, the producer will wait until all outstanding records have been
send to the broker.
- */
- protected boolean flushOnCheckpoint = true;
-
- // -------------------------------- Runtime fields
------------------------------------------
-
- /** KafkaProducer instance. */
- protected transient KafkaProducer<byte[], byte[]> producer;
-
- /** The callback than handles error propagation or logging callbacks. */
- protected transient Callback callback;
-
- /** Errors encountered in the async producer are stored here. */
- protected transient volatile Exception asyncException;
-
- /** Lock for accessing the pending records. */
- protected final SerializableObject pendingRecordsLock = new
SerializableObject();
-
- /** Number of unacknowledged records. */
- protected long pendingRecords;
-
- /**
- * The main constructor for creating a FlinkKafkaProducer.
- *
- * @param defaultTopicId The default topic to write data to
- * @param serializationSchema A serializable serialization schema for
turning user objects into
- * a kafka-consumable byte[] supporting key/value messages
- * @param producerConfig Configuration properties for the KafkaProducer.
'bootstrap.servers.' is
- * the only required argument.
- * @param customPartitioner A serializable partitioner for assigning
messages to Kafka
- * partitions. Passing null will use Kafka's partitioner.
- */
- public FlinkKafkaProducerBase(
- String defaultTopicId,
- KeyedSerializationSchema<IN> serializationSchema,
- Properties producerConfig,
- FlinkKafkaPartitioner<IN> customPartitioner) {
- requireNonNull(defaultTopicId, "TopicID not set");
- requireNonNull(serializationSchema, "serializationSchema not set");
- requireNonNull(producerConfig, "producerConfig not set");
- ClosureCleaner.clean(
- customPartitioner,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
- ClosureCleaner.ensureSerializable(serializationSchema);
-
- this.defaultTopicId = defaultTopicId;
- this.schema = serializationSchema;
- this.producerConfig = producerConfig;
- this.flinkKafkaPartitioner = customPartitioner;
-
- // set the producer configuration properties for kafka record key
value serializers.
- if
(!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
- this.producerConfig.put(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- ByteArraySerializer.class.getName());
- } else {
- LOG.warn(
- "Overwriting the '{}' is not recommended",
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
- }
-
- if
(!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
- this.producerConfig.put(
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- ByteArraySerializer.class.getName());
- } else {
- LOG.warn(
- "Overwriting the '{}' is not recommended",
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- }
-
- // eagerly ensure that bootstrap servers are set.
- if
(!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
- throw new IllegalArgumentException(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
- + " must be supplied in the producer config
properties.");
- }
-
- this.topicPartitionsMap = new HashMap<>();
- }
-
- // ---------------------------------- Properties --------------------------
-
- /**
- * Defines whether the producer should fail on errors, or only log them.
If this is set to true,
- * then exceptions will be only logged, if set to false, exceptions will
be eventually thrown
- * and cause the streaming program to fail (and enter recovery).
- *
- * @param logFailuresOnly The flag to indicate logging-only on exceptions.
- */
- public void setLogFailuresOnly(boolean logFailuresOnly) {
- this.logFailuresOnly = logFailuresOnly;
- }
-
- /**
- * If set to true, the Flink producer will wait for all outstanding
messages in the Kafka
- * buffers to be acknowledged by the Kafka producer on a checkpoint. This
way, the producer can
- * guarantee that messages in the Kafka buffers are part of the checkpoint.
- *
- * @param flush Flag indicating the flushing mode (true = flush on
checkpoint)
- */
- public void setFlushOnCheckpoint(boolean flush) {
- this.flushOnCheckpoint = flush;
- }
-
- /** Used for testing only. */
- @VisibleForTesting
- protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
- return new KafkaProducer<>(props);
- }
-
- // ----------------------------------- Utilities --------------------------
-
- /** Initializes the connection to Kafka. */
- @Override
- public void open(Configuration configuration) throws Exception {
- if (schema instanceof KeyedSerializationSchemaWrapper) {
- ((KeyedSerializationSchemaWrapper<IN>) schema)
- .getSerializationSchema()
- .open(
-
RuntimeContextInitializationContextAdapters.serializationAdapter(
- getRuntimeContext(),
- metricGroup ->
metricGroup.addGroup("user")));
- }
- producer = getKafkaProducer(this.producerConfig);
-
- RuntimeContext ctx = getRuntimeContext();
-
- if (null != flinkKafkaPartitioner) {
- flinkKafkaPartitioner.open(
- ctx.getIndexOfThisSubtask(),
ctx.getNumberOfParallelSubtasks());
- }
-
- LOG.info(
- "Starting FlinkKafkaProducer ({}/{}) to produce into default
topic {}",
- ctx.getIndexOfThisSubtask() + 1,
- ctx.getNumberOfParallelSubtasks(),
- defaultTopicId);
-
- // register Kafka metrics to Flink accumulators
- if
(!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS,
"false"))) {
- Map<MetricName, ? extends Metric> metrics =
this.producer.metrics();
-
- if (metrics == null) {
- // MapR's Kafka implementation returns null here.
- LOG.info("Producer implementation does not support metrics");
- } else {
- final MetricGroup kafkaMetricGroup =
-
getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
- for (Map.Entry<MetricName, ? extends Metric> metric :
metrics.entrySet()) {
- kafkaMetricGroup.gauge(
- metric.getKey().name(), new
KafkaMetricWrapper(metric.getValue()));
- }
- }
- }
-
- if (flushOnCheckpoint
- && !((StreamingRuntimeContext)
this.getRuntimeContext()).isCheckpointingEnabled()) {
- LOG.warn(
- "Flushing on checkpoint is enabled, but checkpointing is
not enabled. Disabling flushing.");
- flushOnCheckpoint = false;
- }
-
- if (logFailuresOnly) {
- callback =
- new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata,
Exception e) {
- if (e != null) {
- LOG.error(
- "Error while sending record to Kafka:
" + e.getMessage(),
- e);
- }
- acknowledgeMessage();
- }
- };
- } else {
- callback =
- new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata,
Exception exception) {
- if (exception != null && asyncException == null) {
- asyncException = exception;
- }
- acknowledgeMessage();
- }
- };
- }
- }
-
- /**
- * Called when new data arrives to the sink, and forwards it to Kafka.
- *
- * @param next The incoming data
- */
- @Override
- public void invoke(IN next, Context context) throws Exception {
- // propagate asynchronous errors
- checkErroneous();
-
- byte[] serializedKey = schema.serializeKey(next);
- byte[] serializedValue = schema.serializeValue(next);
- String targetTopic = schema.getTargetTopic(next);
- if (targetTopic == null) {
- targetTopic = defaultTopicId;
- }
-
- int[] partitions = this.topicPartitionsMap.get(targetTopic);
- if (null == partitions) {
- partitions = getPartitionsByTopic(targetTopic, producer);
- this.topicPartitionsMap.put(targetTopic, partitions);
- }
-
- ProducerRecord<byte[], byte[]> record;
- if (flinkKafkaPartitioner == null) {
- record = new ProducerRecord<>(targetTopic, serializedKey,
serializedValue);
- } else {
- record =
- new ProducerRecord<>(
- targetTopic,
- flinkKafkaPartitioner.partition(
- next, serializedKey, serializedValue,
targetTopic, partitions),
- serializedKey,
- serializedValue);
- }
- if (flushOnCheckpoint) {
- synchronized (pendingRecordsLock) {
- pendingRecords++;
- }
- }
- producer.send(record, callback);
- }
-
- @Override
- public void close() throws Exception {
- if (producer != null) {
- producer.close();
- }
-
- // make sure we propagate pending errors
- checkErroneous();
- }
-
- // ------------------- Logic for handling checkpoint flushing
-------------------------- //
-
- private void acknowledgeMessage() {
- if (flushOnCheckpoint) {
- synchronized (pendingRecordsLock) {
- pendingRecords--;
- if (pendingRecords == 0) {
- pendingRecordsLock.notifyAll();
- }
- }
- }
- }
-
- /** Flush pending records. */
- protected abstract void flush();
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws
Exception {
- // nothing to do
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
- // check for asynchronous errors and fail the checkpoint if necessary
- checkErroneous();
-
- if (flushOnCheckpoint) {
- // flushing is activated: We need to wait until pendingRecords is 0
- flush();
- synchronized (pendingRecordsLock) {
- if (pendingRecords != 0) {
- throw new IllegalStateException(
- "Pending record count must be zero at this point:
" + pendingRecords);
- }
-
- // if the flushed requests has errors, we should propagate it
also and fail the
- // checkpoint
- checkErroneous();
- }
- }
- }
-
- // ----------------------------------- Utilities --------------------------
-
- protected void checkErroneous() throws Exception {
- Exception e = asyncException;
- if (e != null) {
- // prevent double throwing
- asyncException = null;
- throw new Exception("Failed to send data to Kafka: " +
e.getMessage(), e);
- }
- }
-
- public static Properties getPropertiesFromBrokerList(String brokerList) {
- String[] elements = brokerList.split(",");
-
- // validate the broker addresses
- for (String broker : elements) {
- NetUtils.getCorrectHostnamePort(broker);
- }
-
- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- return props;
- }
-
- protected static int[] getPartitionsByTopic(
- String topic, KafkaProducer<byte[], byte[]> producer) {
- // the fetched list is immutable, so we're creating a mutable copy in
order to sort it
- List<PartitionInfo> partitionsList = new
ArrayList<>(producer.partitionsFor(topic));
-
- // sort the partitions by partition id to make sure the fetched
partition list is the same
- // across subtasks
- Collections.sort(
- partitionsList,
- new Comparator<PartitionInfo>() {
- @Override
- public int compare(PartitionInfo o1, PartitionInfo o2) {
- return Integer.compare(o1.partition(), o2.partition());
- }
- });
-
- int[] partitions = new int[partitionsList.size()];
- for (int i = 0; i < partitions.length; i++) {
- partitions[i] = partitionsList.get(i).partition();
- }
-
- return partitions;
- }
-
- @VisibleForTesting
- protected long numPendingRecords() {
- synchronized (pendingRecordsLock) {
- return pendingRecords;
- }
- }
-}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
deleted file mode 100644
index 4274fcff..00000000
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
-import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/** Tests for the {@link FlinkKafkaProducerBase}. */
-public class FlinkKafkaProducerBaseTest {
-
- /** Tests that the constructor eagerly checks bootstrap servers are set in
config. */
- @Test(expected = IllegalArgumentException.class)
- public void testInstantiationFailsWhenBootstrapServersMissing() throws
Exception {
- // no bootstrap servers set in props
- Properties props = new Properties();
- // should throw IllegalArgumentException
- new DummyFlinkKafkaProducer<>(
- props, new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()), null);
- }
-
- /**
- * Tests that constructor defaults to key value serializers in config to
byte array
- * deserializers if not set.
- */
- @Test
- public void testKeyValueDeserializersSetIfMissing() throws Exception {
- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:12345");
- // should set missing key value deserializers
- new DummyFlinkKafkaProducer<>(
- props, new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()), null);
-
- assertThat(props)
- .containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
- .containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-
assertThat(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
- .isEqualTo(ByteArraySerializer.class.getName());
-
assertThat(props.getProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
- .isEqualTo(ByteArraySerializer.class.getName());
- }
-
- /** Tests that partitions list is determinate and correctly provided to
custom partitioner. */
- @SuppressWarnings("unchecked")
- @Test
- public void testPartitionerInvokedWithDeterminatePartitionList() throws
Exception {
- FlinkKafkaPartitioner<String> mockPartitioner =
mock(FlinkKafkaPartitioner.class);
-
- RuntimeContext mockRuntimeContext =
mock(StreamingRuntimeContext.class);
- when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
- when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
-
- // out-of-order list of 4 partitions
- List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
- mockPartitionsList.add(
- new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3,
null, null, null));
- mockPartitionsList.add(
- new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1,
null, null, null));
- mockPartitionsList.add(
- new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0,
null, null, null));
- mockPartitionsList.add(
- new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2,
null, null, null));
-
- final DummyFlinkKafkaProducer<String> producer =
- new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(),
- new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()),
- mockPartitioner);
- producer.setRuntimeContext(mockRuntimeContext);
-
- final KafkaProducer mockProducer = producer.getMockKafkaProducer();
-
when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
- when(mockProducer.metrics()).thenReturn(null);
-
- producer.open(new Configuration());
- verify(mockPartitioner, times(1)).open(0, 1);
-
- producer.invoke("foobar", SinkContextUtil.forTimestamp(0));
- verify(mockPartitioner, times(1))
- .partition(
- "foobar",
- null,
- "foobar".getBytes(),
- DummyFlinkKafkaProducer.DUMMY_TOPIC,
- new int[] {0, 1, 2, 3});
- }
-
- /**
- * Test ensuring that if an invoke call happens right after an async
exception is caught, it
- * should be rethrown.
- */
- @Test
- public void testAsyncErrorRethrownOnInvoke() throws Throwable {
- final DummyFlinkKafkaProducer<String> producer =
- new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(),
- new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()),
- null);
-
- OneInputStreamOperatorTestHarness<String, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>("msg-1"));
-
- // let the message request return an async exception
- producer.getPendingCallbacks()
- .get(0)
- .onCompletion(null, new Exception("artificial async
exception"));
-
- try {
- testHarness.processElement(new StreamRecord<>("msg-2"));
- } catch (Exception e) {
- // the next invoke should rethrow the async exception
- assertThat(e.getCause().getMessage()).contains("artificial async
exception");
-
- // test succeeded
- return;
- }
-
- fail("unknown failure");
- }
-
- /**
- * Test ensuring that if a snapshot call happens right after an async
exception is caught, it
- * should be rethrown.
- */
- @Test
- public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
- final DummyFlinkKafkaProducer<String> producer =
- new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(),
- new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()),
- null);
-
- OneInputStreamOperatorTestHarness<String, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>("msg-1"));
-
- // let the message request return an async exception
- producer.getPendingCallbacks()
- .get(0)
- .onCompletion(null, new Exception("artificial async
exception"));
-
- try {
- testHarness.snapshot(123L, 123L);
- } catch (Exception e) {
- // the next invoke should rethrow the async exception
- assertThat(e.getCause().getMessage()).contains("artificial async
exception");
-
- // test succeeded
- return;
- }
-
- fail("unknown failure");
- }
-
- /**
- * Test ensuring that if an async exception is caught for one of the
flushed requests on
- * checkpoint, it should be rethrown; we set a timeout because the test
will not finish if the
- * logic is broken.
- *
- * <p>Note that this test does not test the snapshot method is blocked
correctly when there are
- * pending records. The test for that is covered in
testAtLeastOnceProducer.
- */
- @SuppressWarnings("unchecked")
- @Test(timeout = 5000)
- public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws
Throwable {
- final DummyFlinkKafkaProducer<String> producer =
- new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(),
- new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()),
- null);
- producer.setFlushOnCheckpoint(true);
-
- final KafkaProducer<?, ?> mockProducer =
producer.getMockKafkaProducer();
-
- final OneInputStreamOperatorTestHarness<String, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>("msg-1"));
- testHarness.processElement(new StreamRecord<>("msg-2"));
- testHarness.processElement(new StreamRecord<>("msg-3"));
-
- verify(mockProducer, times(3)).send(any(ProducerRecord.class),
any(Callback.class));
-
- // only let the first callback succeed for now
- producer.getPendingCallbacks().get(0).onCompletion(null, null);
-
- CheckedThread snapshotThread =
- new CheckedThread() {
- @Override
- public void go() throws Exception {
- // this should block at first, since there are still
two pending records
- // that needs to be flushed
- testHarness.snapshot(123L, 123L);
- }
- };
- snapshotThread.start();
-
- // let the 2nd message fail with an async exception
- producer.getPendingCallbacks()
- .get(1)
- .onCompletion(null, new Exception("artificial async failure
for 2nd message"));
- producer.getPendingCallbacks().get(2).onCompletion(null, null);
-
- try {
- snapshotThread.sync();
- } catch (Exception e) {
- // the snapshot should have failed with the async exception
- assertThat(e.getCause().getMessage())
- .contains("artificial async failure for 2nd message");
-
- // test succeeded
- return;
- }
-
- fail("unknown failure");
- }
-
- /**
- * Test ensuring that the producer is not dropping buffered records; we
set a timeout because
- * the test will not finish if the logic is broken.
- */
- @SuppressWarnings("unchecked")
- @Test(timeout = 10000)
- public void testAtLeastOnceProducer() throws Throwable {
- final DummyFlinkKafkaProducer<String> producer =
- new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(),
- new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()),
- null);
- producer.setFlushOnCheckpoint(true);
-
- final KafkaProducer<?, ?> mockProducer =
producer.getMockKafkaProducer();
-
- final OneInputStreamOperatorTestHarness<String, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>("msg-1"));
- testHarness.processElement(new StreamRecord<>("msg-2"));
- testHarness.processElement(new StreamRecord<>("msg-3"));
-
- verify(mockProducer, times(3)).send(any(ProducerRecord.class),
any(Callback.class));
- assertThat(producer.getPendingSize()).isEqualTo(3);
-
- // start a thread to perform checkpointing
- CheckedThread snapshotThread =
- new CheckedThread() {
- @Override
- public void go() throws Exception {
- // this should block until all records are flushed;
- // if the snapshot implementation returns before
pending records are
- // flushed,
- testHarness.snapshot(123L, 123L);
- }
- };
- snapshotThread.start();
-
- // before proceeding, make sure that flushing has started and that the
snapshot is still
- // blocked;
- // this would block forever if the snapshot didn't perform a flush
- producer.waitUntilFlushStarted();
- assertThat(snapshotThread.isAlive())
- .as("Snapshot returned before all records were flushed")
- .isTrue();
-
- // now, complete the callbacks
- producer.getPendingCallbacks().get(0).onCompletion(null, null);
- assertThat(snapshotThread.isAlive())
- .as("Snapshot returned before all records were flushed")
- .isTrue();
- assertThat(producer.getPendingSize()).isEqualTo(2);
-
- producer.getPendingCallbacks().get(1).onCompletion(null, null);
- assertThat(snapshotThread.isAlive())
- .as("Snapshot returned before all records were flushed")
- .isTrue();
- assertThat(producer.getPendingSize()).isEqualTo(1);
-
- producer.getPendingCallbacks().get(2).onCompletion(null, null);
- assertThat(producer.getPendingSize()).isEqualTo(0);
-
- // this would fail with an exception if flushing wasn't completed
before the snapshot method
- // returned
- snapshotThread.sync();
-
- testHarness.close();
- }
-
- /**
- * This test is meant to assure that testAtLeastOnceProducer is valid by
testing that if
- * flushing is disabled, the snapshot method does indeed finishes without
waiting for pending
- * records; we set a timeout because the test will not finish if the logic
is broken.
- */
- @SuppressWarnings("unchecked")
- @Test(timeout = 5000)
- public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws
Throwable {
- final DummyFlinkKafkaProducer<String> producer =
- new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(),
- new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()),
- null);
- producer.setFlushOnCheckpoint(false);
-
- final KafkaProducer<?, ?> mockProducer =
producer.getMockKafkaProducer();
-
- final OneInputStreamOperatorTestHarness<String, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>("msg"));
-
- // make sure that all callbacks have not been completed
- verify(mockProducer, times(1)).send(any(ProducerRecord.class),
any(Callback.class));
-
- // should return even if there are pending records
- testHarness.snapshot(123L, 123L);
-
- testHarness.close();
- }
-
- // ------------------------------------------------------------------------
-
- private static class DummyFlinkKafkaProducer<T> extends
FlinkKafkaProducerBase<T> {
- private static final long serialVersionUID = 1L;
-
- private static final String DUMMY_TOPIC = "dummy-topic";
-
- private transient KafkaProducer<?, ?> mockProducer;
- private transient List<Callback> pendingCallbacks;
- private transient MultiShotLatch flushLatch;
- private boolean isFlushed;
-
- @SuppressWarnings("unchecked")
- DummyFlinkKafkaProducer(
- Properties producerConfig,
- KeyedSerializationSchema<T> schema,
- FlinkKafkaPartitioner partitioner) {
-
- super(DUMMY_TOPIC, schema, producerConfig, partitioner);
-
- this.mockProducer = mock(KafkaProducer.class);
- when(mockProducer.send(any(ProducerRecord.class),
any(Callback.class)))
- .thenAnswer(
- new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock
invocationOnMock)
- throws Throwable {
-
pendingCallbacks.add(invocationOnMock.getArgument(1));
- return null;
- }
- });
-
- this.pendingCallbacks = new ArrayList<>();
- this.flushLatch = new MultiShotLatch();
- }
-
- long getPendingSize() {
- if (flushOnCheckpoint) {
- return numPendingRecords();
- } else {
- // when flushing is disabled, the implementation does not
- // maintain the current number of pending records to reduce
- // the extra locking overhead required to do so
- throw new UnsupportedOperationException(
- "getPendingSize not supported when flushing is
disabled");
- }
- }
-
- List<Callback> getPendingCallbacks() {
- return pendingCallbacks;
- }
-
- KafkaProducer<?, ?> getMockKafkaProducer() {
- return mockProducer;
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext ctx) throws
Exception {
- isFlushed = false;
-
- super.snapshotState(ctx);
-
- // if the snapshot implementation doesn't wait until all pending
records are flushed, we
- // should fail the test
- if (flushOnCheckpoint && !isFlushed) {
- throw new RuntimeException(
- "Flushing is enabled; snapshots should be blocked
until all pending records are flushed");
- }
- }
-
- public void waitUntilFlushStarted() throws Exception {
- flushLatch.await();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties
props) {
- return (KafkaProducer<K, V>) mockProducer;
- }
-
- @Override
- protected void flush() {
- flushLatch.trigger();
-
- // simply wait until the producer's pending records become zero.
- // This relies on the fact that the producer's Callback
implementation
- // and pending records tracking logic is implemented correctly,
otherwise
- // we will loop forever.
- while (numPendingRecords() > 0) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- throw new RuntimeException("Unable to flush producer, task
was interrupted");
- }
- }
-
- isFlushed = true;
- }
- }
-}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationOperatorTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationOperatorTest.java
deleted file mode 100644
index 5e87f04b..00000000
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationOperatorTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.FlinkVersion;
-
-import org.junit.Ignore;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * Migration test from FlinkKafkaProducer011 operator. This test depends on
the resource generated
- * by {@link FlinkKafkaProducer011MigrationTest#writeSnapshot()}.
- *
- * <p>Warning: We need to rename the generated resource based on the file
naming pattern specified
- * by the {@link #getOperatorSnapshotPath(FlinkVersion)} method then copy the
resource to the path
- * also specified by the {@link #getOperatorSnapshotPath(FlinkVersion)} method.
- */
-public class FlinkKafkaProducerMigrationOperatorTest extends
FlinkKafkaProducerMigrationTest {
- @Parameterized.Parameters(name = "Migration Savepoint: {0}")
- public static Collection<FlinkVersion> parameters() {
- return Arrays.asList(
- FlinkVersion.v1_8, FlinkVersion.v1_9, FlinkVersion.v1_10,
FlinkVersion.v1_11);
- }
-
- public FlinkKafkaProducerMigrationOperatorTest(FlinkVersion
testMigrateVersion) {
- super(testMigrateVersion);
- }
-
- @Override
- public String getOperatorSnapshotPath(FlinkVersion version) {
- return "src/test/resources/kafka-0.11-migration-kafka-producer-flink-"
- + version
- + "-snapshot";
- }
-
- @Ignore
- @Override
- public void writeSnapshot() throws Exception {
- throw new UnsupportedOperationException();
- }
-}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 88f6ac60..026d49bd 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -69,6 +69,7 @@ import
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializatio
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.KafkaUtils;
import
org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
import
org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
@@ -869,7 +870,7 @@ public abstract class KafkaConsumerTestBase extends
KafkaTestBaseWithFlink {
}
});
Properties producerProperties =
-
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "3");
producerProperties.putAll(secureProps);
kafkaServer.produceIntoKafka(stream, topic, sinkSchema,
producerProperties, null);
@@ -1550,7 +1551,7 @@ public abstract class KafkaConsumerTestBase extends
KafkaTestBaseWithFlink {
new TypeInformationKeyValueSerializationSchema<>(
Long.class, PojoValue.class, env.getConfig());
Properties producerProperties =
-
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "3");
kafkaServer.produceIntoKafka(kvStream, topic, schema,
producerProperties, null);
env.execute("Write KV to Kafka");
@@ -1646,7 +1647,7 @@ public abstract class KafkaConsumerTestBase extends
KafkaTestBaseWithFlink {
byte[].class, PojoValue.class, env.getConfig());
Properties producerProperties =
-
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "3");
producerProperties.putAll(secureProps);
kafkaServer.produceIntoKafka(kvStream, topic, schema,
producerProperties, null);
@@ -2288,7 +2289,7 @@ public abstract class KafkaConsumerTestBase extends
KafkaTestBaseWithFlink {
// the producer must not produce duplicates
Properties producerProperties =
-
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "0");
producerProperties.putAll(secureProps);
@@ -2392,7 +2393,7 @@ public abstract class KafkaConsumerTestBase extends
KafkaTestBaseWithFlink {
// the producer must not produce duplicates
Properties producerProperties =
-
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "0");
producerProperties.putAll(secureProps);
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 62438106..cf3bf463 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -36,6 +36,7 @@ import
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationS
import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource;
+import org.apache.flink.streaming.connectors.kafka.testutils.KafkaUtils;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Preconditions;
@@ -148,8 +149,7 @@ public abstract class KafkaProducerTestBase extends
KafkaTestBaseWithFlink {
.setParallelism(1);
Properties props = new Properties();
- props.putAll(
-
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
+
props.putAll(KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings));
props.putAll(secureProps);
// sink partitions into
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index d660bd2f..be3651e5 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -98,8 +97,7 @@ public class DataGenerators {
Properties props = new Properties();
props.putAll(
- FlinkKafkaProducerBase.getPropertiesFromBrokerList(
- testServer.getBrokerConnectionString()));
+
KafkaUtils.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()));
Properties secureProps = testServer.getSecureProperties();
if (secureProps != null) {
props.putAll(testServer.getSecureProperties());
@@ -156,8 +154,7 @@ public class DataGenerators {
OneInputStreamOperatorTestHarness<String, Object> testHarness =
null;
try {
Properties producerProperties =
- FlinkKafkaProducerBase.getPropertiesFromBrokerList(
- server.getBrokerConnectionString());
+
KafkaUtils.getPropertiesFromBrokerList(server.getBrokerConnectionString());
producerProperties.setProperty("retries", "3");
StreamSink<String> sink =
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/KafkaUtils.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/KafkaUtils.java
new file mode 100644
index 00000000..eeb8ce04
--- /dev/null
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/KafkaUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import java.util.Properties;
+
+/** Some utilities methods around Kafka. */
+public class KafkaUtils {
+ public static Properties getPropertiesFromBrokerList(String brokerList) {
+ String[] elements = brokerList.split(",");
+
+ // validate the broker addresses
+ for (String broker : elements) {
+ NetUtils.getCorrectHostnamePort(broker);
+ }
+
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ return props;
+ }
+}
diff --git
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.10-snapshot
b/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.10-snapshot
deleted file mode 100644
index f3e6c74e..00000000
Binary files
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.10-snapshot
and /dev/null differ
diff --git
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.11-snapshot
b/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.11-snapshot
deleted file mode 100644
index f80a4295..00000000
Binary files
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.11-snapshot
and /dev/null differ
diff --git
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.8-snapshot
b/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.8-snapshot
deleted file mode 100644
index 0a22c693..00000000
Binary files
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.8-snapshot
and /dev/null differ
diff --git
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.9-snapshot
b/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.9-snapshot
deleted file mode 100644
index c4f5416a..00000000
Binary files
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.9-snapshot
and /dev/null differ