This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 21d07b7d468ad6d3b0b1a3120745e17ceb1ab1c2
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Sep 26 14:05:42 2024 +0200

    [FLINK-36177] Deprecate KafkaShuffle and more
    
    This commit deprecates all classes that are slated for removal in the 
kafka-4.0 release compatible with Flink 2.0.
    
    I also deprecated internal classes to make later removal easier. Some 
public classes will cease to be public API but are still internally used.
---
 .../reader/deserializer/KafkaDeserializationSchemaWrapper.java    | 2 ++
 .../reader/deserializer/KafkaRecordDeserializationSchema.java     | 2 ++
 .../flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java  | 1 +
 .../flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java     | 8 +++++++-
 .../flink/streaming/connectors/kafka/FlinkKafkaException.java     | 8 +++++++-
 .../flink/streaming/connectors/kafka/KafkaContextAware.java       | 3 +++
 .../streaming/connectors/kafka/KafkaDeserializationSchema.java    | 2 ++
 .../streaming/connectors/kafka/KafkaSerializationSchema.java      | 2 ++
 .../flink/streaming/connectors/kafka/config/OffsetCommitMode.java | 1 +
 .../streaming/connectors/kafka/config/OffsetCommitModes.java      | 1 +
 .../streaming/connectors/kafka/internals/AbstractFetcher.java     | 1 +
 .../connectors/kafka/internals/AbstractPartitionDiscoverer.java   | 1 +
 .../connectors/kafka/internals/ClosableBlockingQueue.java         | 1 +
 .../streaming/connectors/kafka/internals/ExceptionProxy.java      | 1 +
 .../connectors/kafka/internals/FlinkKafkaInternalProducer.java    | 1 +
 .../flink/streaming/connectors/kafka/internals/Handover.java      | 1 +
 .../streaming/connectors/kafka/internals/KafkaCommitCallback.java | 1 +
 .../streaming/connectors/kafka/internals/KafkaConsumerThread.java | 1 +
 .../kafka/internals/KafkaDeserializationSchemaWrapper.java        | 1 +
 .../flink/streaming/connectors/kafka/internals/KafkaFetcher.java  | 1 +
 .../connectors/kafka/internals/KafkaPartitionDiscoverer.java      | 1 +
 .../kafka/internals/KafkaSerializationSchemaWrapper.java          | 1 +
 .../streaming/connectors/kafka/internals/KafkaShuffleFetcher.java | 1 +
 .../streaming/connectors/kafka/internals/KafkaTopicPartition.java | 5 +++++
 .../connectors/kafka/internals/KafkaTopicPartitionAssigner.java   | 1 +
 .../connectors/kafka/internals/KafkaTopicPartitionLeader.java     | 1 +
 .../connectors/kafka/internals/KafkaTopicPartitionState.java      | 1 +
 .../internals/KafkaTopicPartitionStateWithWatermarkGenerator.java | 1 +
 .../connectors/kafka/internals/KafkaTopicsDescriptor.java         | 1 +
 .../kafka/internals/KeyedSerializationSchemaWrapper.java          | 1 +
 .../kafka/internals/SourceContextWatermarkOutputAdapter.java      | 1 +
 .../connectors/kafka/internals/TransactionalIdsGenerator.java     | 1 +
 .../kafka/internals/metrics/KafkaConsumerMetricConstants.java     | 1 +
 .../connectors/kafka/internals/metrics/KafkaMetricWrapper.java    | 1 +
 .../connectors/kafka/partitioner/FlinkFixedPartitioner.java       | 4 ++++
 .../connectors/kafka/partitioner/FlinkKafkaPartitioner.java       | 4 ++++
 .../streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java     | 5 +++++
 .../connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java       | 1 +
 .../connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java       | 1 +
 .../connectors/kafka/shuffle/StreamKafkaShuffleSink.java          | 1 +
 .../connectors/kafka/table/DynamicKafkaDeserializationSchema.java | 2 ++
 .../kafka/table/DynamicKafkaRecordSerializationSchema.java        | 2 ++
 .../util/serialization/JSONKeyValueDeserializationSchema.java     | 1 +
 .../serialization/TypeInformationKeyValueSerializationSchema.java | 1 +
 44 files changed, 78 insertions(+), 2 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java
