This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new b7f2e26  [FLINK-33560][Connectors/AWS] Externalize AWS Python 
connectors from Flink to AWS project
b7f2e26 is described below

commit b7f2e268e377ee18e9133ef1367bdecf04d62d71
Author: Danny Cranmer <[email protected]>
AuthorDate: Mon Dec 18 13:33:42 2023 +0000

    [FLINK-33560][Connectors/AWS] Externalize AWS Python connectors from Flink 
to AWS project
---
 .github/workflows/nightly.yml                      |  10 +-
 .github/workflows/push_pr.yml                      |   8 +
 .gitignore                                         |  17 +-
 .../nightly.yml => flink-python/MANIFEST.in        |  18 +-
 flink-python/README.txt                            |  14 +
 flink-python/dev/integration_test.sh               |  50 ++
 flink-python/pom.xml                               | 173 +++++++
 .../pyflink/datastream/connectors/kinesis.py       | 548 +++++++++++++++++++++
 .../datastream/connectors/tests/test_kinesis.py    | 108 ++++
 flink-python/pyflink/pyflink_gateway_server.py     | 288 +++++++++++
 flink-python/setup.py                              | 153 ++++++
 flink-python/tox.ini                               |  51 ++
 pom.xml                                            |   3 +-
 13 files changed, 1421 insertions(+), 20 deletions(-)

diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml
index d50e7a9..a30d95a 100644
--- a/.github/workflows/nightly.yml
+++ b/.github/workflows/nightly.yml
@@ -31,4 +31,12 @@ jobs:
       flink_version: ${{ matrix.flink }}
       flink_url: https://s3.amazonaws.com/flink-nightly/flink-${{ matrix.flink 
}}-bin-scala_2.12.tgz
       cache_flink_binary: false
-    secrets: inherit
\ No newline at end of file
+    secrets: inherit
+
+  python_test:
+    strategy:
+      matrix:
+        flink: [1.17-SNAPSHOT, 1.18-SNAPSHOT, 1.19-SNAPSHOT]
+    uses: 
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
+    with:
+      flink_version: ${{ matrix.flink }}
diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 82da4df..cfbe8c0 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -32,3 +32,11 @@ jobs:
       flink_url: https://archive.apache.org/dist/flink/flink-${{ matrix.flink 
}}/flink-${{ matrix.flink }}-bin-scala_2.12.tgz
       cache_flink_binary: true
     secrets: inherit
+
+  python_test:
+    strategy:
+      matrix:
+        flink: [ 1.17.1, 1.18.0 ]
+    uses: 
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
+    with:
+      flink_version: ${{ matrix.flink }}
diff --git a/.gitignore b/.gitignore
index 973e8d5..2deddc5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,4 +36,19 @@ tools/flink
 tools/flink-*
 tools/releasing/release
 tools/japicmp-output
-*/.idea/
\ No newline at end of file
+*/.idea/
+
+flink-python/pyflink/datastream/connectors/aws_connector_version.py
+flink-python/apache_flink_connector_aws.egg-info/
+flink-python/.tox/
+flink-python/build
+flink-python/dist
+flink-python/dev/download
+flink-python/dev/.conda/
+flink-python/dev/log/
+flink-python/dev/.stage.txt
+flink-python/dev/install_command.sh
+flink-python/dev/lint-python.sh
+flink-python/dev/build-wheels.sh
+flink-python/dev/glibc_version_fix.h
+flink-python/dev/dev-requirements.txt
\ No newline at end of file
diff --git a/.github/workflows/nightly.yml b/flink-python/MANIFEST.in
similarity index 66%
copy from .github/workflows/nightly.yml
copy to flink-python/MANIFEST.in
index d50e7a9..720decf 100644
--- a/.github/workflows/nightly.yml
+++ b/flink-python/MANIFEST.in
@@ -16,19 +16,5 @@
 # limitations under the License.
 
################################################################################
 
