dianfu commented on code in PR #19869:
URL: https://github.com/apache/flink/pull/19869#discussion_r888480774


##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, _j_kinesis_deserialization_schema):
+        if _j_kinesis_deserialization_schema is None:
+            self._j_kinesis_deserialization_schema = 
get_gateway().jvm.org.apache.flink \
+                
.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
+        else:
+            self._j_kinesis_deserialization_schema = 
_j_kinesis_deserialization_schema
+
+
+class AssignerWithPeriodicWatermarks(object):
+    """
+    The AssignerWithPeriodicWatermarks assigns event time timestamps to 
elements, and generates
+    low watermarks that signal event time progress within the stream. These 
timestamps and
+    watermarks are used by functions and operators that operate on event time, 
for example event
+    time windows.
+    """
+
+    def __init__(self, _j_assigner_with_periodic_watermarks):
+        self._j_assigner_with_periodic_watermarks = 
_j_assigner_with_periodic_watermarks
+
+
+class WatermarkTracker(object):
+    """
+    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
+    or other operators.The class essentially functions like a distributed hash 
table that enclosing
+    operators can use to adopt their processing / IO rates
+    """
+
+
+def job_manager_watermark_tracker(aggregate_name: str,
+                                  log_accumulator_interval_millis: int = -1) \
+        -> 'WatermarkTracker':
+    return 
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
+        .JobManagerWatermarkTracker(aggregate_name, 
log_accumulator_interval_millis)
+
+
+class KinesisConsumer(SourceFunction):
+    """
+    The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to
+    multiple AWS Kinesis streams within the same AWS service region, and can 
handle resharding of
+    streams. Each subtask of the consumer is responsible for fetching data 
records from multiple
+    Kinesis shards. The number of shards fetched by each subtask will change 
as shards are closed
+    and created by Kinesis.
+
+    To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees,
+    the Flink Kinesis consumer is implemented with the AWS Java SDK, instead 
of the officially
+    recommended AWS Kinesis Client Library, for low-level control on the 
management of stream state.
+    The Flink Kinesis Connector also supports setting the initial starting 
points of Kinesis
+    streams, namely TRIM_HORIZON and LATEST.
+
+    Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, 
while sequential,
+    cannot be assumed to be consecutive. There is no perfect generic default 
assignment function.
+    Default shard to subtask assignment, which is based on hash code, may 
result in skew, with some
+    subtasks having many shards assigned and others none.
+
+    It is recommended to monitor the shard distribution and adjust assignment 
appropriately.
+    A custom assigner implementation can be set via 
setShardAssigner(KinesisShardAssigner) to
+    optimize the hash function or use static overrides to limit skew.
+
+    In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via
+    setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto 
watermark emit
+    interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
+
+    Watermarks can only advance when all shards of a subtask continuously 
deliver records.
+    To avoid an inactive or closed shard to block the watermark progress, the 
idle timeout should
+    be configured via configuration property 
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
+    By default, shards won't be considered idle and watermark calculation will 
wait for newer
+    records to arrive from all shards.
+
+    Note that re-sharding of the Kinesis stream while an application (that 
relies on the Kinesis
+    records for watermarking) is running can lead to incorrect late events. 
This depends on how
+    shards are assigned to subtasks and applies regardless of whether 
watermarks are generated in
+    the source or a downstream operator.
+    """
+
+    def __init__(self,
+                 streams: Union[str, List[str]],
+                 deserializer: Union[DeserializationSchema, 
KinesisDeserializationSchema],
+                 config_props: Dict
+                 ):
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in config_props.items():
+            j_properties.setProperty(key, value)
+
+        JKinesisConsumer = 
gateway.jvm.org.apache.flink.streaming.connectors.kinesis. \
+            FlinkKinesisConsumer
+
+        self._j_kinesis_consumer = JKinesisConsumer(
+            streams,
+            deserializer._j_deserialization_schema
+            if isinstance(deserializer, DeserializationSchema)
+            else deserializer._j_kinesis_deserialization_schema,

Review Comment:
   Rename field `_j_kinesis_deserialization_schema` of 
KinesisDeserializationSchema into `_j_deserialization_schema`? Then we could 
make the logic here simple.



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, _j_kinesis_deserialization_schema):
+        if _j_kinesis_deserialization_schema is None:
+            self._j_kinesis_deserialization_schema = 
get_gateway().jvm.org.apache.flink \
+                
.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
+        else:
+            self._j_kinesis_deserialization_schema = 
_j_kinesis_deserialization_schema
+
+
+class AssignerWithPeriodicWatermarks(object):
+    """
+    The AssignerWithPeriodicWatermarks assigns event time timestamps to 
elements, and generates
+    low watermarks that signal event time progress within the stream. These 
timestamps and
+    watermarks are used by functions and operators that operate on event time, 
for example event
+    time windows.
+    """
+
+    def __init__(self, _j_assigner_with_periodic_watermarks):
+        self._j_assigner_with_periodic_watermarks = 
_j_assigner_with_periodic_watermarks
+
+
+class WatermarkTracker(object):
+    """
+    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
+    or other operators.The class essentially functions like a distributed hash 
table that enclosing
+    operators can use to adopt their processing / IO rates
+    """
+
+
+def job_manager_watermark_tracker(aggregate_name: str,
+                                  log_accumulator_interval_millis: int = -1) \
+        -> 'WatermarkTracker':
+    return 
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
+        .JobManagerWatermarkTracker(aggregate_name, 
log_accumulator_interval_millis)
+
+
+class KinesisConsumer(SourceFunction):
+    """
+    The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to
+    multiple AWS Kinesis streams within the same AWS service region, and can 
handle resharding of
+    streams. Each subtask of the consumer is responsible for fetching data 
records from multiple
+    Kinesis shards. The number of shards fetched by each subtask will change 
as shards are closed
+    and created by Kinesis.
+
+    To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees,
+    the Flink Kinesis consumer is implemented with the AWS Java SDK, instead 
of the officially
+    recommended AWS Kinesis Client Library, for low-level control on the 
management of stream state.
+    The Flink Kinesis Connector also supports setting the initial starting 
points of Kinesis
+    streams, namely TRIM_HORIZON and LATEST.
+
+    Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, 
while sequential,
+    cannot be assumed to be consecutive. There is no perfect generic default 
assignment function.
+    Default shard to subtask assignment, which is based on hash code, may 
result in skew, with some
+    subtasks having many shards assigned and others none.
+
+    It is recommended to monitor the shard distribution and adjust assignment 
appropriately.
+    A custom assigner implementation can be set via 
setShardAssigner(KinesisShardAssigner) to
+    optimize the hash function or use static overrides to limit skew.
+
+    In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via
+    setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto 
watermark emit
+    interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
+
+    Watermarks can only advance when all shards of a subtask continuously 
deliver records.
+    To avoid an inactive or closed shard to block the watermark progress, the 
idle timeout should
+    be configured via configuration property 
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
+    By default, shards won't be considered idle and watermark calculation will 
wait for newer
+    records to arrive from all shards.
+
+    Note that re-sharding of the Kinesis stream while an application (that 
relies on the Kinesis
+    records for watermarking) is running can lead to incorrect late events. 
This depends on how
+    shards are assigned to subtasks and applies regardless of whether 
watermarks are generated in
+    the source or a downstream operator.
+    """
+
+    def __init__(self,
+                 streams: Union[str, List[str]],
+                 deserializer: Union[DeserializationSchema, 
KinesisDeserializationSchema],
+                 config_props: Dict
+                 ):
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in config_props.items():
+            j_properties.setProperty(key, value)
+
+        JKinesisConsumer = 
gateway.jvm.org.apache.flink.streaming.connectors.kinesis. \
+            FlinkKinesisConsumer
+
+        self._j_kinesis_consumer = JKinesisConsumer(

Review Comment:
   I guess it will fail for the following case: `KinesisConsumer(str, 
DeserializationSchema, Dict)`.
   
   Could convert streams to list if it's a string, deserializer of type 
DeserializationSchema to KinesisDeserializationSchema and always use the 
following constructor to address it.
   ```
   public FlinkKinesisConsumer(
               List<String> streams,
               KinesisDeserializationSchema<T> deserializer,
               Properties configProps) {
   ```



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, _j_kinesis_deserialization_schema):
+        if _j_kinesis_deserialization_schema is None:
+            self._j_kinesis_deserialization_schema = 
get_gateway().jvm.org.apache.flink \
+                
.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
+        else:
+            self._j_kinesis_deserialization_schema = 
_j_kinesis_deserialization_schema
+
+
+class AssignerWithPeriodicWatermarks(object):
+    """
+    The AssignerWithPeriodicWatermarks assigns event time timestamps to 
elements, and generates
+    low watermarks that signal event time progress within the stream. These 
timestamps and
+    watermarks are used by functions and operators that operate on event time, 
for example event
+    time windows.
+    """
+
+    def __init__(self, _j_assigner_with_periodic_watermarks):
+        self._j_assigner_with_periodic_watermarks = 
_j_assigner_with_periodic_watermarks
+
+
+class WatermarkTracker(object):
+    """
+    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
+    or other operators.The class essentially functions like a distributed hash 
table that enclosing
+    operators can use to adopt their processing / IO rates
+    """
+
+
+def job_manager_watermark_tracker(aggregate_name: str,
+                                  log_accumulator_interval_millis: int = -1) \
+        -> 'WatermarkTracker':
+    return 
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
+        .JobManagerWatermarkTracker(aggregate_name, 
log_accumulator_interval_millis)
+
+
+class KinesisConsumer(SourceFunction):
+    """
+    The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to
+    multiple AWS Kinesis streams within the same AWS service region, and can 
handle resharding of
+    streams. Each subtask of the consumer is responsible for fetching data 
records from multiple
+    Kinesis shards. The number of shards fetched by each subtask will change 
as shards are closed
+    and created by Kinesis.
+
+    To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees,
+    the Flink Kinesis consumer is implemented with the AWS Java SDK, instead 
of the officially
+    recommended AWS Kinesis Client Library, for low-level control on the 
management of stream state.
+    The Flink Kinesis Connector also supports setting the initial starting 
points of Kinesis
+    streams, namely TRIM_HORIZON and LATEST.
+
+    Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, 
while sequential,
+    cannot be assumed to be consecutive. There is no perfect generic default 
assignment function.
+    Default shard to subtask assignment, which is based on hash code, may 
result in skew, with some
+    subtasks having many shards assigned and others none.
+
+    It is recommended to monitor the shard distribution and adjust assignment 
appropriately.
+    A custom assigner implementation can be set via 
setShardAssigner(KinesisShardAssigner) to
+    optimize the hash function or use static overrides to limit skew.
+
+    In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via
+    setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto 
watermark emit
+    interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
+
+    Watermarks can only advance when all shards of a subtask continuously 
deliver records.
+    To avoid an inactive or closed shard to block the watermark progress, the 
idle timeout should
+    be configured via configuration property 
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
+    By default, shards won't be considered idle and watermark calculation will 
wait for newer
+    records to arrive from all shards.
+
+    Note that re-sharding of the Kinesis stream while an application (that 
relies on the Kinesis
+    records for watermarking) is running can lead to incorrect late events. 
This depends on how
+    shards are assigned to subtasks and applies regardless of whether 
watermarks are generated in
+    the source or a downstream operator.
+    """
+
+    def __init__(self,
+                 streams: Union[str, List[str]],
+                 deserializer: Union[DeserializationSchema, 
KinesisDeserializationSchema],
+                 config_props: Dict
+                 ):
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in config_props.items():
+            j_properties.setProperty(key, value)
+
+        JKinesisConsumer = 
gateway.jvm.org.apache.flink.streaming.connectors.kinesis. \
+            FlinkKinesisConsumer
+
+        self._j_kinesis_consumer = JKinesisConsumer(
+            streams,
+            deserializer._j_deserialization_schema
+            if isinstance(deserializer, DeserializationSchema)
+            else deserializer._j_kinesis_deserialization_schema,
+            j_properties)
+
+        super(KinesisConsumer, self).__init__(self._j_kinesis_consumer)
+
+    def set_shard_assigner(self, shard_assigner: KinesisShardAssigner) -> 
'KinesisConsumer':
+        """
+        Provide a custom assigner to influence how shards are distributed over 
subtasks.
+        """
+        
self._j_kinesis_consumer.setShardAssigner(shard_assigner._j_kinesis_shard_assigner)
+        return self
+
+    def set_periodic_watermark_assigner(self,
+                                        periodic_watermark_assigner:
+                                        AssignerWithPeriodicWatermarks) -> 
'KinesisConsumer':
+        """
+        Set the assigner that will extract the timestamp from T and calculate 
the watermark.
+        """
+        self._j_kinesis_consumer.setPeriodicWatermarkAssigner(
+            periodic_watermark_assigner._j_assigner_with_periodic_watermarks)
+        return self
+
+    def set_watermark_tracker(self, watermark_tracker: WatermarkTracker) -> 
'KinesisConsumer':
+        """
+        Set the global watermark tracker. When set, it will be used by the 
fetcher to align the
+        shard consumers by event time.
+        """
+        self._j_kinesis_consumer.setWatermarkTracker(watermark_tracker)
+        return self
+
+
+# ---- KinesisSink ----
+
+class PartitionKeyGenerator(object):
+    def __init__(self, j_partition_key_generator):
+        self.j_partition_key_generator = j_partition_key_generator
+
+    @staticmethod
+    def fixed_kinesis_partition_key_generator() -> 'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     FixedKinesisPartitionKeyGenerator())
+
+    @staticmethod
+    def random_kinesis_partition_key_generator() -> 'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     RandomKinesisPartitionKeyGenerator())
+
+    @staticmethod
+    def row_data_fields_kinesis_partition_key_generator() -> 
'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     
RowDataFieldsKinesisPartitionKeyGenerator())
+
+
+class KinesisSink(Sink):
+    """
+    A Kinesis Data Streams (KDS) Sink that performs async requests against a 
destination stream
+    using the buffering protocol specified in AsyncSinkBase.
+
+    The sink internally uses a 
software.amazon.awssdk.services.kinesis.KinesisAsyncClient to
+    communicate with the AWS endpoint.
+
+    The behaviour of the buffering may be specified by providing configuration 
during the sink
+    build time.
+
+    - maxBatchSize: the maximum size of a batch of entries that may be sent to 
KDS
+    - maxInFlightRequests: the maximum number of in flight requests that may 
exist, if any more in
+        flight requests need to be initiated once the maximum has been 
reached, then it will be
+        blocked until some have completed
+    - maxBufferedRequests: the maximum number of elements held in the buffer, 
requests to add
+        elements will be blocked while the number of elements in the buffer is 
at the maximum
+    - maxBatchSizeInBytes: the maximum size of a batch of entries that may be 
sent to KDS
+        measured in bytes
+    - maxTimeInBufferMS: the maximum amount of time an entry is allowed to 
live in the buffer,
+        if any element reaches this age, the entire buffer will be flushed 
immediately
+    - maxRecordSizeInBytes: the maximum size of a record the sink will accept 
into the buffer,
+        a record of size larger than this will be rejected when passed to the 
sink
+    - failOnError: when an exception is encountered while persisting to 
Kinesis Data Streams,
+        the job will fail immediately if failOnError is set
+    """
+
+    def __init__(self, j_kinesis_sink):
+        super(KinesisSink, self).__init__(sink=j_kinesis_sink)
+
+    @staticmethod
+    def builder() -> 'KinesisSinkBuilder':
+        return KinesisSinkBuilder()
+
+
+class KinesisSinkBuilder(object):
+    """
+    Builder to construct KinesisStreamsSink.
+
+    The following example shows the minimum setup to create a 
KinesisStreamsSink that writes String
+    values to a Kinesis Data Streams stream named your_stream_here.
+
+    Example:
+    ::
+
+        >>> sink = KinesisSink.builder() \\
+        ...     .set_kinesis_client_properties(SINK_PROPERTIES) \\
+        ...     .set_stream_name(STREAM_NAME) \\
+        ...     .set_serialization_schema(SimpleStringSchema()) \\
+        ...     .build()
+
+    If the following parameters are not set in this builder, the following 
defaults will be used:
+
+    - maxBatchSize will be 500
+    - maxInFlightRequests will be 50
+    - maxBufferedRequests will be 10000
+    - maxBatchSizeInBytes will be 5 MB i.e. 5 * 1024 * 1024
+    - maxTimeInBufferMS will be 5000ms
+    - maxRecordSizeInBytes will be 1 MB i.e. 1 * 1024 * 1024
+    - failOnError will be false
+    """
+
+    def __init__(self):
+        super().__init__()
+        JKinesisSink = 
get_gateway().jvm.org.apache.flink.connector.kinesis.sink.KinesisStreamsSink
+        self._j_kinesis_sink_builder = JKinesisSink.builder()
+
+    def set_stream_name(self, stream_name: Union[str, List[str]]) -> 
'KinesisSinkBuilder':
+        """
+        Sets the name of the KDS stream that the sink will connect to. There 
is no default for this
+        parameter, therefore, this must be provided at sink creation time 
otherwise the build will
+        fail.
+        """
+        self._j_kinesis_sink_builder.setStreamName(stream_name)
+        return self
+
+    def set_serialization_schema(self, serialization_schema: 
SerializationSchema) \
+            -> 'KinesisSinkBuilder':
+        """
+        Sets the SerializationSchema of the KinesisSinkBuilder.
+        """
+        self._j_kinesis_sink_builder.setSerializationSchema(
+            serialization_schema._j_serialization_schema)
+        return self
+
+    def set_partition_key_generator(self, partition_key_generator: 
PartitionKeyGenerator) \
+            -> 'KinesisSinkBuilder':
+        """
+        Sets the PartitionKeyGenerator of the KinesisSinkBuilder.
+        """
+        self._j_kinesis_sink_builder.setPartitionKeyGenerator(
+            partition_key_generator.j_partition_key_generator)
+        return self
+
+    def set_fail_on_error(self, fail_on_error: bool) -> 'KinesisSinkBuilder':
+        """
+        Sets the failOnError of the KinesisSinkBuilder. If failOnError is on, 
then a runtime
+        exception will be raised. Otherwise, those records will be requested 
in the buffer for
+        retry.
+        """
+        self._j_kinesis_sink_builder.setFailOnError(fail_on_error)
+        return self
+
+    def set_kinesis_client_properties(self, kinesis_client_properties: Dict) \
+            -> 'KinesisSinkBuilder':
+        """
+        Sets the kinesisClientProperties of the KinesisSinkBuilder.
+        """
+        j_properties = get_gateway().jvm.java.util.Properties()
+        for key, value in kinesis_client_properties.items():
+            j_properties.setProperty(key, value)
+        self._j_kinesis_sink_builder.setKinesisClientProperties(j_properties)
+        return self
+
+    def set_max_batch_size(self, max_batch_size: int) -> 'KinesisSinkBuilder':
+        """
+        Maximum number of elements that may be passed in a list to be written 
downstream.
+        """
+        self._j_kinesis_sink_builder.setMaxBatchSize(max_batch_size)
+        return self
+
+    def set_max_in_flight_requests(self, max_in_flight_requests):

