This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit df7d347eccd9ecd45f8be816df4d15870f098c10
Author: lvyanquan <lvyanquan....@alibaba-inc.com>
AuthorDate: Mon Mar 31 20:38:23 2025 +0800

    [FLINK-37590][build] Add CI of python and add jdk11 to ensure compatibility.
---
 .github/workflows/push_pr.yml                      |  11 +-
 .github/workflows/weekly.yml                       |   2 +-
 .../pyflink/datastream/connectors/kafka.py         | 313 +--------------------
 .../datastream/connectors/tests/test_kafka.py      |   5 +-
 4 files changed, 14 insertions(+), 317 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index f8bf5779..e4bd1e5e 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -29,8 +29,17 @@ jobs:
     strategy:
       matrix:
         flink: [ 2.0.0 ]
-        jdk: [ '17, 21' ]
+        jdk: [ '11, 17, 21' ]
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
       flink_version: ${{ matrix.flink }}
       jdk_version: ${{ matrix.jdk }}
+  python_test:
+    strategy:
+      matrix:
+        flink: [ 2.0.0 ]
+        jdk: [ '11, 17, 21' ]
+    uses: 
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
+    with:
+      flink_version: ${{ matrix.flink }}
+      jdk_version: ${{ matrix.jdk }}
diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index ff06ad37..4e95a724 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -49,5 +49,5 @@ jobs:
     with:
       flink_version: ${{ matrix.flink_branches.flink }}
       connector_branch: ${{ matrix.flink_branches.branch }}
-      jdk_version: ${{ matrix.flink_branches.jdk || '17, 21' }}
+      jdk_version: ${{ matrix.flink_branches.jdk || '11, 17, 21' }}
       run_dependency_convergence: false
diff --git a/flink-python/pyflink/datastream/connectors/kafka.py 
b/flink-python/pyflink/datastream/connectors/kafka.py
index 062c5b2e..4a3fbacc 100644
--- a/flink-python/pyflink/datastream/connectors/kafka.py
+++ b/flink-python/pyflink/datastream/connectors/kafka.py
@@ -15,30 +15,24 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import warnings
 from abc import ABC, abstractmethod
 from enum import Enum
-from typing import Dict, Union, List, Set, Callable, Any, Optional
+from typing import Dict, Union, Set, Callable, Any, Optional
 
 from py4j.java_gateway import JavaObject, get_java_class
-
-from pyflink.common import DeserializationSchema, TypeInformation, typeinfo, 
SerializationSchema, \
+from pyflink.common import DeserializationSchema, SerializationSchema, \
     Types, Row
 from pyflink.datastream.connectors import Source, Sink
 from pyflink.datastream.connectors.base import DeliveryGuarantee, 
SupportsPreprocessing, \
     StreamTransformer