-name: "flink-connector-aws: nightly build"
-on:
-  schedule:
-    - cron: "0 0 * * *"
-jobs:
-  compile_and_test:
-    if: github.repository_owner == 'apache'
-    strategy:
-      matrix:
-        flink: [1.17-SNAPSHOT, 1.18-SNAPSHOT, 1.19-SNAPSHOT]
-    uses: ./.github/workflows/common.yml
-    with:
-      flink_version: ${{ matrix.flink }}
-      flink_url: https://s3.amazonaws.com/flink-nightly/flink-${{ matrix.flink 
}}-bin-scala_2.12.tgz
-      cache_flink_binary: false
-    secrets: inherit
\ No newline at end of file
+graft pyflink
+global-exclude *.py[cod] __pycache__ .DS_Store
diff --git a/flink-python/README.txt b/flink-python/README.txt
new file mode 100644
index 0000000..e792225
--- /dev/null
+++ b/flink-python/README.txt
@@ -0,0 +1,14 @@
+These are the official AWS Python connectors for Apache Flink 
+
+For the latest information about Flink connector, please visit our website at:
+
+https://flink.apache.org
+
+and our GitHub Account for the AWS connectors
+
+https://github.com/apache/flink-connector-aws
+
+If you have any questions, ask on our Mailing lists:
+
[email protected]
[email protected]
\ No newline at end of file
diff --git a/flink-python/dev/integration_test.sh 
b/flink-python/dev/integration_test.sh
new file mode 100755
index 0000000..751cf6c
--- /dev/null
+++ b/flink-python/dev/integration_test.sh
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+function test_module() {
+    module="$FLINK_PYTHON_DIR/pyflink/$1"
+    echo "test module $module"
+    pytest --durations=20 ${module} $2
+    if [[ $? -ne 0 ]]; then
+        echo "test module $module failed"
+        exit 1
+    fi
+}
+
+function test_all_modules() {
+    # test datastream module
+    test_module "datastream"
+}
+
+# CURRENT_DIR is "<root-project-folder>/flink-python/dev/"
+CURRENT_DIR="$(cd "$( dirname "$0" )" && pwd)"
+
+# FLINK_PYTHON_DIR is "<root-project-folder>/flink-python/"
+FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
+
+# set the FLINK_TEST_LIB_DIR to 
"<root-project-folder>/flink-connector-aws/flink-python-connector-aws/target/dep..."
+export FLINK_TEST_LIBS="${FLINK_PYTHON_DIR}/target/test-dependencies/*"
+
+# Temporarily update the installed 'pyflink_gateway_server.py' files with the 
new one
+# Needed only until Flink 1.19 release
+echo "Checking ${FLINK_PYTHON_DIR} for 'pyflink_gateway_server.py'"
+find "${FLINK_PYTHON_DIR}/.tox" -name pyflink_gateway_server.py -exec cp 
"${FLINK_PYTHON_DIR}/pyflink/pyflink_gateway_server.py" {} \;
+
+# python test
+test_all_modules
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
new file mode 100644
index 0000000..7af0852
--- /dev/null
+++ b/flink-python/pom.xml
@@ -0,0 +1,173 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws</artifactId>
+        <version>4.3-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-python-connector-aws</artifactId>
+    <name>Flink : Connectors : AWS : Python</name>
+
+    <packaging>pom</packaging>
+
+    <properties>
+        <python.infra.download.skip>false</python.infra.download.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-connector-kinesis</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>clean</id>
+                        <phase>clean</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <target>
+                                <delete dir="${project.basedir}/.tox"/>
+                                <delete 
dir="${project.basedir}/apache_flink_connector_aws.egg-info"/>
+                                <delete dir="${project.basedir}/dev/.conda"/>
+                                <delete dir="${project.basedir}/dev/download"/>
+                                <delete dir="${project.basedir}/dev/log"/>
+                                <delete dir="${project.basedir}/build"/>
+                                <delete dir="${project.basedir}/dist"/>
+                                <delete dir="${project.basedir}/pyflink/lib"/>
+                                <delete dir="${project.basedir}/target"/>
+                                <delete 
file="${project.basedir}/dev/.stage.txt"/>
+                                <delete 
file="${project.basedir}/dev/install_command.sh"/>
+                                <delete 
file="${project.basedir}/dev/lint-python.sh"/>
+                                <delete 
file="${project.basedir}/dev/build-wheels.sh"/>
+                                <delete 
file="${project.basedir}/dev/glibc_version_fix.h"/>
+                                <delete 
file="${project.basedir}/dev/dev-requirements.txt"/>
+                                <delete 
file="${project.basedir}/pyflink/datastream/connectors/aws_connector_version.py"/>
+                            </target>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    
<artifactId>flink-sql-connector-kinesis</artifactId>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    
<artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
+                                </artifactItem>
+                            </artifactItems>
+                            
<outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>wagon-maven-plugin</artifactId>
+                <version>2.0.2</version>
+                <executions>
+                    <execution>
+                        <id>download-install</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>download-single</goal>
+                        </goals>
+                        <configuration>
+                            
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/install_command.sh</url>
+                            <toDir>${project.basedir}/dev</toDir>
+                            <skip>${python.infra.download.skip}</skip>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>download-lint</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>download-single</goal>
+                        </goals>
+                        <configuration>
+                            
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/lint-python.sh</url>
+                            <toDir>${project.basedir}/dev</toDir>
+                            <skip>${python.infra.download.skip}</skip>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>download-build-wheels</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>download-single</goal>
+                        </goals>
+                        <configuration>
+                            
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/build-wheels.sh</url>
+                            <toDir>${project.basedir}/dev</toDir>
+                            <skip>${python.infra.download.skip}</skip>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>download-build-version-header</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>download-single</goal>
+                        </goals>
+                        <configuration>
+                            
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/glibc_version_fix.h</url>
+                            <toDir>${project.basedir}/dev</toDir>
+                            <skip>${python.infra.download.skip}</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/flink-python/pyflink/datastream/connectors/kinesis.py 
b/flink-python/pyflink/datastream/connectors/kinesis.py
new file mode 100644
index 0000000..355c954
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/kinesis.py
@@ -0,0 +1,548 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema, \
+    AssignerWithPeriodicWatermarksWrapper
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'KinesisShardAssigner',
+    'KinesisDeserializationSchema',
+    'WatermarkTracker',
+    'PartitionKeyGenerator',
+    'FlinkKinesisConsumer',
+    'KinesisStreamsSink',
+    'KinesisStreamsSinkBuilder',
+    'KinesisFirehoseSink',
+    'KinesisFirehoseSinkBuilder'
+]
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+    """
+    Utility to map Kinesis shards to Flink subtask indices. Users can provide 
a Java
+    KinesisShardAssigner in Python if they want to provide custom shared 
assigner.
+    """
+    def __init__(self, j_kinesis_shard_assigner):
+        self._j_kinesis_shard_assigner = j_kinesis_shard_assigner
+
+    @staticmethod
+    def default_shard_assigner() -> 'KinesisShardAssigner':
+        """
+        A Default KinesisShardAssigner that maps Kinesis shard hash-key ranges 
to Flink subtasks.
+        """
+        return 
KinesisShardAssigner(get_gateway().jvm.org.apache.flink.streaming.connectors.
+                                    
kinesis.internals.KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER)
+
+    @staticmethod
+    def uniform_shard_assigner() -> 'KinesisShardAssigner':
+        """
+        A KinesisShardAssigner that maps Kinesis shard hash-key ranges to 
Flink subtasks.
+        It creates a more uniform distribution of shards across subtasks than 
org.apache.flink. \
+        
streaming.connectors.kinesis.internals.KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER
 when the
+        Kinesis records in the stream have hash keys that are uniformly 
distributed over all
+        possible hash keys, which is the case if records have 
randomly-generated partition keys.
+        (This is the same assumption made if you use the Kinesis 
UpdateShardCount operation with
+        UNIFORM_SCALING.)
+        """
+        return 
KinesisShardAssigner(get_gateway().jvm.org.apache.flink.streaming.connectors.
+                                    kinesis.util.UniformShardAssigner())
+
+
+class KinesisDeserializationSchema(object):
+    """
+    This is a deserialization schema specific for the Flink Kinesis Consumer. 
Different from the
+    basic DeserializationSchema, this schema offers additional 
Kinesis-specific information about
+    the record that may be useful to the user application.
+    """
+
+    def __init__(self, j_kinesis_deserialization_schema):
+        self._j_kinesis_deserialization_schema = 
j_kinesis_deserialization_schema
+
+
+class WatermarkTracker(object):
+    """
+    The watermark tracker is responsible for aggregating watermarks across 
distributed operators.
+    It can be used for sub tasks of a single Flink source as well as multiple 
heterogeneous sources
+    or other operators.The class essentially functions like a distributed hash 
table that enclosing
+    operators can use to adopt their processing / IO rates
+    """
+
+    def __init__(self, j_watermark_tracker):
+        self._j_watermark_tracker = j_watermark_tracker
+
+    @staticmethod
+    def job_manager_watermark_tracker(
+            aggregate_name: str, log_accumulator_interval_millis: int = -1) -> 
'WatermarkTracker':
+        j_watermark_tracker = 
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
+            .JobManagerWatermarkTracker(aggregate_name, 
log_accumulator_interval_millis)
+        return WatermarkTracker(j_watermark_tracker)
+
+
+class FlinkKinesisConsumer(SourceFunction):
+    """
+    The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to
+    multiple AWS Kinesis streams within the same AWS service region, and can 
handle resharding of
+    streams. Each subtask of the consumer is responsible for fetching data 
records from multiple
+    Kinesis shards. The number of shards fetched by each subtask will change 
as shards are closed
+    and created by Kinesis.
+
+    To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees,
+    the Flink Kinesis consumer is implemented with the AWS Java SDK, instead 
of the officially
+    recommended AWS Kinesis Client Library, for low-level control on the 
management of stream state.
+    The Flink Kinesis Connector also supports setting the initial starting 
points of Kinesis
+    streams, namely TRIM_HORIZON and LATEST.
+
+    Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, 
while sequential,
+    cannot be assumed to be consecutive. There is no perfect generic default 
assignment function.
+    Default shard to subtask assignment, which is based on hash code, may 
result in skew, with some
+    subtasks having many shards assigned and others none.
+
+    It is recommended to monitor the shard distribution and adjust assignment 
appropriately.
+    A custom assigner implementation can be set via 
setShardAssigner(KinesisShardAssigner) to
+    optimize the hash function or use static overrides to limit skew.
+
+    In order for the consumer to emit watermarks, a timestamp assigner needs 
to be set via
+    setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto 
watermark emit
+    interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
+
+    Watermarks can only advance when all shards of a subtask continuously 
deliver records.
+    To avoid an inactive or closed shard to block the watermark progress, the 
idle timeout should
+    be configured via configuration property 
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
+    By default, shards won't be considered idle and watermark calculation will 
wait for newer
+    records to arrive from all shards.
+
+    Note that re-sharding of the Kinesis stream while an application (that 
relies on the Kinesis
+    records for watermarking) is running can lead to incorrect late events. 
This depends on how
+    shards are assigned to subtasks and applies regardless of whether 
watermarks are generated in
+    the source or a downstream operator.
+    """
+
+    def __init__(self,
+                 streams: Union[str, List[str]],
+                 deserializer: Union[DeserializationSchema, 
KinesisDeserializationSchema],
+                 config_props: Dict
+                 ):
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in config_props.items():
+            j_properties.setProperty(key, value)
+
+        JFlinkKinesisConsumer = 
gateway.jvm.org.apache.flink.streaming.connectors.kinesis. \
+            FlinkKinesisConsumer
+        JKinesisDeserializationSchemaWrapper = 
get_gateway().jvm.org.apache.flink.streaming. \
+            
connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper
+
+        if isinstance(streams, str):
+            streams = [streams]
+
+        if isinstance(deserializer, DeserializationSchema):
+            deserializer = JKinesisDeserializationSchemaWrapper(
+                deserializer._j_deserialization_schema)
+
+        self._j_kinesis_consumer = JFlinkKinesisConsumer(streams, 
deserializer, j_properties)
+        super(FlinkKinesisConsumer, self).__init__(self._j_kinesis_consumer)
+
+    def set_shard_assigner(self, shard_assigner: KinesisShardAssigner) -> 
'FlinkKinesisConsumer':
+        """
+        Provide a custom assigner to influence how shards are distributed over 
subtasks.
+        """
+        
self._j_kinesis_consumer.setShardAssigner(shard_assigner._j_kinesis_shard_assigner)
+        return self
+
+    def set_periodic_watermark_assigner(
+        self,
+        periodic_watermark_assigner: AssignerWithPeriodicWatermarksWrapper) \
+            -> 'FlinkKinesisConsumer':
+        """
+        Set the assigner that will extract the timestamp from T and calculate 
the watermark.
+        """
+        self._j_kinesis_consumer.setPeriodicWatermarkAssigner(
+            periodic_watermark_assigner._j_assigner_with_periodic_watermarks)
+        return self
+
+    def set_watermark_tracker(self, watermark_tracker: WatermarkTracker) -> 
'FlinkKinesisConsumer':
+        """
+        Set the global watermark tracker. When set, it will be used by the 
fetcher to align the
+        shard consumers by event time.
+        """
+        
self._j_kinesis_consumer.setWatermarkTracker(watermark_tracker._j_watermark_tracker)
+        return self
+
+
+# ---- KinesisSink ----
+
+class PartitionKeyGenerator(object):
+    """
+    This is a generator convert from an input element to the partition key, a 
string.
+    """
+    def __init__(self, j_partition_key_generator):
+        self._j_partition_key_generator = j_partition_key_generator
+
+    @staticmethod
+    def fixed() -> 'PartitionKeyGenerator':
+        """
+        A partitioner ensuring that each internal Flink partition ends up in 
the same Kinesis
+        partition. This is achieved by using the index of the producer task as 
a PartitionKey.
+        """
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     FixedKinesisPartitionKeyGenerator())
+
+    @staticmethod
+    def random() -> 'PartitionKeyGenerator':
+        """
+        A PartitionKeyGenerator that maps an arbitrary input element to a 
random partition ID.
+        """
+        return 
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+                                     RandomKinesisPartitionKeyGenerator())
+
+
+class KinesisStreamsSink(Sink):
+    """
+    A Kinesis Data Streams (KDS) Sink that performs async requests against a 
destination stream
+    using the buffering protocol.
+
+    The sink internally uses a 
software.amazon.awssdk.services.kinesis.KinesisAsyncClient to
+    communicate with the AWS endpoint.
+
+    The behaviour of the buffering may be specified by providing configuration 
during the sink
+    build time.
+
+    - maxBatchSize: the maximum size of a batch of entries that may be sent to 
KDS
+    - maxInFlightRequests: the maximum number of in flight requests that may 
exist, if any more in
+        flight requests need to be initiated once the maximum has been 
reached, then it will be
+        blocked until some have completed
+    - maxBufferedRequests: the maximum number of elements held in the buffer, 
requests to add
+        elements will be blocked while the number of elements in the buffer is 
at the maximum
+    - maxBatchSizeInBytes: the maximum size of a batch of entries that may be 
sent to KDS
+        measured in bytes
+    - maxTimeInBufferMS: the maximum amount of time an entry is allowed to 
live in the buffer,
+        if any element reaches this age, the entire buffer will be flushed 
immediately
+    - maxRecordSizeInBytes: the maximum size of a record the sink will accept 
into the buffer,
+        a record of size larger than this will be rejected when passed to the 
sink
+    - failOnError: when an exception is encountered while persisting to 
Kinesis Data Streams,
+        the job will fail immediately if failOnError is set
+    """
+
+    def __init__(self, j_kinesis_streams_sink):
+        super(KinesisStreamsSink, self).__init__(sink=j_kinesis_streams_sink)
+
+    @staticmethod
+    def builder() -> 'KinesisStreamsSinkBuilder':
+        return KinesisStreamsSinkBuilder()
+
+
+class KinesisStreamsSinkBuilder(object):
+    """
+    Builder to construct KinesisStreamsSink.
+
+    The following example shows the minimum setup to create a 
KinesisStreamsSink that writes String
+    values to a Kinesis Data Streams stream named your_stream_here.
+
+    Example:
+    ::
+
+        >>> from pyflink.common.serialization import SimpleStringSchema
+        >>> sink_properties = {"aws.region": "eu-west-1"}
+        >>> sink = KinesisStreamsSink.builder() \\
+        ...     .set_kinesis_client_properties(sink_properties) \\
+        ...     .set_stream_name("your_stream_name") \\
+        ...     .set_serialization_schema(SimpleStringSchema()) \\
+        ...     .set_partition_key_generator(PartitionKeyGenerator.random()) \\
+        ...     .build()
+
+    If the following parameters are not set in this builder, the following 
defaults will be used:
+
+        - maxBatchSize will be 500
+        - maxInFlightRequests will be 50
+        - maxBufferedRequests will be 10000
+        - maxBatchSizeInBytes will be 5 MB i.e. 5 * 1024 * 1024
+        - maxTimeInBufferMS will be 5000ms
+        - maxRecordSizeInBytes will be 1 MB i.e. 1 * 1024 * 1024
+        - failOnError will be false
+    """
+
+    def __init__(self):
+        JKinesisStreamsSink = 
get_gateway().jvm.org.apache.flink.connector.kinesis.sink.\
+            KinesisStreamsSink
+        self._j_kinesis_sink_builder = JKinesisStreamsSink.builder()
+
+    def set_stream_name(self, stream_name: Union[str, List[str]]) -> 
'KinesisStreamsSinkBuilder':
+        """
+        Sets the name of the KDS stream that the sink will connect to. There 
is no default for this
+        parameter, therefore, this must be provided at sink creation time 
otherwise the build will
+        fail.
+        """
+        self._j_kinesis_sink_builder.setStreamName(stream_name)
+        return self
+
+    def set_serialization_schema(self, serialization_schema: 
SerializationSchema) \
+            -> 'KinesisStreamsSinkBuilder':
+        """
+        Sets the SerializationSchema of the KinesisSinkBuilder.
+        """
+        self._j_kinesis_sink_builder.setSerializationSchema(
+            serialization_schema._j_serialization_schema)
+        return self
+
+    def set_partition_key_generator(self, partition_key_generator: 
PartitionKeyGenerator) \
+            -> 'KinesisStreamsSinkBuilder':
+        """
+        Sets the PartitionKeyGenerator of the KinesisSinkBuilder.
+        """
+        self._j_kinesis_sink_builder.setPartitionKeyGenerator(
+            partition_key_generator._j_partition_key_generator)
+        return self
+
+    def set_fail_on_error(self, fail_on_error: bool) -> 
'KinesisStreamsSinkBuilder':
+        """
+        Sets the failOnError of the KinesisSinkBuilder. If failOnError is on, 
then a runtime
+        exception will be raised. Otherwise, those records will be requested 
in the buffer for
+        retry.
+        """
+        self._j_kinesis_sink_builder.setFailOnError(fail_on_error)
+        return self
+
+    def set_kinesis_client_properties(self, kinesis_client_properties: Dict) \
+            -> 'KinesisStreamsSinkBuilder':
+        """
+        Sets the kinesisClientProperties of the KinesisSinkBuilder.
+        """
+        j_properties = get_gateway().jvm.java.util.Properties()
+        for key, value in kinesis_client_properties.items():
+            j_properties.setProperty(key, value)
+        self._j_kinesis_sink_builder.setKinesisClientProperties(j_properties)
+        return self
+
+    def set_max_batch_size(self, max_batch_size: int) -> 
'KinesisStreamsSinkBuilder':
+        """
+        Maximum number of elements that may be passed in a list to be written 
downstream.
+        """
+        self._j_kinesis_sink_builder.setMaxBatchSize(max_batch_size)
+        return self
+
+    def set_max_in_flight_requests(self, max_in_flight_requests: int) \
+            -> 'KinesisStreamsSinkBuilder':
+        """
+        Maximum number of uncompleted calls to submitRequestEntries that the 
SinkWriter will allow
+        at any given point. Once this point has reached, writes and callbacks 
to add elements to
+        the buffer may block until one or more requests to 
submitRequestEntries completes.
+        """
+        
self._j_kinesis_sink_builder.setMaxInFlightRequests(max_in_flight_requests)
+        return self
+
+    def set_max_buffered_requests(self, max_buffered_requests: int) -> 
'KinesisStreamsSinkBuilder':
+        """
+        The maximum buffer length. Callbacks to add elements to the buffer and 
calls to write will
+        block if this length has been reached and will only unblock if 
elements from the buffer have
+        been removed for flushing.
+        """
+        
self._j_kinesis_sink_builder.setMaxBufferedRequests(max_buffered_requests)
+        return self
+
+    def set_max_batch_size_in_bytes(self, max_batch_size_in_bytes: int) \
+            -> 'KinesisStreamsSinkBuilder':
+        """
+        The flush will be attempted if the most recent call to write 
introduces an element to the
+        buffer such that the total size of the buffer is greater than or equal 
to this threshold
+        value. If this happens, the maximum number of elements from the head 
of the buffer will be
+        selected, that is smaller than maxBatchSizeInBytes in size will be 
flushed.
+        """
+        
self._j_kinesis_sink_builder.setMaxBatchSizeInBytes(max_batch_size_in_bytes)
+        return self
+
+    def set_max_time_in_buffer_ms(self, max_time_in_buffer_ms: int) -> 
'KinesisStreamsSinkBuilder':
+        """
+        The maximum amount of time an element may remain in the buffer. In 
most cases elements are
+        flushed as a result of the batch size (in bytes or number) being 
reached or during a
+        snapshot. However, there are scenarios where an element may remain in 
the buffer forever or
+        a long period of time. To mitigate this, a timer is constantly active 
in the buffer such
+        that: while the buffer is not empty, it will flush every 
maxTimeInBufferMS milliseconds.
+        """
+        
self._j_kinesis_sink_builder.setMaxTimeInBufferMS(max_time_in_buffer_ms)
+        return self
+
+    def set_max_record_size_in_bytes(self, max_record_size_in_bytes: int) \
+            -> 'KinesisStreamsSinkBuilder':
+        """
+        The maximum size of each records in bytes. If a record larger than 
this is passed to the
+        sink, it will throw an IllegalArgumentException.
+        """
+        
self._j_kinesis_sink_builder.setMaxRecordSizeInBytes(max_record_size_in_bytes)
+        return self
+
+    def build(self) -> 'KinesisStreamsSink':
+        """
+        Build thd KinesisStreamsSink.
+        """
+        return KinesisStreamsSink(self._j_kinesis_sink_builder.build())
+
+
+class KinesisFirehoseSink(Sink):
+    """
+    A Kinesis Data Firehose (KDF) Sink that performs async requests against a 
destination delivery
+    stream using the buffering protocol.
+    """
+    def __init__(self, j_kinesis_firehose_sink):
+        super(KinesisFirehoseSink, self).__init__(sink=j_kinesis_firehose_sink)
+
+    @staticmethod
+    def builder() -> 'KinesisFirehoseSinkBuilder':
+        return KinesisFirehoseSinkBuilder()
+
+
+class KinesisFirehoseSinkBuilder(object):
+    """
+    Builder to construct KinesisFirehoseSink.
+
+    The following example shows the minimum setup to create a 
KinesisFirehoseSink that writes
+    String values to a Kinesis Data Firehose delivery stream named 
delivery-stream-name.
+
+    Example:
+    ::
+
+        >>> from pyflink.common.serialization import SimpleStringSchema
+        >>> sink_properties = {"aws.region": "eu-west-1"}
+        >>> sink = KinesisFirehoseSink.builder() \\
+        ...     .set_firehose_client_properties(sink_properties) \\
+        ...     .set_delivery_stream_name("delivery-stream-name") \\
+        ...     .set_serialization_schema(SimpleStringSchema()) \\
+        ...     .set_max_batch_size(20) \\
+        ...     .build()
+
+    If the following parameters are not set in this builder, the following 
defaults will be used:
+    - maxBatchSize will be 500
+    - maxInFlightRequests will be 50
+    - maxBufferedRequests will be 10000
+    - maxBatchSizeInBytes will be 4 MB i.e. 4 * 1024 * 1024
+    - maxTimeInBufferMS will be 5000ms
+    - maxRecordSizeInBytes will be 1000 KB i.e. 1000 * 1024
+    - failOnError will be false
+    """
+    def __init__(self):
+        JKinesisFirehoseSink = 
get_gateway().jvm.org.apache.flink.connector.firehose.sink. \
+            KinesisFirehoseSink
+        self._j_kinesis_sink_builder = JKinesisFirehoseSink.builder()
+
+    def set_delivery_stream_name(self, delivery_stream_name: str) -> 
'KinesisFirehoseSinkBuilder':
+        """
+        Sets the name of the KDF delivery stream that the sink will connect 
to. There is no default
+        for this parameter, therefore, this must be provided at sink creation 
time otherwise the
+        build will fail.
+        """
+        
self._j_kinesis_sink_builder.setDeliveryStreamName(delivery_stream_name)
+        return self
+
+    def set_serialization_schema(self, serialization_schema: 
SerializationSchema) \
+            -> 'KinesisFirehoseSinkBuilder':
+        """
+        Allows the user to specify a serialization schema to serialize each 
record to persist to
+        Firehose.
+        """
+        self._j_kinesis_sink_builder.setSerializationSchema(
+            serialization_schema._j_serialization_schema)
+        return self
+
+    def set_fail_on_error(self, fail_on_error: bool) -> 
'KinesisFirehoseSinkBuilder':
+        """
+        If writing to Kinesis Data Firehose results in a partial or full 
failure being returned,
+        the job will fail
+        """
+        self._j_kinesis_sink_builder.setFailOnError(fail_on_error)
+        return self
+
+    def set_firehose_client_properties(self, firehose_client_properties: Dict) 
\
+            -> 'KinesisFirehoseSinkBuilder':
+        """
+        A set of properties used by the sink to create the firehose client. 
This may be used to set
+        the aws region, credentials etc. See the docs for usage and syntax.
+        """
+        j_properties = get_gateway().jvm.java.util.Properties()
+        for key, value in firehose_client_properties.items():
+            j_properties.setProperty(key, value)
+        self._j_kinesis_sink_builder.setFirehoseClientProperties(j_properties)
+        return self
+
+    def set_max_batch_size(self, max_batch_size: int) -> 
'KinesisFirehoseSinkBuilder':
+        """
+        Maximum number of elements that may be passed in a list to be written 
downstream.
+        """
+        self._j_kinesis_sink_builder.setMaxBatchSize(max_batch_size)
+        return self
+
+    def set_max_in_flight_requests(self, max_in_flight_requests: int) \
+            -> 'KinesisFirehoseSinkBuilder':
+        """
+        Maximum number of uncompleted calls to submitRequestEntries that the 
SinkWriter will allow
+        at any given point. Once this point has reached, writes and callbacks 
to add elements to
+        the buffer may block until one or more requests to 
submitRequestEntries completes.
+        """
+        
self._j_kinesis_sink_builder.setMaxInFlightRequests(max_in_flight_requests)
+        return self
+
+    def set_max_buffered_requests(self, max_buffered_requests: int) -> 
'KinesisFirehoseSinkBuilder':
+        """
+        The maximum buffer length. Callbacks to add elements to the buffer and 
calls to write will
+        block if this length has been reached and will only unblock if 
elements from the buffer have
+        been removed for flushing.
+        """
+        
self._j_kinesis_sink_builder.setMaxBufferedRequests(max_buffered_requests)
+        return self
+
+    def set_max_batch_size_in_bytes(self, max_batch_size_in_bytes: int) \
+            -> 'KinesisFirehoseSinkBuilder':
+        """
+        The flush will be attempted if the most recent call to write 
introduces an element to the
+        buffer such that the total size of the buffer is greater than or equal 
to this threshold
+        value. If this happens, the maximum number of elements from the head 
of the buffer will be
+        selected, that is smaller than maxBatchSizeInBytes in size will be 
flushed.
+        """
+        
self._j_kinesis_sink_builder.setMaxBatchSizeInBytes(max_batch_size_in_bytes)
+        return self
+
+    def set_max_time_in_buffer_ms(self, max_time_in_buffer_ms: int) -> 
'KinesisFirehoseSinkBuilder':
+        """
+        The maximum amount of time an element may remain in the buffer. In 
most cases elements are
+        flushed as a result of the batch size (in bytes or number) being 
reached or during a
+        snapshot. However, there are scenarios where an element may remain in 
the buffer forever or
+        a long period of time. To mitigate this, a timer is constantly active 
in the buffer such
+        that: while the buffer is not empty, it will flush every 
maxTimeInBufferMS milliseconds.
+        """
+        
self._j_kinesis_sink_builder.setMaxTimeInBufferMS(max_time_in_buffer_ms)
+        return self
+
+    def set_max_record_size_in_bytes(self, max_record_size_in_bytes: int) \
+            -> 'KinesisFirehoseSinkBuilder':
+        """
+        The maximum size of each records in bytes. If a record larger than 
this is passed to the
+        sink, it will throw an IllegalArgumentException.
+        """
+        
self._j_kinesis_sink_builder.setMaxRecordSizeInBytes(max_record_size_in_bytes)
+        return self
+
+    def build(self) -> 'KinesisFirehoseSink':
+        """
+        Build thd KinesisFirehoseSink.
+        """
+        return KinesisFirehoseSink(self._j_kinesis_sink_builder.build())
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kinesis.py 
b/flink-python/pyflink/datastream/connectors/tests/test_kinesis.py
new file mode 100644
index 0000000..3007711
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/tests/test_kinesis.py
@@ -0,0 +1,108 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common import SimpleStringSchema, Types
+from pyflink.datastream.connectors.kinesis import PartitionKeyGenerator, 
FlinkKinesisConsumer, \
+    KinesisStreamsSink, KinesisFirehoseSink
+from pyflink.testing.test_case_utils import PyFlinkUTTestCase
+from pyflink.util.java_utils import get_field_value
+
+
+class FlinkKinesisTest(PyFlinkUTTestCase):
+
+    def test_kinesis_source(self):
+        consumer_config = {
+            'aws.region': 'us-east-1',
+            'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+            'aws.credentials.provider.basic.secretkey': 
'aws_secret_access_key',
+            'flink.stream.initpos': 'LATEST'
+        }
+
+        kinesis_source = FlinkKinesisConsumer("stream-1", 
SimpleStringSchema(), consumer_config)
+
+        ds = self.env.add_source(source_func=kinesis_source, 
source_name="kinesis source")
+        ds.print()
+        plan = eval(self.env.get_execution_plan())
+        self.assertEqual('Source: kinesis source', plan['nodes'][0]['type'])
+        self.assertEqual(
+            get_field_value(kinesis_source.get_java_function(), 'streams')[0], 
'stream-1')
+
+    def test_kinesis_streams_sink(self):
+        sink_properties = {
+            'aws.region': 'us-east-1',
+            'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
+        }
+
+        ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3), 
('deeefg', 4)],
+                                      type_info=Types.ROW([Types.STRING(), 
Types.INT()]))
+
+        kinesis_streams_sink = KinesisStreamsSink.builder() \
+            .set_kinesis_client_properties(sink_properties) \
+            .set_serialization_schema(SimpleStringSchema()) \
+            .set_partition_key_generator(PartitionKeyGenerator.fixed()) \
+            .set_stream_name("stream-1") \
+            .set_fail_on_error(False) \
+            .set_max_batch_size(500) \
+            .set_max_in_flight_requests(50) \
+            .set_max_buffered_requests(10000) \
+            .set_max_batch_size_in_bytes(5 * 1024 * 1024) \
+            .set_max_time_in_buffer_ms(5000) \
+            .set_max_record_size_in_bytes(1 * 1024 * 1024) \
+            .build()
+
+        ds.sink_to(kinesis_streams_sink).name('kinesis streams sink')
+        plan = eval(self.env.get_execution_plan())
+
+        self.assertEqual('kinesis streams sink: Writer', 
plan['nodes'][1]['type'])
+        
self.assertEqual(get_field_value(kinesis_streams_sink.get_java_function(), 
'failOnError'),
+                         False)
+        self.assertEqual(
+            get_field_value(kinesis_streams_sink.get_java_function(), 
'streamName'), 'stream-1')
+
+    def test_kinesis_firehose_sink(self):
+
+        sink_properties = {
+            'aws.region': 'eu-west-1',
+            'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+            'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
+        }
+
+        ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3), 
('deeefg', 4)],
+                                      type_info=Types.ROW([Types.STRING(), 
Types.INT()]))
+
+        kinesis_firehose_sink = KinesisFirehoseSink.builder() \
+            .set_firehose_client_properties(sink_properties) \
+            .set_serialization_schema(SimpleStringSchema()) \
+            .set_delivery_stream_name('stream-1') \
+            .set_fail_on_error(False) \
+            .set_max_batch_size(500) \
+            .set_max_in_flight_requests(50) \
+            .set_max_buffered_requests(10000) \
+            .set_max_batch_size_in_bytes(5 * 1024 * 1024) \
+            .set_max_time_in_buffer_ms(5000) \
+            .set_max_record_size_in_bytes(1 * 1024 * 1024) \
+            .build()
+
+        ds.sink_to(kinesis_firehose_sink).name('kinesis firehose sink')
+        plan = eval(self.env.get_execution_plan())
+
+        self.assertEqual('kinesis firehose sink: Writer', 
plan['nodes'][1]['type'])
+        
self.assertEqual(get_field_value(kinesis_firehose_sink.get_java_function(), 
'failOnError'),
+                         False)
+        self.assertEqual(
+            get_field_value(kinesis_firehose_sink.get_java_function(), 
'deliveryStreamName'),
+            'stream-1')
diff --git a/flink-python/pyflink/pyflink_gateway_server.py 
b/flink-python/pyflink/pyflink_gateway_server.py
new file mode 100644
index 0000000..1cf25a5
--- /dev/null
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -0,0 +1,288 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This is a copy of the pyflink_gateway_server.py file from the Flink.
+# The original file which is accessible here:
+# 
https://github.com/apache/flink/blob/master/flink-python/pyflink/pyflink_gateway_server.py
+# Additional change is the handling of the FLINK_TEST_LIB_DIR environmental 
variable.
+# It could be used to add extra testing jars for the gateway classpath.
+# The plan is to remove this once Pyflink 1.19 is released
+
+import argparse
+import getpass
+import glob
+import os
+import platform
+import signal
+import socket
+import sys
+from collections import namedtuple
+from string import Template
+from subprocess import Popen, PIPE
+
+from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root
+
+KEY_ENV_LOG_DIR = "env.log.dir"
+KEY_ENV_YARN_CONF_DIR = "env.yarn.conf.dir"
+KEY_ENV_HADOOP_CONF_DIR = "env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR = "env.hbase.conf.dir"
+KEY_ENV_JAVA_HOME = "env.java.home"
+KEY_ENV_JAVA_OPTS = "env.java.opts.all"
+KEY_ENV_JAVA_OPTS_DEPRECATED = "env.java.opts"
+
+
+def on_windows():
+    return platform.system() == "Windows"
+
+
+def read_from_config(key, default_value, flink_conf_file):
+    value = default_value
+    # get the realpath of tainted path value to avoid CWE22 problem that 
constructs a path or URI
+    # using the tainted value and might allow an attacker to access, modify, 
or test the existence
+    # of critical or sensitive files.
+    with open(os.path.realpath(flink_conf_file), "r") as f:
+        while True:
+            line = f.readline()
+            if not line:
+                break
+            if line.startswith("#") or len(line.strip()) == 0:
+                continue
+            k, v = line.split(":", 1)
+            if k.strip() == key:
+                value = v.strip()
+    return value
+
+
+def find_java_executable():
+    java_executable = "java.exe" if on_windows() else "java"
+    flink_home = _find_flink_home()
+    flink_conf_file = os.path.join(flink_home, "conf", "flink-conf.yaml")
+    java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_file)
+
+    if java_home is None and "JAVA_HOME" in os.environ:
+        java_home = os.environ["JAVA_HOME"]
+
+    if java_home is not None:
+        java_executable = os.path.join(java_home, "bin", java_executable)
+
+    return java_executable
+
+
+def prepare_environment_variables(env):
+    flink_home = _find_flink_home()
+    # get the realpath of tainted path value to avoid CWE22 problem that 
constructs a path or URI
+    # using the tainted value and might allow an attacker to access, modify, 
or test the existence
+    # of critical or sensitive files.
+    real_flink_home = os.path.realpath(flink_home)
+
+    if 'FLINK_CONF_DIR' in env:
+        flink_conf_directory = os.path.realpath(env['FLINK_CONF_DIR'])
+    else:
+        flink_conf_directory = os.path.join(real_flink_home, "conf")
+    env['FLINK_CONF_DIR'] = flink_conf_directory
+
+    if 'FLINK_LIB_DIR' in env:
+        flink_lib_directory = os.path.realpath(env['FLINK_LIB_DIR'])
+    else:
+        flink_lib_directory = os.path.join(real_flink_home, "lib")
+    env['FLINK_LIB_DIR'] = flink_lib_directory
+
+    if 'FLINK_OPT_DIR' in env:
+        flink_opt_directory = os.path.realpath(env['FLINK_OPT_DIR'])
+    else:
+        flink_opt_directory = os.path.join(real_flink_home, "opt")
+    env['FLINK_OPT_DIR'] = flink_opt_directory
+
+    if 'FLINK_PLUGINS_DIR' in env:
+        flink_plugins_directory = os.path.realpath(env['FLINK_PLUGINS_DIR'])
+    else:
+        flink_plugins_directory = os.path.join(real_flink_home, "plugins")
+    env['FLINK_PLUGINS_DIR'] = flink_plugins_directory
+
+    env["FLINK_BIN_DIR"] = os.path.join(real_flink_home, "bin")
+
+
+def construct_log_settings(env):
+    templates = [
+        
"-Dlog.file=${flink_log_dir}/flink-${flink_ident_string}-python-${hostname}.log",
+        "-Dlog4j.configuration=${log4j_properties}",
+        "-Dlog4j.configurationFile=${log4j_properties}",
+        "-Dlogback.configurationFile=${logback_xml}"
+    ]
+
+    flink_home = os.path.realpath(_find_flink_home())
+    flink_conf_dir = env['FLINK_CONF_DIR']
+    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+
+    if "FLINK_LOG_DIR" in env:
+        flink_log_dir = env["FLINK_LOG_DIR"]
+    else:
+        flink_log_dir = read_from_config(
+            KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), flink_conf_file)
+
+    if "LOG4J_PROPERTIES" in env:
+        log4j_properties = env["LOG4J_PROPERTIES"]
+    else:
+        log4j_properties = "%s/log4j-cli.properties" % flink_conf_dir
+
+    if "LOGBACK_XML" in env:
+        logback_xml = env["LOGBACK_XML"]
+    else:
+        logback_xml = "%s/logback.xml" % flink_conf_dir
+
+    if "FLINK_IDENT_STRING" in env:
+        flink_ident_string = env["FLINK_IDENT_STRING"]
+    else:
+        flink_ident_string = getpass.getuser()
+
+    hostname = socket.gethostname()
+    log_settings = []
+    for template in templates:
+        log_settings.append(Template(template).substitute(
+            log4j_properties=log4j_properties,
+            logback_xml=logback_xml,
+            flink_log_dir=flink_log_dir,
+            flink_ident_string=flink_ident_string,
+            hostname=hostname))
+    return log_settings
+
+
+def get_jvm_opts(env):
+    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+    jvm_opts = env.get(
+        'FLINK_ENV_JAVA_OPTS',
+        read_from_config(
+            KEY_ENV_JAVA_OPTS,
+            read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "", 
flink_conf_file),
+            flink_conf_file))
+
+    # Remove leading and ending double quotes (if present) of value
+    jvm_opts = jvm_opts.strip("\"")
+    return jvm_opts.split(" ")
+
+
+def construct_flink_classpath(env):
+    flink_home = _find_flink_home()
+    flink_lib_directory = env['FLINK_LIB_DIR']
+    flink_opt_directory = env['FLINK_OPT_DIR']
+
+    if on_windows():
+        # The command length is limited on Windows. To avoid the problem we 
should shorten the
+        # command length as much as possible.
+        lib_jars = os.path.join(flink_lib_directory, "*")
+    else:
+        lib_jars = os.pathsep.join(glob.glob(os.path.join(flink_lib_directory, 
"*.jar")))
+
+    flink_python_jars = glob.glob(os.path.join(flink_opt_directory, 
"flink-python*.jar"))
+    if len(flink_python_jars) < 1:
+        print("The flink-python jar is not found in the opt folder of the 
FLINK_HOME: %s" %
+              flink_home)
+        return lib_jars
+    flink_python_jar = flink_python_jars[0]
+
+    return os.pathsep.join([lib_jars, flink_python_jar])
+
+
+def construct_hadoop_classpath(env):
+    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+
+    hadoop_conf_dir = ""
+    if 'HADOOP_CONF_DIR' not in env and 'HADOOP_CLASSPATH' not in env:
+        if os.path.isdir("/etc/hadoop/conf"):
+            print("Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no 
HADOOP_CONF_DIR or"
+                  "HADOOP_CLASSPATH was set.")
+            hadoop_conf_dir = "/etc/hadoop/conf"
+
+    hbase_conf_dir = ""
+    if 'HBASE_CONF_DIR' not in env:
+        if os.path.isdir("/etc/hbase/conf"):
+            print("Setting HBASE_CONF_DIR=/etc/hbase/conf because no 
HBASE_CONF_DIR was set.")
+            hbase_conf_dir = "/etc/hbase/conf"
+
+    return os.pathsep.join(
+        [env.get("HADOOP_CLASSPATH", ""),
+         env.get("YARN_CONF_DIR",
+                 read_from_config(KEY_ENV_YARN_CONF_DIR, "", flink_conf_file)),
+         env.get("HADOOP_CONF_DIR",
+                 read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, 
flink_conf_file)),
+         env.get("HBASE_CONF_DIR",
+                 read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, 
flink_conf_file))])
+
+
+def construct_test_classpath(env):
+    test_jar_patterns = [
+        "flink-python/target/test-dependencies/*",
+        "flink-python/target/artifacts/testDataStream.jar",
+        "flink-python/target/flink-python*-tests.jar",
+    ]
+    test_jars = []
+
+    # Connector tests need to add specific jars to the gateway classpath
+    if 'FLINK_TEST_LIBS' in env:
+        test_jars += glob.glob(env['FLINK_TEST_LIBS'])
+    else:
+        flink_source_root = _find_flink_source_root()
+        for pattern in test_jar_patterns:
+            pattern = pattern.replace("/", os.path.sep)
+            test_jars += glob.glob(os.path.join(flink_source_root, pattern))
+    return os.path.pathsep.join(test_jars)
+
+
+def construct_program_args(args):
+    parser = argparse.ArgumentParser()
+    parser.add_argument("-c", "--class", required=True)
+    parser.add_argument("cluster_type", choices=["local", "remote", "yarn"])
+    parse_result, other_args = parser.parse_known_args(args)
+    main_class = getattr(parse_result, "class")
+    cluster_type = parse_result.cluster_type
+    return namedtuple(
+        "ProgramArgs", ["main_class", "cluster_type", "other_args"])(
+        main_class, cluster_type, other_args)
+
+
+def launch_gateway_server_process(env, args):
+    prepare_environment_variables(env)
+    program_args = construct_program_args(args)
+    if program_args.cluster_type == "local":
+        java_executable = find_java_executable()
+        log_settings = construct_log_settings(env)
+        jvm_args = env.get('JVM_ARGS', '')
+        jvm_opts = get_jvm_opts(env)
+        classpath = os.pathsep.join(
+            [construct_flink_classpath(env), construct_hadoop_classpath(env)])
+        if "FLINK_TESTING" in env:
+            classpath = os.pathsep.join([classpath, 
construct_test_classpath(env)])
+        command = [java_executable, jvm_args, 
"-XX:+IgnoreUnrecognizedVMOptions",
+                   "--add-opens=jdk.proxy2/jdk.proxy2=ALL-UNNAMED"] \
+            + jvm_opts + log_settings \
+            + ["-cp", classpath, program_args.main_class] + 
program_args.other_args
+    else:
+        command = [os.path.join(env["FLINK_BIN_DIR"], "flink"), "run"] + 
program_args.other_args \
+            + ["-c", program_args.main_class]
+    preexec_fn = None
+    if not on_windows():
+        def preexec_func():
+            # ignore ctrl-c / SIGINT
+            signal.signal(signal.SIGINT, signal.SIG_IGN)
+        preexec_fn = preexec_func
+    return Popen(list(filter(lambda c: len(c) != 0, command)),
+                 stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)
+
+
+if __name__ == "__main__":
+    launch_gateway_server_process(os.environ, sys.argv[1:])
diff --git a/flink-python/setup.py b/flink-python/setup.py
new file mode 100644
index 0000000..8c37061
--- /dev/null
+++ b/flink-python/setup.py
@@ -0,0 +1,153 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from __future__ import print_function
+
+import io
+import os
+import sys
+
+from setuptools import setup
+from shutil import rmtree
+from xml.etree import ElementTree as ET
+
+PACKAGE_NAME = 'apache-flink-connector-aws'
+# Source files, directories
+CURRENT_DIR = os.path.abspath(os.path.dirname(__file__))
+POM_FILE = os.path.join(CURRENT_DIR, '../pom.xml')
+README_FILE = os.path.join(CURRENT_DIR, 'README.txt')
+
+# Generated files and directories
+VERSION_FILE = os.path.join(CURRENT_DIR, 
'pyflink/datastream/connectors/aws_connector_version.py')
+DEPENDENCY_FILE = os.path.join(CURRENT_DIR, 'dev/dev-requirements.txt')
+
+
+# Removes a file or directory if exists.
+def remove_if_exists(file_path):
+    if os.path.exists(file_path):
+        if os.path.isfile(file_path):
+            os.remove(file_path)
+        if os.path.isdir(file_path):
+            rmtree(file_path)
+
+
+# Reads the content of the README.txt file.
+def readme_content():
+    with io.open(README_FILE, 'r', encoding='utf-8') as f:
+        return f.read()
+
+
+# Reads the parameters used by the setup command.
+# The source is the aws_connector_version.py and the README.txt.
+def setup_parameters():
+    try:
+        exec(open(VERSION_FILE).read())
+        return locals()['__connector_version__'], 
locals()['__flink_dependency__'], readme_content()
+    except IOError:
+        print("Failed to load PyFlink version file for packaging. " +
+              "'%s' not found!" % VERSION_FILE,
+              file=sys.stderr)
+        sys.exit(-1)
+
+
+# Reads and parses the flink-connector-aws main pom.xml.
+# Based on the version data in the pom.xml prepares the pyflink dir:
+#  - Generates aws_connector_version.py
+#  - Generates dev-requirements.txt
+#  - Copies the flink-sql-connector-kinesis.jar to the pyflink/lib dir
+def prepare_pyflink_dir():
+    # source files
+    pom_root = ET.parse(POM_FILE).getroot()
+    flink_version = pom_root.findall(
+        "./{http://maven.apache.org/POM/4.0.0}properties/"; +
+        "{http://maven.apache.org/POM/4.0.0}flink.version";
+    )[0].text
+    connector_version = pom_root.findall(
+        
"./{http://maven.apache.org/POM/4.0.0}version";)[0].text.replace("-SNAPSHOT", 
".dev0")
+
+    flink_dependency = "apache-flink>=" + flink_version
+
+    with io.open(VERSION_FILE, 'w', encoding='utf-8') as f:
+        f.write('# Generated file, do not edit\n')
+        f.write('__connector_version__ = "' + connector_version + '"\n')
+        f.write('__flink_dependency__ = "' + flink_dependency + '"\n')
+
+    with io.open(DEPENDENCY_FILE, 'w', encoding='utf-8') as f:
+        f.write('# Generated file, do not edit\n')
+        f.write(flink_dependency + '\n')
+
+# Main
+print("Python version used to package: " + sys.version)
+
+# Python version check
+if sys.version_info < (3, 7):
+    print("Python versions prior to 3.7 are not supported for PyFlink.",
+          file=sys.stderr)
+    sys.exit(-1)
+
+# Checks the running environment:
+#  - In the connector source root directory - package preparation
+#  - Otherwise - package deployment
+in_flink_source = os.path.isfile(
+    "../flink-connector-aws/flink-connector-kinesis/src/main" +
+    
"/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java")
+
+# Cleans up the generated files and directories and regenerate them.
+if in_flink_source:
+    remove_if_exists(VERSION_FILE)
+    remove_if_exists(DEPENDENCY_FILE)
+    prepare_pyflink_dir()
+    print("\nPreparing Flink AWS connector package")
+
+# Reads the current setup data from the aws_connector_version.py file and the 
README.txt
+(connector_version, flink_dependency, long_description) = setup_parameters()
+
+print("\nConnector version: " + connector_version)
+print("Flink dependency: " + flink_dependency + "\n")
+
+if in_flink_source:
+    # Removes temporary directory used by the setup tool
+    remove_if_exists(PACKAGE_NAME.replace('-', '_') + '.egg-info')
+
+# Runs the python setup
+setup(
+    name=PACKAGE_NAME,
+    version=connector_version,
+    include_package_data=True,
+    url='https://flink.apache.org',
+    license='https://www.apache.org/licenses/LICENSE-2.0',
+    author='Apache Software Foundation',
+    author_email='[email protected]',
+    python_requires='>=3.7',
+    install_requires=[flink_dependency],
+    description='Apache Flink Python AWS Connector API',
+    long_description=long_description,
+    long_description_content_type='text/plain',
+    zip_safe=False,
+    py_modules=[
+        "pyflink"
+    ],
+    classifiers=[
+        'Development Status :: 5 - Production/Stable',
+        'License :: OSI Approved :: Apache Software License',
+        'Programming Language :: Python :: 3.7',
+        'Programming Language :: Python :: 3.8',
+        'Programming Language :: Python :: 3.9',
+        'Programming Language :: Python :: 3.10']
+)
+
+print("\nFlink AWS connector package is ready\n")
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
new file mode 100644
index 0000000..acbc519
--- /dev/null
+++ b/flink-python/tox.ini
@@ -0,0 +1,51 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+[tox]
+# tox (https://tox.readthedocs.io/) is a tool for running tests
+# in multiple virtualenvs. This configuration file will run the
+# test suite on all supported python versions.
+# new environments will be excluded by default unless explicitly added to 
envlist.
+envlist = {py37, py38, py39, py310}-cython
+
+[testenv]
+whitelist_externals = /bin/bash
+deps = apache-flink
+passenv = *
+commands =
+    python --version
+    pip install pytest
+    bash ./dev/integration_test.sh
+# Replace the default installation command with a custom retry installation 
script, because on high-speed
+# networks, downloading a package may raise a ConnectionResetError: [Errno 
104] Peer reset connection.
+install_command = {toxinidir}/dev/install_command.sh {opts} {packages}
+
+[flake8]
+# We follow PEP 8 (https://www.python.org/dev/peps/pep-0008/) with one 
exception: lines can be
+# up to 100 characters in length, not 79.
+ignore=E226,E241,E305,E402,E722,E731,E741,W503,W504
+max-line-length=100
+exclude=.tox/*,dev/*,lib/*,target/*,build/*,dist/*
+
+[mypy]
+files=pyflink/datastream/connectors/*.py
+ignore_missing_imports = True
+strict_optional=False
+
+[mypy-pyflink.fn_execution.*]
+ignore_errors = True
diff --git a/pom.xml b/pom.xml
index 324a98f..f9f0ade 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,10 +74,9 @@ under the License.
 
     <modules>
         <module>flink-connector-aws-base</module>
-
         <module>flink-connector-aws</module>
         <module>flink-formats-aws</module>
-
+        <module>flink-python</module>
         <module>flink-connector-aws-e2e-tests</module>
     </modules>
 

Reply via email to