Review Comment:
   ```suggestion
       def set_max_in_flight_requests(self, max_in_flight_requests: int):
   ```
   
   Also check the below methods.



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, _j_kinesis_deserialization_schema):
+        if _j_kinesis_deserialization_schema is None:
+            self._j_kinesis_deserialization_schema = 
get_gateway().jvm.org.apache.flink \
+                
.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
+        else:
+            self._j_kinesis_deserialization_schema = 
_j_kinesis_deserialization_schema
+
+
+class AssignerWithPeriodicWatermarks(object):
+    """
+    The AssignerWithPeriodicWatermarks assigns event time timestamps to 
elements, and generates
+    low watermarks that signal event time progress within the stream. These 
timestamps and
+    watermarks are used by functions and operators that operate on event time, 
for example event
+    time windows.
+    """
+
+    def __init__(self, _j_assigner_with_periodic_watermarks):
+        self._j_assigner_with_periodic_watermarks = 
_j_assigner_with_periodic_watermarks
+
+
+class WatermarkTracker(object):
+    """
+    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
+    or other operators.The class essentially functions like a distributed hash 
table that enclosing
+    operators can use to adopt their processing / IO rates
+    """
+
+
+def job_manager_watermark_tracker(aggregate_name: str,
+                                  log_accumulator_interval_millis: int = -1) \
+        -> 'WatermarkTracker':
+    return 
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
+        .JobManagerWatermarkTracker(aggregate_name, 
log_accumulator_interval_millis)
+
+
+class KinesisConsumer(SourceFunction):
+    """
+    The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to
+    multiple AWS Kinesis streams within the same AWS service region, and can 
handle resharding of
+    streams. Each subtask of the consumer is responsible for fetching data 
records from multiple
+    Kinesis shards. The number of shards fetched by each subtask will change 
as shards are closed
+    and created by Kinesis.
+
+    To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees,
+    the Flink Kinesis consumer is implemented with the AWS Java SDK, instead 
of the officially
+    recommended AWS Kinesis Client Library, for low-level control on the 
management of stream state.
+    The Flink Kinesis Connector also supports setting the initial starting 
points of Kinesis
+    streams, namely TRIM_HORIZON and LATEST.
+
+    Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, 
while sequential,
+    cannot be assumed to be consecutive. There is no perfect generic default 
assignment function.
+    Default shard to subtask assignment, which is based on hash code, may 
result in skew, with some
+    subtasks having many shards assigned and others none.
+
+    It is recommended to monitor the shard distribution and adjust assignment 
appropriately.
+    A custom assigner implementation can be set via 
setShardAssigner(KinesisShardAssigner) to
+    optimize the hash function or use static overrides to limit skew.
+
+    In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via
+    setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto 
watermark emit
+    interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
+
+    Watermarks can only advance when all shards of a subtask continuously 
deliver records.
+    To avoid an inactive or closed shard to block the watermark progress, the 
idle timeout should
+    be configured via configuration property 
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
+    By default, shards won't be considered idle and watermark calculation will 
wait for newer
+    records to arrive from all shards.
+
+    Note that re-sharding of the Kinesis stream while an application (that 
relies on the Kinesis
+    records for watermarking) is running can lead to incorrect late events. 
This depends on how
+    shards are assigned to subtasks and applies regardless of whether 
watermarks are generated in
+    the source or a downstream operator.
+    """
+
+    def __init__(self,
+                 streams: Union[str, List[str]],
+                 deserializer: Union[DeserializationSchema, 
KinesisDeserializationSchema],
+                 config_props: Dict
+                 ):
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in config_props.items():
+            j_properties.setProperty(key, value)
+
+        JKinesisConsumer = 
gateway.jvm.org.apache.flink.streaming.connectors.kinesis. \
+            FlinkKinesisConsumer
+
+        self._j_kinesis_consumer = JKinesisConsumer(
+            streams,
+            deserializer._j_deserialization_schema
+            if isinstance(deserializer, DeserializationSchema)
+            else deserializer._j_kinesis_deserialization_schema,
+            j_properties)
+
+        super(KinesisConsumer, self).__init__(self._j_kinesis_consumer)
+
+    def set_shard_assigner(self, shard_assigner: KinesisShardAssigner) -> 
'KinesisConsumer':
+        """
+        Provide a custom assigner to influence how shards are distributed over 
subtasks.
+        """
+        
self._j_kinesis_consumer.setShardAssigner(shard_assigner._j_kinesis_shard_assigner)
+        return self
+
+    def set_periodic_watermark_assigner(self,
+                                        periodic_watermark_assigner:
+                                        AssignerWithPeriodicWatermarks) -> 
'KinesisConsumer':
+        """
+        Set the assigner that will extract the timestamp from T and calculate 
the watermark.
+        """
+        self._j_kinesis_consumer.setPeriodicWatermarkAssigner(
+            periodic_watermark_assigner._j_assigner_with_periodic_watermarks)
+        return self
+
+    def set_watermark_tracker(self, watermark_tracker: WatermarkTracker) -> 
'KinesisConsumer':
+        """
+        Set the global watermark tracker. When set, it will be used by the 
fetcher to align the
+        shard consumers by event time.
+        """
+        self._j_kinesis_consumer.setWatermarkTracker(watermark_tracker)
+        return self
+
+
+# ---- KinesisSink ----
+
+class PartitionKeyGenerator(object):
+    def __init__(self, j_partition_key_generator):
+        self.j_partition_key_generator = j_partition_key_generator
+
+    @staticmethod
+    def fixed_kinesis_partition_key_generator() -> 'PartitionKeyGenerator':

