This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a9620b  [FLINK-38952] Update PyFlink connector classes to match with 
the current Java API and entities
2a9620b is described below

commit 2a9620b283248ac2e99d7c07c60dd7781018c735
Author: Abhi Gupta <[email protected]>
AuthorDate: Thu Jan 22 18:57:32 2026 +0530

    [FLINK-38952] Update PyFlink connector classes to match with the current 
Java API and entities
---
 .../pyflink/datastream/connectors/kinesis.py       | 179 +++++++++------------
 .../datastream/connectors/tests/test_kinesis.py    |  22 ++-
 flink-python/setup.py                              |   2 +-
 flink-python/tox.ini                               |   2 +
 4 files changed, 91 insertions(+), 114 deletions(-)

diff --git a/flink-python/pyflink/datastream/connectors/kinesis.py 
b/flink-python/pyflink/datastream/connectors/kinesis.py
index 355c954..a677f9a 100644
--- a/flink-python/pyflink/datastream/connectors/kinesis.py
+++ b/flink-python/pyflink/datastream/connectors/kinesis.py
@@ -17,18 +17,16 @@
 
################################################################################
 from typing import Dict, Union, List
 
-from pyflink.common import SerializationSchema, DeserializationSchema, \
-    AssignerWithPeriodicWatermarksWrapper
-from pyflink.datastream.functions import SourceFunction
-from pyflink.datastream.connectors import Sink
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.connectors import Sink, Source
 from pyflink.java_gateway import get_gateway
 
 __all__ = [
     'KinesisShardAssigner',
     'KinesisDeserializationSchema',
-    'WatermarkTracker',
     'PartitionKeyGenerator',
-    'FlinkKinesisConsumer',
+    'KinesisStreamsSource',
+    'KinesisStreamsSourceBuilder',
     'KinesisStreamsSink',
     'KinesisStreamsSinkBuilder',
     'KinesisFirehoseSink',
@@ -47,27 +45,18 @@ class KinesisShardAssigner(object):
     def __init__(self, j_kinesis_shard_assigner):
         self._j_kinesis_shard_assigner = j_kinesis_shard_assigner
 
-    @staticmethod
-    def default_shard_assigner() -> 'KinesisShardAssigner':
-        """
-        A Default KinesisShardAssigner that maps Kinesis shard hash-key ranges 
to Flink subtasks.
-        """
-        return 
KinesisShardAssigner(get_gateway().jvm.org.apache.flink.streaming.connectors.
-                                    
kinesis.internals.KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER)
-
     @staticmethod
     def uniform_shard_assigner() -> 'KinesisShardAssigner':
         """
         A KinesisShardAssigner that maps Kinesis shard hash-key ranges to 
Flink subtasks.
-        It creates a more uniform distribution of shards across subtasks than 
org.apache.flink. \
-        
streaming.connectors.kinesis.internals.KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER
 when the
+        It creates a more uniform distribution of shards across subtasks when 
the
         Kinesis records in the stream have hash keys that are uniformly 
distributed over all
         possible hash keys, which is the case if records have 
randomly-generated partition keys.
         (This is the same assumption made if you use the Kinesis 
UpdateShardCount operation with
         UNIFORM_SCALING.)
         """
-        return 
KinesisShardAssigner(get_gateway().jvm.org.apache.flink.streaming.connectors.
-                                    kinesis.util.UniformShardAssigner())
+        return 
KinesisShardAssigner(get_gateway().jvm.org.apache.flink.connector.kinesis.source.
+                                    
enumerator.assigner.ShardAssignerFactory.uniformShardAssigner())
 
 
 class KinesisDeserializationSchema(object):
@@ -81,115 +70,103 @@ class KinesisDeserializationSchema(object):
         self._j_kinesis_deserialization_schema = 
j_kinesis_deserialization_schema
 
 
-class WatermarkTracker(object):
+class KinesisStreamsSource(Source):
     """
-    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
-    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
-    or other operators.The class essentially functions like a distributed hash 
table that enclosing
-    operators can use to adopt their processing / IO rates
+    A Kinesis Data Streams (KDS) Source that reads from a single Kinesis 
stream using the
+    FLIP-27 Source API.
+
+    The source is an exactly-once parallel streaming data source that 
subscribes to a single
+    AWS Kinesis Data stream. It handles resharding transparently and maintains 
order within
+    a specific Kinesis partitionId.
+
+    The source will discover shards and start reading from each eligible shard 
in parallel,
+    depending on the parallelism of the operator. It also transparently 
handles discovery of
+    new shards if resharding occurs while the job is running.
     """
 
-    def __init__(self, j_watermark_tracker):
-        self._j_watermark_tracker = j_watermark_tracker
+    def __init__(self, j_kinesis_streams_source):
+        super().__init__(j_kinesis_streams_source)
 
     @staticmethod
-    def job_manager_watermark_tracker(
-            aggregate_name: str, log_accumulator_interval_millis: int = -1) -> 
'WatermarkTracker':
-        j_watermark_tracker = 
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
-            .JobManagerWatermarkTracker(aggregate_name, 
log_accumulator_interval_millis)
-        return WatermarkTracker(j_watermark_tracker)
+    def builder() -> 'KinesisStreamsSourceBuilder':
+        """
+        Get a KinesisStreamsSourceBuilder to build a 
:class:`KinesisStreamsSource`.
 
+        :return: a Kinesis source builder.
+        """
+        return KinesisStreamsSourceBuilder()
 
-class FlinkKinesisConsumer(SourceFunction):
-    """
-    The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to
-    multiple AWS Kinesis streams within the same AWS service region, and can 
handle resharding of
-    streams. Each subtask of the consumer is responsible for fetching data 
records from multiple
-    Kinesis shards. The number of shards fetched by each subtask will change 
as shards are closed
-    and created by Kinesis.
-
-    To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees,
-    the Flink Kinesis consumer is implemented with the AWS Java SDK, instead 
of the officially
-    recommended AWS Kinesis Client Library, for low-level control on the 
management of stream state.
-    The Flink Kinesis Connector also supports setting the initial starting 
points of Kinesis
-    streams, namely TRIM_HORIZON and LATEST.
-
-    Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, 
while sequential,
-    cannot be assumed to be consecutive. There is no perfect generic default 
assignment function.
-    Default shard to subtask assignment, which is based on hash code, may 
result in skew, with some
-    subtasks having many shards assigned and others none.
-
-    It is recommended to monitor the shard distribution and adjust assignment 
appropriately.
-    A custom assigner implementation can be set via 
setShardAssigner(KinesisShardAssigner) to
-    optimize the hash function or use static overrides to limit skew.
-
-    In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via
-    setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto 
watermark emit
-    interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
-
-    Watermarks can only advance when all shards of a subtask continuously 
deliver records.
-    To avoid an inactive or closed shard to block the watermark progress, the 
idle timeout should
-    be configured via configuration property 
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
-    By default, shards won't be considered idle and watermark calculation will 
wait for newer
-    records to arrive from all shards.
-
-    Note that re-sharding of the Kinesis stream while an application (that 
relies on the Kinesis
-    records for watermarking) is running can lead to incorrect late events. 
This depends on how
-    shards are assigned to subtasks and applies regardless of whether 
watermarks are generated in
-    the source or a downstream operator.
+
+class KinesisStreamsSourceBuilder(object):
     """
+    Builder to construct KinesisStreamsSource.
 
-    def __init__(self,
-                 streams: Union[str, List[str]],
-                 deserializer: Union[DeserializationSchema, 
KinesisDeserializationSchema],
-                 config_props: Dict
-                 ):
-        gateway = get_gateway()
-        j_properties = gateway.jvm.java.util.Properties()
-        for key, value in config_props.items():
-            j_properties.setProperty(key, value)
+    The following example shows the minimum setup to create a 
KinesisStreamsSource that reads
+    String values from a Kinesis Data Streams stream.
 
-        JFlinkKinesisConsumer = 
gateway.jvm.org.apache.flink.streaming.connectors.kinesis. \
-            FlinkKinesisConsumer
-        JKinesisDeserializationSchemaWrapper = 
get_gateway().jvm.org.apache.flink.streaming. \
-            
connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper
+    Example:
+    ::
 
-        if isinstance(streams, str):
-            streams = [streams]
+        >>> from pyflink.common.serialization import SimpleStringSchema
+        >>> from pyflink.common import Configuration
+        >>> source_config = Configuration()
+        >>> source = KinesisStreamsSource.builder() \\
+        ...     
.set_stream_arn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") \\
+        ...     .set_deserialization_schema(SimpleStringSchema()) \\
+        ...     .set_source_config(source_config) \\
+        ...     
.set_kinesis_shard_assigner(KinesisShardAssigner.uniform_shard_assigner()) \\
+        ...     .build()
 
-        if isinstance(deserializer, DeserializationSchema):
-            deserializer = JKinesisDeserializationSchemaWrapper(
-                deserializer._j_deserialization_schema)
+    If the following parameters are not set in this builder, the following 
defaults will be used:
 
-        self._j_kinesis_consumer = JFlinkKinesisConsumer(streams, 
deserializer, j_properties)
-        super(FlinkKinesisConsumer, self).__init__(self._j_kinesis_consumer)
+    - kinesisShardAssigner will be UniformShardAssigner
+    """
 
-    def set_shard_assigner(self, shard_assigner: KinesisShardAssigner) -> 
'FlinkKinesisConsumer':
+    def __init__(self):
+        self._j_builder = 
get_gateway().jvm.org.apache.flink.connector.kinesis.source. \
+            KinesisStreamsSource.builder()
+
+    def set_stream_arn(self, stream_arn: str) -> 'KinesisStreamsSourceBuilder':
         """
-        Provide a custom assigner to influence how shards are distributed over 
subtasks.
+        Sets the ARN of the Kinesis Data Streams stream to read from. There is 
no default for
+        this parameter, therefore, this must be provided at source creation 
time otherwise the
+        build will fail.
         """
-        
self._j_kinesis_consumer.setShardAssigner(shard_assigner._j_kinesis_shard_assigner)
+        self._j_builder.setStreamArn(stream_arn)
         return self
 
-    def set_periodic_watermark_assigner(
-        self,
-        periodic_watermark_assigner: AssignerWithPeriodicWatermarksWrapper) \
-            -> 'FlinkKinesisConsumer':
+    def set_deserialization_schema(
+            self, deserialization_schema: DeserializationSchema) -> 
'KinesisStreamsSourceBuilder':
         """
-        Set the assigner that will extract the timestamp from T and calculate 
the watermark.
+        Sets the DeserializationSchema for deserializing records read from 
Kinesis.
         """
-        self._j_kinesis_consumer.setPeriodicWatermarkAssigner(
-            periodic_watermark_assigner._j_assigner_with_periodic_watermarks)
+        
self._j_builder.setDeserializationSchema(deserialization_schema._j_deserialization_schema)
         return self
 
-    def set_watermark_tracker(self, watermark_tracker: WatermarkTracker) -> 
'FlinkKinesisConsumer':
+    def set_kinesis_shard_assigner(
+            self, shard_assigner: KinesisShardAssigner) -> 
'KinesisStreamsSourceBuilder':
         """
-        Set the global watermark tracker. When set, it will be used by the 
fetcher to align the
-        shard consumers by event time.
+        Sets the KinesisShardAssigner that controls the mapping of Kinesis 
shards to Flink
+        subtask indices. By default, UniformShardAssigner is used.
         """
-        
self._j_kinesis_consumer.setWatermarkTracker(watermark_tracker._j_watermark_tracker)
+        
self._j_builder.setKinesisShardAssigner(shard_assigner._j_kinesis_shard_assigner)
         return self
 
+    def set_source_config(self, source_config) -> 
'KinesisStreamsSourceBuilder':
+        """
+        Sets the configuration for the KinesisStreamsSource. Configuration 
keys can be taken
+        from AWSConfigOptions (AWS-specific configuration) and 
KinesisSourceConfigOptions
+        (Kinesis Source configuration).
+        """
+        self._j_builder.setSourceConfig(source_config._j_configuration)
+        return self
+
+    def build(self) -> 'KinesisStreamsSource':
+        """
+        Build the KinesisStreamsSource.
+        """
+        return KinesisStreamsSource(self._j_builder.build())
+
 
 # ---- KinesisSink ----
 
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kinesis.py 
b/flink-python/pyflink/datastream/connectors/tests/test_kinesis.py
index 3007711..e09b9cd 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_kinesis.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_kinesis.py
@@ -15,8 +15,8 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from pyflink.common import SimpleStringSchema, Types
-from pyflink.datastream.connectors.kinesis import PartitionKeyGenerator, 
FlinkKinesisConsumer, \
+from pyflink.common import SimpleStringSchema, Types, WatermarkStrategy
+from pyflink.datastream.connectors.kinesis import PartitionKeyGenerator, 
KinesisStreamsSource, \
     KinesisStreamsSink, KinesisFirehoseSink
 from pyflink.testing.test_case_utils import PyFlinkUTTestCase
 from pyflink.util.java_utils import get_field_value
@@ -25,21 +25,19 @@ from pyflink.util.java_utils import get_field_value
 class FlinkKinesisTest(PyFlinkUTTestCase):
 
     def test_kinesis_source(self):
-        consumer_config = {
-            'aws.region': 'us-east-1',
-            'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
-            'aws.credentials.provider.basic.secretkey': 
'aws_secret_access_key',
-            'flink.stream.initpos': 'LATEST'
-        }
-
-        kinesis_source = FlinkKinesisConsumer("stream-1", 
SimpleStringSchema(), consumer_config)
+        source = KinesisStreamsSource.builder() \
+            
.set_stream_arn("arn:aws:kinesis:us-east-1:123456789012:stream/stream-1") \
+            .set_deserialization_schema(SimpleStringSchema()) \
+            .build()
 
-        ds = self.env.add_source(source_func=kinesis_source, 
source_name="kinesis source")
+        ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(),
+                                  source_name="kinesis source", 
type_info=Types.STRING())
         ds.print()
         plan = eval(self.env.get_execution_plan())
         self.assertEqual('Source: kinesis source', plan['nodes'][0]['type'])
         self.assertEqual(
-            get_field_value(kinesis_source.get_java_function(), 'streams')[0], 
'stream-1')
+            get_field_value(source.get_java_function(), 'streamArn'),
+            'arn:aws:kinesis:us-east-1:123456789012:stream/stream-1')
 
     def test_kinesis_streams_sink(self):
         sink_properties = {
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 5a46ef3..ce3eeae 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -32,7 +32,7 @@ POM_FILE = os.path.join(CURRENT_DIR, '../pom.xml')
 README_FILE = os.path.join(CURRENT_DIR, 'README.txt')
 
 # Generated files and directories
-VERSION_FILE = os.path.join(CURRENT_DIR, 
'pyflink/datastream/connectors/kinesis.py')
+VERSION_FILE = os.path.join(CURRENT_DIR, 
'pyflink/datastream/connectors/aws_connector_version.py')
 DEPENDENCY_FILE = os.path.join(CURRENT_DIR, 'dev/dev-requirements.txt')
 
 
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index 1f526fd..b47bb59 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -27,6 +27,8 @@ envlist = {py39, py310, py311}-cython
 whitelist_externals = /bin/bash
 deps = apache-flink
 passenv = *
+commands_pre =
+    python -c "import shutil, os; shutil.copy(os.path.join('{toxinidir}', 
'pyflink/datastream/connectors/kinesis.py'), 
os.path.join('{envsitepackagesdir}', 
'pyflink/datastream/connectors/kinesis.py'))"
 commands =
     python --version
     pip install pytest

Reply via email to