This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new b7f2e26 [FLINK-33560][Connectors/AWS] Externalize AWS Python
connectors from Flink to AWS project
b7f2e26 is described below
commit b7f2e268e377ee18e9133ef1367bdecf04d62d71
Author: Danny Cranmer <[email protected]>
AuthorDate: Mon Dec 18 13:33:42 2023 +0000
[FLINK-33560][Connectors/AWS] Externalize AWS Python connectors from Flink
to AWS project
---
.github/workflows/nightly.yml | 10 +-
.github/workflows/push_pr.yml | 8 +
.gitignore | 17 +-
.../nightly.yml => flink-python/MANIFEST.in | 18 +-
flink-python/README.txt | 14 +
flink-python/dev/integration_test.sh | 50 ++
flink-python/pom.xml | 173 +++++++
.../pyflink/datastream/connectors/kinesis.py | 548 +++++++++++++++++++++
.../datastream/connectors/tests/test_kinesis.py | 108 ++++
flink-python/pyflink/pyflink_gateway_server.py | 288 +++++++++++
flink-python/setup.py | 153 ++++++
flink-python/tox.ini | 51 ++
pom.xml | 3 +-
13 files changed, 1421 insertions(+), 20 deletions(-)
diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml
index d50e7a9..a30d95a 100644
--- a/.github/workflows/nightly.yml
+++ b/.github/workflows/nightly.yml
@@ -31,4 +31,12 @@ jobs:
flink_version: ${{ matrix.flink }}
flink_url: https://s3.amazonaws.com/flink-nightly/flink-${{ matrix.flink
}}-bin-scala_2.12.tgz
cache_flink_binary: false
- secrets: inherit
\ No newline at end of file
+ secrets: inherit
+
+ python_test:
+ strategy:
+ matrix:
+ flink: [1.17-SNAPSHOT, 1.18-SNAPSHOT, 1.19-SNAPSHOT]
+ uses:
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
+ with:
+ flink_version: ${{ matrix.flink }}
diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 82da4df..cfbe8c0 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -32,3 +32,11 @@ jobs:
flink_url: https://archive.apache.org/dist/flink/flink-${{ matrix.flink
}}/flink-${{ matrix.flink }}-bin-scala_2.12.tgz
cache_flink_binary: true
secrets: inherit
+
+ python_test:
+ strategy:
+ matrix:
+ flink: [ 1.17.1, 1.18.0 ]
+ uses:
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
+ with:
+ flink_version: ${{ matrix.flink }}
diff --git a/.gitignore b/.gitignore
index 973e8d5..2deddc5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,4 +36,19 @@ tools/flink
tools/flink-*
tools/releasing/release
tools/japicmp-output
-*/.idea/
\ No newline at end of file
+*/.idea/
+
+flink-python/pyflink/datastream/connectors/aws_connector_version.py
+flink-python/apache_flink_connector_aws.egg-info/
+flink-python/.tox/
+flink-python/build
+flink-python/dist
+flink-python/dev/download
+flink-python/dev/.conda/
+flink-python/dev/log/
+flink-python/dev/.stage.txt
+flink-python/dev/install_command.sh
+flink-python/dev/lint-python.sh
+flink-python/dev/build-wheels.sh
+flink-python/dev/glibc_version_fix.h
+flink-python/dev/dev-requirements.txt
\ No newline at end of file
diff --git a/.github/workflows/nightly.yml b/flink-python/MANIFEST.in
similarity index 66%
copy from .github/workflows/nightly.yml
copy to flink-python/MANIFEST.in
index d50e7a9..720decf 100644
--- a/.github/workflows/nightly.yml
+++ b/flink-python/MANIFEST.in
@@ -16,19 +16,5 @@
# limitations under the License.
################################################################################
-name: "flink-connector-aws: nightly build"
-on:
- schedule:
- - cron: "0 0 * * *"
-jobs:
- compile_and_test:
- if: github.repository_owner == 'apache'
- strategy:
- matrix:
- flink: [1.17-SNAPSHOT, 1.18-SNAPSHOT, 1.19-SNAPSHOT]
- uses: ./.github/workflows/common.yml
- with:
- flink_version: ${{ matrix.flink }}
- flink_url: https://s3.amazonaws.com/flink-nightly/flink-${{ matrix.flink
}}-bin-scala_2.12.tgz
- cache_flink_binary: false
- secrets: inherit
\ No newline at end of file
+graft pyflink
+global-exclude *.py[cod] __pycache__ .DS_Store
diff --git a/flink-python/README.txt b/flink-python/README.txt
new file mode 100644
index 0000000..e792225
--- /dev/null
+++ b/flink-python/README.txt
@@ -0,0 +1,14 @@
+These are the official AWS Python connectors for Apache Flink
+
+For the latest information about Flink connector, please visit our website at:
+
+https://flink.apache.org
+
+and our GitHub Account for the AWS connectors
+
+https://github.com/apache/flink-connector-aws
+
+If you have any questions, ask on our Mailing lists:
+
[email protected]
[email protected]
\ No newline at end of file
diff --git a/flink-python/dev/integration_test.sh
b/flink-python/dev/integration_test.sh
new file mode 100755
index 0000000..751cf6c
--- /dev/null
+++ b/flink-python/dev/integration_test.sh
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+function test_module() {
+ module="$FLINK_PYTHON_DIR/pyflink/$1"
+ echo "test module $module"
+ pytest --durations=20 ${module} $2
+ if [[ $? -ne 0 ]]; then
+ echo "test module $module failed"
+ exit 1
+ fi
+}
+
+function test_all_modules() {
+ # test datastream module
+ test_module "datastream"
+}
+
+# CURRENT_DIR is "<root-project-folder>/flink-python/dev/"
+CURRENT_DIR="$(cd "$( dirname "$0" )" && pwd)"
+
+# FLINK_PYTHON_DIR is "<root-project-folder>/flink-python/"
+FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
+
+# set the FLINK_TEST_LIB_DIR to
"<root-project-folder>/flink-connector-aws/flink-python-connector-aws/target/dep..."
+export FLINK_TEST_LIBS="${FLINK_PYTHON_DIR}/target/test-dependencies/*"
+
+# Temporarily update the installed 'pyflink_gateway_server.py' files with the
new one
+# Needed only until Flink 1.19 release
+echo "Checking ${FLINK_PYTHON_DIR} for 'pyflink_gateway_server.py'"
+find "${FLINK_PYTHON_DIR}/.tox" -name pyflink_gateway_server.py -exec cp
"${FLINK_PYTHON_DIR}/pyflink/pyflink_gateway_server.py" {} \;
+
+# python test
+test_all_modules
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
new file mode 100644
index 0000000..7af0852
--- /dev/null
+++ b/flink-python/pom.xml
@@ -0,0 +1,173 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws</artifactId>
+ <version>4.3-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-python-connector-aws</artifactId>
+ <name>Flink : Connectors : AWS : Python</name>
+
+ <packaging>pom</packaging>
+
+ <properties>
+ <python.infra.download.skip>false</python.infra.download.skip>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-connector-kinesis</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>clean</id>
+ <phase>clean</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <delete dir="${project.basedir}/.tox"/>
+ <delete
dir="${project.basedir}/apache_flink_connector_aws.egg-info"/>
+ <delete dir="${project.basedir}/dev/.conda"/>
+ <delete dir="${project.basedir}/dev/download"/>
+ <delete dir="${project.basedir}/dev/log"/>
+ <delete dir="${project.basedir}/build"/>
+ <delete dir="${project.basedir}/dist"/>
+ <delete dir="${project.basedir}/pyflink/lib"/>
+ <delete dir="${project.basedir}/target"/>
+ <delete
file="${project.basedir}/dev/.stage.txt"/>
+ <delete
file="${project.basedir}/dev/install_command.sh"/>
+ <delete
file="${project.basedir}/dev/lint-python.sh"/>
+ <delete
file="${project.basedir}/dev/build-wheels.sh"/>
+ <delete
file="${project.basedir}/dev/glibc_version_fix.h"/>
+ <delete
file="${project.basedir}/dev/dev-requirements.txt"/>
+ <delete
file="${project.basedir}/pyflink/datastream/connectors/aws_connector_version.py"/>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-sql-connector-kinesis</artifactId>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
+ </artifactItem>
+ </artifactItems>
+
<outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>wagon-maven-plugin</artifactId>
+ <version>2.0.2</version>
+ <executions>
+ <execution>
+ <id>download-install</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>download-single</goal>
+ </goals>
+ <configuration>
+
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/install_command.sh</url>
+ <toDir>${project.basedir}/dev</toDir>
+ <skip>${python.infra.download.skip}</skip>
+ </configuration>
+ </execution>
+ <execution>
+ <id>download-lint</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>download-single</goal>
+ </goals>
+ <configuration>
+
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/lint-python.sh</url>
+ <toDir>${project.basedir}/dev</toDir>
+ <skip>${python.infra.download.skip}</skip>
+ </configuration>
+ </execution>
+ <execution>
+ <id>download-build-wheels</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>download-single</goal>
+ </goals>
+ <configuration>
+
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/build-wheels.sh</url>
+ <toDir>${project.basedir}/dev</toDir>
+ <skip>${python.infra.download.skip}</skip>
+ </configuration>
+ </execution>
+ <execution>
+ <id>download-build-version-header</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>download-single</goal>
+ </goals>
+ <configuration>
+
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/glibc_version_fix.h</url>
+ <toDir>${project.basedir}/dev</toDir>
+ <skip>${python.infra.download.skip}</skip>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/flink-python/pyflink/datastream/connectors/kinesis.py
b/flink-python/pyflink/datastream/connectors/kinesis.py
new file mode 100644
index 0000000..355c954
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/kinesis.py
@@ -0,0 +1,548 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Dict, Union, List
+
+from pyflink.common import SerializationSchema, DeserializationSchema, \
+ AssignerWithPeriodicWatermarksWrapper
+from pyflink.datastream.functions import SourceFunction
+from pyflink.datastream.connectors import Sink
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+ 'KinesisShardAssigner',
+ 'KinesisDeserializationSchema',
+ 'WatermarkTracker',
+ 'PartitionKeyGenerator',
+ 'FlinkKinesisConsumer',
+ 'KinesisStreamsSink',
+ 'KinesisStreamsSinkBuilder',
+ 'KinesisFirehoseSink',
+ 'KinesisFirehoseSinkBuilder'
+]
+
+
+# ---- KinesisSource ----
+
+
+class KinesisShardAssigner(object):
+ """
+ Utility to map Kinesis shards to Flink subtask indices. Users can provide
a Java
+ KinesisShardAssigner in Python if they want to provide custom shared
assigner.
+ """
+ def __init__(self, j_kinesis_shard_assigner):
+ self._j_kinesis_shard_assigner = j_kinesis_shard_assigner
+
+ @staticmethod
+ def default_shard_assigner() -> 'KinesisShardAssigner':
+ """
+ A Default KinesisShardAssigner that maps Kinesis shard hash-key ranges
to Flink subtasks.
+ """
+ return
KinesisShardAssigner(get_gateway().jvm.org.apache.flink.streaming.connectors.
+
kinesis.internals.KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER)
+
+ @staticmethod
+ def uniform_shard_assigner() -> 'KinesisShardAssigner':
+ """
+ A KinesisShardAssigner that maps Kinesis shard hash-key ranges to
Flink subtasks.
+ It creates a more uniform distribution of shards across subtasks than
org.apache.flink. \
+
streaming.connectors.kinesis.internals.KinesisDataFetcher.DEFAULT_SHARD_ASSIGNER
when the
+ Kinesis records in the stream have hash keys that are uniformly
distributed over all
+ possible hash keys, which is the case if records have
randomly-generated partition keys.
+ (This is the same assumption made if you use the Kinesis
UpdateShardCount operation with
+ UNIFORM_SCALING.)
+ """
+ return
KinesisShardAssigner(get_gateway().jvm.org.apache.flink.streaming.connectors.
+ kinesis.util.UniformShardAssigner())
+
+
+class KinesisDeserializationSchema(object):
+ """
+ This is a deserialization schema specific for the Flink Kinesis Consumer.
Different from the
+ basic DeserializationSchema, this schema offers additional
Kinesis-specific information about
+ the record that may be useful to the user application.
+ """
+
+ def __init__(self, j_kinesis_deserialization_schema):
+ self._j_kinesis_deserialization_schema =
j_kinesis_deserialization_schema
+
+
+class WatermarkTracker(object):
+ """
+ The watermark tracker is responsible for aggregating watermarks across
distributed operators.
+ It can be used for sub tasks of a single Flink source as well as multiple
heterogeneous sources
+ or other operators.The class essentially functions like a distributed hash
table that enclosing
+ operators can use to adopt their processing / IO rates
+ """
+
+ def __init__(self, j_watermark_tracker):
+ self._j_watermark_tracker = j_watermark_tracker
+
+ @staticmethod
+ def job_manager_watermark_tracker(
+ aggregate_name: str, log_accumulator_interval_millis: int = -1) ->
'WatermarkTracker':
+ j_watermark_tracker =
get_gateway().jvm.org.apache.flink.streaming.connectors.kinesis.util \
+ .JobManagerWatermarkTracker(aggregate_name,
log_accumulator_interval_millis)
+ return WatermarkTracker(j_watermark_tracker)
+
+
+class FlinkKinesisConsumer(SourceFunction):
+ """
+ The Flink Kinesis Consumer is an exactly-once parallel streaming data
source that subscribes to
+ multiple AWS Kinesis streams within the same AWS service region, and can
handle resharding of
+ streams. Each subtask of the consumer is responsible for fetching data
records from multiple
+ Kinesis shards. The number of shards fetched by each subtask will change
as shards are closed
+ and created by Kinesis.
+
+ To leverage Flink's checkpointing mechanics for exactly-once streaming
processing guarantees,
+ the Flink Kinesis consumer is implemented with the AWS Java SDK, instead
of the officially
+ recommended AWS Kinesis Client Library, for low-level control on the
management of stream state.
+ The Flink Kinesis Connector also supports setting the initial starting
points of Kinesis
+ streams, namely TRIM_HORIZON and LATEST.
+
+ Kinesis and the Flink consumer support dynamic re-sharding and shard IDs,
while sequential,
+ cannot be assumed to be consecutive. There is no perfect generic default
assignment function.
+ Default shard to subtask assignment, which is based on hash code, may
result in skew, with some
+ subtasks having many shards assigned and others none.
+
+ It is recommended to monitor the shard distribution and adjust assignment
appropriately.
+ A custom assigner implementation can be set via
setShardAssigner(KinesisShardAssigner) to
+ optimize the hash function or use static overrides to limit skew.
+
+ In order for the consumer to emit watermarks, a timestamp assigner needs
to be set via
+ setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto
watermark emit
+ interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
+
+ Watermarks can only advance when all shards of a subtask continuously
deliver records.
+ To avoid an inactive or closed shard to block the watermark progress, the
idle timeout should
+ be configured via configuration property
ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
+ By default, shards won't be considered idle and watermark calculation will
wait for newer
+ records to arrive from all shards.
+
+ Note that re-sharding of the Kinesis stream while an application (that
relies on the Kinesis
+ records for watermarking) is running can lead to incorrect late events.
This depends on how
+ shards are assigned to subtasks and applies regardless of whether
watermarks are generated in
+ the source or a downstream operator.
+ """
+
+ def __init__(self,
+ streams: Union[str, List[str]],
+ deserializer: Union[DeserializationSchema,
KinesisDeserializationSchema],
+ config_props: Dict
+ ):
+ gateway = get_gateway()
+ j_properties = gateway.jvm.java.util.Properties()
+ for key, value in config_props.items():
+ j_properties.setProperty(key, value)
+
+ JFlinkKinesisConsumer =
gateway.jvm.org.apache.flink.streaming.connectors.kinesis. \
+ FlinkKinesisConsumer
+ JKinesisDeserializationSchemaWrapper =
get_gateway().jvm.org.apache.flink.streaming. \
+
connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper
+
+ if isinstance(streams, str):
+ streams = [streams]
+
+ if isinstance(deserializer, DeserializationSchema):
+ deserializer = JKinesisDeserializationSchemaWrapper(
+ deserializer._j_deserialization_schema)
+
+ self._j_kinesis_consumer = JFlinkKinesisConsumer(streams,
deserializer, j_properties)
+ super(FlinkKinesisConsumer, self).__init__(self._j_kinesis_consumer)
+
+ def set_shard_assigner(self, shard_assigner: KinesisShardAssigner) ->
'FlinkKinesisConsumer':
+ """
+ Provide a custom assigner to influence how shards are distributed over
subtasks.
+ """
+
self._j_kinesis_consumer.setShardAssigner(shard_assigner._j_kinesis_shard_assigner)
+ return self
+
+ def set_periodic_watermark_assigner(
+ self,
+ periodic_watermark_assigner: AssignerWithPeriodicWatermarksWrapper) \
+ -> 'FlinkKinesisConsumer':
+ """
+ Set the assigner that will extract the timestamp from T and calculate
the watermark.
+ """
+ self._j_kinesis_consumer.setPeriodicWatermarkAssigner(
+ periodic_watermark_assigner._j_assigner_with_periodic_watermarks)
+ return self
+
+ def set_watermark_tracker(self, watermark_tracker: WatermarkTracker) ->
'FlinkKinesisConsumer':
+ """
+ Set the global watermark tracker. When set, it will be used by the
fetcher to align the
+ shard consumers by event time.
+ """
+
self._j_kinesis_consumer.setWatermarkTracker(watermark_tracker._j_watermark_tracker)
+ return self
+
+
+# ---- KinesisSink ----
+
+class PartitionKeyGenerator(object):
+ """
+ This is a generator convert from an input element to the partition key, a
string.
+ """
+ def __init__(self, j_partition_key_generator):
+ self._j_partition_key_generator = j_partition_key_generator
+
+ @staticmethod
+ def fixed() -> 'PartitionKeyGenerator':
+ """
+ A partitioner ensuring that each internal Flink partition ends up in
the same Kinesis
+ partition. This is achieved by using the index of the producer task as
a PartitionKey.
+ """
+ return
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+ FixedKinesisPartitionKeyGenerator())
+
+ @staticmethod
+ def random() -> 'PartitionKeyGenerator':
+ """
+ A PartitionKeyGenerator that maps an arbitrary input element to a
random partition ID.
+ """
+ return
PartitionKeyGenerator(get_gateway().jvm.org.apache.flink.connector.kinesis.table.
+ RandomKinesisPartitionKeyGenerator())
+
+
+class KinesisStreamsSink(Sink):
+ """
+ A Kinesis Data Streams (KDS) Sink that performs async requests against a
destination stream
+ using the buffering protocol.
+
+ The sink internally uses a
software.amazon.awssdk.services.kinesis.KinesisAsyncClient to
+ communicate with the AWS endpoint.
+
+ The behaviour of the buffering may be specified by providing configuration
during the sink
+ build time.
+
+ - maxBatchSize: the maximum size of a batch of entries that may be sent to
KDS
+ - maxInFlightRequests: the maximum number of in flight requests that may
exist, if any more in
+ flight requests need to be initiated once the maximum has been
reached, then it will be
+ blocked until some have completed
+ - maxBufferedRequests: the maximum number of elements held in the buffer,
requests to add
+ elements will be blocked while the number of elements in the buffer is
at the maximum
+ - maxBatchSizeInBytes: the maximum size of a batch of entries that may be
sent to KDS
+ measured in bytes
+ - maxTimeInBufferMS: the maximum amount of time an entry is allowed to
live in the buffer,
+ if any element reaches this age, the entire buffer will be flushed
immediately
+ - maxRecordSizeInBytes: the maximum size of a record the sink will accept
into the buffer,
+ a record of size larger than this will be rejected when passed to the
sink
+ - failOnError: when an exception is encountered while persisting to
Kinesis Data Streams,
+ the job will fail immediately if failOnError is set
+ """
+
+ def __init__(self, j_kinesis_streams_sink):
+ super(KinesisStreamsSink, self).__init__(sink=j_kinesis_streams_sink)
+
+ @staticmethod
+ def builder() -> 'KinesisStreamsSinkBuilder':
+ return KinesisStreamsSinkBuilder()
+
+
+class KinesisStreamsSinkBuilder(object):
+ """
+ Builder to construct KinesisStreamsSink.
+
+ The following example shows the minimum setup to create a
KinesisStreamsSink that writes String
+ values to a Kinesis Data Streams stream named your_stream_here.
+
+ Example:
+ ::
+
+ >>> from pyflink.common.serialization import SimpleStringSchema
+ >>> sink_properties = {"aws.region": "eu-west-1"}
+ >>> sink = KinesisStreamsSink.builder() \\
+ ... .set_kinesis_client_properties(sink_properties) \\
+ ... .set_stream_name("your_stream_name") \\
+ ... .set_serialization_schema(SimpleStringSchema()) \\
+ ... .set_partition_key_generator(PartitionKeyGenerator.random()) \\
+ ... .build()
+
+ If the following parameters are not set in this builder, the following
defaults will be used:
+
+ - maxBatchSize will be 500
+ - maxInFlightRequests will be 50
+ - maxBufferedRequests will be 10000
+ - maxBatchSizeInBytes will be 5 MB i.e. 5 * 1024 * 1024
+ - maxTimeInBufferMS will be 5000ms
+ - maxRecordSizeInBytes will be 1 MB i.e. 1 * 1024 * 1024
+ - failOnError will be false
+ """
+
+ def __init__(self):
+ JKinesisStreamsSink =
get_gateway().jvm.org.apache.flink.connector.kinesis.sink.\
+ KinesisStreamsSink
+ self._j_kinesis_sink_builder = JKinesisStreamsSink.builder()
+
+ def set_stream_name(self, stream_name: Union[str, List[str]]) ->
'KinesisStreamsSinkBuilder':
+ """
+ Sets the name of the KDS stream that the sink will connect to. There
is no default for this
+ parameter, therefore, this must be provided at sink creation time
otherwise the build will
+ fail.
+ """
+ self._j_kinesis_sink_builder.setStreamName(stream_name)
+ return self
+
+ def set_serialization_schema(self, serialization_schema:
SerializationSchema) \
+ -> 'KinesisStreamsSinkBuilder':
+ """
+ Sets the SerializationSchema of the KinesisSinkBuilder.
+ """
+ self._j_kinesis_sink_builder.setSerializationSchema(
+ serialization_schema._j_serialization_schema)
+ return self
+
+ def set_partition_key_generator(self, partition_key_generator:
PartitionKeyGenerator) \
+ -> 'KinesisStreamsSinkBuilder':
+ """
+ Sets the PartitionKeyGenerator of the KinesisSinkBuilder.
+ """
+ self._j_kinesis_sink_builder.setPartitionKeyGenerator(
+ partition_key_generator._j_partition_key_generator)
+ return self
+
+ def set_fail_on_error(self, fail_on_error: bool) ->
'KinesisStreamsSinkBuilder':
+ """
+ Sets the failOnError of the KinesisSinkBuilder. If failOnError is on,
then a runtime
+ exception will be raised. Otherwise, those records will be requested
in the buffer for
+ retry.
+ """
+ self._j_kinesis_sink_builder.setFailOnError(fail_on_error)
+ return self
+
+ def set_kinesis_client_properties(self, kinesis_client_properties: Dict) \
+ -> 'KinesisStreamsSinkBuilder':
+ """
+ Sets the kinesisClientProperties of the KinesisSinkBuilder.
+ """
+ j_properties = get_gateway().jvm.java.util.Properties()
+ for key, value in kinesis_client_properties.items():
+ j_properties.setProperty(key, value)
+ self._j_kinesis_sink_builder.setKinesisClientProperties(j_properties)
+ return self
+
+ def set_max_batch_size(self, max_batch_size: int) ->
'KinesisStreamsSinkBuilder':
+ """
+ Maximum number of elements that may be passed in a list to be written
downstream.
+ """
+ self._j_kinesis_sink_builder.setMaxBatchSize(max_batch_size)
+ return self
+
+ def set_max_in_flight_requests(self, max_in_flight_requests: int) \
+ -> 'KinesisStreamsSinkBuilder':
+ """
+ Maximum number of uncompleted calls to submitRequestEntries that the
SinkWriter will allow
+ at any given point. Once this point has reached, writes and callbacks
to add elements to
+ the buffer may block until one or more requests to
submitRequestEntries completes.
+ """
+
self._j_kinesis_sink_builder.setMaxInFlightRequests(max_in_flight_requests)
+ return self
+
+ def set_max_buffered_requests(self, max_buffered_requests: int) ->
'KinesisStreamsSinkBuilder':
+ """
+ The maximum buffer length. Callbacks to add elements to the buffer and
calls to write will
+ block if this length has been reached and will only unblock if
elements from the buffer have
+ been removed for flushing.
+ """
+
self._j_kinesis_sink_builder.setMaxBufferedRequests(max_buffered_requests)
+ return self
+
+ def set_max_batch_size_in_bytes(self, max_batch_size_in_bytes: int) \
+ -> 'KinesisStreamsSinkBuilder':
+ """
+ The flush will be attempted if the most recent call to write
introduces an element to the
+ buffer such that the total size of the buffer is greater than or equal
to this threshold
+ value. If this happens, the maximum number of elements from the head
of the buffer will be
+ selected, that is smaller than maxBatchSizeInBytes in size will be
flushed.
+ """
+
self._j_kinesis_sink_builder.setMaxBatchSizeInBytes(max_batch_size_in_bytes)
+ return self
+
+ def set_max_time_in_buffer_ms(self, max_time_in_buffer_ms: int) ->
'KinesisStreamsSinkBuilder':
+ """
+ The maximum amount of time an element may remain in the buffer. In
most cases elements are
+ flushed as a result of the batch size (in bytes or number) being
reached or during a
+ snapshot. However, there are scenarios where an element may remain in
the buffer forever or
+ a long period of time. To mitigate this, a timer is constantly active
in the buffer such
+ that: while the buffer is not empty, it will flush every
maxTimeInBufferMS milliseconds.
+ """
+
self._j_kinesis_sink_builder.setMaxTimeInBufferMS(max_time_in_buffer_ms)
+ return self
+
+ def set_max_record_size_in_bytes(self, max_record_size_in_bytes: int) \
+ -> 'KinesisStreamsSinkBuilder':
+ """
+ The maximum size of each records in bytes. If a record larger than
this is passed to the
+ sink, it will throw an IllegalArgumentException.
+ """
+
self._j_kinesis_sink_builder.setMaxRecordSizeInBytes(max_record_size_in_bytes)
+ return self
+
+ def build(self) -> 'KinesisStreamsSink':
+ """
+ Build thd KinesisStreamsSink.
+ """
+ return KinesisStreamsSink(self._j_kinesis_sink_builder.build())
+
+
+class KinesisFirehoseSink(Sink):
+ """
+ A Kinesis Data Firehose (KDF) Sink that performs async requests against a
destination delivery
+ stream using the buffering protocol.
+ """
+ def __init__(self, j_kinesis_firehose_sink):
+ super(KinesisFirehoseSink, self).__init__(sink=j_kinesis_firehose_sink)
+
+ @staticmethod
+ def builder() -> 'KinesisFirehoseSinkBuilder':
+ return KinesisFirehoseSinkBuilder()
+
+
+class KinesisFirehoseSinkBuilder(object):
+ """
+ Builder to construct KinesisFirehoseSink.
+
+ The following example shows the minimum setup to create a
KinesisFirehoseSink that writes
+ String values to a Kinesis Data Firehose delivery stream named
delivery-stream-name.
+
+ Example:
+ ::
+
+ >>> from pyflink.common.serialization import SimpleStringSchema
+ >>> sink_properties = {"aws.region": "eu-west-1"}
+ >>> sink = KinesisFirehoseSink.builder() \\
+ ... .set_firehose_client_properties(sink_properties) \\
+ ... .set_delivery_stream_name("delivery-stream-name") \\
+ ... .set_serialization_schema(SimpleStringSchema()) \\
+ ... .set_max_batch_size(20) \\
+ ... .build()
+
+ If the following parameters are not set in this builder, the following
defaults will be used:
+ - maxBatchSize will be 500
+ - maxInFlightRequests will be 50
+ - maxBufferedRequests will be 10000
+ - maxBatchSizeInBytes will be 4 MB i.e. 4 * 1024 * 1024
+ - maxTimeInBufferMS will be 5000ms
+ - maxRecordSizeInBytes will be 1000 KB i.e. 1000 * 1024
+ - failOnError will be false
+ """
+ def __init__(self):
+ JKinesisFirehoseSink =
get_gateway().jvm.org.apache.flink.connector.firehose.sink. \
+ KinesisFirehoseSink
+ self._j_kinesis_sink_builder = JKinesisFirehoseSink.builder()
+
+ def set_delivery_stream_name(self, delivery_stream_name: str) ->
'KinesisFirehoseSinkBuilder':
+ """
+ Sets the name of the KDF delivery stream that the sink will connect
to. There is no default
+ for this parameter, therefore, this must be provided at sink creation
time otherwise the
+ build will fail.
+ """
+
self._j_kinesis_sink_builder.setDeliveryStreamName(delivery_stream_name)
+ return self
+
+ def set_serialization_schema(self, serialization_schema:
SerializationSchema) \
+ -> 'KinesisFirehoseSinkBuilder':
+ """
+ Allows the user to specify a serialization schema to serialize each
record to persist to
+ Firehose.
+ """
+ self._j_kinesis_sink_builder.setSerializationSchema(
+ serialization_schema._j_serialization_schema)
+ return self
+
+ def set_fail_on_error(self, fail_on_error: bool) ->
'KinesisFirehoseSinkBuilder':
+ """
+ If writing to Kinesis Data Firehose results in a partial or full
failure being returned,
+ the job will fail
+ """
+ self._j_kinesis_sink_builder.setFailOnError(fail_on_error)
+ return self
+
+ def set_firehose_client_properties(self, firehose_client_properties: Dict)
\
+ -> 'KinesisFirehoseSinkBuilder':
+ """
+ A set of properties used by the sink to create the firehose client.
This may be used to set
+ the aws region, credentials etc. See the docs for usage and syntax.
+ """
+ j_properties = get_gateway().jvm.java.util.Properties()
+ for key, value in firehose_client_properties.items():
+ j_properties.setProperty(key, value)
+ self._j_kinesis_sink_builder.setFirehoseClientProperties(j_properties)
+ return self
+
+ def set_max_batch_size(self, max_batch_size: int) ->
'KinesisFirehoseSinkBuilder':
+ """
+ Maximum number of elements that may be passed in a list to be written
downstream.
+ """
+ self._j_kinesis_sink_builder.setMaxBatchSize(max_batch_size)
+ return self
+
+ def set_max_in_flight_requests(self, max_in_flight_requests: int) \
+ -> 'KinesisFirehoseSinkBuilder':
+ """
+ Maximum number of uncompleted calls to submitRequestEntries that the
SinkWriter will allow
+ at any given point. Once this point has reached, writes and callbacks
to add elements to
+ the buffer may block until one or more requests to
submitRequestEntries completes.
+ """
+
self._j_kinesis_sink_builder.setMaxInFlightRequests(max_in_flight_requests)
+ return self
+
+ def set_max_buffered_requests(self, max_buffered_requests: int) ->
'KinesisFirehoseSinkBuilder':
+ """
+ The maximum buffer length. Callbacks to add elements to the buffer and
calls to write will
+ block if this length has been reached and will only unblock if
elements from the buffer have
+ been removed for flushing.
+ """
+
self._j_kinesis_sink_builder.setMaxBufferedRequests(max_buffered_requests)
+ return self
+
+ def set_max_batch_size_in_bytes(self, max_batch_size_in_bytes: int) \
+ -> 'KinesisFirehoseSinkBuilder':
+ """
+ The flush will be attempted if the most recent call to write
introduces an element to the
+ buffer such that the total size of the buffer is greater than or equal
to this threshold
+ value. If this happens, the maximum number of elements from the head
of the buffer will be
+ selected, that is smaller than maxBatchSizeInBytes in size will be
flushed.
+ """
+
self._j_kinesis_sink_builder.setMaxBatchSizeInBytes(max_batch_size_in_bytes)
+ return self
+
+ def set_max_time_in_buffer_ms(self, max_time_in_buffer_ms: int) ->
'KinesisFirehoseSinkBuilder':
+ """
+ The maximum amount of time an element may remain in the buffer. In
most cases elements are
+ flushed as a result of the batch size (in bytes or number) being
reached or during a
+ snapshot. However, there are scenarios where an element may remain in
the buffer forever or
+ a long period of time. To mitigate this, a timer is constantly active
in the buffer such
+ that: while the buffer is not empty, it will flush every
maxTimeInBufferMS milliseconds.
+ """
+
self._j_kinesis_sink_builder.setMaxTimeInBufferMS(max_time_in_buffer_ms)
+ return self
+
+ def set_max_record_size_in_bytes(self, max_record_size_in_bytes: int) \
+ -> 'KinesisFirehoseSinkBuilder':
+ """
+ The maximum size of each records in bytes. If a record larger than
this is passed to the
+ sink, it will throw an IllegalArgumentException.
+ """
+
self._j_kinesis_sink_builder.setMaxRecordSizeInBytes(max_record_size_in_bytes)
+ return self
+
+ def build(self) -> 'KinesisFirehoseSink':
+ """
+ Build thd KinesisFirehoseSink.
+ """
+ return KinesisFirehoseSink(self._j_kinesis_sink_builder.build())
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kinesis.py
b/flink-python/pyflink/datastream/connectors/tests/test_kinesis.py
new file mode 100644
index 0000000..3007711
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/tests/test_kinesis.py
@@ -0,0 +1,108 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common import SimpleStringSchema, Types
+from pyflink.datastream.connectors.kinesis import PartitionKeyGenerator,
FlinkKinesisConsumer, \
+ KinesisStreamsSink, KinesisFirehoseSink
+from pyflink.testing.test_case_utils import PyFlinkUTTestCase
+from pyflink.util.java_utils import get_field_value
+
+
+class FlinkKinesisTest(PyFlinkUTTestCase):
+
+ def test_kinesis_source(self):
+ consumer_config = {
+ 'aws.region': 'us-east-1',
+ 'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+ 'aws.credentials.provider.basic.secretkey':
'aws_secret_access_key',
+ 'flink.stream.initpos': 'LATEST'
+ }
+
+ kinesis_source = FlinkKinesisConsumer("stream-1",
SimpleStringSchema(), consumer_config)
+
+ ds = self.env.add_source(source_func=kinesis_source,
source_name="kinesis source")
+ ds.print()
+ plan = eval(self.env.get_execution_plan())
+ self.assertEqual('Source: kinesis source', plan['nodes'][0]['type'])
+ self.assertEqual(
+ get_field_value(kinesis_source.get_java_function(), 'streams')[0],
'stream-1')
+
+ def test_kinesis_streams_sink(self):
+ sink_properties = {
+ 'aws.region': 'us-east-1',
+ 'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
+ }
+
+ ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3),
('deeefg', 4)],
+ type_info=Types.ROW([Types.STRING(),
Types.INT()]))
+
+ kinesis_streams_sink = KinesisStreamsSink.builder() \
+ .set_kinesis_client_properties(sink_properties) \
+ .set_serialization_schema(SimpleStringSchema()) \
+ .set_partition_key_generator(PartitionKeyGenerator.fixed()) \
+ .set_stream_name("stream-1") \
+ .set_fail_on_error(False) \
+ .set_max_batch_size(500) \
+ .set_max_in_flight_requests(50) \
+ .set_max_buffered_requests(10000) \
+ .set_max_batch_size_in_bytes(5 * 1024 * 1024) \
+ .set_max_time_in_buffer_ms(5000) \
+ .set_max_record_size_in_bytes(1 * 1024 * 1024) \
+ .build()
+
+ ds.sink_to(kinesis_streams_sink).name('kinesis streams sink')
+ plan = eval(self.env.get_execution_plan())
+
+ self.assertEqual('kinesis streams sink: Writer',
plan['nodes'][1]['type'])
+
self.assertEqual(get_field_value(kinesis_streams_sink.get_java_function(),
'failOnError'),
+ False)
+ self.assertEqual(
+ get_field_value(kinesis_streams_sink.get_java_function(),
'streamName'), 'stream-1')
+
+ def test_kinesis_firehose_sink(self):
+
+ sink_properties = {
+ 'aws.region': 'eu-west-1',
+ 'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+ 'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
+ }
+
+ ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3),
('deeefg', 4)],
+ type_info=Types.ROW([Types.STRING(),
Types.INT()]))
+
+ kinesis_firehose_sink = KinesisFirehoseSink.builder() \
+ .set_firehose_client_properties(sink_properties) \
+ .set_serialization_schema(SimpleStringSchema()) \
+ .set_delivery_stream_name('stream-1') \
+ .set_fail_on_error(False) \
+ .set_max_batch_size(500) \
+ .set_max_in_flight_requests(50) \
+ .set_max_buffered_requests(10000) \
+ .set_max_batch_size_in_bytes(5 * 1024 * 1024) \
+ .set_max_time_in_buffer_ms(5000) \
+ .set_max_record_size_in_bytes(1 * 1024 * 1024) \
+ .build()
+
+ ds.sink_to(kinesis_firehose_sink).name('kinesis firehose sink')
+ plan = eval(self.env.get_execution_plan())
+
+ self.assertEqual('kinesis firehose sink: Writer',
plan['nodes'][1]['type'])
+
self.assertEqual(get_field_value(kinesis_firehose_sink.get_java_function(),
'failOnError'),
+ False)
+ self.assertEqual(
+ get_field_value(kinesis_firehose_sink.get_java_function(),
'deliveryStreamName'),
+ 'stream-1')
diff --git a/flink-python/pyflink/pyflink_gateway_server.py
b/flink-python/pyflink/pyflink_gateway_server.py
new file mode 100644
index 0000000..1cf25a5
--- /dev/null
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -0,0 +1,288 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This is a copy of the pyflink_gateway_server.py file from the Flink.
+# The original file which is accessible here:
+#
https://github.com/apache/flink/blob/master/flink-python/pyflink/pyflink_gateway_server.py
+# Additional change is the handling of the FLINK_TEST_LIB_DIR environmental
variable.
+# It could be used to add extra testing jars for the gateway classpath.
+# The plan is to remove this once Pyflink 1.19 is released
+
+import argparse
+import getpass
+import glob
+import os
+import platform
+import signal
+import socket
+import sys
+from collections import namedtuple
+from string import Template
+from subprocess import Popen, PIPE
+
+from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root
+
+KEY_ENV_LOG_DIR = "env.log.dir"
+KEY_ENV_YARN_CONF_DIR = "env.yarn.conf.dir"
+KEY_ENV_HADOOP_CONF_DIR = "env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR = "env.hbase.conf.dir"
+KEY_ENV_JAVA_HOME = "env.java.home"
+KEY_ENV_JAVA_OPTS = "env.java.opts.all"
+KEY_ENV_JAVA_OPTS_DEPRECATED = "env.java.opts"
+
+
+def on_windows():
+ return platform.system() == "Windows"
+
+
+def read_from_config(key, default_value, flink_conf_file):
+ value = default_value
+ # get the realpath of tainted path value to avoid CWE22 problem that
constructs a path or URI
+ # using the tainted value and might allow an attacker to access, modify,
or test the existence
+ # of critical or sensitive files.
+ with open(os.path.realpath(flink_conf_file), "r") as f:
+ while True:
+ line = f.readline()
+ if not line:
+ break
+ if line.startswith("#") or len(line.strip()) == 0:
+ continue
+ k, v = line.split(":", 1)
+ if k.strip() == key:
+ value = v.strip()
+ return value
+
+
+def find_java_executable():
+ java_executable = "java.exe" if on_windows() else "java"
+ flink_home = _find_flink_home()
+ flink_conf_file = os.path.join(flink_home, "conf", "flink-conf.yaml")
+ java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_file)
+
+ if java_home is None and "JAVA_HOME" in os.environ:
+ java_home = os.environ["JAVA_HOME"]
+
+ if java_home is not None:
+ java_executable = os.path.join(java_home, "bin", java_executable)
+
+ return java_executable
+
+
+def prepare_environment_variables(env):
+ flink_home = _find_flink_home()
+ # get the realpath of tainted path value to avoid CWE22 problem that
constructs a path or URI
+ # using the tainted value and might allow an attacker to access, modify,
or test the existence
+ # of critical or sensitive files.
+ real_flink_home = os.path.realpath(flink_home)
+
+ if 'FLINK_CONF_DIR' in env:
+ flink_conf_directory = os.path.realpath(env['FLINK_CONF_DIR'])
+ else:
+ flink_conf_directory = os.path.join(real_flink_home, "conf")
+ env['FLINK_CONF_DIR'] = flink_conf_directory
+
+ if 'FLINK_LIB_DIR' in env:
+ flink_lib_directory = os.path.realpath(env['FLINK_LIB_DIR'])
+ else:
+ flink_lib_directory = os.path.join(real_flink_home, "lib")
+ env['FLINK_LIB_DIR'] = flink_lib_directory
+
+ if 'FLINK_OPT_DIR' in env:
+ flink_opt_directory = os.path.realpath(env['FLINK_OPT_DIR'])
+ else:
+ flink_opt_directory = os.path.join(real_flink_home, "opt")
+ env['FLINK_OPT_DIR'] = flink_opt_directory
+
+ if 'FLINK_PLUGINS_DIR' in env:
+ flink_plugins_directory = os.path.realpath(env['FLINK_PLUGINS_DIR'])
+ else:
+ flink_plugins_directory = os.path.join(real_flink_home, "plugins")
+ env['FLINK_PLUGINS_DIR'] = flink_plugins_directory
+
+ env["FLINK_BIN_DIR"] = os.path.join(real_flink_home, "bin")
+
+
+def construct_log_settings(env):
+ templates = [
+
"-Dlog.file=${flink_log_dir}/flink-${flink_ident_string}-python-${hostname}.log",
+ "-Dlog4j.configuration=${log4j_properties}",
+ "-Dlog4j.configurationFile=${log4j_properties}",
+ "-Dlogback.configurationFile=${logback_xml}"
+ ]
+
+ flink_home = os.path.realpath(_find_flink_home())
+ flink_conf_dir = env['FLINK_CONF_DIR']
+ flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+
+ if "FLINK_LOG_DIR" in env:
+ flink_log_dir = env["FLINK_LOG_DIR"]
+ else:
+ flink_log_dir = read_from_config(
+ KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), flink_conf_file)
+
+ if "LOG4J_PROPERTIES" in env:
+ log4j_properties = env["LOG4J_PROPERTIES"]
+ else:
+ log4j_properties = "%s/log4j-cli.properties" % flink_conf_dir
+
+ if "LOGBACK_XML" in env:
+ logback_xml = env["LOGBACK_XML"]
+ else:
+ logback_xml = "%s/logback.xml" % flink_conf_dir
+
+ if "FLINK_IDENT_STRING" in env:
+ flink_ident_string = env["FLINK_IDENT_STRING"]
+ else:
+ flink_ident_string = getpass.getuser()
+
+ hostname = socket.gethostname()
+ log_settings = []
+ for template in templates:
+ log_settings.append(Template(template).substitute(
+ log4j_properties=log4j_properties,
+ logback_xml=logback_xml,
+ flink_log_dir=flink_log_dir,
+ flink_ident_string=flink_ident_string,
+ hostname=hostname))
+ return log_settings
+
+
+def get_jvm_opts(env):
+ flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+ jvm_opts = env.get(
+ 'FLINK_ENV_JAVA_OPTS',
+ read_from_config(
+ KEY_ENV_JAVA_OPTS,
+ read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "",
flink_conf_file),
+ flink_conf_file))
+
+ # Remove leading and ending double quotes (if present) of value
+ jvm_opts = jvm_opts.strip("\"")
+ return jvm_opts.split(" ")
+
+
+def construct_flink_classpath(env):
+ flink_home = _find_flink_home()
+ flink_lib_directory = env['FLINK_LIB_DIR']
+ flink_opt_directory = env['FLINK_OPT_DIR']
+
+ if on_windows():
+ # The command length is limited on Windows. To avoid the problem we
should shorten the
+ # command length as much as possible.
+ lib_jars = os.path.join(flink_lib_directory, "*")
+ else:
+ lib_jars = os.pathsep.join(glob.glob(os.path.join(flink_lib_directory,
"*.jar")))
+
+ flink_python_jars = glob.glob(os.path.join(flink_opt_directory,
"flink-python*.jar"))
+ if len(flink_python_jars) < 1:
+ print("The flink-python jar is not found in the opt folder of the
FLINK_HOME: %s" %
+ flink_home)
+ return lib_jars
+ flink_python_jar = flink_python_jars[0]
+
+ return os.pathsep.join([lib_jars, flink_python_jar])
+
+
+def construct_hadoop_classpath(env):
+ flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+
+ hadoop_conf_dir = ""
+ if 'HADOOP_CONF_DIR' not in env and 'HADOOP_CLASSPATH' not in env:
+ if os.path.isdir("/etc/hadoop/conf"):
+ print("Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no
HADOOP_CONF_DIR or"
+ "HADOOP_CLASSPATH was set.")
+ hadoop_conf_dir = "/etc/hadoop/conf"
+
+ hbase_conf_dir = ""
+ if 'HBASE_CONF_DIR' not in env:
+ if os.path.isdir("/etc/hbase/conf"):
+ print("Setting HBASE_CONF_DIR=/etc/hbase/conf because no
HBASE_CONF_DIR was set.")
+ hbase_conf_dir = "/etc/hbase/conf"
+
+ return os.pathsep.join(
+ [env.get("HADOOP_CLASSPATH", ""),
+ env.get("YARN_CONF_DIR",
+ read_from_config(KEY_ENV_YARN_CONF_DIR, "", flink_conf_file)),
+ env.get("HADOOP_CONF_DIR",
+ read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir,
flink_conf_file)),
+ env.get("HBASE_CONF_DIR",
+ read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir,
flink_conf_file))])
+
+
+def construct_test_classpath(env):
+ test_jar_patterns = [
+ "flink-python/target/test-dependencies/*",
+ "flink-python/target/artifacts/testDataStream.jar",
+ "flink-python/target/flink-python*-tests.jar",
+ ]
+ test_jars = []
+
+ # Connector tests need to add specific jars to the gateway classpath
+ if 'FLINK_TEST_LIBS' in env:
+ test_jars += glob.glob(env['FLINK_TEST_LIBS'])
+ else:
+ flink_source_root = _find_flink_source_root()
+ for pattern in test_jar_patterns:
+ pattern = pattern.replace("/", os.path.sep)
+ test_jars += glob.glob(os.path.join(flink_source_root, pattern))
+ return os.path.pathsep.join(test_jars)
+
+
+def construct_program_args(args):
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-c", "--class", required=True)
+ parser.add_argument("cluster_type", choices=["local", "remote", "yarn"])
+ parse_result, other_args = parser.parse_known_args(args)
+ main_class = getattr(parse_result, "class")
+ cluster_type = parse_result.cluster_type
+ return namedtuple(
+ "ProgramArgs", ["main_class", "cluster_type", "other_args"])(
+ main_class, cluster_type, other_args)
+
+
+def launch_gateway_server_process(env, args):
+ prepare_environment_variables(env)
+ program_args = construct_program_args(args)
+ if program_args.cluster_type == "local":
+ java_executable = find_java_executable()
+ log_settings = construct_log_settings(env)
+ jvm_args = env.get('JVM_ARGS', '')
+ jvm_opts = get_jvm_opts(env)
+ classpath = os.pathsep.join(
+ [construct_flink_classpath(env), construct_hadoop_classpath(env)])
+ if "FLINK_TESTING" in env:
+ classpath = os.pathsep.join([classpath,
construct_test_classpath(env)])
+ command = [java_executable, jvm_args,
"-XX:+IgnoreUnrecognizedVMOptions",
+ "--add-opens=jdk.proxy2/jdk.proxy2=ALL-UNNAMED"] \
+ + jvm_opts + log_settings \
+ + ["-cp", classpath, program_args.main_class] +
program_args.other_args
+ else:
+ command = [os.path.join(env["FLINK_BIN_DIR"], "flink"), "run"] +
program_args.other_args \
+ + ["-c", program_args.main_class]
+ preexec_fn = None
+ if not on_windows():
+ def preexec_func():
+ # ignore ctrl-c / SIGINT
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ preexec_fn = preexec_func
+ return Popen(list(filter(lambda c: len(c) != 0, command)),
+ stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)
+
+
+if __name__ == "__main__":
+ launch_gateway_server_process(os.environ, sys.argv[1:])
diff --git a/flink-python/setup.py b/flink-python/setup.py
new file mode 100644
index 0000000..8c37061
--- /dev/null
+++ b/flink-python/setup.py
@@ -0,0 +1,153 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from __future__ import print_function
+
+import io
+import os
+import sys
+
+from setuptools import setup
+from shutil import rmtree
+from xml.etree import ElementTree as ET
+
+PACKAGE_NAME = 'apache-flink-connector-aws'
+# Source files, directories
+CURRENT_DIR = os.path.abspath(os.path.dirname(__file__))
+POM_FILE = os.path.join(CURRENT_DIR, '../pom.xml')
+README_FILE = os.path.join(CURRENT_DIR, 'README.txt')
+
+# Generated files and directories
+VERSION_FILE = os.path.join(CURRENT_DIR,
'pyflink/datastream/connectors/aws_connector_version.py')
+DEPENDENCY_FILE = os.path.join(CURRENT_DIR, 'dev/dev-requirements.txt')
+
+
+# Removes a file or directory if exists.
+def remove_if_exists(file_path):
+ if os.path.exists(file_path):
+ if os.path.isfile(file_path):
+ os.remove(file_path)
+ if os.path.isdir(file_path):
+ rmtree(file_path)
+
+
+# Reads the content of the README.txt file.
+def readme_content():
+ with io.open(README_FILE, 'r', encoding='utf-8') as f:
+ return f.read()
+
+
+# Reads the parameters used by the setup command.
+# The source is the aws_connector_version.py and the README.txt.
+def setup_parameters():
+ try:
+ exec(open(VERSION_FILE).read())
+ return locals()['__connector_version__'],
locals()['__flink_dependency__'], readme_content()
+ except IOError:
+ print("Failed to load PyFlink version file for packaging. " +
+ "'%s' not found!" % VERSION_FILE,
+ file=sys.stderr)
+ sys.exit(-1)
+
+
+# Reads and parses the flink-connector-aws main pom.xml.
+# Based on the version data in the pom.xml prepares the pyflink dir:
+# - Generates aws_connector_version.py
+# - Generates dev-requirements.txt
+# - Copies the flink-sql-connector-kinesis.jar to the pyflink/lib dir
+def prepare_pyflink_dir():
+ # source files
+ pom_root = ET.parse(POM_FILE).getroot()
+ flink_version = pom_root.findall(
+ "./{http://maven.apache.org/POM/4.0.0}properties/" +
+ "{http://maven.apache.org/POM/4.0.0}flink.version"
+ )[0].text
+ connector_version = pom_root.findall(
+
"./{http://maven.apache.org/POM/4.0.0}version")[0].text.replace("-SNAPSHOT",
".dev0")
+
+ flink_dependency = "apache-flink>=" + flink_version
+
+ with io.open(VERSION_FILE, 'w', encoding='utf-8') as f:
+ f.write('# Generated file, do not edit\n')
+ f.write('__connector_version__ = "' + connector_version + '"\n')
+ f.write('__flink_dependency__ = "' + flink_dependency + '"\n')
+
+ with io.open(DEPENDENCY_FILE, 'w', encoding='utf-8') as f:
+ f.write('# Generated file, do not edit\n')
+ f.write(flink_dependency + '\n')
+
+# Main
+print("Python version used to package: " + sys.version)
+
+# Python version check
+if sys.version_info < (3, 7):
+ print("Python versions prior to 3.7 are not supported for PyFlink.",
+ file=sys.stderr)
+ sys.exit(-1)
+
+# Checks the running environment:
+# - In the connector source root directory - package preparation
+# - Otherwise - package deployment
+in_flink_source = os.path.isfile(
+ "../flink-connector-aws/flink-connector-kinesis/src/main" +
+
"/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java")
+
+# Cleans up the generated files and directories and regenerate them.
+if in_flink_source:
+ remove_if_exists(VERSION_FILE)
+ remove_if_exists(DEPENDENCY_FILE)
+ prepare_pyflink_dir()
+ print("\nPreparing Flink AWS connector package")
+
+# Reads the current setup data from the aws_connector_version.py file and the
README.txt
+(connector_version, flink_dependency, long_description) = setup_parameters()
+
+print("\nConnector version: " + connector_version)
+print("Flink dependency: " + flink_dependency + "\n")
+
+if in_flink_source:
+ # Removes temporary directory used by the setup tool
+ remove_if_exists(PACKAGE_NAME.replace('-', '_') + '.egg-info')
+
+# Runs the python setup
+setup(
+ name=PACKAGE_NAME,
+ version=connector_version,
+ include_package_data=True,
+ url='https://flink.apache.org',
+ license='https://www.apache.org/licenses/LICENSE-2.0',
+ author='Apache Software Foundation',
+ author_email='[email protected]',
+ python_requires='>=3.7',
+ install_requires=[flink_dependency],
+ description='Apache Flink Python AWS Connector API',
+ long_description=long_description,
+ long_description_content_type='text/plain',
+ zip_safe=False,
+ py_modules=[
+ "pyflink"
+ ],
+ classifiers=[
+ 'Development Status :: 5 - Production/Stable',
+ 'License :: OSI Approved :: Apache Software License',
+ 'Programming Language :: Python :: 3.7',
+ 'Programming Language :: Python :: 3.8',
+ 'Programming Language :: Python :: 3.9',
+ 'Programming Language :: Python :: 3.10']
+)
+
+print("\nFlink AWS connector package is ready\n")
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
new file mode 100644
index 0000000..acbc519
--- /dev/null
+++ b/flink-python/tox.ini
@@ -0,0 +1,51 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+[tox]
+# tox (https://tox.readthedocs.io/) is a tool for running tests
+# in multiple virtualenvs. This configuration file will run the
+# test suite on all supported python versions.
+# new environments will be excluded by default unless explicitly added to
envlist.
+envlist = {py37, py38, py39, py310}-cython
+
+[testenv]
+whitelist_externals = /bin/bash
+deps = apache-flink
+passenv = *
+commands =
+ python --version
+ pip install pytest
+ bash ./dev/integration_test.sh
+# Replace the default installation command with a custom retry installation
script, because on high-speed
+# networks, downloading a package may raise a ConnectionResetError: [Errno
104] Peer reset connection.
+install_command = {toxinidir}/dev/install_command.sh {opts} {packages}
+
+[flake8]
+# We follow PEP 8 (https://www.python.org/dev/peps/pep-0008/) with one
exception: lines can be
+# up to 100 characters in length, not 79.
+ignore=E226,E241,E305,E402,E722,E731,E741,W503,W504
+max-line-length=100
+exclude=.tox/*,dev/*,lib/*,target/*,build/*,dist/*
+
+[mypy]
+files=pyflink/datastream/connectors/*.py
+ignore_missing_imports = True
+strict_optional=False
+
+[mypy-pyflink.fn_execution.*]
+ignore_errors = True
diff --git a/pom.xml b/pom.xml
index 324a98f..f9f0ade 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,10 +74,9 @@ under the License.
<modules>
<module>flink-connector-aws-base</module>
-
<module>flink-connector-aws</module>
<module>flink-formats-aws</module>
-
+ <module>flink-python</module>
<module>flink-connector-aws-e2e-tests</module>
</modules>