Review Comment:
   ```suggestion
       def fixed() -> 'PartitionKeyGenerator':
   ```



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, _j_kinesis_deserialization_schema):
+        if _j_kinesis_deserialization_schema is None:
+            self._j_kinesis_deserialization_schema = 
get_gateway().jvm.org.apache.flink \
+                
.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
+        else:
+            self._j_kinesis_deserialization_schema = 
_j_kinesis_deserialization_schema
+
+
+class AssignerWithPeriodicWatermarks(object):
+    """
+    The AssignerWithPeriodicWatermarks assigns event time timestamps to 
elements, and generates
+    low watermarks that signal event time progress within the stream. These 
timestamps and
+    watermarks are used by functions and operators that operate on event time, 
for example event
+    time windows.
+    """
+
+    def __init__(self, _j_assigner_with_periodic_watermarks):
+        self._j_assigner_with_periodic_watermarks = 
_j_assigner_with_periodic_watermarks
+
+
+class WatermarkTracker(object):
+    """
+    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
+    or other operators.The class essentially functions like a distributed hash 
table that enclosing
+    operators can use to adopt their processing / IO rates
+    """

Review Comment:
   Add static method job_manager_watermark_tracker which returns 
JobManagerWatermarkTracker? Otherwise, however users use it?



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, _j_kinesis_deserialization_schema):
+        if _j_kinesis_deserialization_schema is None:
+            self._j_kinesis_deserialization_schema = 
get_gateway().jvm.org.apache.flink \
+                
.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
+        else:
+            self._j_kinesis_deserialization_schema = 
_j_kinesis_deserialization_schema
+
+
+class AssignerWithPeriodicWatermarks(object):
+    """
+    The AssignerWithPeriodicWatermarks assigns event time timestamps to 
elements, and generates
+    low watermarks that signal event time progress within the stream. These 
timestamps and
+    watermarks are used by functions and operators that operate on event time, 
for example event
+    time windows.
+    """
+
+    def __init__(self, _j_assigner_with_periodic_watermarks):
+        self._j_assigner_with_periodic_watermarks = 
_j_assigner_with_periodic_watermarks
+
+
+class WatermarkTracker(object):
+    """
+    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
+    or other operators.The class essentially functions like a distributed hash 
table that enclosing
+    operators can use to adopt their processing / IO rates
+    """
+
+
+def job_manager_watermark_tracker(aggregate_name: str,
+                                  log_accumulator_interval_millis: int = -1) \
+        -> 'WatermarkTracker':
+    return 
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
+        .JobManagerWatermarkTracker(aggregate_name, 
log_accumulator_interval_millis)
+
+
+class KinesisConsumer(SourceFunction):
+    """
+    The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to
+    multiple AWS Kinesis streams within the same AWS service region, and can 
handle resharding of
+    streams. Each subtask of the consumer is responsible for fetching data 
records from multiple
+    Kinesis shards. The number of shards fetched by each subtask will change 
as shards are closed
+    and created by Kinesis.
+
+    To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees,
+    the Flink Kinesis consumer is implemented with the AWS Java SDK, instead 
of the officially
+    recommended AWS Kinesis Client Library, for low-level control on the 
management of stream state.
+    The Flink Kinesis Connector also supports setting the initial starting 
points of Kinesis
+    streams, namely TRIM_HORIZON and LATEST.
+
+    Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, 
while sequential,
+    cannot be assumed to be consecutive. There is no perfect generic default 
assignment function.
+    Default shard to subtask assignment, which is based on hash code, may 
result in skew, with some
+    subtasks having many shards assigned and others none.
+
+    It is recommended to monitor the shard distribution and adjust assignment 
appropriately.
+    A custom assigner implementation can be set via 
setShardAssigner(KinesisShardAssigner) to
+    optimize the hash function or use static overrides to limit skew.
+
+    In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via
+    setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto 
watermark emit
+    interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
+
+    Watermarks can only advance when all shards of a subtask continuously 
deliver records.
+    To avoid an inactive or closed shard to block the watermark progress, the 
idle timeout should
+    be configured via configuration property 
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
+    By default, shards won't be considered idle and watermark calculation will 
wait for newer
+    records to arrive from all shards.
+
+    Note that re-sharding of the Kinesis stream while an application (that 
relies on the Kinesis
+    records for watermarking) is running can lead to incorrect late events. 
This depends on how
+    shards are assigned to subtasks and applies regardless of whether 
watermarks are generated in
+    the source or a downstream operator.
+    """
+
+    def __init__(self,
+                 streams: Union[str, List[str]],
+                 deserializer: Union[DeserializationSchema, 
KinesisDeserializationSchema],
+                 config_props: Dict
+                 ):
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in config_props.items():
+            j_properties.setProperty(key, value)
+
+        JKinesisConsumer = 
gateway.jvm.org.apache.flink.streaming.connectors.kinesis. \
+            FlinkKinesisConsumer
+
+        self._j_kinesis_consumer = JKinesisConsumer(
+            streams,
+            deserializer._j_deserialization_schema
+            if isinstance(deserializer, DeserializationSchema)
+            else deserializer._j_kinesis_deserialization_schema,
+            j_properties)
+
+        super(KinesisConsumer, self).__init__(self._j_kinesis_consumer)
+
+    def set_shard_assigner(self, shard_assigner: KinesisShardAssigner) -> 
'KinesisConsumer':
+        """
+        Provide a custom assigner to influence how shards are distributed over 
subtasks.
+        """
+        
self._j_kinesis_consumer.setShardAssigner(shard_assigner._j_kinesis_shard_assigner)
+        return self
+
+    def set_periodic_watermark_assigner(self,
+                                        periodic_watermark_assigner:
+                                        AssignerWithPeriodicWatermarks) -> 
'KinesisConsumer':
+        """
+        Set the assigner that will extract the timestamp from T and calculate 
the watermark.
+        """
+        self._j_kinesis_consumer.setPeriodicWatermarkAssigner(
+            periodic_watermark_assigner._j_assigner_with_periodic_watermarks)
+        return self
+
+    def set_watermark_tracker(self, watermark_tracker: WatermarkTracker) -> 
'KinesisConsumer':
+        """
+        Set the global watermark tracker. When set, it will be used by the 
fetcher to align the
+        shard consumers by event time.
+        """
+        self._j_kinesis_consumer.setWatermarkTracker(watermark_tracker)
+        return self
+
+
+# ---- KinesisSink ----
+
+class PartitionKeyGenerator(object):
+    def __init__(self, j_partition_key_generator):
+        self.j_partition_key_generator = j_partition_key_generator
+
+    @staticmethod
+    def fixed_kinesis_partition_key_generator() -> 'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     FixedKinesisPartitionKeyGenerator())
+
+    @staticmethod
+    def random_kinesis_partition_key_generator() -> 'PartitionKeyGenerator':