-from pyflink.datastream.functions import SinkFunction, SourceFunction
 from pyflink.java_gateway import get_gateway
 from pyflink.util.java_utils import to_jarray, get_field, get_field_value
 
 __all__ = [
-    'FlinkKafkaConsumer',
-    'FlinkKafkaProducer',
     'KafkaSource',
     'KafkaSourceBuilder',
     'KafkaSink',
     'KafkaSinkBuilder',
-    'Semantic',
     'KafkaTopicPartition',
     'KafkaOffsetsInitializer',
     'KafkaOffsetResetStrategy',
@@ -48,309 +42,6 @@ __all__ = [
 ]
 
 
-# ---- FlinkKafkaConsumer ----
-
-class FlinkKafkaConsumerBase(SourceFunction, ABC):
-    """
-    Base class of all Flink Kafka Consumer data sources. This implements the 
common behavior across
-    all kafka versions.
-
-    The Kafka version specific behavior is defined mainly in the specific 
subclasses.
-    """
-
-    def __init__(self, j_flink_kafka_consumer):
-        super(FlinkKafkaConsumerBase, 
self).__init__(source_func=j_flink_kafka_consumer)
-
-    def set_commit_offsets_on_checkpoints(self,
-                                          commit_on_checkpoints: bool) -> 
'FlinkKafkaConsumerBase':
-        """
-        Specifies whether or not the consumer should commit offsets back to 
kafka on checkpoints.
-        This setting will only have effect if checkpointing is enabled for the 
job. If checkpointing
-        isn't enabled, only the "auto.commit.enable" (for 0.8) / 
"enable.auto.commit" (for 0.9+)
-        property settings will be used.
-        """
-        self._j_function = self._j_function \
-            .setCommitOffsetsOnCheckpoints(commit_on_checkpoints)
-        return self
-
-    def set_start_from_earliest(self) -> 'FlinkKafkaConsumerBase':
-        """
-        Specifies the consumer to start reading from the earliest offset for 
all partitions. This
-        lets the consumer ignore any committed group offsets in Zookeeper/ 
Kafka brokers.
-
-        This method does not affect where partitions are read from when the 
consumer is restored
-        from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
-        savepoint, only the offsets in the restored state will be used.
-        """
-        self._j_function = self._j_function.setStartFromEarliest()
-        return self
-
-    def set_start_from_latest(self) -> 'FlinkKafkaConsumerBase':
-        """
-        Specifies the consuer to start reading from the latest offset for all 
partitions. This lets
-        the consumer ignore any committed group offsets in Zookeeper / Kafka 
brokers.
-
-        This method does not affect where partitions are read from when the 
consumer is restored
-        from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
-        savepoint, only the offsets in the restored state will be used.
-        """
-        self._j_function = self._j_function.setStartFromLatest()
-        return self
-
-    def set_start_from_timestamp(self, startup_offsets_timestamp: int) -> 
'FlinkKafkaConsumerBase':
-        """
-        Specifies the consumer to start reading partitions from a specified 
timestamp. The specified
-        timestamp must be before the current timestamp. This lets the consumer 
ignore any committed
-        group offsets in Zookeeper / Kafka brokers.
-
-        The consumer will look up the earliest offset whose timestamp is 
greater than or equal to
-        the specific timestamp from Kafka. If there's no such offset, the 
consumer will use the
-        latest offset to read data from Kafka.
-
-        This method does not affect where partitions are read from when the 
consumer is restored
-        from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
-        savepoint, only the offsets in the restored state will be used.
-
-        :param startup_offsets_timestamp: timestamp for the startup offsets, 
as milliseconds for
-                                          epoch.
-        """
-        self._j_function = self._j_function.setStartFromTimestamp(
-            startup_offsets_timestamp)
-        return self
-
-    def set_start_from_group_offsets(self) -> 'FlinkKafkaConsumerBase':
-        """
-        Specifies the consumer to start reading from any committed group 
offsets found in Zookeeper/
-        Kafka brokers. The 'group.id' property must be set in the 
configuration properties. If no
-        offset can be found for a partition, the behaviour in 
'auto.offset.reset' set in the
-        configuration properties will be used for the partition.
-
-        This method does not affect where partitions are read from when the 
consumer is restored
-        from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
-        savepoint, only the offsets in the restored state will be used.
-        """
-        self._j_function = self._j_function.setStartFromGroupOffsets()
-        return self
-
-    def disable_filter_restored_partitions_with_subscribed_topics(self) -> 
'FlinkKafkaConsumerBase':
-        """
-        By default, when restoring from a checkpoint / savepoint, the consumer 
always ignores
-        restored partitions that are no longer associated with the current 
specified topics or topic
-        pattern to subscribe to.
-
-        This method does not affect where partitions are read from when the 
consumer is restored
-        from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
-        savepoint, only the offsets in the restored state will be used.
-        """
-        self._j_function = self._j_function \
-            .disableFilterRestoredPartitionsWithSubscribedTopics()
-        return self
-
-    def get_produced_type(self) -> TypeInformation:
-        return typeinfo._from_java_type(self._j_function.getProducedType())
-
-
-def _get_kafka_consumer(topics, properties, deserialization_schema, 
j_consumer_clz):
-    if not isinstance(topics, list):
-        topics = [topics]
-    gateway = get_gateway()
-    j_properties = gateway.jvm.java.util.Properties()
-    for key, value in properties.items():
-        j_properties.setProperty(key, value)
-
-    j_flink_kafka_consumer = j_consumer_clz(topics,
-                                            
deserialization_schema._j_deserialization_schema,
-                                            j_properties)
-    return j_flink_kafka_consumer
-
-
-class FlinkKafkaConsumer(FlinkKafkaConsumerBase):
-    """
-    The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
-    Apache Kafka. The consumer can run in multiple parallel instances, each of 
which will
-    pull data from one or more Kafka partitions.
-
-    The Flink Kafka Consumer participates in checkpointing and guarantees that 
no data is lost
-    during a failure, and that the computation processes elements 'exactly 
once. (These guarantees
-    naturally assume that Kafka itself does not lose any data.)
-
-    Please note that Flink snapshots the offsets internally as part of its 
distributed checkpoints.
-    The offsets committed to Kafka / Zookeeper are only to bring the outside 
view of progress in
-    sync with Flink's view of the progress. That way, monitoring and other 
jobs can get a view of
-    how far the Flink Kafka consumer has consumed a topic.
-
-    Please refer to Kafka's documentation for the available configuration 
properties:
-    http://kafka.apache.org/documentation.html#newconsumerconfigs
-    """
-
-    def __init__(self, topics: Union[str, List[str]], deserialization_schema: 
DeserializationSchema,
-                 properties: Dict):
-        """
-        Creates a new Kafka streaming source consumer for Kafka 0.10.x.
-
-        This constructor allows passing multiple topics to the consumer.
-
-        :param topics: The Kafka topics to read from.
-        :param deserialization_schema: The de-/serializer used to convert 
between Kafka's byte
-                                       messages and Flink's objects.
-        :param properties: The properties that are used to configure both the 
fetcher and the offset
-                           handler.
-        """
-
-        warnings.warn("Deprecated in 1.16. Use KafkaSource instead.", 
DeprecationWarning)
-        JFlinkKafkaConsumer = get_gateway().jvm \
-            .org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
-        j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, 
deserialization_schema,
-                                                     JFlinkKafkaConsumer)
-        super(FlinkKafkaConsumer, 
self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer)
-
-
-# ---- FlinkKafkaProducer ----
-
-
-class Semantic(Enum):
-    """
-    Semantics that can be chosen.
-
-    :data: `EXACTLY_ONCE`:
-
-    The Flink producer will write all messages in a Kafka transaction that 
will be committed to
-    the Kafka on a checkpoint. In this mode FlinkKafkaProducer sets up a pool 
of
-    FlinkKafkaProducer. Between each checkpoint there is created new Kafka 
transaction, which is
-    being committed on FlinkKafkaProducer.notifyCheckpointComplete(long). If 
checkpoint
-    complete notifications are running late, FlinkKafkaProducer can run out of
-    FlinkKafkaProducers in the pool. In that case any subsequent 
FlinkKafkaProducer.snapshot-
-    State() requests will fail and the FlinkKafkaProducer will keep using the
-    FlinkKafkaProducer from previous checkpoint. To decrease chances of 
failing checkpoints
-    there are four options:
-
-        1. decrease number of max concurrent checkpoints
-        2. make checkpoints mre reliable (so that they complete faster)
-        3. increase delay between checkpoints
-        4. increase size of FlinkKafkaProducers pool
-
-    :data: `AT_LEAST_ONCE`:
-
-    The Flink producer will wait for all outstanding messages in the Kafka 
buffers to be
-    acknowledged by the Kafka producer on a checkpoint.
-
-    :data: `NONE`:
-
-    Means that nothing will be guaranteed. Messages can be lost and/or 
duplicated in case of
-    failure.
-
-    """
-
-    EXACTLY_ONCE = 0,
-    AT_LEAST_ONCE = 1,
-    NONE = 2
-
-    def _to_j_semantic(self):
-        JSemantic = get_gateway().jvm \
-            
.org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
-        return getattr(JSemantic, self.name)
-
-
-class FlinkKafkaProducerBase(SinkFunction, ABC):
-    """
-    Flink Sink to produce data into a Kafka topic.
-
-    Please note that this producer provides at-least-once reliability 
guarantees when checkpoints
-    are enabled and set_flush_on_checkpoint(True) is set. Otherwise, the 
producer doesn;t provid any
-    reliability guarantees.
-    """
-
-    def __init__(self, j_flink_kafka_producer):
-        super(FlinkKafkaProducerBase, 
self).__init__(sink_func=j_flink_kafka_producer)
-
-    def set_log_failures_only(self, log_failures_only: bool) -> 
'FlinkKafkaProducerBase':
-        """
-        Defines whether the producer should fail on errors, or only log them. 
If this is set to
-        true, then exceptions will be only logged, if set to false, exceptions 
will be eventually
-        thrown and cause the streaming program to fail (and enter recovery).
-
-        :param log_failures_only: The flag to indicate logging-only on 
exceptions.
-        """
-        self._j_function.setLogFailuresOnly(log_failures_only)
-        return self
-
-    def set_flush_on_checkpoint(self, flush_on_checkpoint: bool) -> 
'FlinkKafkaProducerBase':
-        """
-        If set to true, the Flink producer will wait for all outstanding 
messages in the Kafka
-        buffers to be acknowledged by the Kafka producer on a checkpoint.
-
-        This way, the producer can guarantee that messages in the Kafka 
buffers are part of the
-        checkpoint.
-
-        :param flush_on_checkpoint: Flag indicating the flush mode (true = 
flush on checkpoint)
-        """
-        self._j_function.setFlushOnCheckpoint(flush_on_checkpoint)
-        return self
-
-    def set_write_timestamp_to_kafka(self,
-                                     write_timestamp_to_kafka: bool) -> 
'FlinkKafkaProducerBase':
-        """
-        If set to true, Flink will write the (event time) timestamp attached 
to each record into
-        Kafka. Timestamps must be positive for Kafka to accept them.
-
-        :param write_timestamp_to_kafka: Flag indicating if Flink's internal 
timestamps are written
-                                         to Kafka.
-        """
-        self._j_function.setWriteTimestampToKafka(write_timestamp_to_kafka)
-        return self
-
-
-class FlinkKafkaProducer(FlinkKafkaProducerBase):
-    """
-    Flink Sink to produce data into a Kafka topic. By
-    default producer will use AT_LEAST_ONCE semantic. Before using 
EXACTLY_ONCE please refer to
-    Flink's Kafka connector documentation.
-    """
-
-    def __init__(self, topic: str, serialization_schema: SerializationSchema,
-                 producer_config: Dict, kafka_producer_pool_size: int = 5,
-                 semantic=Semantic.AT_LEAST_ONCE):
-        """
-        Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to the topic.
-
-        Using this constructor, the default FlinkFixedPartitioner will be used 
as the partitioner.
-        This default partitioner maps each sink subtask to a single Kafka 
partition (i.e. all
-        records received by a sink subtask will end up in the same Kafka 
partition).
-
-        :param topic: ID of the Kafka topic.
-        :param serialization_schema: User defined key-less serialization 
schema.
-        :param producer_config: Properties with the producer configuration.
-        """
-        gateway = get_gateway()
-        j_properties = gateway.jvm.java.util.Properties()
-        for key, value in producer_config.items():
-            j_properties.setProperty(key, value)
-
-        JFlinkKafkaProducer = gateway.jvm \
-            .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
-
-        j_flink_kafka_producer = JFlinkKafkaProducer(
-            topic, serialization_schema._j_serialization_schema, j_properties, 
None,
-            semantic._to_j_semantic(), kafka_producer_pool_size)
-        super(FlinkKafkaProducer, 
self).__init__(j_flink_kafka_producer=j_flink_kafka_producer)
-
-    def ignore_failures_after_transaction_timeout(self) -> 
'FlinkKafkaProducer':
-        """
-        Disables the propagation of exceptions thrown when committing 
presumably timed out Kafka
-        transactions during recovery of the job. If a Kafka transaction is 
timed out, a commit will
-        never be successful. Hence, use this feature to avoid recovery loops 
of the Job. Exceptions
-        will still be logged to inform the user that data loss might have 
occurred.
-
-        Note that we use the System.currentTimeMillis() to track the age of a 
transaction. Moreover,
-        only exceptions thrown during the recovery are caught, i.e., the 
producer will attempt at
-        least one commit of the transaction before giving up.
-
-        :return: This FlinkKafkaProducer.
-        """
-        self._j_function.ignoreFailuresAfterTransactionTimeout()
-        return self
-
-
 # ---- KafkaSource ----
 
 
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py 
b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
index 16bb739b..c34a9ffc 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
@@ -19,7 +19,6 @@ import json
 from typing import Dict
 
 import pyflink.datastream.data_stream as data_stream
