This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 268f6b5c [FLINK-36176] Remove support for Kafka 0.1 (#115)
268f6b5c is described below

commit 268f6b5c209f119611242400e6d7d2fd1e65a0a5
Author: Arvid Heise <[email protected]>
AuthorDate: Fri Aug 30 12:08:02 2024 +0200

    [FLINK-36176] Remove support for Kafka 0.1 (#115)
    
    Warp up FLINK-19152 by also removing all related producer code.
    
    Move FlinkKafkaProducerBase.getPropertiesFromBrokerList to a new test 
utility class as this method is used by some tests.
---
 .../connectors/kafka/FlinkKafkaProducer011.java    |  72 ---
 .../connectors/kafka/FlinkKafkaProducerBase.java   | 442 -------------------
 .../kafka/FlinkKafkaProducerBaseTest.java          | 482 ---------------------
 .../FlinkKafkaProducerMigrationOperatorTest.java   |  60 ---
 .../connectors/kafka/KafkaConsumerTestBase.java    |  11 +-
 .../connectors/kafka/KafkaProducerTestBase.java    |   4 +-
 .../connectors/kafka/testutils/DataGenerators.java |   7 +-
 .../connectors/kafka/testutils/KafkaUtils.java     |  41 ++
 ...11-migration-kafka-producer-flink-1.10-snapshot | Bin 2032 -> 0 bytes
 ...11-migration-kafka-producer-flink-1.11-snapshot | Bin 2040 -> 0 bytes
 ....11-migration-kafka-producer-flink-1.8-snapshot | Bin 2032 -> 0 bytes
 ....11-migration-kafka-producer-flink-1.9-snapshot | Bin 2032 -> 0 bytes
 12 files changed, 51 insertions(+), 1068 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
deleted file mode 100644
index c5b008c1..00000000
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
-
-/**
- * Compatibility class to make migration possible from the 0.11 connector to 
the universal one.
- *
- * <p>Problem is that FlinkKafkaProducer (universal) and FlinkKafkaProducer011 
have different names
- * and they both defined static classes NextTransactionalIdHint, 
KafkaTransactionState and
- * KafkaTransactionContext inside the parent classes. This is causing 
incompatibility problems since
- * for example FlinkKafkaProducer011.KafkaTransactionState and
- * FlinkKafkaProducer.KafkaTransactionState are treated as completely 
incompatible classes, despite
- * being identical.
- *
- * <p>This issue is solved by using custom serialization logic: keeping a 
fake/dummy
- * FlinkKafkaProducer011.*Serializer classes in the universal connector (this 
class), as entry
- * points for the deserialization and converting them to 
FlinkKafkaProducer.*Serializer counter
- * parts. After all serialized binary data are exactly the same in all of 
those cases.
- *
- * <p>For more details check FLINK-11249 and the discussion in the pull 
requests.
- */
-// CHECKSTYLE:OFF: JavadocType
-public class FlinkKafkaProducer011 {
-    public static class NextTransactionalIdHintSerializer {
-        public static final class NextTransactionalIdHintSerializerSnapshot
-                extends 
SimpleTypeSerializerSnapshot<FlinkKafkaProducer.NextTransactionalIdHint> {
-            public NextTransactionalIdHintSerializerSnapshot() {
-                
super(FlinkKafkaProducer.NextTransactionalIdHintSerializer::new);
-            }
-        }
-    }
-
-    public static class ContextStateSerializer {
-        public static final class ContextStateSerializerSnapshot
-                extends 
SimpleTypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionContext> {
-            public ContextStateSerializerSnapshot() {
-                super(FlinkKafkaProducer.ContextStateSerializer::new);
-            }
-        }
-    }
-
-    public static class TransactionStateSerializer {
-        public static final class TransactionStateSerializerSnapshot
-                extends 
SimpleTypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState> {
-            public TransactionStateSerializerSnapshot() {
-                super(FlinkKafkaProducer.TransactionStateSerializer::new);
-            }
-        }
-    }
-
-    public static class NextTransactionalIdHint
-            extends FlinkKafkaProducer.NextTransactionalIdHint {}
-}
-// CHECKSTYLE:ON: JavadocType
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
deleted file mode 100644
index 234fd9af..00000000
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import 
org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
-import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.SerializableObject;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Flink Sink to produce data into a Kafka topic.
- *
- * <p>Please note that this producer provides at-least-once reliability 
guarantees when checkpoints
- * are enabled and setFlushOnCheckpoint(true) is set. Otherwise, the producer 
doesn't provide any
- * reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-@Internal
-public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
-        implements CheckpointedFunction {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
-
-    private static final long serialVersionUID = 1L;
-
-    /** Configuration key for disabling the metrics reporting. */
-    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
-
-    /** User defined properties for the Producer. */
-    protected final Properties producerConfig;
-
-    /** The name of the default topic this producer is writing data to. */
-    protected final String defaultTopicId;
-
-    /**
-     * (Serializable) SerializationSchema for turning objects used with Flink 
into. byte[] for
-     * Kafka.
-     */
-    protected final KeyedSerializationSchema<IN> schema;
-
-    /** User-provided partitioner for assigning an object to a Kafka partition 
for each topic. */
-    protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
-
-    /** Partitions of each topic. */
-    protected final Map<String, int[]> topicPartitionsMap;
-
-    /** Flag indicating whether to accept failures (and log them), or to fail 
on failures. */
-    protected boolean logFailuresOnly;
-
-    /**
-     * If true, the producer will wait until all outstanding records have been 
send to the broker.
-     */
-    protected boolean flushOnCheckpoint = true;
-
-    // -------------------------------- Runtime fields 
------------------------------------------
-
-    /** KafkaProducer instance. */
-    protected transient KafkaProducer<byte[], byte[]> producer;
-
-    /** The callback than handles error propagation or logging callbacks. */
-    protected transient Callback callback;
-
-    /** Errors encountered in the async producer are stored here. */
-    protected transient volatile Exception asyncException;
-
-    /** Lock for accessing the pending records. */
-    protected final SerializableObject pendingRecordsLock = new 
SerializableObject();
-
-    /** Number of unacknowledged records. */
-    protected long pendingRecords;
-
-    /**
-     * The main constructor for creating a FlinkKafkaProducer.
-     *
-     * @param defaultTopicId The default topic to write data to
-     * @param serializationSchema A serializable serialization schema for 
turning user objects into
-     *     a kafka-consumable byte[] supporting key/value messages
-     * @param producerConfig Configuration properties for the KafkaProducer. 
'bootstrap.servers.' is
-     *     the only required argument.
-     * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka
-     *     partitions. Passing null will use Kafka's partitioner.
-     */
-    public FlinkKafkaProducerBase(
-            String defaultTopicId,
-            KeyedSerializationSchema<IN> serializationSchema,
-            Properties producerConfig,
-            FlinkKafkaPartitioner<IN> customPartitioner) {
-        requireNonNull(defaultTopicId, "TopicID not set");
-        requireNonNull(serializationSchema, "serializationSchema not set");
-        requireNonNull(producerConfig, "producerConfig not set");
-        ClosureCleaner.clean(
-                customPartitioner, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
-        ClosureCleaner.ensureSerializable(serializationSchema);
-
-        this.defaultTopicId = defaultTopicId;
-        this.schema = serializationSchema;
-        this.producerConfig = producerConfig;
-        this.flinkKafkaPartitioner = customPartitioner;
-
-        // set the producer configuration properties for kafka record key 
value serializers.
-        if 
(!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
-            this.producerConfig.put(
-                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                    ByteArraySerializer.class.getName());
-        } else {
-            LOG.warn(
-                    "Overwriting the '{}' is not recommended",
-                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-        }
-
-        if 
(!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
-            this.producerConfig.put(
-                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                    ByteArraySerializer.class.getName());
-        } else {
-            LOG.warn(
-                    "Overwriting the '{}' is not recommended",
-                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-        }
-
-        // eagerly ensure that bootstrap servers are set.
-        if 
(!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
-            throw new IllegalArgumentException(
-                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-                            + " must be supplied in the producer config 
properties.");
-        }
-
-        this.topicPartitionsMap = new HashMap<>();
-    }
-
-    // ---------------------------------- Properties --------------------------
-
-    /**
-     * Defines whether the producer should fail on errors, or only log them. 
If this is set to true,
-     * then exceptions will be only logged, if set to false, exceptions will 
be eventually thrown
-     * and cause the streaming program to fail (and enter recovery).
-     *
-     * @param logFailuresOnly The flag to indicate logging-only on exceptions.
-     */
-    public void setLogFailuresOnly(boolean logFailuresOnly) {
-        this.logFailuresOnly = logFailuresOnly;
-    }
-
-    /**
-     * If set to true, the Flink producer will wait for all outstanding 
messages in the Kafka
-     * buffers to be acknowledged by the Kafka producer on a checkpoint. This 
way, the producer can
-     * guarantee that messages in the Kafka buffers are part of the checkpoint.
-     *
-     * @param flush Flag indicating the flushing mode (true = flush on 
checkpoint)
-     */
-    public void setFlushOnCheckpoint(boolean flush) {
-        this.flushOnCheckpoint = flush;
-    }
-
-    /** Used for testing only. */
-    @VisibleForTesting
-    protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
-        return new KafkaProducer<>(props);
-    }
-
-    // ----------------------------------- Utilities --------------------------
-
-    /** Initializes the connection to Kafka. */
-    @Override
-    public void open(Configuration configuration) throws Exception {
-        if (schema instanceof KeyedSerializationSchemaWrapper) {
-            ((KeyedSerializationSchemaWrapper<IN>) schema)
-                    .getSerializationSchema()
-                    .open(
-                            
RuntimeContextInitializationContextAdapters.serializationAdapter(
-                                    getRuntimeContext(),
-                                    metricGroup -> 
metricGroup.addGroup("user")));
-        }
-        producer = getKafkaProducer(this.producerConfig);
-
-        RuntimeContext ctx = getRuntimeContext();
-
-        if (null != flinkKafkaPartitioner) {
-            flinkKafkaPartitioner.open(
-                    ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks());
-        }
-
-        LOG.info(
-                "Starting FlinkKafkaProducer ({}/{}) to produce into default 
topic {}",
-                ctx.getIndexOfThisSubtask() + 1,
-                ctx.getNumberOfParallelSubtasks(),
-                defaultTopicId);
-
-        // register Kafka metrics to Flink accumulators
-        if 
(!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, 
"false"))) {
-            Map<MetricName, ? extends Metric> metrics = 
this.producer.metrics();
-
-            if (metrics == null) {
-                // MapR's Kafka implementation returns null here.
-                LOG.info("Producer implementation does not support metrics");
-            } else {
-                final MetricGroup kafkaMetricGroup =
-                        
getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
-                for (Map.Entry<MetricName, ? extends Metric> metric : 
metrics.entrySet()) {
-                    kafkaMetricGroup.gauge(
-                            metric.getKey().name(), new 
KafkaMetricWrapper(metric.getValue()));
-                }
-            }
-        }
-
-        if (flushOnCheckpoint
-                && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
-            LOG.warn(
-                    "Flushing on checkpoint is enabled, but checkpointing is 
not enabled. Disabling flushing.");
-            flushOnCheckpoint = false;
-        }
-
-        if (logFailuresOnly) {
-            callback =
-                    new Callback() {
-                        @Override
-                        public void onCompletion(RecordMetadata metadata, 
Exception e) {
-                            if (e != null) {
-                                LOG.error(
-                                        "Error while sending record to Kafka: 
" + e.getMessage(),
-                                        e);
-                            }
-                            acknowledgeMessage();
-                        }
-                    };
-        } else {
-            callback =
-                    new Callback() {
-                        @Override
-                        public void onCompletion(RecordMetadata metadata, 
Exception exception) {
-                            if (exception != null && asyncException == null) {
-                                asyncException = exception;
-                            }
-                            acknowledgeMessage();
-                        }
-                    };
-        }
-    }
-
-    /**
-     * Called when new data arrives to the sink, and forwards it to Kafka.
-     *
-     * @param next The incoming data
-     */
-    @Override
-    public void invoke(IN next, Context context) throws Exception {
-        // propagate asynchronous errors
-        checkErroneous();
-
-        byte[] serializedKey = schema.serializeKey(next);
-        byte[] serializedValue = schema.serializeValue(next);
-        String targetTopic = schema.getTargetTopic(next);
-        if (targetTopic == null) {
-            targetTopic = defaultTopicId;
-        }
-
-        int[] partitions = this.topicPartitionsMap.get(targetTopic);
-        if (null == partitions) {
-            partitions = getPartitionsByTopic(targetTopic, producer);
-            this.topicPartitionsMap.put(targetTopic, partitions);
-        }
-
-        ProducerRecord<byte[], byte[]> record;
-        if (flinkKafkaPartitioner == null) {
-            record = new ProducerRecord<>(targetTopic, serializedKey, 
serializedValue);
-        } else {
-            record =
-                    new ProducerRecord<>(
-                            targetTopic,
-                            flinkKafkaPartitioner.partition(
-                                    next, serializedKey, serializedValue, 
targetTopic, partitions),
-                            serializedKey,
-                            serializedValue);
-        }
-        if (flushOnCheckpoint) {
-            synchronized (pendingRecordsLock) {
-                pendingRecords++;
-            }
-        }
-        producer.send(record, callback);
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (producer != null) {
-            producer.close();
-        }
-
-        // make sure we propagate pending errors
-        checkErroneous();
-    }
-
-    // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
-
-    private void acknowledgeMessage() {
-        if (flushOnCheckpoint) {
-            synchronized (pendingRecordsLock) {
-                pendingRecords--;
-                if (pendingRecords == 0) {
-                    pendingRecordsLock.notifyAll();
-                }
-            }
-        }
-    }
-
-    /** Flush pending records. */
-    protected abstract void flush();
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws 
Exception {
-        // nothing to do
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
-        // check for asynchronous errors and fail the checkpoint if necessary
-        checkErroneous();
-
-        if (flushOnCheckpoint) {
-            // flushing is activated: We need to wait until pendingRecords is 0
-            flush();
-            synchronized (pendingRecordsLock) {
-                if (pendingRecords != 0) {
-                    throw new IllegalStateException(
-                            "Pending record count must be zero at this point: 
" + pendingRecords);
-                }
-
-                // if the flushed requests has errors, we should propagate it 
also and fail the
-                // checkpoint
-                checkErroneous();
-            }
-        }
-    }
-
-    // ----------------------------------- Utilities --------------------------
-
-    protected void checkErroneous() throws Exception {
-        Exception e = asyncException;
-        if (e != null) {
-            // prevent double throwing
-            asyncException = null;
-            throw new Exception("Failed to send data to Kafka: " + 
e.getMessage(), e);
-        }
-    }
-
-    public static Properties getPropertiesFromBrokerList(String brokerList) {
-        String[] elements = brokerList.split(",");
-
-        // validate the broker addresses
-        for (String broker : elements) {
-            NetUtils.getCorrectHostnamePort(broker);
-        }
-
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
-        return props;
-    }
-
-    protected static int[] getPartitionsByTopic(
-            String topic, KafkaProducer<byte[], byte[]> producer) {
-        // the fetched list is immutable, so we're creating a mutable copy in 
order to sort it
-        List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(topic));
-
-        // sort the partitions by partition id to make sure the fetched 
partition list is the same
-        // across subtasks
-        Collections.sort(
-                partitionsList,
-                new Comparator<PartitionInfo>() {
-                    @Override
-                    public int compare(PartitionInfo o1, PartitionInfo o2) {
-                        return Integer.compare(o1.partition(), o2.partition());
-                    }
-                });
-
-        int[] partitions = new int[partitionsList.size()];
-        for (int i = 0; i < partitions.length; i++) {
-            partitions[i] = partitionsList.get(i).partition();
-        }
-
-        return partitions;
-    }
-
-    @VisibleForTesting
-    protected long numPendingRecords() {
-        synchronized (pendingRecordsLock) {
-            return pendingRecords;
-        }
-    }
-}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
deleted file mode 100644
index 4274fcff..00000000
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/** Tests for the {@link FlinkKafkaProducerBase}. */
-public class FlinkKafkaProducerBaseTest {
-
-    /** Tests that the constructor eagerly checks bootstrap servers are set in 
config. */
-    @Test(expected = IllegalArgumentException.class)
-    public void testInstantiationFailsWhenBootstrapServersMissing() throws 
Exception {
-        // no bootstrap servers set in props
-        Properties props = new Properties();
-        // should throw IllegalArgumentException
-        new DummyFlinkKafkaProducer<>(
-                props, new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), null);
-    }
-
-    /**
-     * Tests that constructor defaults to key value serializers in config to 
byte array
-     * deserializers if not set.
-     */
-    @Test
-    public void testKeyValueDeserializersSetIfMissing() throws Exception {
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:12345");
-        // should set missing key value deserializers
-        new DummyFlinkKafkaProducer<>(
-                props, new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), null);
-
-        assertThat(props)
-                .containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
-                .containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-        
assertThat(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
-                .isEqualTo(ByteArraySerializer.class.getName());
-        
assertThat(props.getProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
-                .isEqualTo(ByteArraySerializer.class.getName());
-    }
-
-    /** Tests that partitions list is determinate and correctly provided to 
custom partitioner. */
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testPartitionerInvokedWithDeterminatePartitionList() throws 
Exception {
-        FlinkKafkaPartitioner<String> mockPartitioner = 
mock(FlinkKafkaPartitioner.class);
-
-        RuntimeContext mockRuntimeContext = 
mock(StreamingRuntimeContext.class);
-        when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
-        when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
-
-        // out-of-order list of 4 partitions
-        List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
-        mockPartitionsList.add(
-                new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, 
null, null, null));
-        mockPartitionsList.add(
-                new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, 
null, null, null));
-        mockPartitionsList.add(
-                new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, 
null, null, null));
-        mockPartitionsList.add(
-                new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, 
null, null, null));
-
-        final DummyFlinkKafkaProducer<String> producer =
-                new DummyFlinkKafkaProducer<>(
-                        FakeStandardProducerConfig.get(),
-                        new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()),
-                        mockPartitioner);
-        producer.setRuntimeContext(mockRuntimeContext);
-
-        final KafkaProducer mockProducer = producer.getMockKafkaProducer();
-        
when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
-        when(mockProducer.metrics()).thenReturn(null);
-
-        producer.open(new Configuration());
-        verify(mockPartitioner, times(1)).open(0, 1);
-
-        producer.invoke("foobar", SinkContextUtil.forTimestamp(0));
-        verify(mockPartitioner, times(1))
-                .partition(
-                        "foobar",
-                        null,
-                        "foobar".getBytes(),
-                        DummyFlinkKafkaProducer.DUMMY_TOPIC,
-                        new int[] {0, 1, 2, 3});
-    }
-
-    /**
-     * Test ensuring that if an invoke call happens right after an async 
exception is caught, it
-     * should be rethrown.
-     */
-    @Test
-    public void testAsyncErrorRethrownOnInvoke() throws Throwable {
-        final DummyFlinkKafkaProducer<String> producer =
-                new DummyFlinkKafkaProducer<>(
-                        FakeStandardProducerConfig.get(),
-                        new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()),
-                        null);
-
-        OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
-
-        testHarness.open();
-
-        testHarness.processElement(new StreamRecord<>("msg-1"));
-
-        // let the message request return an async exception
-        producer.getPendingCallbacks()
-                .get(0)
-                .onCompletion(null, new Exception("artificial async 
exception"));
-
-        try {
-            testHarness.processElement(new StreamRecord<>("msg-2"));
-        } catch (Exception e) {
-            // the next invoke should rethrow the async exception
-            assertThat(e.getCause().getMessage()).contains("artificial async 
exception");
-
-            // test succeeded
-            return;
-        }
-
-        fail("unknown failure");
-    }
-
-    /**
-     * Test ensuring that if a snapshot call happens right after an async 
exception is caught, it
-     * should be rethrown.
-     */
-    @Test
-    public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
-        final DummyFlinkKafkaProducer<String> producer =
-                new DummyFlinkKafkaProducer<>(
-                        FakeStandardProducerConfig.get(),
-                        new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()),
-                        null);
-
-        OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
-
-        testHarness.open();
-
-        testHarness.processElement(new StreamRecord<>("msg-1"));
-
-        // let the message request return an async exception
-        producer.getPendingCallbacks()
-                .get(0)
-                .onCompletion(null, new Exception("artificial async 
exception"));
-
-        try {
-            testHarness.snapshot(123L, 123L);
-        } catch (Exception e) {
-            // the next invoke should rethrow the async exception
-            assertThat(e.getCause().getMessage()).contains("artificial async 
exception");
-
-            // test succeeded
-            return;
-        }
-
-        fail("unknown failure");
-    }
-
-    /**
-     * Test ensuring that if an async exception is caught for one of the 
flushed requests on
-     * checkpoint, it should be rethrown; we set a timeout because the test 
will not finish if the
-     * logic is broken.
-     *
-     * <p>Note that this test does not test the snapshot method is blocked 
correctly when there are
-     * pending records. The test for that is covered in 
testAtLeastOnceProducer.
-     */
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 5000)
-    public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws 
Throwable {
-        final DummyFlinkKafkaProducer<String> producer =
-                new DummyFlinkKafkaProducer<>(
-                        FakeStandardProducerConfig.get(),
-                        new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()),
-                        null);
-        producer.setFlushOnCheckpoint(true);
-
-        final KafkaProducer<?, ?> mockProducer = 
producer.getMockKafkaProducer();
-
-        final OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
-
-        testHarness.open();
-
-        testHarness.processElement(new StreamRecord<>("msg-1"));
-        testHarness.processElement(new StreamRecord<>("msg-2"));
-        testHarness.processElement(new StreamRecord<>("msg-3"));
-
-        verify(mockProducer, times(3)).send(any(ProducerRecord.class), 
any(Callback.class));
-
-        // only let the first callback succeed for now
-        producer.getPendingCallbacks().get(0).onCompletion(null, null);
-
-        CheckedThread snapshotThread =
-                new CheckedThread() {
-                    @Override
-                    public void go() throws Exception {
-                        // this should block at first, since there are still 
two pending records
-                        // that needs to be flushed
-                        testHarness.snapshot(123L, 123L);
-                    }
-                };
-        snapshotThread.start();
-
-        // let the 2nd message fail with an async exception
-        producer.getPendingCallbacks()
-                .get(1)
-                .onCompletion(null, new Exception("artificial async failure 
for 2nd message"));
-        producer.getPendingCallbacks().get(2).onCompletion(null, null);
-
-        try {
-            snapshotThread.sync();
-        } catch (Exception e) {
-            // the snapshot should have failed with the async exception
-            assertThat(e.getCause().getMessage())
-                    .contains("artificial async failure for 2nd message");
-
-            // test succeeded
-            return;
-        }
-
-        fail("unknown failure");
-    }
-
-    /**
-     * Test ensuring that the producer is not dropping buffered records; we 
set a timeout because
-     * the test will not finish if the logic is broken.
-     */
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 10000)
-    public void testAtLeastOnceProducer() throws Throwable {
-        final DummyFlinkKafkaProducer<String> producer =
-                new DummyFlinkKafkaProducer<>(
-                        FakeStandardProducerConfig.get(),
-                        new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()),
-                        null);
-        producer.setFlushOnCheckpoint(true);
-
-        final KafkaProducer<?, ?> mockProducer = 
producer.getMockKafkaProducer();
-
-        final OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
-
-        testHarness.open();
-
-        testHarness.processElement(new StreamRecord<>("msg-1"));
-        testHarness.processElement(new StreamRecord<>("msg-2"));
-        testHarness.processElement(new StreamRecord<>("msg-3"));
-
-        verify(mockProducer, times(3)).send(any(ProducerRecord.class), 
any(Callback.class));
-        assertThat(producer.getPendingSize()).isEqualTo(3);
-
-        // start a thread to perform checkpointing
-        CheckedThread snapshotThread =
-                new CheckedThread() {
-                    @Override
-                    public void go() throws Exception {
-                        // this should block until all records are flushed;
-                        // if the snapshot implementation returns before 
pending records are
-                        // flushed,
-                        testHarness.snapshot(123L, 123L);
-                    }
-                };
-        snapshotThread.start();
-
-        // before proceeding, make sure that flushing has started and that the 
snapshot is still
-        // blocked;
-        // this would block forever if the snapshot didn't perform a flush
-        producer.waitUntilFlushStarted();
-        assertThat(snapshotThread.isAlive())
-                .as("Snapshot returned before all records were flushed")
-                .isTrue();
-
-        // now, complete the callbacks
-        producer.getPendingCallbacks().get(0).onCompletion(null, null);
-        assertThat(snapshotThread.isAlive())
-                .as("Snapshot returned before all records were flushed")
-                .isTrue();
-        assertThat(producer.getPendingSize()).isEqualTo(2);
-
-        producer.getPendingCallbacks().get(1).onCompletion(null, null);
-        assertThat(snapshotThread.isAlive())
-                .as("Snapshot returned before all records were flushed")
-                .isTrue();
-        assertThat(producer.getPendingSize()).isEqualTo(1);
-
-        producer.getPendingCallbacks().get(2).onCompletion(null, null);
-        assertThat(producer.getPendingSize()).isEqualTo(0);
-
-        // this would fail with an exception if flushing wasn't completed 
before the snapshot method
-        // returned
-        snapshotThread.sync();
-
-        testHarness.close();
-    }
-
-    /**
-     * This test is meant to assure that testAtLeastOnceProducer is valid by 
testing that if
-     * flushing is disabled, the snapshot method does indeed finishes without 
waiting for pending
-     * records; we set a timeout because the test will not finish if the logic 
is broken.
-     */
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 5000)
-    public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws 
Throwable {
-        final DummyFlinkKafkaProducer<String> producer =
-                new DummyFlinkKafkaProducer<>(
-                        FakeStandardProducerConfig.get(),
-                        new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()),
-                        null);
-        producer.setFlushOnCheckpoint(false);
-
-        final KafkaProducer<?, ?> mockProducer = 
producer.getMockKafkaProducer();
-
-        final OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
-
-        testHarness.open();
-
-        testHarness.processElement(new StreamRecord<>("msg"));
-
-        // make sure that all callbacks have not been completed
-        verify(mockProducer, times(1)).send(any(ProducerRecord.class), 
any(Callback.class));
-
-        // should return even if there are pending records
-        testHarness.snapshot(123L, 123L);
-
-        testHarness.close();
-    }
-
-    // ------------------------------------------------------------------------
-
-    private static class DummyFlinkKafkaProducer<T> extends 
FlinkKafkaProducerBase<T> {
-        private static final long serialVersionUID = 1L;
-
-        private static final String DUMMY_TOPIC = "dummy-topic";
-
-        private transient KafkaProducer<?, ?> mockProducer;
-        private transient List<Callback> pendingCallbacks;
-        private transient MultiShotLatch flushLatch;
-        private boolean isFlushed;
-
-        @SuppressWarnings("unchecked")
-        DummyFlinkKafkaProducer(
-                Properties producerConfig,
-                KeyedSerializationSchema<T> schema,
-                FlinkKafkaPartitioner partitioner) {
-
-            super(DUMMY_TOPIC, schema, producerConfig, partitioner);
-
-            this.mockProducer = mock(KafkaProducer.class);
-            when(mockProducer.send(any(ProducerRecord.class), 
any(Callback.class)))
-                    .thenAnswer(
-                            new Answer<Object>() {
-                                @Override
-                                public Object answer(InvocationOnMock 
invocationOnMock)
-                                        throws Throwable {
-                                    
pendingCallbacks.add(invocationOnMock.getArgument(1));
-                                    return null;
-                                }
-                            });
-
-            this.pendingCallbacks = new ArrayList<>();
-            this.flushLatch = new MultiShotLatch();
-        }
-
-        long getPendingSize() {
-            if (flushOnCheckpoint) {
-                return numPendingRecords();
-            } else {
-                // when flushing is disabled, the implementation does not
-                // maintain the current number of pending records to reduce
-                // the extra locking overhead required to do so
-                throw new UnsupportedOperationException(
-                        "getPendingSize not supported when flushing is 
disabled");
-            }
-        }
-
-        List<Callback> getPendingCallbacks() {
-            return pendingCallbacks;
-        }
-
-        KafkaProducer<?, ?> getMockKafkaProducer() {
-            return mockProducer;
-        }
-
-        @Override
-        public void snapshotState(FunctionSnapshotContext ctx) throws 
Exception {
-            isFlushed = false;
-
-            super.snapshotState(ctx);
-
-            // if the snapshot implementation doesn't wait until all pending 
records are flushed, we
-            // should fail the test
-            if (flushOnCheckpoint && !isFlushed) {
-                throw new RuntimeException(
-                        "Flushing is enabled; snapshots should be blocked 
until all pending records are flushed");
-            }
-        }
-
-        public void waitUntilFlushStarted() throws Exception {
-            flushLatch.await();
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties 
props) {
-            return (KafkaProducer<K, V>) mockProducer;
-        }
-
-        @Override
-        protected void flush() {
-            flushLatch.trigger();
-
-            // simply wait until the producer's pending records become zero.
-            // This relies on the fact that the producer's Callback 
implementation
-            // and pending records tracking logic is implemented correctly, 
otherwise
-            // we will loop forever.
-            while (numPendingRecords() > 0) {
-                try {
-                    Thread.sleep(10);
-                } catch (InterruptedException e) {
-                    throw new RuntimeException("Unable to flush producer, task 
was interrupted");
-                }
-            }
-
-            isFlushed = true;
-        }
-    }
-}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationOperatorTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationOperatorTest.java
deleted file mode 100644
index 5e87f04b..00000000
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationOperatorTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.FlinkVersion;
-
-import org.junit.Ignore;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * Migration test from FlinkKafkaProducer011 operator. This test depends on 
the resource generated
- * by {@link FlinkKafkaProducer011MigrationTest#writeSnapshot()}.
- *
- * <p>Warning: We need to rename the generated resource based on the file 
naming pattern specified
- * by the {@link #getOperatorSnapshotPath(FlinkVersion)} method then copy the 
resource to the path
- * also specified by the {@link #getOperatorSnapshotPath(FlinkVersion)} method.
- */
-public class FlinkKafkaProducerMigrationOperatorTest extends 
FlinkKafkaProducerMigrationTest {
-    @Parameterized.Parameters(name = "Migration Savepoint: {0}")
-    public static Collection<FlinkVersion> parameters() {
-        return Arrays.asList(
-                FlinkVersion.v1_8, FlinkVersion.v1_9, FlinkVersion.v1_10, 
FlinkVersion.v1_11);
-    }
-
-    public FlinkKafkaProducerMigrationOperatorTest(FlinkVersion 
testMigrateVersion) {
-        super(testMigrateVersion);
-    }
-
-    @Override
-    public String getOperatorSnapshotPath(FlinkVersion version) {
-        return "src/test/resources/kafka-0.11-migration-kafka-producer-flink-"
-                + version
-                + "-snapshot";
-    }
-
-    @Ignore
-    @Override
-    public void writeSnapshot() throws Exception {
-        throw new UnsupportedOperationException();
-    }
-}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 88f6ac60..026d49bd 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -69,6 +69,7 @@ import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializatio
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.KafkaUtils;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
@@ -869,7 +870,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
                             }
                         });
         Properties producerProperties =
-                
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+                
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
         producerProperties.setProperty("retries", "3");
         producerProperties.putAll(secureProps);
         kafkaServer.produceIntoKafka(stream, topic, sinkSchema, 
producerProperties, null);
@@ -1550,7 +1551,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
                 new TypeInformationKeyValueSerializationSchema<>(
                         Long.class, PojoValue.class, env.getConfig());
         Properties producerProperties =
-                
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+                
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
         producerProperties.setProperty("retries", "3");
         kafkaServer.produceIntoKafka(kvStream, topic, schema, 
producerProperties, null);
         env.execute("Write KV to Kafka");
@@ -1646,7 +1647,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
                         byte[].class, PojoValue.class, env.getConfig());
 
         Properties producerProperties =
-                
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+                
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
         producerProperties.setProperty("retries", "3");
         producerProperties.putAll(secureProps);
         kafkaServer.produceIntoKafka(kvStream, topic, schema, 
producerProperties, null);
@@ -2288,7 +2289,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
 
             // the producer must not produce duplicates
             Properties producerProperties =
-                    
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+                    
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
             producerProperties.setProperty("retries", "0");
             producerProperties.putAll(secureProps);
 
@@ -2392,7 +2393,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
 
         // the producer must not produce duplicates
         Properties producerProperties =
-                
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+                
KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings);
         producerProperties.setProperty("retries", "0");
         producerProperties.putAll(secureProps);
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 62438106..cf3bf463 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationS
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource;
+import org.apache.flink.streaming.connectors.kafka.testutils.KafkaUtils;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.Preconditions;
@@ -148,8 +149,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBaseWithFlink {
                             .setParallelism(1);
 
             Properties props = new Properties();
-            props.putAll(
-                    
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
+            
props.putAll(KafkaUtils.getPropertiesFromBrokerList(brokerConnectionStrings));
             props.putAll(secureProps);
 
             // sink partitions into
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index d660bd2f..be3651e5 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -98,8 +97,7 @@ public class DataGenerators {
 
         Properties props = new Properties();
         props.putAll(
-                FlinkKafkaProducerBase.getPropertiesFromBrokerList(
-                        testServer.getBrokerConnectionString()));
+                
KafkaUtils.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()));
         Properties secureProps = testServer.getSecureProperties();
         if (secureProps != null) {
             props.putAll(testServer.getSecureProperties());
@@ -156,8 +154,7 @@ public class DataGenerators {
             OneInputStreamOperatorTestHarness<String, Object> testHarness = 
null;
             try {
                 Properties producerProperties =
-                        FlinkKafkaProducerBase.getPropertiesFromBrokerList(
-                                server.getBrokerConnectionString());
+                        
KafkaUtils.getPropertiesFromBrokerList(server.getBrokerConnectionString());
                 producerProperties.setProperty("retries", "3");
 
                 StreamSink<String> sink =
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/KafkaUtils.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/KafkaUtils.java
new file mode 100644
index 00000000..eeb8ce04
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/KafkaUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import java.util.Properties;
+
+/** Some utilities methods around Kafka. */
+public class KafkaUtils {
+    public static Properties getPropertiesFromBrokerList(String brokerList) {
+        String[] elements = brokerList.split(",");
+
+        // validate the broker addresses
+        for (String broker : elements) {
+            NetUtils.getCorrectHostnamePort(broker);
+        }
+
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        return props;
+    }
+}
diff --git 
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.10-snapshot
 
b/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.10-snapshot
deleted file mode 100644
index f3e6c74e..00000000
Binary files 
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.10-snapshot
 and /dev/null differ
diff --git 
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.11-snapshot
 
b/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.11-snapshot
deleted file mode 100644
index f80a4295..00000000
Binary files 
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.11-snapshot
 and /dev/null differ
diff --git 
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.8-snapshot
 
b/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.8-snapshot
deleted file mode 100644
index 0a22c693..00000000
Binary files 
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.8-snapshot
 and /dev/null differ
diff --git 
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.9-snapshot
 
b/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.9-snapshot
deleted file mode 100644
index c4f5416a..00000000
Binary files 
a/flink-connector-kafka/src/test/resources/kafka-0.11-migration-kafka-producer-flink-1.9-snapshot
 and /dev/null differ

Reply via email to