Review Comment:
   ```suggestion
       def random() -> 'PartitionKeyGenerator':
   ```



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.

Review Comment:
   ```
       See assign(StreamShardHandle, int) for details.
   ```
   This documentation doesn't really make sense for Python users. However, you 
could document that users could provide a Java KinesisShardAssigner and use it 
in Python if they want to provide custom shared assigner.



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, _j_kinesis_deserialization_schema):
+        if _j_kinesis_deserialization_schema is None:
+            self._j_kinesis_deserialization_schema = 
get_gateway().jvm.org.apache.flink \
+                
.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
+        else:
+            self._j_kinesis_deserialization_schema = 
_j_kinesis_deserialization_schema
+
+
+class AssignerWithPeriodicWatermarks(object):
+    """
+    The AssignerWithPeriodicWatermarks assigns event time timestamps to 
elements, and generates
+    low watermarks that signal event time progress within the stream. These 
timestamps and
+    watermarks are used by functions and operators that operate on event time, 
for example event
+    time windows.
+    """
+
+    def __init__(self, _j_assigner_with_periodic_watermarks):
+        self._j_assigner_with_periodic_watermarks = 
_j_assigner_with_periodic_watermarks
+
+
+class WatermarkTracker(object):
+    """
+    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
+    or other operators.The class essentially functions like a distributed hash 
table that enclosing
+    operators can use to adopt their processing / IO rates
+    """
+
+
+def job_manager_watermark_tracker(aggregate_name: str,
+                                  log_accumulator_interval_millis: int = -1) \
+        -> 'WatermarkTracker':
+    return 
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
+        .JobManagerWatermarkTracker(aggregate_name, 
log_accumulator_interval_millis)
+
+
+class KinesisConsumer(SourceFunction):