-from pyflink.common import typeinfo
 
 from pyflink.common.configuration import Configuration
 from pyflink.common.serialization import SimpleStringSchema, 
DeserializationSchema
@@ -28,8 +27,7 @@ from pyflink.common.types import Row
 from pyflink.common.watermark_strategy import WatermarkStrategy
 from pyflink.datastream.connectors.base import DeliveryGuarantee
 from pyflink.datastream.connectors.kafka import KafkaSource, 
KafkaTopicPartition, \
-    KafkaOffsetsInitializer, KafkaOffsetResetStrategy, 
KafkaRecordSerializationSchema, KafkaSink, \
-    FlinkKafkaProducer, FlinkKafkaConsumer
+    KafkaOffsetsInitializer, KafkaOffsetResetStrategy, 
KafkaRecordSerializationSchema, KafkaSink
 from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, 
AvroRowSerializationSchema
 from pyflink.datastream.formats.csv import CsvRowDeserializationSchema, 
CsvRowSerializationSchema
 from pyflink.datastream.formats.json import JsonRowDeserializationSchema, 
JsonRowSerializationSchema
@@ -37,7 +35,6 @@ from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import (
     PyFlinkStreamingTestCase,
     PyFlinkTestCase,
-    invoke_java_object_method,
     to_java_data_structure,
 )
 from pyflink.util.java_utils import to_jarray, is_instance_of, get_field_value

Reply via email to