This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 21d07b7d468ad6d3b0b1a3120745e17ceb1ab1c2 Author: Arvid Heise <[email protected]> AuthorDate: Thu Sep 26 14:05:42 2024 +0200 [FLINK-36177] Deprecate KafkaShuffle and more This commit deprecates all classes that are slated for removal in the kafka-4.0 release compatible with Flink 2.0. I also deprecated internal classes to make later removal easier. Some public classes will cease to be public API but are still internally used. --- .../reader/deserializer/KafkaDeserializationSchemaWrapper.java | 2 ++ .../reader/deserializer/KafkaRecordDeserializationSchema.java | 2 ++ .../flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java | 1 + .../flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java | 8 +++++++- .../flink/streaming/connectors/kafka/FlinkKafkaException.java | 8 +++++++- .../flink/streaming/connectors/kafka/KafkaContextAware.java | 3 +++ .../streaming/connectors/kafka/KafkaDeserializationSchema.java | 2 ++ .../streaming/connectors/kafka/KafkaSerializationSchema.java | 2 ++ .../flink/streaming/connectors/kafka/config/OffsetCommitMode.java | 1 + .../streaming/connectors/kafka/config/OffsetCommitModes.java | 1 + .../streaming/connectors/kafka/internals/AbstractFetcher.java | 1 + .../connectors/kafka/internals/AbstractPartitionDiscoverer.java | 1 + .../connectors/kafka/internals/ClosableBlockingQueue.java | 1 + .../streaming/connectors/kafka/internals/ExceptionProxy.java | 1 + .../connectors/kafka/internals/FlinkKafkaInternalProducer.java | 1 + .../flink/streaming/connectors/kafka/internals/Handover.java | 1 + .../streaming/connectors/kafka/internals/KafkaCommitCallback.java | 1 + .../streaming/connectors/kafka/internals/KafkaConsumerThread.java | 1 + .../kafka/internals/KafkaDeserializationSchemaWrapper.java | 1 + .../flink/streaming/connectors/kafka/internals/KafkaFetcher.java | 1 + .../connectors/kafka/internals/KafkaPartitionDiscoverer.java | 1 + .../kafka/internals/KafkaSerializationSchemaWrapper.java | 1 + .../streaming/connectors/kafka/internals/KafkaShuffleFetcher.java | 1 + .../streaming/connectors/kafka/internals/KafkaTopicPartition.java | 5 +++++ .../connectors/kafka/internals/KafkaTopicPartitionAssigner.java | 1 + .../connectors/kafka/internals/KafkaTopicPartitionLeader.java | 1 + .../connectors/kafka/internals/KafkaTopicPartitionState.java | 1 + .../internals/KafkaTopicPartitionStateWithWatermarkGenerator.java | 1 + .../connectors/kafka/internals/KafkaTopicsDescriptor.java | 1 + .../kafka/internals/KeyedSerializationSchemaWrapper.java | 1 + .../kafka/internals/SourceContextWatermarkOutputAdapter.java | 1 + .../connectors/kafka/internals/TransactionalIdsGenerator.java | 1 + .../kafka/internals/metrics/KafkaConsumerMetricConstants.java | 1 + .../connectors/kafka/internals/metrics/KafkaMetricWrapper.java | 1 + .../connectors/kafka/partitioner/FlinkFixedPartitioner.java | 4 ++++ .../connectors/kafka/partitioner/FlinkKafkaPartitioner.java | 4 ++++ .../streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java | 5 +++++ .../connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java | 1 + .../connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java | 1 + .../connectors/kafka/shuffle/StreamKafkaShuffleSink.java | 1 + .../connectors/kafka/table/DynamicKafkaDeserializationSchema.java | 2 ++ .../kafka/table/DynamicKafkaRecordSerializationSchema.java | 2 ++ .../util/serialization/JSONKeyValueDeserializationSchema.java | 1 + .../serialization/TypeInformationKeyValueSerializationSchema.java | 1 + 44 files changed, 78 insertions(+), 2 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java index 94197e34..1cc7dde7 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java @@ -33,7 +33,9 @@ import java.io.IOException; * ConsumerRecord ConsumerRecords}. * * @param <T> the type of the deserialized records. + * @deprecated Remove with @{@link KafkaDeserializationSchema} */ +@Deprecated class KafkaDeserializationSchemaWrapper<T> implements KafkaRecordDeserializationSchema<T> { private static final long serialVersionUID = 1L; private final KafkaDeserializationSchema<T> kafkaDeserializationSchema; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java index 6ad6607c..91d1f343 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java @@ -71,7 +71,9 @@ public interface KafkaRecordDeserializationSchema<T> extends Serializable, Resul * @param <V> the return type of the deserialized record. * @return A {@link KafkaRecordDeserializationSchema} that uses the given {@link * KafkaDeserializationSchema} to deserialize the {@link ConsumerRecord ConsumerRecords}. + * @deprecated Will be removed with {@link KafkaDeserializationSchema}. */ + @Deprecated static <V> KafkaRecordDeserializationSchema<V> of( KafkaDeserializationSchema<V> kafkaDeserializationSchema) { return new KafkaDeserializationSchemaWrapper<>(kafkaDeserializationSchema); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index f291b05b..7a85b434 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -90,6 +90,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <T> The type of records produced by this data source */ @Internal +@Deprecated public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java index c11f1b17..3c1ae27e 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java @@ -19,8 +19,14 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.PublicEvolving; -/** Error codes used in {@link FlinkKafkaException}. */ +/** + * Error codes used in {@link FlinkKafkaException}. + * + * @deprecated Will be removed with {@link FlinkKafkaProducer} and {@link + * org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle}. + */ @PublicEvolving +@Deprecated public enum FlinkKafkaErrorCode { PRODUCERS_POOL_EMPTY, EXTERNAL_ERROR diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java index 77d02313..65b654c6 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java @@ -20,8 +20,14 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.util.FlinkException; -/** Exception used by {@link FlinkKafkaProducer} and {@link FlinkKafkaConsumer}. */ +/** + * Exception used by {@link FlinkKafkaProducer} and {@link FlinkKafkaConsumer}. + * + * @deprecated Will be removed with {@link FlinkKafkaProducer} and {@link + * org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle}. + */ @PublicEvolving +@Deprecated public class FlinkKafkaException extends FlinkException { private static final long serialVersionUID = 920269130311214200L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java index ad977cd9..d4013959 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java @@ -26,8 +26,11 @@ import org.apache.flink.annotation.PublicEvolving; * * <p>You only need to override the methods for the information that you need. However, {@link * #getTargetTopic(Object)} is required because it is used to determine the available partitions. + * + * @deprecated Will be turned into internal API when {@link FlinkKafkaProducer} is removed. */ @PublicEvolving +@Deprecated public interface KafkaContextAware<T> { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java index b54b9835..8f15b921 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java @@ -31,8 +31,10 @@ import java.io.Serializable; * (Java/Scala objects) that are processed by Flink. * * @param <T> The type created by the keyed deserialization schema. + * @deprecated Will be turned into internal API when {@link FlinkKafkaConsumer} is removed. */ @PublicEvolving +@Deprecated public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java index 89e2b920..7ed987fc 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java @@ -35,8 +35,10 @@ import java.io.Serializable; * which the Kafka Producer is running. * * @param <T> the type of values being serialized + * @deprecated Will be turned into internal API when {@link FlinkKafkaProducer} is removed. */ @PublicEvolving +@Deprecated public interface KafkaSerializationSchema<T> extends Serializable { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java index 32b9d4cd..6ad4f833 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java @@ -26,6 +26,7 @@ import org.apache.flink.annotation.Internal; * <p>The exact value of this is determined at runtime in the consumer subtasks. */ @Internal +@Deprecated public enum OffsetCommitMode { /** Completely disable offset committing. */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java index 1394af72..32ac2f5f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; /** Utilities for {@link OffsetCommitMode}. */ @Internal +@Deprecated public class OffsetCommitModes { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 41b5ad24..07436302 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -61,6 +61,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version. */ @Internal +@Deprecated public abstract class AbstractFetcher<T, KPH> { private static final int NO_TIMESTAMPS_WATERMARKS = 0; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java index 18322bf6..c8dc1836 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java @@ -41,6 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * allows the discoverer to be interrupted during a {@link #discoverPartitions()} call. */ @Internal +@Deprecated public abstract class AbstractPartitionDiscoverer { /** Describes whether we are discovering partitions for fixed topics or a topic pattern. */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java index 22800611..3b1751d4 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java @@ -49,6 +49,7 @@ import static java.util.Objects.requireNonNull; * @param <E> The type of elements in the queue. */ @Internal +@Deprecated public class ClosableBlockingQueue<E> { /** The lock used to make queue accesses and open checks atomic. */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java index 204a01b4..a9f9c9ca 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java @@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicReference; * }</pre> */ @Internal +@Deprecated public class ExceptionProxy { /** The thread that should be interrupted when an exception occurs. */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java index 12dad9fb..6e618cbe 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java @@ -58,6 +58,7 @@ import java.util.stream.Collectors; /** Internal flink kafka producer. */ @PublicEvolving +@Deprecated public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaInternalProducer.class); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java index 7fc50e62..64132b0b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java @@ -47,6 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ @ThreadSafe @Internal +@Deprecated public final class Handover implements Closeable { private final Object lock = new Object(); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java index d7666772..f1180b8b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java @@ -25,6 +25,7 @@ import org.apache.flink.annotation.Internal; * commit request completes, which should normally be triggered from checkpoint complete event. */ @Internal +@Deprecated public interface KafkaCommitCallback { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java index f7f40b80..5b6fb4d4 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java @@ -61,6 +61,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * an indirection to the KafkaConsumer calls that change signature. */ @Internal +@Deprecated public class KafkaConsumerThread<T> extends Thread { /** Logger for this consumer. */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java index d53e4ff4..b754b4d0 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java @@ -32,6 +32,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; * @param <T> The type created by the deserialization schema. */ @Internal +@Deprecated public class KafkaDeserializationSchemaWrapper<T> implements KafkaDeserializationSchema<T> { private static final long serialVersionUID = 2651665280744549932L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java index 9c4d8387..428e6c7c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java @@ -51,6 +51,7 @@ import static org.apache.flink.util.Preconditions.checkState; * @param <T> The type of elements produced by the fetcher. */ @Internal +@Deprecated public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> { private static final Logger LOG = LoggerFactory.getLogger(KafkaFetcher.class); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java index ec788991..ef7162bd 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java @@ -34,6 +34,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * brokers via the Kafka high-level consumer API. */ @Internal +@Deprecated public class KafkaPartitionDiscoverer extends AbstractPartitionDiscoverer { private final Properties kafkaProperties; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java index 73b1d42a..147fad9b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java @@ -35,6 +35,7 @@ import javax.annotation.Nullable; * KafkaSerializationSchema}. */ @Internal +@Deprecated public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSchema<T>, KafkaContextAware<T> { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java index fe7ee7f7..c61db83f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java @@ -50,6 +50,7 @@ import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuf /** Fetch data from Kafka for Kafka Shuffle. */ @Internal +@Deprecated public class KafkaShuffleFetcher<T> extends KafkaFetcher<T> { /** The handler to check and generate watermarks from fetched records. * */ private final WatermarkHandler watermarkHandler; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java index f262a222..0e91042f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java @@ -32,8 +32,13 @@ import static java.util.Objects.requireNonNull; * * <p>Note: This class must not change in its structure, because it would change the serialization * format and make previous savepoints unreadable. + * + * @deprecated Will be turned into internal class when {@link + * org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed. Replace with + * {@link org.apache.kafka.common.TopicPartition}. */ @PublicEvolving +@Deprecated public final class KafkaTopicPartition implements Serializable { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java index be61e8ad..83c7483f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; /** Utility for assigning Kafka partitions to consumer subtasks. */ @Internal +@Deprecated public class KafkaTopicPartitionAssigner { /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java index a2ef1288..031400d6 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java @@ -27,6 +27,7 @@ import java.io.Serializable; * Serializable Topic Partition info with leader Node information. This class is used at runtime. */ @Internal +@Deprecated public class KafkaTopicPartitionLeader implements Serializable { private static final long serialVersionUID = 9145855900303748582L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java index c09df342..ee669e7e 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java @@ -29,6 +29,7 @@ import org.apache.flink.annotation.Internal; * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions. */ @Internal +@Deprecated public class KafkaTopicPartitionState<T, KPH> { // ------------------------------------------------------------------------ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java index 6c843409..f9c815fc 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithWatermarkGenerator.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.eventtime.WatermarkOutput; * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions. */ @Internal +@Deprecated public final class KafkaTopicPartitionStateWithWatermarkGenerator<T, KPH> extends KafkaTopicPartitionState<T, KPH> { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java index 8261a2b3..4bb37b1c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicsDescriptor.java @@ -33,6 +33,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * list of topics, or a topic pattern. */ @Internal +@Deprecated public class KafkaTopicsDescriptor implements Serializable { private static final long serialVersionUID = -3807227764764900975L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java index c95cd9c4..ae4e922d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KeyedSerializationSchemaWrapper.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; * @param <T> The type to serialize */ @Internal +@Deprecated public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> { private static final long serialVersionUID = 1351665280744549933L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java index 8e567431..68c4db12 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont * A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that forwards calls to a {@link * org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. */ +@Deprecated public class SourceContextWatermarkOutputAdapter<T> implements WatermarkOutput { private final SourceContext<T> sourceContext; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java index e21355e8..cd6270ac 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TransactionalIdsGenerator.java @@ -40,6 +40,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * subtask. */ @Internal +@Deprecated public class TransactionalIdsGenerator { private final String prefix; private final int subtaskIndex; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java index 30d46971..73108902 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaConsumerMetricConstants.java @@ -26,6 +26,7 @@ import org.apache.flink.annotation.Internal; * metrics. */ @Internal +@Deprecated public class KafkaConsumerMetricConstants { public static final String KAFKA_CONSUMER_METRICS_GROUP = "KafkaConsumer"; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java index 1ab41ce9..2893bad3 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java @@ -23,6 +23,7 @@ import org.apache.flink.metrics.Gauge; /** Gauge for getting the current value of a Kafka metric. */ @Internal +@Deprecated public class KafkaMetricWrapper implements Gauge<Double> { private final org.apache.kafka.common.Metric kafkaMetric; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java index 16b57f62..e70baea3 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java @@ -54,8 +54,12 @@ import org.apache.flink.util.Preconditions; * <p>Not all Kafka partitions contain data To avoid such an unbalanced partitioning, use a * round-robin kafka partitioner (note that this will cause a lot of network connections between all * the Flink instances and all the Kafka brokers). + * + * @deprecated Will be turned into internal class when {@link + * org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed. */ @PublicEvolving +@Deprecated public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> { private static final long serialVersionUID = -3785320239953858777L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java index 2fb89e20..7318ed69 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java @@ -24,8 +24,12 @@ import java.io.Serializable; /** * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records across partitions of * multiple Kafka topics. + * + * @deprecated Will be turned into internal class when {@link + * org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed. */ @PublicEvolving +@Deprecated public abstract class FlinkKafkaPartitioner<T> implements Serializable { private static final long serialVersionUID = -9086719227828020494L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java index ae9af29f..bb7c76a6 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java @@ -98,8 +98,13 @@ import java.util.Properties; * | * | ----------> KafkaShuffleConsumerReuse -> ... * </pre> + * + * @deprecated This experimental feature never graduated to a stable feature and will be removed in + * future releases. In case of interest to port it to the Source/Sink API, please reach out to + * the Flink community. */ @Experimental +@Deprecated public class FlinkKafkaShuffle { static final String PRODUCER_PARALLELISM = "producer parallelism"; static final String PARTITION_NUMBER = "partition number"; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java index 886343be..b96e9c0f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java @@ -39,6 +39,7 @@ import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuf /** Flink Kafka Shuffle Consumer Function. */ @Internal +@Deprecated public class FlinkKafkaShuffleConsumer<T> extends FlinkKafkaConsumer<T> { private final TypeSerializer<T> typeSerializer; private final int producerParallelism; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java index e05e8f9a..46754f27 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java @@ -44,6 +44,7 @@ import static org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuf * handling elements and watermarks */ @Internal +@Deprecated public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN> { private final KafkaSerializer<IN> kafkaSerializer; private final KeySelector<IN, KEY> keySelector; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java index 8bd77840..e24e1565 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; * this way to avoid public interface change. */ @Internal +@Deprecated class StreamKafkaShuffleSink<IN> extends StreamSink<IN> { public StreamKafkaShuffleSink(FlinkKafkaShuffleProducer flinkKafkaShuffleProducer) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java index 91798281..ef5eca95 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka.table; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.List; /** A specific {@link KafkaSerializationSchema} for {@link KafkaDynamicSource}. */ +@Internal class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> { private static final long serialVersionUID = 1L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java index 71ca4147..f3a7acb3 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka.table; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; @@ -40,6 +41,7 @@ import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkNotNull; /** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link KafkaSink}. */ +@Internal class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema<RowData> { private final Set<String> topics; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java index 970bad1c..cffdc8ea 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java @@ -42,6 +42,7 @@ import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; * (String) and "partition" (int). */ @PublicEvolving +@Deprecated public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> { private static final long serialVersionUID = 1509391548173891955L; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java index 1c3eaa62..05e0eaea 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java @@ -41,6 +41,7 @@ import java.io.IOException; * @param <V> The value type to be serialized. */ @PublicEvolving +@Deprecated public class TypeInformationKeyValueSerializationSchema<K, V> implements KafkaDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K, V>> {