Review Comment:
   ```suggestion
   class FlinkKinesisConsumer(SourceFunction):
   ```
   Keep the naming convention consistent with the Java API



##########
flink-python/pyflink/datastream/tests/test_connectors.py:
##########
@@ -517,3 +519,65 @@ def test_seq_source(self):
         to_field = seq_source_clz.getDeclaredField("to")
         to_field.setAccessible(True)
         self.assertEqual(10, to_field.get(seq_source.get_java_function()))
+
+
+class FlinkKinesisTest(ConnectorTestBase):
+    @classmethod
+    def _get_jars_relative_path(cls):
+        return '/flink-connectors/flink-sql-connector-kinesis'
+
+    def test_kinesis_source(self):
+        consumer_config = dict()
+        consumer_config['aws.region'] = 'us-east-1'
+        consumer_config['aws.credentials.provider.basic.accesskeyid'] = 
'aws_access_key_id'
+        consumer_config['aws.credentials.provider.basic.secretkey'] = 
'aws_secret_access_key'
+        consumer_config['flink.stream.initpos'] = 'LATEST'
+
+        JKinesisDeserializationSchema = 
get_gateway().jvm.org.apache.flink.streaming.connectors.\
+            kinesis.serialization.KinesisDeserializationSchemaWrapper
+        schema = 
JKinesisDeserializationSchema(SimpleStringSchema()._j_deserialization_schema)
+
+        kinesis_source = KinesisConsumer("stream-1", 
KinesisDeserializationSchema(schema),
+                                         consumer_config)
+
+        watermark_tracker = job_manager_watermark_tracker("myKinesisSource")
+        kinesis_source.set_watermark_tracker(watermark_tracker)
+
+        ds = self.env.add_source(source_func=kinesis_source, 
source_name="kinesis source")
+        ds.print()

Review Comment:
   remove this line



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, _j_kinesis_deserialization_schema):
+        if _j_kinesis_deserialization_schema is None:
+            self._j_kinesis_deserialization_schema = 
get_gateway().jvm.org.apache.flink \
+                
.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
+        else:
+            self._j_kinesis_deserialization_schema = 
_j_kinesis_deserialization_schema
+
+
+class AssignerWithPeriodicWatermarks(object):

Review Comment:
   Rename it to AssignerWithPeriodicWatermarksWrapper and move it to 
watermark_strategy.py?



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':

Review Comment:
   Add documentation for it?



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+

Review Comment:
   Also add the default_shard_assigner() ? Otherwise, I'm afraid that users may 