index 94197e34..1cc7dde7 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java
@@ -33,7 +33,9 @@ import java.io.IOException;
  * ConsumerRecord ConsumerRecords}.
  *
  * @param <T> the type of the deserialized records.
+ * @deprecated Remove with @{@link KafkaDeserializationSchema}
  */
+@Deprecated
 class KafkaDeserializationSchemaWrapper<T> implements 
KafkaRecordDeserializationSchema<T> {
     private static final long serialVersionUID = 1L;
     private final KafkaDeserializationSchema<T> kafkaDeserializationSchema;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java
index 6ad6607c..91d1f343 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java
@@ -71,7 +71,9 @@ public interface KafkaRecordDeserializationSchema<T> extends 
Serializable, Resul
      * @param <V> the return type of the deserialized record.
      * @return A {@link KafkaRecordDeserializationSchema} that uses the given 
{@link
      *     KafkaDeserializationSchema} to deserialize the {@link 
ConsumerRecord ConsumerRecords}.
+     * @deprecated Will be removed with {@link KafkaDeserializationSchema}.
      */
+    @Deprecated
     static <V> KafkaRecordDeserializationSchema<V> of(
             KafkaDeserializationSchema<V> kafkaDeserializationSchema) {
         return new 
KafkaDeserializationSchemaWrapper<>(kafkaDeserializationSchema);
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index f291b05b..7a85b434 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -90,6 +90,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <T> The type of records produced by this data source
  */
 @Internal
+@Deprecated
 public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFunction<T>
         implements CheckpointListener, ResultTypeQueryable<T>, 
CheckpointedFunction {
 
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java
index c11f1b17..3c1ae27e 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java
@@ -19,8 +19,14 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.annotation.PublicEvolving;
 
-/** Error codes used in {@link FlinkKafkaException}. */
+/**
+ * Error codes used in {@link FlinkKafkaException}.
+ *
+ * @deprecated Will be removed with {@link FlinkKafkaProducer} and {@link
+ *     org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle}.
+ */
 @PublicEvolving
+@Deprecated
 public enum FlinkKafkaErrorCode {
     PRODUCERS_POOL_EMPTY,
     EXTERNAL_ERROR
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java
index 77d02313..65b654c6 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java
@@ -20,8 +20,14 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.util.FlinkException;
 
-/** Exception used by {@link FlinkKafkaProducer} and {@link 
FlinkKafkaConsumer}. */
+/**
+ * Exception used by {@link FlinkKafkaProducer} and {@link FlinkKafkaConsumer}.
+ *
+ * @deprecated Will be removed with {@link FlinkKafkaProducer} and {@link
+ *     org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle}.
+ */
 @PublicEvolving
+@Deprecated
 public class FlinkKafkaException extends FlinkException {
 
     private static final long serialVersionUID = 920269130311214200L;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java
index ad977cd9..d4013959 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java
@@ -26,8 +26,11 @@ import org.apache.flink.annotation.PublicEvolving;
  *
  * <p>You only need to override the methods for the information that you need. 
However, {@link
  * #getTargetTopic(Object)} is required because it is used to determine the 
available partitions.
+ *
+ * @deprecated Will be turned into internal API when {@link 
FlinkKafkaProducer} is removed.
  */
 @PublicEvolving
+@Deprecated
 public interface KafkaContextAware<T> {
 
     /**
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java
index b54b9835..8f15b921 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java
@@ -31,8 +31,10 @@ import java.io.Serializable;
  * (Java/Scala objects) that are processed by Flink.
  *
  * @param <T> The type created by the keyed deserialization schema.
+ * @deprecated Will be turned into internal API when {@link 
FlinkKafkaConsumer} is removed.
  */
 @PublicEvolving
+@Deprecated
 public interface KafkaDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
 
     /**
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
index 89e2b920..7ed987fc 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
@@ -35,8 +35,10 @@ import java.io.Serializable;
  * which the Kafka Producer is running.
  *
  * @param <T> the type of values being serialized
+ * @deprecated Will be turned into internal API when {@link 
FlinkKafkaProducer} is removed.
  */
 @PublicEvolving
+@Deprecated
 public interface KafkaSerializationSchema<T> extends Serializable {
 
     /**
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
index 32b9d4cd..6ad4f833 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java
@@ -26,6 +26,7 @@ import org.apache.flink.annotation.Internal;
  * <p>The exact value of this is determined at runtime in the consumer 
subtasks.
  */
 @Internal
+@Deprecated
 public enum OffsetCommitMode {
 
     /** Completely disable offset committing. */
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java
index 1394af72..32ac2f5f 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 
 /** Utilities for {@link OffsetCommitMode}. */
 @Internal
+@Deprecated
 public class OffsetCommitModes {
 
     /**
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 41b5ad24..07436302 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -61,6 +61,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <KPH> The type of topic/partition identifier used by Kafka in the 
specific version.
  */
 @Internal
+@Deprecated
 public abstract class AbstractFetcher<T, KPH> {
 
     private static final int NO_TIMESTAMPS_WATERMARKS = 0;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
index 18322bf6..c8dc1836 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
@@ -41,6 +41,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * allows the discoverer to be interrupted during a {@link 
#discoverPartitions()} call.
  */
 @Internal
+@Deprecated
 public abstract class AbstractPartitionDiscoverer {
 
     /** Describes whether we are discovering partitions for fixed topics or a 
topic pattern. */
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
index 22800611..3b1751d4 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -49,6 +49,7 @@ import static java.util.Objects.requireNonNull;
  * @param <E> The type of elements in the queue.
  */
 @Internal
+@Deprecated
 public class ClosableBlockingQueue<E> {
 
     /** The lock used to make queue accesses and open checks atomic. */
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
index 204a01b4..a9f9c9ca 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * }</pre>
  */
 @Internal
+@Deprecated
 public class ExceptionProxy {
 
     /** The thread that should be interrupted when an exception occurs. */
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
index 12dad9fb..6e618cbe 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
@@ -58,6 +58,7 @@ import java.util.stream.Collectors;
 
 /** Internal flink kafka producer. */
 @PublicEvolving
+@Deprecated
 public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaInternalProducer.class);
 
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java
index 7fc50e62..64132b0b 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java
@@ -47,6 +47,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 @ThreadSafe
 @Internal
+@Deprecated
 public final class Handover implements Closeable {
 
     private final Object lock = new Object();
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
index d7666772..f1180b8b 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
@@ -25,6 +25,7 @@ import org.apache.flink.annotation.Internal;
  * commit request completes, which should normally be triggered from 
checkpoint complete event.
  */
 @Internal
+@Deprecated
 public interface KafkaCommitCallback {
 
     /**
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
index f7f40b80..5b6fb4d4 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
@@ -61,6 +61,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * an indirection to the KafkaConsumer calls that change signature.
  */
 @Internal
+@Deprecated
 public class KafkaConsumerThread<T> extends Thread {
 
     /** Logger for this consumer. */
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java
index d53e4ff4..b754b4d0 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
  * @param <T> The type created by the deserialization schema.
  */
 @Internal
+@Deprecated
 public class KafkaDeserializationSchemaWrapper<T> implements 
KafkaDeserializationSchema<T> {
 
     private static final long serialVersionUID = 2651665280744549932L;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java
index 9c4d8387..428e6c7c 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java
@@ -51,6 +51,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * @param <T> The type of elements produced by the fetcher.
  */
 @Internal
+@Deprecated
 public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaFetcher.class);
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java
index ec788991..ef7162bd 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java
@@ -34,6 +34,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * brokers via the Kafka high-level consumer API.
  */
 @Internal
+@Deprecated
 public class KafkaPartitionDiscoverer extends AbstractPartitionDiscoverer {
 
     private final Properties kafkaProperties;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
index 73b1d42a..147fad9b 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
@@ -35,6 +35,7 @@ import javax.annotation.Nullable;
  * KafkaSerializationSchema}.
  */
 @Internal
+@Deprecated
 public class KafkaSerializationSchemaWrapper<T>
         implements KafkaSerializationSchema<T>, KafkaContextAware<T> {
 
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java
index fe7ee7f7..c61db83f 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java
@@ -50,6 +50,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuf
 
 /** Fetch data from Kafka for Kafka Shuffle. */
 @Internal
+@Deprecated
 public class KafkaShuffleFetcher<T> extends KafkaFetcher<T> {
     /** The handler to check and generate watermarks from fetched records. * */
     private final WatermarkHandler watermarkHandler;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
index f262a222..0e91042f 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -32,8 +32,13 @@ import static java.util.Objects.requireNonNull;
  *
  * <p>Note: This class must not change in its structure, because it would 
change the serialization
  * format and make previous savepoints unreadable.
+ *
+ * @deprecated Will be turned into internal class when {@link
+ *     org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is 
removed. Replace with
+ *     {@link org.apache.kafka.common.TopicPartition}.
  */
 @PublicEvolving
+@Deprecated
 public final class KafkaTopicPartition implements Serializable {
 
     /**
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
index be61e8ad..83c7483f 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 
 /** Utility for assigning Kafka partitions to consumer subtasks. */
 @Internal
+@Deprecated
 public class KafkaTopicPartitionAssigner {
 
     /**
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
index a2ef1288..031400d6 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
  * Serializable Topic Partition info with leader Node information. This class 
is used at runtime.
  */
 @Internal
+@Deprecated
 public class KafkaTopicPartitionLeader implements Serializable {
 
     private static final long serialVersionUID = 9145855900303748582L;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
index c09df342..ee669e7e 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -29,6 +29,7 @@ import org.apache.flink.annotation.Internal;
  * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions.
  */
 @Internal
+@Deprecated
 public class KafkaTopicPartitionState<T, KPH> {
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java
index 6c843409..f9c815fc 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.eventtime.WatermarkOutput;
  * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions.
  */
 @Internal
+@Deprecated
 public final class KafkaTopicPartitionStateWithWatermarkGenerator<T, KPH>
         extends KafkaTopicPartitionState<T, KPH> {
 
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
index 8261a2b3..4bb37b1c 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java
@@ -33,6 +33,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * list of topics, or a topic pattern.
  */
 @Internal
+@Deprecated
 public class KafkaTopicsDescriptor implements Serializable {
 
     private static final long serialVersionUID = -3807227764764900975L;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java
index c95cd9c4..ae4e922d 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
  * @param <T> The type to serialize
  */
 @Internal
+@Deprecated
 public class KeyedSerializationSchemaWrapper<T> implements 
KeyedSerializationSchema<T> {
 
     private static final long serialVersionUID = 1351665280744549933L;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
index 8e567431..68c4db12 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
  * A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that 
forwards calls to a {@link
  * 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}.
  */
+@Deprecated
 public class SourceContextWatermarkOutputAdapter<T> implements WatermarkOutput 
{
     private final SourceContext<T> sourceContext;
 
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java
index e21355e8..cd6270ac 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java
@@ -40,6 +40,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * subtask.
  */
 @Internal
+@Deprecated
 public class TransactionalIdsGenerator {
     private final String prefix;
     private final int subtaskIndex;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java
index 30d46971..73108902 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java
@@ -26,6 +26,7 @@ import org.apache.flink.annotation.Internal;
  * metrics.
  */
 @Internal
+@Deprecated
 public class KafkaConsumerMetricConstants {
 
     public static final String KAFKA_CONSUMER_METRICS_GROUP = "KafkaConsumer";
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
index 1ab41ce9..2893bad3 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
@@ -23,6 +23,7 @@ import org.apache.flink.metrics.Gauge;
 
 /** Gauge for getting the current value of a Kafka metric. */
 @Internal
+@Deprecated
 public class KafkaMetricWrapper implements Gauge<Double> {
     private final org.apache.kafka.common.Metric kafkaMetric;
 
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index 16b57f62..e70baea3 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -54,8 +54,12 @@ import org.apache.flink.util.Preconditions;
  * <p>Not all Kafka partitions contain data To avoid such an unbalanced 
partitioning, use a
  * round-robin kafka partitioner (note that this will cause a lot of network 
connections between all
  * the Flink instances and all the Kafka brokers).
+ *
+ * @deprecated Will be turned into internal class when {@link
+ *     org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is 
removed.
  */
 @PublicEvolving
+@Deprecated
 public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
 
     private static final long serialVersionUID = -3785320239953858777L;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
index 2fb89e20..7318ed69 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -24,8 +24,12 @@ import java.io.Serializable;
 /**
  * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records 
across partitions of
  * multiple Kafka topics.
+ *
+ * @deprecated Will be turned into internal class when {@link
+ *     org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is 
removed.
  */
 @PublicEvolving
+@Deprecated
 public abstract class FlinkKafkaPartitioner<T> implements Serializable {
 
     private static final long serialVersionUID = -9086719227828020494L;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
index ae9af29f..bb7c76a6 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
@@ -98,8 +98,13 @@ import java.util.Properties;
  *                                                |
  *                                                | ----------> 
KafkaShuffleConsumerReuse -> ...
  * </pre>
+ *
+ * @deprecated This experimental feature never graduated to a stable feature 
and will be removed in
+ *     future releases. In case of interest to port it to the Source/Sink API, 
please reach out to
+ *     the Flink community.
  */
 @Experimental
+@Deprecated
 public class FlinkKafkaShuffle {
     static final String PRODUCER_PARALLELISM = "producer parallelism";
     static final String PARTITION_NUMBER = "partition number";
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
index 886343be..b96e9c0f 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
@@ -39,6 +39,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuf
 
 /** Flink Kafka Shuffle Consumer Function. */
 @Internal
+@Deprecated
 public class FlinkKafkaShuffleConsumer<T> extends FlinkKafkaConsumer<T> {
     private final TypeSerializer<T> typeSerializer;
     private final int producerParallelism;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
index e05e8f9a..46754f27 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
@@ -44,6 +44,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuf
  * handling elements and watermarks
  */
 @Internal
+@Deprecated
 public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN> 
{
     private final KafkaSerializer<IN> kafkaSerializer;
     private final KeySelector<IN, KEY> keySelector;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java
index 8bd77840..e24e1565 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
  * this way to avoid public interface change.
  */
 @Internal
+@Deprecated
 class StreamKafkaShuffleSink<IN> extends StreamSink<IN> {
 
     public StreamKafkaShuffleSink(FlinkKafkaShuffleProducer 
flinkKafkaShuffleProducer) {
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
index 91798281..ef5eca95 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
@@ -38,6 +39,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 /** A specific {@link KafkaSerializationSchema} for {@link 
KafkaDynamicSource}. */
+@Internal
 class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<RowData> {
 
     private static final long serialVersionUID = 1L;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
index 71ca4147..f3a7acb3 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
@@ -40,6 +41,7 @@ import java.util.regex.Pattern;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link 
KafkaSink}. */
+@Internal
 class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {
 
     private final Set<String> topics;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index 970bad1c..cffdc8ea 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -42,6 +42,7 @@ import static 
org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
  * (String) and "partition" (int).
  */
 @PublicEvolving
+@Deprecated
 public class JSONKeyValueDeserializationSchema implements 
KafkaDeserializationSchema<ObjectNode> {
 
     private static final long serialVersionUID = 1509391548173891955L;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
index 1c3eaa62..05e0eaea 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -41,6 +41,7 @@ import java.io.IOException;
  * @param <V> The value type to be serialized.
  */
 @PublicEvolving
+@Deprecated
 public class TypeInformationKeyValueSerializationSchema<K, V>
         implements KafkaDeserializationSchema<Tuple2<K, V>>,
                 KeyedSerializationSchema<Tuple2<K, V>> {


Reply via email to