This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit df7d347eccd9ecd45f8be816df4d15870f098c10 Author: lvyanquan <lvyanquan....@alibaba-inc.com> AuthorDate: Mon Mar 31 20:38:23 2025 +0800 [FLINK-37590][build] Add CI of python and add jdk11 to ensure compatibility. --- .github/workflows/push_pr.yml | 11 +- .github/workflows/weekly.yml | 2 +- .../pyflink/datastream/connectors/kafka.py | 313 +-------------------- .../datastream/connectors/tests/test_kafka.py | 5 +- 4 files changed, 14 insertions(+), 317 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index f8bf5779..e4bd1e5e 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -29,8 +29,17 @@ jobs: strategy: matrix: flink: [ 2.0.0 ] - jdk: [ '17, 21' ] + jdk: [ '11, 17, 21' ] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} jdk_version: ${{ matrix.jdk }} + python_test: + strategy: + matrix: + flink: [ 2.0.0 ] + jdk: [ '11, 17, 21' ] + uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils + with: + flink_version: ${{ matrix.flink }} + jdk_version: ${{ matrix.jdk }} diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index ff06ad37..4e95a724 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -49,5 +49,5 @@ jobs: with: flink_version: ${{ matrix.flink_branches.flink }} connector_branch: ${{ matrix.flink_branches.branch }} - jdk_version: ${{ matrix.flink_branches.jdk || '17, 21' }} + jdk_version: ${{ matrix.flink_branches.jdk || '11, 17, 21' }} run_dependency_convergence: false diff --git a/flink-python/pyflink/datastream/connectors/kafka.py b/flink-python/pyflink/datastream/connectors/kafka.py index 062c5b2e..4a3fbacc 100644 --- a/flink-python/pyflink/datastream/connectors/kafka.py +++ b/flink-python/pyflink/datastream/connectors/kafka.py @@ -15,30 +15,24 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import warnings from abc import ABC, abstractmethod from enum import Enum -from typing import Dict, Union, List, Set, Callable, Any, Optional +from typing import Dict, Union, Set, Callable, Any, Optional from py4j.java_gateway import JavaObject, get_java_class - -from pyflink.common import DeserializationSchema, TypeInformation, typeinfo, SerializationSchema, \ +from pyflink.common import DeserializationSchema, SerializationSchema, \ Types, Row from pyflink.datastream.connectors import Source, Sink from pyflink.datastream.connectors.base import DeliveryGuarantee, SupportsPreprocessing, \ StreamTransformer -from pyflink.datastream.functions import SinkFunction, SourceFunction from pyflink.java_gateway import get_gateway from pyflink.util.java_utils import to_jarray, get_field, get_field_value __all__ = [ - 'FlinkKafkaConsumer', - 'FlinkKafkaProducer', 'KafkaSource', 'KafkaSourceBuilder', 'KafkaSink', 'KafkaSinkBuilder', - 'Semantic', 'KafkaTopicPartition', 'KafkaOffsetsInitializer', 'KafkaOffsetResetStrategy', @@ -48,309 +42,6 @@ __all__ = [ ] -# ---- FlinkKafkaConsumer ---- - -class FlinkKafkaConsumerBase(SourceFunction, ABC): - """ - Base class of all Flink Kafka Consumer data sources. This implements the common behavior across - all kafka versions. - - The Kafka version specific behavior is defined mainly in the specific subclasses. - """ - - def __init__(self, j_flink_kafka_consumer): - super(FlinkKafkaConsumerBase, self).__init__(source_func=j_flink_kafka_consumer) - - def set_commit_offsets_on_checkpoints(self, - commit_on_checkpoints: bool) -> 'FlinkKafkaConsumerBase': - """ - Specifies whether or not the consumer should commit offsets back to kafka on checkpoints. - This setting will only have effect if checkpointing is enabled for the job. If checkpointing - isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+) - property settings will be used. - """ - self._j_function = self._j_function \ - .setCommitOffsetsOnCheckpoints(commit_on_checkpoints) - return self - - def set_start_from_earliest(self) -> 'FlinkKafkaConsumerBase': - """ - Specifies the consumer to start reading from the earliest offset for all partitions. This - lets the consumer ignore any committed group offsets in Zookeeper/ Kafka brokers. - - This method does not affect where partitions are read from when the consumer is restored - from a checkpoint or savepoint. When the consumer is restored from a checkpoint or - savepoint, only the offsets in the restored state will be used. - """ - self._j_function = self._j_function.setStartFromEarliest() - return self - - def set_start_from_latest(self) -> 'FlinkKafkaConsumerBase': - """ - Specifies the consuer to start reading from the latest offset for all partitions. This lets - the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. - - This method does not affect where partitions are read from when the consumer is restored - from a checkpoint or savepoint. When the consumer is restored from a checkpoint or - savepoint, only the offsets in the restored state will be used. - """ - self._j_function = self._j_function.setStartFromLatest() - return self - - def set_start_from_timestamp(self, startup_offsets_timestamp: int) -> 'FlinkKafkaConsumerBase': - """ - Specifies the consumer to start reading partitions from a specified timestamp. The specified - timestamp must be before the current timestamp. This lets the consumer ignore any committed - group offsets in Zookeeper / Kafka brokers. - - The consumer will look up the earliest offset whose timestamp is greater than or equal to - the specific timestamp from Kafka. If there's no such offset, the consumer will use the - latest offset to read data from Kafka. - - This method does not affect where partitions are read from when the consumer is restored - from a checkpoint or savepoint. When the consumer is restored from a checkpoint or - savepoint, only the offsets in the restored state will be used. - - :param startup_offsets_timestamp: timestamp for the startup offsets, as milliseconds for - epoch. - """ - self._j_function = self._j_function.setStartFromTimestamp( - startup_offsets_timestamp) - return self - - def set_start_from_group_offsets(self) -> 'FlinkKafkaConsumerBase': - """ - Specifies the consumer to start reading from any committed group offsets found in Zookeeper/ - Kafka brokers. The 'group.id' property must be set in the configuration properties. If no - offset can be found for a partition, the behaviour in 'auto.offset.reset' set in the - configuration properties will be used for the partition. - - This method does not affect where partitions are read from when the consumer is restored - from a checkpoint or savepoint. When the consumer is restored from a checkpoint or - savepoint, only the offsets in the restored state will be used. - """ - self._j_function = self._j_function.setStartFromGroupOffsets() - return self - - def disable_filter_restored_partitions_with_subscribed_topics(self) -> 'FlinkKafkaConsumerBase': - """ - By default, when restoring from a checkpoint / savepoint, the consumer always ignores - restored partitions that are no longer associated with the current specified topics or topic - pattern to subscribe to. - - This method does not affect where partitions are read from when the consumer is restored - from a checkpoint or savepoint. When the consumer is restored from a checkpoint or - savepoint, only the offsets in the restored state will be used. - """ - self._j_function = self._j_function \ - .disableFilterRestoredPartitionsWithSubscribedTopics() - return self - - def get_produced_type(self) -> TypeInformation: - return typeinfo._from_java_type(self._j_function.getProducedType()) - - -def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz): - if not isinstance(topics, list): - topics = [topics] - gateway = get_gateway() - j_properties = gateway.jvm.java.util.Properties() - for key, value in properties.items(): - j_properties.setProperty(key, value) - - j_flink_kafka_consumer = j_consumer_clz(topics, - deserialization_schema._j_deserialization_schema, - j_properties) - return j_flink_kafka_consumer - - -class FlinkKafkaConsumer(FlinkKafkaConsumerBase): - """ - The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from - Apache Kafka. The consumer can run in multiple parallel instances, each of which will - pull data from one or more Kafka partitions. - - The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost - during a failure, and that the computation processes elements 'exactly once. (These guarantees - naturally assume that Kafka itself does not lose any data.) - - Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. - The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in - sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of - how far the Flink Kafka consumer has consumed a topic. - - Please refer to Kafka's documentation for the available configuration properties: - http://kafka.apache.org/documentation.html#newconsumerconfigs - """ - - def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema, - properties: Dict): - """ - Creates a new Kafka streaming source consumer for Kafka 0.10.x. - - This constructor allows passing multiple topics to the consumer. - - :param topics: The Kafka topics to read from. - :param deserialization_schema: The de-/serializer used to convert between Kafka's byte - messages and Flink's objects. - :param properties: The properties that are used to configure both the fetcher and the offset - handler. - """ - - warnings.warn("Deprecated in 1.16. Use KafkaSource instead.", DeprecationWarning) - JFlinkKafkaConsumer = get_gateway().jvm \ - .org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema, - JFlinkKafkaConsumer) - super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer) - - -# ---- FlinkKafkaProducer ---- - - -class Semantic(Enum): - """ - Semantics that can be chosen. - - :data: `EXACTLY_ONCE`: - - The Flink producer will write all messages in a Kafka transaction that will be committed to - the Kafka on a checkpoint. In this mode FlinkKafkaProducer sets up a pool of - FlinkKafkaProducer. Between each checkpoint there is created new Kafka transaction, which is - being committed on FlinkKafkaProducer.notifyCheckpointComplete(long). If checkpoint - complete notifications are running late, FlinkKafkaProducer can run out of - FlinkKafkaProducers in the pool. In that case any subsequent FlinkKafkaProducer.snapshot- - State() requests will fail and the FlinkKafkaProducer will keep using the - FlinkKafkaProducer from previous checkpoint. To decrease chances of failing checkpoints - there are four options: - - 1. decrease number of max concurrent checkpoints - 2. make checkpoints mre reliable (so that they complete faster) - 3. increase delay between checkpoints - 4. increase size of FlinkKafkaProducers pool - - :data: `AT_LEAST_ONCE`: - - The Flink producer will wait for all outstanding messages in the Kafka buffers to be - acknowledged by the Kafka producer on a checkpoint. - - :data: `NONE`: - - Means that nothing will be guaranteed. Messages can be lost and/or duplicated in case of - failure. - - """ - - EXACTLY_ONCE = 0, - AT_LEAST_ONCE = 1, - NONE = 2 - - def _to_j_semantic(self): - JSemantic = get_gateway().jvm \ - .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic - return getattr(JSemantic, self.name) - - -class FlinkKafkaProducerBase(SinkFunction, ABC): - """ - Flink Sink to produce data into a Kafka topic. - - Please note that this producer provides at-least-once reliability guarantees when checkpoints - are enabled and set_flush_on_checkpoint(True) is set. Otherwise, the producer doesn;t provid any - reliability guarantees. - """ - - def __init__(self, j_flink_kafka_producer): - super(FlinkKafkaProducerBase, self).__init__(sink_func=j_flink_kafka_producer) - - def set_log_failures_only(self, log_failures_only: bool) -> 'FlinkKafkaProducerBase': - """ - Defines whether the producer should fail on errors, or only log them. If this is set to - true, then exceptions will be only logged, if set to false, exceptions will be eventually - thrown and cause the streaming program to fail (and enter recovery). - - :param log_failures_only: The flag to indicate logging-only on exceptions. - """ - self._j_function.setLogFailuresOnly(log_failures_only) - return self - - def set_flush_on_checkpoint(self, flush_on_checkpoint: bool) -> 'FlinkKafkaProducerBase': - """ - If set to true, the Flink producer will wait for all outstanding messages in the Kafka - buffers to be acknowledged by the Kafka producer on a checkpoint. - - This way, the producer can guarantee that messages in the Kafka buffers are part of the - checkpoint. - - :param flush_on_checkpoint: Flag indicating the flush mode (true = flush on checkpoint) - """ - self._j_function.setFlushOnCheckpoint(flush_on_checkpoint) - return self - - def set_write_timestamp_to_kafka(self, - write_timestamp_to_kafka: bool) -> 'FlinkKafkaProducerBase': - """ - If set to true, Flink will write the (event time) timestamp attached to each record into - Kafka. Timestamps must be positive for Kafka to accept them. - - :param write_timestamp_to_kafka: Flag indicating if Flink's internal timestamps are written - to Kafka. - """ - self._j_function.setWriteTimestampToKafka(write_timestamp_to_kafka) - return self - - -class FlinkKafkaProducer(FlinkKafkaProducerBase): - """ - Flink Sink to produce data into a Kafka topic. By - default producer will use AT_LEAST_ONCE semantic. Before using EXACTLY_ONCE please refer to - Flink's Kafka connector documentation. - """ - - def __init__(self, topic: str, serialization_schema: SerializationSchema, - producer_config: Dict, kafka_producer_pool_size: int = 5, - semantic=Semantic.AT_LEAST_ONCE): - """ - Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. - - Using this constructor, the default FlinkFixedPartitioner will be used as the partitioner. - This default partitioner maps each sink subtask to a single Kafka partition (i.e. all - records received by a sink subtask will end up in the same Kafka partition). - - :param topic: ID of the Kafka topic. - :param serialization_schema: User defined key-less serialization schema. - :param producer_config: Properties with the producer configuration. - """ - gateway = get_gateway() - j_properties = gateway.jvm.java.util.Properties() - for key, value in producer_config.items(): - j_properties.setProperty(key, value) - - JFlinkKafkaProducer = gateway.jvm \ - .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - - j_flink_kafka_producer = JFlinkKafkaProducer( - topic, serialization_schema._j_serialization_schema, j_properties, None, - semantic._to_j_semantic(), kafka_producer_pool_size) - super(FlinkKafkaProducer, self).__init__(j_flink_kafka_producer=j_flink_kafka_producer) - - def ignore_failures_after_transaction_timeout(self) -> 'FlinkKafkaProducer': - """ - Disables the propagation of exceptions thrown when committing presumably timed out Kafka - transactions during recovery of the job. If a Kafka transaction is timed out, a commit will - never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions - will still be logged to inform the user that data loss might have occurred. - - Note that we use the System.currentTimeMillis() to track the age of a transaction. Moreover, - only exceptions thrown during the recovery are caught, i.e., the producer will attempt at - least one commit of the transaction before giving up. - - :return: This FlinkKafkaProducer. - """ - self._j_function.ignoreFailuresAfterTransactionTimeout() - return self - - # ---- KafkaSource ---- diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py index 16bb739b..c34a9ffc 100644 --- a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py +++ b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py @@ -19,7 +19,6 @@ import json from typing import Dict import pyflink.datastream.data_stream as data_stream -from pyflink.common import typeinfo from pyflink.common.configuration import Configuration from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema @@ -28,8 +27,7 @@ from pyflink.common.types import Row from pyflink.common.watermark_strategy import WatermarkStrategy from pyflink.datastream.connectors.base import DeliveryGuarantee from pyflink.datastream.connectors.kafka import KafkaSource, KafkaTopicPartition, \ - KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink, \ - FlinkKafkaProducer, FlinkKafkaConsumer + KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, AvroRowSerializationSchema from pyflink.datastream.formats.csv import CsvRowDeserializationSchema, CsvRowSerializationSchema from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema @@ -37,7 +35,6 @@ from pyflink.java_gateway import get_gateway from pyflink.testing.test_case_utils import ( PyFlinkStreamingTestCase, PyFlinkTestCase, - invoke_java_object_method, to_java_data_structure, ) from pyflink.util.java_utils import to_jarray, is_instance_of, get_field_value