think that this is the only available shard assigner.



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, _j_kinesis_deserialization_schema):
+        if _j_kinesis_deserialization_schema is None:
+            self._j_kinesis_deserialization_schema = 
get_gateway().jvm.org.apache.flink \
+                
.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
+        else:
+            self._j_kinesis_deserialization_schema = 
_j_kinesis_deserialization_schema
+
+
+class AssignerWithPeriodicWatermarks(object):
+    """
+    The AssignerWithPeriodicWatermarks assigns event time timestamps to 
elements, and generates
+    low watermarks that signal event time progress within the stream. These 
timestamps and
+    watermarks are used by functions and operators that operate on event time, 
for example event
+    time windows.
+    """
+
+    def __init__(self, _j_assigner_with_periodic_watermarks):
+        self._j_assigner_with_periodic_watermarks = 
_j_assigner_with_periodic_watermarks
+
+
+class WatermarkTracker(object):
+    """
+    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
+    or other operators.The class essentially functions like a distributed hash 
table that enclosing
+    operators can use to adopt their processing / IO rates
+    """
+
+
+def job_manager_watermark_tracker(aggregate_name: str,
+                                  log_accumulator_interval_millis: int = -1) \
+        -> 'WatermarkTracker':
+    return 
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
+        .JobManagerWatermarkTracker(aggregate_name, 
log_accumulator_interval_millis)
+
+
+class KinesisConsumer(SourceFunction):
+    """
+    The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to
+    multiple AWS Kinesis streams within the same AWS service region, and can 
handle resharding of
+    streams. Each subtask of the consumer is responsible for fetching data 
records from multiple
+    Kinesis shards. The number of shards fetched by each subtask will change 
as shards are closed
+    and created by Kinesis.
+
+    To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees,
+    the Flink Kinesis consumer is implemented with the AWS Java SDK, instead 
of the officially
+    recommended AWS Kinesis Client Library, for low-level control on the 
management of stream state.
+    The Flink Kinesis Connector also supports setting the initial starting 
points of Kinesis
+    streams, namely TRIM_HORIZON and LATEST.
+
+    Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, 
while sequential,
+    cannot be assumed to be consecutive. There is no perfect generic default 
assignment function.
+    Default shard to subtask assignment, which is based on hash code, may 
result in skew, with some
+    subtasks having many shards assigned and others none.
+
+    It is recommended to monitor the shard distribution and adjust assignment 
appropriately.
+    A custom assigner implementation can be set via 
setShardAssigner(KinesisShardAssigner) to
+    optimize the hash function or use static overrides to limit skew.
+
+    In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via
+    setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto 
watermark emit
+    interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
+
+    Watermarks can only advance when all shards of a subtask continuously 
deliver records.
+    To avoid an inactive or closed shard to block the watermark progress, the 
idle timeout should
+    be configured via configuration property 
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
+    By default, shards won't be considered idle and watermark calculation will 
wait for newer
+    records to arrive from all shards.
+
+    Note that re-sharding of the Kinesis stream while an application (that 
relies on the Kinesis
+    records for watermarking) is running can lead to incorrect late events. 
This depends on how
+    shards are assigned to subtasks and applies regardless of whether 
watermarks are generated in
+    the source or a downstream operator.
+    """
+
+    def __init__(self,
+                 streams: Union[str, List[str]],
+                 deserializer: Union[DeserializationSchema, 
KinesisDeserializationSchema],
+                 config_props: Dict
+                 ):
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in config_props.items():
+            j_properties.setProperty(key, value)
+
+        JKinesisConsumer = 
gateway.jvm.org.apache.flink.streaming.connectors.kinesis. \
+            FlinkKinesisConsumer
+
+        self._j_kinesis_consumer = JKinesisConsumer(
+            streams,
+            deserializer._j_deserialization_schema
+            if isinstance(deserializer, DeserializationSchema)
+            else deserializer._j_kinesis_deserialization_schema,
+            j_properties)
+
+        super(KinesisConsumer, self).__init__(self._j_kinesis_consumer)
+
+    def set_shard_assigner(self, shard_assigner: KinesisShardAssigner) -> 
'KinesisConsumer':
+        """
+        Provide a custom assigner to influence how shards are distributed over 
subtasks.
+        """
+        
self._j_kinesis_consumer.setShardAssigner(shard_assigner._j_kinesis_shard_assigner)
+        return self
+
+    def set_periodic_watermark_assigner(self,
+                                        periodic_watermark_assigner:
+                                        AssignerWithPeriodicWatermarks) -> 
'KinesisConsumer':
+        """
+        Set the assigner that will extract the timestamp from T and calculate 
the watermark.
+        """
+        self._j_kinesis_consumer.setPeriodicWatermarkAssigner(
+            periodic_watermark_assigner._j_assigner_with_periodic_watermarks)
+        return self
+
+    def set_watermark_tracker(self, watermark_tracker: WatermarkTracker) -> 
'KinesisConsumer':
+        """
+        Set the global watermark tracker. When set, it will be used by the 
fetcher to align the
+        shard consumers by event time.
+        """
+        self._j_kinesis_consumer.setWatermarkTracker(watermark_tracker)
+        return self
+
+
+# ---- KinesisSink ----
+
+class PartitionKeyGenerator(object):
+    def __init__(self, j_partition_key_generator):
+        self.j_partition_key_generator = j_partition_key_generator
+
+    @staticmethod
+    def fixed_kinesis_partition_key_generator() -> 'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     FixedKinesisPartitionKeyGenerator())
+
+    @staticmethod
+    def random_kinesis_partition_key_generator() -> 'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     RandomKinesisPartitionKeyGenerator())
+
+    @staticmethod
+    def row_data_fields_kinesis_partition_key_generator() -> 
'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     
RowDataFieldsKinesisPartitionKeyGenerator())
+
+
+class KinesisSink(Sink):

Review Comment:
   ```suggestion
   class KinesisStreamsSink(Sink):
   ```



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, _j_kinesis_deserialization_schema):
+        if _j_kinesis_deserialization_schema is None:
+            self._j_kinesis_deserialization_schema = 
get_gateway().jvm.org.apache.flink \
+                
.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
+        else:
+            self._j_kinesis_deserialization_schema = 
_j_kinesis_deserialization_schema
+
+
+class AssignerWithPeriodicWatermarks(object):
+    """
+    The AssignerWithPeriodicWatermarks assigns event time timestamps to 
elements, and generates
+    low watermarks that signal event time progress within the stream. These 
timestamps and
+    watermarks are used by functions and operators that operate on event time, 
for example event
+    time windows.
+    """
+
+    def __init__(self, _j_assigner_with_periodic_watermarks):
+        self._j_assigner_with_periodic_watermarks = 
_j_assigner_with_periodic_watermarks
+
+
+class WatermarkTracker(object):
+    """
+    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
+    or other operators.The class essentially functions like a distributed hash 
table that enclosing
+    operators can use to adopt their processing / IO rates
+    """
+
+
+def job_manager_watermark_tracker(aggregate_name: str,
+                                  log_accumulator_interval_millis: int = -1) \
+        -> 'WatermarkTracker':
+    return 
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
+        .JobManagerWatermarkTracker(aggregate_name, 
log_accumulator_interval_millis)
+
+
+class KinesisConsumer(SourceFunction):
+    """
+    The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to
+    multiple AWS Kinesis streams within the same AWS service region, and can 
handle resharding of
+    streams. Each subtask of the consumer is responsible for fetching data 
records from multiple
+    Kinesis shards. The number of shards fetched by each subtask will change 
as shards are closed
+    and created by Kinesis.
+
+    To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees,
+    the Flink Kinesis consumer is implemented with the AWS Java SDK, instead 
of the officially
+    recommended AWS Kinesis Client Library, for low-level control on the 
management of stream state.
+    The Flink Kinesis Connector also supports setting the initial starting 
points of Kinesis
+    streams, namely TRIM_HORIZON and LATEST.
+
+    Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, 
while sequential,
+    cannot be assumed to be consecutive. There is no perfect generic default 
assignment function.
+    Default shard to subtask assignment, which is based on hash code, may 
result in skew, with some
+    subtasks having many shards assigned and others none.
+
+    It is recommended to monitor the shard distribution and adjust assignment 
appropriately.
+    A custom assigner implementation can be set via 
setShardAssigner(KinesisShardAssigner) to
+    optimize the hash function or use static overrides to limit skew.
+
+    In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via
+    setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto 
watermark emit
+    interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
+
+    Watermarks can only advance when all shards of a subtask continuously 
deliver records.
+    To avoid an inactive or closed shard to block the watermark progress, the 
idle timeout should
+    be configured via configuration property 
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
+    By default, shards won't be considered idle and watermark calculation will 
wait for newer
+    records to arrive from all shards.
+
+    Note that re-sharding of the Kinesis stream while an application (that 
relies on the Kinesis
+    records for watermarking) is running can lead to incorrect late events. 
This depends on how
+    shards are assigned to subtasks and applies regardless of whether 
watermarks are generated in
+    the source or a downstream operator.
+    """
+
+    def __init__(self,
+                 streams: Union[str, List[str]],
+                 deserializer: Union[DeserializationSchema, 
KinesisDeserializationSchema],
+                 config_props: Dict
+                 ):
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in config_props.items():
+            j_properties.setProperty(key, value)
+
+        JKinesisConsumer = 
gateway.jvm.org.apache.flink.streaming.connectors.kinesis. \
+            FlinkKinesisConsumer
+
+        self._j_kinesis_consumer = JKinesisConsumer(
+            streams,
+            deserializer._j_deserialization_schema
+            if isinstance(deserializer, DeserializationSchema)
+            else deserializer._j_kinesis_deserialization_schema,
+            j_properties)
+
+        super(KinesisConsumer, self).__init__(self._j_kinesis_consumer)
+
+    def set_shard_assigner(self, shard_assigner: KinesisShardAssigner) -> 
'KinesisConsumer':
+        """
+        Provide a custom assigner to influence how shards are distributed over 
subtasks.
+        """
+        
self._j_kinesis_consumer.setShardAssigner(shard_assigner._j_kinesis_shard_assigner)
+        return self
+
+    def set_periodic_watermark_assigner(self,
+                                        periodic_watermark_assigner:
+                                        AssignerWithPeriodicWatermarks) -> 
'KinesisConsumer':
+        """
+        Set the assigner that will extract the timestamp from T and calculate 
the watermark.
+        """
+        self._j_kinesis_consumer.setPeriodicWatermarkAssigner(
+            periodic_watermark_assigner._j_assigner_with_periodic_watermarks)
+        return self
+
+    def set_watermark_tracker(self, watermark_tracker: WatermarkTracker) -> 
'KinesisConsumer':
+        """
+        Set the global watermark tracker. When set, it will be used by the 
fetcher to align the
+        shard consumers by event time.
+        """
+        self._j_kinesis_consumer.setWatermarkTracker(watermark_tracker)
+        return self
+
+
+# ---- KinesisSink ----
+
+class PartitionKeyGenerator(object):
+    def __init__(self, j_partition_key_generator):
+        self.j_partition_key_generator = j_partition_key_generator
+
+    @staticmethod
+    def fixed_kinesis_partition_key_generator() -> 'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     FixedKinesisPartitionKeyGenerator())
+
+    @staticmethod
+    def random_kinesis_partition_key_generator() -> 'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     RandomKinesisPartitionKeyGenerator())
+
+    @staticmethod
+    def row_data_fields_kinesis_partition_key_generator() -> 
'PartitionKeyGenerator':

Review Comment:
   It's used internally in the Table API and so could be removed.



##########
docs/content/docs/connectors/datastream/kinesis.md:
##########
@@ -112,6 +112,23 @@ val kinesis = env.addSource(new 
FlinkKinesisConsumer[String](
     "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
 ```
 {{< /tab >}}
+{{< tab "python" >}}
+```python
+consumer_config= dict() 
+consumer_config['aws.region'] = 'us-east-1'
+consumer_config['aws.credentials.provider.basic.accesskeyid'] = 
'aws_access_key_id'
+consumer_config['aws.credentials.provider.basic.secretkey'] = 
'aws_secret_access_key'
+consumer_config['flink.stream.initpos'] = 'LATEST'
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+JKinesisDeserializationSchema = 
get_gateway().jvm.org.apache.flink.streaming.connectors.
+    kinesis.serialization.KinesisDeserializationSchemaWrapper
+schema = 
JKinesisDeserializationSchema(SimpleStringSchema()._j_deserialization_schema)
+
+kinesis = KinesisConsumer("stream-1", KinesisDeserializationSchema(schema), 
consumer_config)
+```
+{{< /tab >}}

Review Comment:
   Many examples in this file only have Java & Scala examples, why not update 
them?



##########
docs/content/docs/connectors/datastream/kinesis.md:
##########
@@ -112,6 +112,23 @@ val kinesis = env.addSource(new 
FlinkKinesisConsumer[String](
     "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
 ```
 {{< /tab >}}
+{{< tab "python" >}}
+```python
+consumer_config= dict() 
+consumer_config['aws.region'] = 'us-east-1'
+consumer_config['aws.credentials.provider.basic.accesskeyid'] = 
'aws_access_key_id'
+consumer_config['aws.credentials.provider.basic.secretkey'] = 
'aws_secret_access_key'
+consumer_config['flink.stream.initpos'] = 'LATEST'
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+JKinesisDeserializationSchema = 
get_gateway().jvm.org.apache.flink.streaming.connectors.
+    kinesis.serialization.KinesisDeserializationSchemaWrapper
+schema = 
JKinesisDeserializationSchema(SimpleStringSchema()._j_deserialization_schema)
+
+kinesis = KinesisConsumer("stream-1", KinesisDeserializationSchema(schema), 
consumer_config)

Review Comment:
   Why not `KinesisConsumer("stream-1", SimpleStringSchema(), consumer_config)`



##########
docs/content/docs/connectors/datastream/kinesis.md:
##########
@@ -112,6 +112,23 @@ val kinesis = env.addSource(new 
FlinkKinesisConsumer[String](
     "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
 ```
 {{< /tab >}}
+{{< tab "python" >}}

Review Comment:
   ```suggestion
   {{< tab "Python" >}}
   ```



##########
docs/content/docs/connectors/datastream/kinesis.md:
##########
@@ -112,6 +112,23 @@ val kinesis = env.addSource(new 
FlinkKinesisConsumer[String](
     "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
 ```
 {{< /tab >}}
+{{< tab "python" >}}
+```python
+consumer_config= dict() 
+consumer_config['aws.region'] = 'us-east-1'
+consumer_config['aws.credentials.provider.basic.accesskeyid'] = 
'aws_access_key_id'
+consumer_config['aws.credentials.provider.basic.secretkey'] = 
'aws_secret_access_key'
+consumer_config['flink.stream.initpos'] = 'LATEST'

Review Comment:
   What about change it to the following which seems more pythonic:
   ```
   consumer_config = {
       'aws.region': 'us-east-1',
       'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
       'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
       'flink.stream.initpos': 'LATEST'
   }
   ```
   



##########
flink-python/pyflink/datastream/connectors/kinesis.py:
##########
@@ -0,0 +1,368 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can 
implement this interface to
+    optimize distribution of shards over subtasks. See 
assign(StreamShardHandle, int) for details.
+    """
+
+    def __init__(self, _j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = _j_kinesis_shard_assigner
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        return get_gateway().jvm.org.apache.flink.streaming.connectors \
+            .kinesis.util.UniformShardAssigner()
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, _j_kinesis_deserialization_schema):
+        if _j_kinesis_deserialization_schema is None:
+            self._j_kinesis_deserialization_schema = 
get_gateway().jvm.org.apache.flink \
+                
.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema
+        else:
+            self._j_kinesis_deserialization_schema = 
_j_kinesis_deserialization_schema
+
+
+class AssignerWithPeriodicWatermarks(object):
+    """
+    The AssignerWithPeriodicWatermarks assigns event time timestamps to 
elements, and generates
+    low watermarks that signal event time progress within the stream. These 
timestamps and
+    watermarks are used by functions and operators that operate on event time, 
for example event
+    time windows.
+    """
+
+    def __init__(self, _j_assigner_with_periodic_watermarks):
+        self._j_assigner_with_periodic_watermarks = 
_j_assigner_with_periodic_watermarks
+
+
+class WatermarkTracker(object):
+    """
+    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
+    or other operators.The class essentially functions like a distributed hash 
table that enclosing
+    operators can use to adopt their processing / IO rates
+    """
+
+
+def job_manager_watermark_tracker(aggregate_name: str,
+                                  log_accumulator_interval_millis: int = -1) \
+        -> 'WatermarkTracker':
+    return 
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
+        .JobManagerWatermarkTracker(aggregate_name, 
log_accumulator_interval_millis)
+
+
+class KinesisConsumer(SourceFunction):
+    """
+    The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to
+    multiple AWS Kinesis streams within the same AWS service region, and can 
handle resharding of
+    streams. Each subtask of the consumer is responsible for fetching data 
records from multiple
+    Kinesis shards. The number of shards fetched by each subtask will change 
as shards are closed
+    and created by Kinesis.
+
+    To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees,
+    the Flink Kinesis consumer is implemented with the AWS Java SDK, instead 
of the officially
+    recommended AWS Kinesis Client Library, for low-level control on the 
management of stream state.
+    The Flink Kinesis Connector also supports setting the initial starting 
points of Kinesis
+    streams, namely TRIM_HORIZON and LATEST.
+
+    Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, 
while sequential,
+    cannot be assumed to be consecutive. There is no perfect generic default 
assignment function.
+    Default shard to subtask assignment, which is based on hash code, may 
result in skew, with some
+    subtasks having many shards assigned and others none.
+
+    It is recommended to monitor the shard distribution and adjust assignment 
appropriately.
+    A custom assigner implementation can be set via 
setShardAssigner(KinesisShardAssigner) to
+    optimize the hash function or use static overrides to limit skew.
+
+    In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via
+    setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto 
watermark emit
+    interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
+
+    Watermarks can only advance when all shards of a subtask continuously 
deliver records.
+    To avoid an inactive or closed shard to block the watermark progress, the 
idle timeout should
+    be configured via configuration property 
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
+    By default, shards won't be considered idle and watermark calculation will 
wait for newer
+    records to arrive from all shards.
+
+    Note that re-sharding of the Kinesis stream while an application (that 
relies on the Kinesis
+    records for watermarking) is running can lead to incorrect late events. 
This depends on how
+    shards are assigned to subtasks and applies regardless of whether 
watermarks are generated in
+    the source or a downstream operator.
+    """
+
+    def __init__(self,
+                 streams: Union[str, List[str]],
+                 deserializer: Union[DeserializationSchema, 
KinesisDeserializationSchema],
+                 config_props: Dict
+                 ):
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in config_props.items():
+            j_properties.setProperty(key, value)
+
+        JKinesisConsumer = 
gateway.jvm.org.apache.flink.streaming.connectors.kinesis. \
+            FlinkKinesisConsumer
+
+        self._j_kinesis_consumer = JKinesisConsumer(
+            streams,
+            deserializer._j_deserialization_schema
+            if isinstance(deserializer, DeserializationSchema)
+            else deserializer._j_kinesis_deserialization_schema,
+            j_properties)
+
+        super(KinesisConsumer, self).__init__(self._j_kinesis_consumer)
+
+    def set_shard_assigner(self, shard_assigner: KinesisShardAssigner) -> 
'KinesisConsumer':
+        """
+        Provide a custom assigner to influence how shards are distributed over 
subtasks.
+        """
+        
self._j_kinesis_consumer.setShardAssigner(shard_assigner._j_kinesis_shard_assigner)
+        return self
+
+    def set_periodic_watermark_assigner(self,
+                                        periodic_watermark_assigner:
+                                        AssignerWithPeriodicWatermarks) -> 
'KinesisConsumer':
+        """
+        Set the assigner that will extract the timestamp from T and calculate 
the watermark.
+        """
+        self._j_kinesis_consumer.setPeriodicWatermarkAssigner(
+            periodic_watermark_assigner._j_assigner_with_periodic_watermarks)
+        return self
+
+    def set_watermark_tracker(self, watermark_tracker: WatermarkTracker) -> 
'KinesisConsumer':
+        """
+        Set the global watermark tracker. When set, it will be used by the 
fetcher to align the
+        shard consumers by event time.
+        """
+        self._j_kinesis_consumer.setWatermarkTracker(watermark_tracker)
+        return self
+
+
+# ---- KinesisSink ----
+
+class PartitionKeyGenerator(object):
+    def __init__(self, j_partition_key_generator):
+        self.j_partition_key_generator = j_partition_key_generator
+
+    @staticmethod
+    def fixed_kinesis_partition_key_generator() -> 'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     FixedKinesisPartitionKeyGenerator())
+
+    @staticmethod
+    def random_kinesis_partition_key_generator() -> 'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     RandomKinesisPartitionKeyGenerator())
+
+    @staticmethod
+    def row_data_fields_kinesis_partition_key_generator() -> 
'PartitionKeyGenerator':
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     
RowDataFieldsKinesisPartitionKeyGenerator())
+
+
+class KinesisSink(Sink):
+    """
+    A Kinesis Data Streams (KDS) Sink that performs async requests against a 
destination stream

Review Comment:
   Also add KinesisFirehoseSink?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to