This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new c38a0406 [FLINK-33559] Externalize Kafka Python connector code
c38a0406 is described below

commit c38a0406104646a7ea8199bb64244310e344ce2b
Author: pvary <peter.vary.apa...@gmail.com>
AuthorDate: Mon Dec 11 09:40:50 2023 +0100

    [FLINK-33559] Externalize Kafka Python connector code
---
 .github/workflows/push_pr.yml                      |    8 +
 .gitignore                                         |   18 +-
 .../push_pr.yml => flink-python/MANIFEST.in        |   16 +-
 flink-python/README.txt                            |   14 +
 flink-python/dev/integration_test.sh               |   54 +
 flink-python/pom.xml                               |  222 ++++
 .../pyflink/datastream/connectors/kafka.py         | 1163 ++++++++++++++++++++
 .../datastream/connectors/tests/test_kafka.py      |  669 +++++++++++
 flink-python/pyflink/pyflink_gateway_server.py     |  288 +++++
 flink-python/setup.py                              |  158 +++
 flink-python/tox.ini                               |   51 +
 pom.xml                                            |    1 +
 12 files changed, 2648 insertions(+), 14 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index ddc50ab8..8f53a5bd 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -29,3 +29,11 @@ jobs:
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
       flink_version: ${{ matrix.flink }}
+    
+  python_test:
+    strategy:
+      matrix:
+        flink: [ 1.17.1, 1.18.0 ]
+    uses: 
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
+    with:
+      flink_version: ${{ matrix.flink }}
diff --git a/.gitignore b/.gitignore
index 5f0068cd..901fd674 100644
--- a/.gitignore
+++ b/.gitignore
@@ -35,4 +35,20 @@ out/
 tools/flink
 tools/flink-*
 tools/releasing/release
-tools/japicmp-output
\ No newline at end of file
+tools/japicmp-output
+
+# Generated file, do not store in git
+flink-python/pyflink/datastream/connectors/kafka_connector_version.py
+flink-python/apache_flink_connectors_kafka.egg-info/
+flink-python/.tox/
+flink-python/build
+flink-python/dist
+flink-python/dev/download
+flink-python/dev/.conda/
+flink-python/dev/log/
+flink-python/dev/.stage.txt
+flink-python/dev/install_command.sh
+flink-python/dev/lint-python.sh
+flink-python/dev/build-wheels.sh
+flink-python/dev/glibc_version_fix.h
+flink-python/dev/dev-requirements.txt
diff --git a/.github/workflows/push_pr.yml b/flink-python/MANIFEST.in
similarity index 73%
copy from .github/workflows/push_pr.yml
copy to flink-python/MANIFEST.in
index ddc50ab8..3578d2df 100644
--- a/.github/workflows/push_pr.yml
+++ b/flink-python/MANIFEST.in
@@ -16,16 +16,6 @@
 # limitations under the License.
 
################################################################################
 
-name: CI
-on: [push, pull_request]
-concurrency:
-  group: ${{ github.workflow }}-${{ github.ref }}
-  cancel-in-progress: true
-jobs:
-  compile_and_test:
-    strategy:
-      matrix:
-        flink: [ 1.17.1, 1.18.0 ]
-    uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
-    with:
-      flink_version: ${{ matrix.flink }}
+graft pyflink
+global-exclude *.py[cod] __pycache__ .DS_Store
+
diff --git a/flink-python/README.txt b/flink-python/README.txt
new file mode 100644
index 00000000..a12c13e5
--- /dev/null
+++ b/flink-python/README.txt
@@ -0,0 +1,14 @@
+This is official Apache Flink Kafka Python connector.
+
+For the latest information about Flink connector, please visit our website at:
+
+   https://flink.apache.org
+
+and our GitHub Account for Kafka connector
+
+   https://github.com/apache/flink-connector-kafka
+
+If you have any questions, ask on our Mailing lists:
+
+   u...@flink.apache.org
+   d...@flink.apache.org
diff --git a/flink-python/dev/integration_test.sh 
b/flink-python/dev/integration_test.sh
new file mode 100755
index 00000000..19816725
--- /dev/null
+++ b/flink-python/dev/integration_test.sh
@@ -0,0 +1,54 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+function test_module() {
+    module="$FLINK_PYTHON_DIR/pyflink/$1"
+    echo "test module $module"
+    pytest --durations=20 ${module} $2
+    if [[ $? -ne 0 ]]; then
+        echo "test module $module failed"
+        exit 1
+    fi
+}
+
+function test_all_modules() {
+    # test datastream module
+    test_module "datastream"
+}
+
+# CURRENT_DIR is "flink-connector-kafka/flink-python/dev/"
+CURRENT_DIR="$(cd "$( dirname "$0" )" && pwd)"
+
+# FLINK_PYTHON_DIR is "flink-connector-kafka/flink-python"
+FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
+
+# FLINK_SOURCE_DIR is "flink-connector-kafka"
+FLINK_SOURCE_DIR=$(dirname "$FLINK_PYTHON_DIR")
+
+# set the FLINK_TEST_LIB_DIR to "flink-connector-kafka"
+export 
FLINK_TEST_LIBS="${FLINK_SOURCE_DIR}/flink-python/target/test-dependencies/*"
+
+# Temporarily update the installed 'pyflink_gateway_server.py' files with the 
new one
+# Needed only until Flink 1.19 release
+echo "Checking ${FLINK_SOURCE_DIR} for 'pyflink_gateway_server.py'"
+find "${FLINK_SOURCE_DIR}/flink-python" -name pyflink_gateway_server.py
+find "${FLINK_SOURCE_DIR}/flink-python/.tox" -name pyflink_gateway_server.py 
-exec cp "${FLINK_SOURCE_DIR}/flink-python/pyflink/pyflink_gateway_server.py" 
{} \;
+
+# python test
+test_all_modules
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
new file mode 100644
index 00000000..cb3f70ca
--- /dev/null
+++ b/flink-python/pom.xml
@@ -0,0 +1,222 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-kafka-parent</artifactId>
+        <version>3.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-connector-kafka-python</artifactId>
+    <name>Flink : Connectors : SQL : Kafka : Python</name>
+
+    <packaging>pom</packaging>
+
+    <properties>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-connector-kafka</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-avro</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>clean</id>
+                        <phase>clean</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <target>
+                                <delete dir="${project.basedir}/.tox"/>
+                                <delete 
dir="${project.basedir}/apache_flink_connectors_kafka.egg-info"/>
+                                <delete dir="${project.basedir}/dev/.conda"/>
+                                <delete dir="${project.basedir}/dev/download"/>
+                                <delete dir="${project.basedir}/dev/log"/>
+                                <delete dir="${project.basedir}/build"/>
+                                <delete dir="${project.basedir}/dist"/>
+                                <delete dir="${project.basedir}/pyflink/lib"/>
+                                <delete 
file="${project.basedir}/dev/.stage.txt"/>
+                                <delete 
file="${project.basedir}/dev/install_command.sh"/>
+                                <delete 
file="${project.basedir}/dev/lint-python.sh"/>
+                                <delete 
file="${project.basedir}/dev/build-wheels.sh"/>
+                                <delete 
file="${project.basedir}/dev/glibc_version_fix.h"/>
+                                <delete 
file="${project.basedir}/dev/dev-requirements.txt"/>
+                                <delete 
file="${project.basedir}/pyflink/datastream/connectors/kafka_connector_version.py"/>
+                            </target>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    
<artifactId>flink-sql-connector-kafka</artifactId>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    <artifactId>flink-runtime</artifactId>
+                                    <!-- Don't use test-jar type because of a 
bug in the plugin (MDEP-587). -->
+                                    <classifier>tests</classifier>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    
<artifactId>flink-streaming-java</artifactId>
+                                    <!-- Don't use test-jar type because of a 
bug in the plugin (MDEP-587). -->
+                                    <classifier>tests</classifier>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    
<artifactId>flink-connector-test-utils</artifactId>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    <artifactId>flink-test-utils</artifactId>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    <artifactId>flink-sql-avro</artifactId>
+                                </artifactItem>
+                            </artifactItems>
+                            
<outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <includeGroupIds>junit</includeGroupIds>
+                            
<outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <!-- Download the testing infra sources from the Flink main 
repository -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>wagon-maven-plugin</artifactId>
+                <version>2.0.2</version>
+                <executions>
+                    <execution>
+                        <id>download-install</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>download-single</goal>
+                        </goals>
+                        <configuration>
+                            
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/install_command.sh</url>
+                            <toDir>${project.basedir}/dev</toDir>
+                            <skip>${python.infra.download.skip}</skip>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>download-lint</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>download-single</goal>
+                        </goals>
+                        <configuration>
+                            
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/lint-python.sh</url>
+                            <toDir>${project.basedir}/dev</toDir>
+                            <skip>${python.infra.download.skip}</skip>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>download-build-wheels</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>download-single</goal>
+                        </goals>
+                        <configuration>
+                            
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/build-wheels.sh</url>
+                            <toDir>${project.basedir}/dev</toDir>
+                            <skip>${python.infra.download.skip}</skip>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>download-build-version-header</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>download-single</goal>
+                        </goals>
+                        <configuration>
+                            
<url>https://raw.githubusercontent.com/apache/flink-connector-shared-utils/ci_utils/python/glibc_version_fix.h</url>
+                            <toDir>${project.basedir}/dev</toDir>
+                            <skip>${python.infra.download.skip}</skip>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/flink-python/pyflink/datastream/connectors/kafka.py 
b/flink-python/pyflink/datastream/connectors/kafka.py
new file mode 100644
index 00000000..0e0a1289
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/kafka.py
@@ -0,0 +1,1163 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import warnings
+from abc import ABC, abstractmethod
+from enum import Enum
+from typing import Dict, Union, List, Set, Callable, Any, Optional
+
+from py4j.java_gateway import JavaObject, get_java_class
+
+from pyflink.common import DeserializationSchema, TypeInformation, typeinfo, 
SerializationSchema, \
+    Types, Row
+from pyflink.datastream.connectors import Source, Sink
+from pyflink.datastream.connectors.base import DeliveryGuarantee, 
SupportsPreprocessing, \
+    StreamTransformer
+from pyflink.datastream.functions import SinkFunction, SourceFunction
+from pyflink.java_gateway import get_gateway
+from pyflink.util.java_utils import to_jarray, get_field, get_field_value
+
+__all__ = [
+    'FlinkKafkaConsumer',
+    'FlinkKafkaProducer',
+    'KafkaSource',
+    'KafkaSourceBuilder',
+    'KafkaSink',
+    'KafkaSinkBuilder',
+    'Semantic',
+    'KafkaTopicPartition',
+    'KafkaOffsetsInitializer',
+    'KafkaOffsetResetStrategy',
+    'KafkaRecordSerializationSchema',
+    'KafkaRecordSerializationSchemaBuilder',
+    'KafkaTopicSelector'
+]
+
+
+# ---- FlinkKafkaConsumer ----
+
+class FlinkKafkaConsumerBase(SourceFunction, ABC):
+    """
+    Base class of all Flink Kafka Consumer data sources. This implements the 
common behavior across
+    all kafka versions.
+
+    The Kafka version specific behavior is defined mainly in the specific 
subclasses.
+    """
+
+    def __init__(self, j_flink_kafka_consumer):
+        super(FlinkKafkaConsumerBase, 
self).__init__(source_func=j_flink_kafka_consumer)
+
+    def set_commit_offsets_on_checkpoints(self,
+                                          commit_on_checkpoints: bool) -> 
'FlinkKafkaConsumerBase':
+        """
+        Specifies whether or not the consumer should commit offsets back to 
kafka on checkpoints.
+        This setting will only have effect if checkpointing is enabled for the 
job. If checkpointing
+        isn't enabled, only the "auto.commit.enable" (for 0.8) / 
"enable.auto.commit" (for 0.9+)
+        property settings will be used.
+        """
+        self._j_function = self._j_function \
+            .setCommitOffsetsOnCheckpoints(commit_on_checkpoints)
+        return self
+
+    def set_start_from_earliest(self) -> 'FlinkKafkaConsumerBase':
+        """
+        Specifies the consumer to start reading from the earliest offset for 
all partitions. This
+        lets the consumer ignore any committed group offsets in Zookeeper/ 
Kafka brokers.
+
+        This method does not affect where partitions are read from when the 
consumer is restored
+        from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+        savepoint, only the offsets in the restored state will be used.
+        """
+        self._j_function = self._j_function.setStartFromEarliest()
+        return self
+
+    def set_start_from_latest(self) -> 'FlinkKafkaConsumerBase':
+        """
+        Specifies the consuer to start reading from the latest offset for all 
partitions. This lets
+        the consumer ignore any committed group offsets in Zookeeper / Kafka 
brokers.
+
+        This method does not affect where partitions are read from when the 
consumer is restored
+        from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+        savepoint, only the offsets in the restored state will be used.
+        """
+        self._j_function = self._j_function.setStartFromLatest()
+        return self
+
+    def set_start_from_timestamp(self, startup_offsets_timestamp: int) -> 
'FlinkKafkaConsumerBase':
+        """
+        Specifies the consumer to start reading partitions from a specified 
timestamp. The specified
+        timestamp must be before the current timestamp. This lets the consumer 
ignore any committed
+        group offsets in Zookeeper / Kafka brokers.
+
+        The consumer will look up the earliest offset whose timestamp is 
greater than or equal to
+        the specific timestamp from Kafka. If there's no such offset, the 
consumer will use the
+        latest offset to read data from Kafka.
+
+        This method does not affect where partitions are read from when the 
consumer is restored
+        from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+        savepoint, only the offsets in the restored state will be used.
+
+        :param startup_offsets_timestamp: timestamp for the startup offsets, 
as milliseconds for
+                                          epoch.
+        """
+        self._j_function = self._j_function.setStartFromTimestamp(
+            startup_offsets_timestamp)
+        return self
+
+    def set_start_from_group_offsets(self) -> 'FlinkKafkaConsumerBase':
+        """
+        Specifies the consumer to start reading from any committed group 
offsets found in Zookeeper/
+        Kafka brokers. The 'group.id' property must be set in the 
configuration properties. If no
+        offset can be found for a partition, the behaviour in 
'auto.offset.reset' set in the
+        configuration properties will be used for the partition.
+
+        This method does not affect where partitions are read from when the 
consumer is restored
+        from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+        savepoint, only the offsets in the restored state will be used.
+        """
+        self._j_function = self._j_function.setStartFromGroupOffsets()
+        return self
+
+    def disable_filter_restored_partitions_with_subscribed_topics(self) -> 
'FlinkKafkaConsumerBase':
+        """
+        By default, when restoring from a checkpoint / savepoint, the consumer 
always ignores
+        restored partitions that are no longer associated with the current 
specified topics or topic
+        pattern to subscribe to.
+
+        This method does not affect where partitions are read from when the 
consumer is restored
+        from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+        savepoint, only the offsets in the restored state will be used.
+        """
+        self._j_function = self._j_function \
+            .disableFilterRestoredPartitionsWithSubscribedTopics()
+        return self
+
+    def get_produced_type(self) -> TypeInformation:
+        return typeinfo._from_java_type(self._j_function.getProducedType())
+
+
+def _get_kafka_consumer(topics, properties, deserialization_schema, 
j_consumer_clz):
+    if not isinstance(topics, list):
+        topics = [topics]
+    gateway = get_gateway()
+    j_properties = gateway.jvm.java.util.Properties()
+    for key, value in properties.items():
+        j_properties.setProperty(key, value)
+
+    j_flink_kafka_consumer = j_consumer_clz(topics,
+                                            
deserialization_schema._j_deserialization_schema,
+                                            j_properties)
+    return j_flink_kafka_consumer
+
+
+class FlinkKafkaConsumer(FlinkKafkaConsumerBase):
+    """
+    The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
+    Apache Kafka. The consumer can run in multiple parallel instances, each of 
which will
+    pull data from one or more Kafka partitions.
+
+    The Flink Kafka Consumer participates in checkpointing and guarantees that 
no data is lost
+    during a failure, and that the computation processes elements 'exactly 
once. (These guarantees
+    naturally assume that Kafka itself does not lose any data.)
+
+    Please note that Flink snapshots the offsets internally as part of its 
distributed checkpoints.
+    The offsets committed to Kafka / Zookeeper are only to bring the outside 
view of progress in
+    sync with Flink's view of the progress. That way, monitoring and other 
jobs can get a view of
+    how far the Flink Kafka consumer has consumed a topic.
+
+    Please refer to Kafka's documentation for the available configuration 
properties:
+    http://kafka.apache.org/documentation.html#newconsumerconfigs
+    """
+
+    def __init__(self, topics: Union[str, List[str]], deserialization_schema: 
DeserializationSchema,
+                 properties: Dict):
+        """
+        Creates a new Kafka streaming source consumer for Kafka 0.10.x.
+
+        This constructor allows passing multiple topics to the consumer.
+
+        :param topics: The Kafka topics to read from.
+        :param deserialization_schema: The de-/serializer used to convert 
between Kafka's byte
+                                       messages and Flink's objects.
+        :param properties: The properties that are used to configure both the 
fetcher and the offset
+                           handler.
+        """
+
+        warnings.warn("Deprecated in 1.16. Use KafkaSource instead.", 
DeprecationWarning)
+        JFlinkKafkaConsumer = get_gateway().jvm \
+            .org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
+        j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, 
deserialization_schema,
+                                                     JFlinkKafkaConsumer)
+        super(FlinkKafkaConsumer, 
self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer)
+
+
+# ---- FlinkKafkaProducer ----
+
+
+class Semantic(Enum):
+    """
+    Semantics that can be chosen.
+
+    :data: `EXACTLY_ONCE`:
+
+    The Flink producer will write all messages in a Kafka transaction that 
will be committed to
+    the Kafka on a checkpoint. In this mode FlinkKafkaProducer sets up a pool 
of
+    FlinkKafkaProducer. Between each checkpoint there is created new Kafka 
transaction, which is
+    being committed on FlinkKafkaProducer.notifyCheckpointComplete(long). If 
checkpoint
+    complete notifications are running late, FlinkKafkaProducer can run out of
+    FlinkKafkaProducers in the pool. In that case any subsequent 
FlinkKafkaProducer.snapshot-
+    State() requests will fail and the FlinkKafkaProducer will keep using the
+    FlinkKafkaProducer from previous checkpoint. To decrease chances of 
failing checkpoints
+    there are four options:
+
+        1. decrease number of max concurrent checkpoints
+        2. make checkpoints mre reliable (so that they complete faster)
+        3. increase delay between checkpoints
+        4. increase size of FlinkKafkaProducers pool
+
+    :data: `AT_LEAST_ONCE`:
+
+    The Flink producer will wait for all outstanding messages in the Kafka 
buffers to be
+    acknowledged by the Kafka producer on a checkpoint.
+
+    :data: `NONE`:
+
+    Means that nothing will be guaranteed. Messages can be lost and/or 
duplicated in case of
+    failure.
+
+    """
+
+    EXACTLY_ONCE = 0,
+    AT_LEAST_ONCE = 1,
+    NONE = 2
+
+    def _to_j_semantic(self):
+        JSemantic = get_gateway().jvm \
+            
.org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
+        return getattr(JSemantic, self.name)
+
+
+class FlinkKafkaProducerBase(SinkFunction, ABC):
+    """
+    Flink Sink to produce data into a Kafka topic.
+
+    Please note that this producer provides at-least-once reliability 
guarantees when checkpoints
+    are enabled and set_flush_on_checkpoint(True) is set. Otherwise, the 
producer doesn;t provid any
+    reliability guarantees.
+    """
+
+    def __init__(self, j_flink_kafka_producer):
+        super(FlinkKafkaProducerBase, 
self).__init__(sink_func=j_flink_kafka_producer)
+
+    def set_log_failures_only(self, log_failures_only: bool) -> 
'FlinkKafkaProducerBase':
+        """
+        Defines whether the producer should fail on errors, or only log them. 
If this is set to
+        true, then exceptions will be only logged, if set to false, exceptions 
will be eventually
+        thrown and cause the streaming program to fail (and enter recovery).
+
+        :param log_failures_only: The flag to indicate logging-only on 
exceptions.
+        """
+        self._j_function.setLogFailuresOnly(log_failures_only)
+        return self
+
+    def set_flush_on_checkpoint(self, flush_on_checkpoint: bool) -> 
'FlinkKafkaProducerBase':
+        """
+        If set to true, the Flink producer will wait for all outstanding 
messages in the Kafka
+        buffers to be acknowledged by the Kafka producer on a checkpoint.
+
+        This way, the producer can guarantee that messages in the Kafka 
buffers are part of the
+        checkpoint.
+
+        :param flush_on_checkpoint: Flag indicating the flush mode (true = 
flush on checkpoint)
+        """
+        self._j_function.setFlushOnCheckpoint(flush_on_checkpoint)
+        return self
+
+    def set_write_timestamp_to_kafka(self,
+                                     write_timestamp_to_kafka: bool) -> 
'FlinkKafkaProducerBase':
+        """
+        If set to true, Flink will write the (event time) timestamp attached 
to each record into
+        Kafka. Timestamps must be positive for Kafka to accept them.
+
+        :param write_timestamp_to_kafka: Flag indicating if Flink's internal 
timestamps are written
+                                         to Kafka.
+        """
+        self._j_function.setWriteTimestampToKafka(write_timestamp_to_kafka)
+        return self
+
+
+class FlinkKafkaProducer(FlinkKafkaProducerBase):
+    """
+    Flink Sink to produce data into a Kafka topic. By
+    default producer will use AT_LEAST_ONCE semantic. Before using 
EXACTLY_ONCE please refer to
+    Flink's Kafka connector documentation.
+    """
+
+    def __init__(self, topic: str, serialization_schema: SerializationSchema,
+                 producer_config: Dict, kafka_producer_pool_size: int = 5,
+                 semantic=Semantic.AT_LEAST_ONCE):
+        """
+        Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to the topic.
+
+        Using this constructor, the default FlinkFixedPartitioner will be used 
as the partitioner.
+        This default partitioner maps each sink subtask to a single Kafka 
partition (i.e. all
+        records received by a sink subtask will end up in the same Kafka 
partition).
+
+        :param topic: ID of the Kafka topic.
+        :param serialization_schema: User defined key-less serialization 
schema.
+        :param producer_config: Properties with the producer configuration.
+        """
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in producer_config.items():
+            j_properties.setProperty(key, value)
+
+        JFlinkKafkaProducer = gateway.jvm \
+            .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
+
+        j_flink_kafka_producer = JFlinkKafkaProducer(
+            topic, serialization_schema._j_serialization_schema, j_properties, 
None,
+            semantic._to_j_semantic(), kafka_producer_pool_size)
+        super(FlinkKafkaProducer, 
self).__init__(j_flink_kafka_producer=j_flink_kafka_producer)
+
+    def ignore_failures_after_transaction_timeout(self) -> 
'FlinkKafkaProducer':
+        """
+        Disables the propagation of exceptions thrown when committing 
presumably timed out Kafka
+        transactions during recovery of the job. If a Kafka transaction is 
timed out, a commit will
+        never be successful. Hence, use this feature to avoid recovery loops 
of the Job. Exceptions
+        will still be logged to inform the user that data loss might have 
occurred.
+
+        Note that we use the System.currentTimeMillis() to track the age of a 
transaction. Moreover,
+        only exceptions thrown during the recovery are caught, i.e., the 
producer will attempt at
+        least one commit of the transaction before giving up.
+
+        :return: This FlinkKafkaProducer.
+        """
+        self._j_function.ignoreFailuresAfterTransactionTimeout()
+        return self
+
+
+# ---- KafkaSource ----
+
+
+class KafkaSource(Source):
+    """
+    The Source implementation of Kafka. Please use a 
:class:`KafkaSourceBuilder` to construct a
+    :class:`KafkaSource`. The following example shows how to create a 
KafkaSource emitting records
+    of String type.
+
+    ::
+
+        >>> source = KafkaSource \\
+        ...     .builder() \\
+        ...     .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\
+        ...     .set_group_id('MY_GROUP') \\
+        ...     .set_topics('TOPIC1', 'TOPIC2') \\
+        ...     .set_value_only_deserializer(SimpleStringSchema()) \\
+        ...     .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \\
+        ...     .build()
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self, j_kafka_source: JavaObject):
+        super().__init__(j_kafka_source)
+
+    @staticmethod
+    def builder() -> 'KafkaSourceBuilder':
+        """
+        Get a kafkaSourceBuilder to build a :class:`KafkaSource`.
+
+        :return: a Kafka source builder.
+        """
+        return KafkaSourceBuilder()
+
+
+class KafkaSourceBuilder(object):
+    """
+    The builder class for :class:`KafkaSource` to make it easier for the users 
to construct a
+    :class:`KafkaSource`.
+
+    The following example shows the minimum setup to create a KafkaSource that 
reads the String
+    values from a Kafka topic.
+
+    ::
+
+        >>> source = KafkaSource.builder() \\
+        ...     .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\
+        ...     .set_topics('TOPIC1', 'TOPIC2') \\
+        ...     .set_value_only_deserializer(SimpleStringSchema()) \\
+        ...     .build()
+
+    The bootstrap servers, topics/partitions to consume, and the record 
deserializer are required
+    fields that must be set.
+
+    To specify the starting offsets of the KafkaSource, one can call 
:meth:`set_starting_offsets`.
+
+    By default, the KafkaSource runs in an CONTINUOUS_UNBOUNDED mode and never 
stops until the Flink
+    job is canceled or fails. To let the KafkaSource run in 
CONTINUOUS_UNBOUNDED but stops at some
+    given offsets, one can call :meth:`set_stopping_offsets`. For example the 
following KafkaSource
+    stops after it consumes up to the latest partition offsets at the point 
when the Flink started.
+
+    ::
+
+        >>> source = KafkaSource.builder() \\
+        ...     .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\
+        ...     .set_topics('TOPIC1', 'TOPIC2') \\
+        ...     .set_value_only_deserializer(SimpleStringSchema()) \\
+        ...     .set_unbounded(KafkaOffsetsInitializer.latest()) \\
+        ...     .build()
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self):
+        self._j_builder = 
get_gateway().jvm.org.apache.flink.connector.kafka.source \
+            .KafkaSource.builder()
+
+    def build(self) -> 'KafkaSource':
+        return KafkaSource(self._j_builder.build())
+
+    def set_bootstrap_servers(self, bootstrap_servers: str) -> 
'KafkaSourceBuilder':
+        """
+        Sets the bootstrap servers for the KafkaConsumer of the KafkaSource.
+
+        :param bootstrap_servers: the bootstrap servers of the Kafka cluster.
+        :return: this KafkaSourceBuilder.
+        """
+        self._j_builder.setBootstrapServers(bootstrap_servers)
+        return self
+
+    def set_group_id(self, group_id: str) -> 'KafkaSourceBuilder':
+        """
+        Sets the consumer group id of the KafkaSource.
+
+        :param group_id: the group id of the KafkaSource.
+        :return: this KafkaSourceBuilder.
+        """
+        self._j_builder.setGroupId(group_id)
+        return self
+
+    def set_topics(self, *topics: str) -> 'KafkaSourceBuilder':
+        """
+        Set a list of topics the KafkaSource should consume from. All the 
topics in the list should
+        have existed in the Kafka cluster. Otherwise, an exception will be 
thrown. To allow some
+        topics to be created lazily, please use :meth:`set_topic_pattern` 
instead.
+
+        :param topics: the list of topics to consume from.
+        :return: this KafkaSourceBuilder.
+        """
+        
self._j_builder.setTopics(to_jarray(get_gateway().jvm.java.lang.String, topics))
+        return self
+
+    def set_topic_pattern(self, topic_pattern: str) -> 'KafkaSourceBuilder':
+        """
+        Set a topic pattern to consume from use the java Pattern. For grammar, 
check out
+        `JavaDoc 
<https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html>`_ .
+
+        :param topic_pattern: the pattern of the topic name to consume from.
+        :return: this KafkaSourceBuilder.
+        """
+        self._j_builder.setTopicPattern(get_gateway().jvm.java.util.regex
+                                        .Pattern.compile(topic_pattern))
+        return self
+
+    def set_partitions(self, partitions: Set['KafkaTopicPartition']) -> 
'KafkaSourceBuilder':
+        """
+        Set a set of partitions to consume from.
+
+        Example:
+        ::
+
+            >>> KafkaSource.builder().set_partitions({
+            ...     KafkaTopicPartition('TOPIC1', 0),
+            ...     KafkaTopicPartition('TOPIC1', 1),
+            ... })
+
+        :param partitions: the set of partitions to consume from.
+        :return: this KafkaSourceBuilder.
+        """
+        j_set = get_gateway().jvm.java.util.HashSet()
+        for tp in partitions:
+            j_set.add(tp._to_j_topic_partition())
+        self._j_builder.setPartitions(j_set)
+        return self
+
+    def set_starting_offsets(self, starting_offsets_initializer: 
'KafkaOffsetsInitializer') \
+            -> 'KafkaSourceBuilder':
+        """
+        Specify from which offsets the KafkaSource should start consume from 
by providing an
+        :class:`KafkaOffsetsInitializer`.
+
+        The following :class:`KafkaOffsetsInitializer` s are commonly used and 
provided out of the
+        box. Currently, customized offset initializer is not supported in 
PyFlink.
+
+        * :meth:`KafkaOffsetsInitializer.earliest` - starting from the 
earliest offsets. This is
+          also the default offset initializer of the KafkaSource for starting 
offsets.
+        * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest 
offsets.
+        * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the 
committed offsets of
+          the consumer group. If there is no committed offsets, starting from 
the offsets
+          specified by the :class:`KafkaOffsetResetStrategy`.
+        * :meth:`KafkaOffsetsInitializer.offsets` - starting from the 
specified offsets for each
+          partition.
+        * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the 
specified timestamp for each
+          partition. Note that the guarantee here is that all the records in 
Kafka whose timestamp
+          is greater than the given starting timestamp will be consumed. 
However, it is possible
+          that some consumer records whose timestamp is smaller than the given 
starting timestamp
+          are also consumed.
+
+        :param starting_offsets_initializer: the 
:class:`KafkaOffsetsInitializer` setting the
+            starting offsets for the Source.
+        :return: this KafkaSourceBuilder.
+        """
+        
self._j_builder.setStartingOffsets(starting_offsets_initializer._j_initializer)
+        return self
+
+    def set_unbounded(self, stopping_offsets_initializer: 
'KafkaOffsetsInitializer') \
+            -> 'KafkaSourceBuilder':
+        """
+        By default, the KafkaSource is set to run in CONTINUOUS_UNBOUNDED 
manner and thus never
+        stops until the Flink job fails or is canceled. To let the KafkaSource 
run as a streaming
+        source but still stops at some point, one can set an 
:class:`KafkaOffsetsInitializer`
+        to specify the stopping offsets for each partition. When all the 
partitions have reached
+        their stopping offsets, the KafkaSource will then exit.
+
+        This method is different from :meth:`set_bounded` that after setting 
the stopping offsets
+        with this method, KafkaSource will still be CONTINUOUS_UNBOUNDED even 
though it will stop at
+        the stopping offsets specified by the stopping offset initializer.
+
+        The following :class:`KafkaOffsetsInitializer` s are commonly used and 
provided out of the
+        box. Currently, customized offset initializer is not supported in 
PyFlink.
+
+        * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest 
offsets.
+        * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the 
committed offsets of
+          the consumer group. If there is no committed offsets, starting from 
the offsets
+          specified by the :class:`KafkaOffsetResetStrategy`.
+        * :meth:`KafkaOffsetsInitializer.offsets` - starting from the 
specified offsets for each
+          partition.
+        * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the 
specified timestamp for each
+          partition. Note that the guarantee here is that all the records in 
Kafka whose timestamp
+          is greater than the given starting timestamp will be consumed. 
However, it is possible
+          that some consumer records whose timestamp is smaller than the given 
starting timestamp
+          are also consumed.
+
+        :param stopping_offsets_initializer: the 
:class:`KafkaOffsetsInitializer` to specify the
+            stopping offsets.
+        :return: this KafkaSourceBuilder
+        """
+        
self._j_builder.setUnbounded(stopping_offsets_initializer._j_initializer)
+        return self
+
+    def set_bounded(self, stopping_offsets_initializer: 
'KafkaOffsetsInitializer') \
+            -> 'KafkaSourceBuilder':
+        """
+        By default, the KafkaSource is set to run in CONTINUOUS_UNBOUNDED 
manner and thus never
+        stops until the Flink job fails or is canceled. To let the KafkaSource 
run in BOUNDED manner
+        and stop at some point, one can set an 
:class:`KafkaOffsetsInitializer` to specify the
+        stopping offsets for each partition. When all the partitions have 
reached their stopping
+        offsets, the KafkaSource will then exit.
+
+        This method is different from :meth:`set_unbounded` that after setting 
the stopping offsets
+        with this method, :meth:`KafkaSource.get_boundedness` will return 
BOUNDED instead of
+        CONTINUOUS_UNBOUNDED.
+
+        The following :class:`KafkaOffsetsInitializer` s are commonly used and 
provided out of the
+        box. Currently, customized offset initializer is not supported in 
PyFlink.
+
+        * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest 
offsets.
+        * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the 
committed offsets of
+          the consumer group. If there is no committed offsets, starting from 
the offsets
+          specified by the :class:`KafkaOffsetResetStrategy`.
+        * :meth:`KafkaOffsetsInitializer.offsets` - starting from the 
specified offsets for each
+          partition.
+        * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the 
specified timestamp for each
+          partition. Note that the guarantee here is that all the records in 
Kafka whose timestamp
+          is greater than the given starting timestamp will be consumed. 
However, it is possible
+          that some consumer records whose timestamp is smaller than the given 
starting timestamp
+          are also consumed.
+
+        :param stopping_offsets_initializer: the 
:class:`KafkaOffsetsInitializer` to specify the
+            stopping offsets.
+        :return: this KafkaSourceBuilder
+        """
+        self._j_builder.setBounded(stopping_offsets_initializer._j_initializer)
+        return self
+
+    def set_value_only_deserializer(self, deserialization_schema: 
DeserializationSchema) \
+            -> 'KafkaSourceBuilder':
+        """
+        Sets the :class:`~pyflink.common.serialization.DeserializationSchema` 
for deserializing the
+        value of Kafka's ConsumerRecord. The other information (e.g. key) in a 
ConsumerRecord will
+        be ignored.
+
+        :param deserialization_schema: the :class:`DeserializationSchema` to 
use for
+            deserialization.
+        :return: this KafkaSourceBuilder.
+        """
+        
self._j_builder.setValueOnlyDeserializer(deserialization_schema._j_deserialization_schema)
+        return self
+
+    def set_client_id_prefix(self, prefix: str) -> 'KafkaSourceBuilder':
+        """
+        Sets the client id prefix of this KafkaSource.
+
+        :param prefix: the client id prefix to use for this KafkaSource.
+        :return: this KafkaSourceBuilder.
+        """
+        self._j_builder.setClientIdPrefix(prefix)
+        return self
+
+    def set_property(self, key: str, value: str) -> 'KafkaSourceBuilder':
+        """
+        Set an arbitrary property for the KafkaSource and KafkaConsumer. The 
valid keys can be found
+        in ConsumerConfig and KafkaSourceOptions.
+
+        Note that the following keys will be overridden by the builder when 
the KafkaSource is
+        created.
+
+        * ``key.deserializer`` is always set to ByteArrayDeserializer.
+        * ``value.deserializer`` is always set to ByteArrayDeserializer.
+        * ``auto.offset.reset.strategy`` is overridden by 
AutoOffsetResetStrategy returned by
+          :class:`KafkaOffsetsInitializer` for the starting offsets, which is 
by default
+          :meth:`KafkaOffsetsInitializer.earliest`.
+        * ``partition.discovery.interval.ms`` is overridden to -1 when 
:meth:`set_bounded` has been
+          invoked.
+
+        :param key: the key of the property.
+        :param value: the value of the property.
+        :return: this KafkaSourceBuilder.
+        """
+        self._j_builder.setProperty(key, value)
+        return self
+
+    def set_properties(self, props: Dict) -> 'KafkaSourceBuilder':
+        """
+        Set arbitrary properties for the KafkaSource and KafkaConsumer. The 
valid keys can be found
+        in ConsumerConfig and KafkaSourceOptions.
+
+        Note that the following keys will be overridden by the builder when 
the KafkaSource is
+        created.
+
+        * ``key.deserializer`` is always set to ByteArrayDeserializer.
+        * ``value.deserializer`` is always set to ByteArrayDeserializer.
+        * ``auto.offset.reset.strategy`` is overridden by 
AutoOffsetResetStrategy returned by
+          :class:`KafkaOffsetsInitializer` for the starting offsets, which is 
by default
+          :meth:`KafkaOffsetsInitializer.earliest`.
+        * ``partition.discovery.interval.ms`` is overridden to -1 when 
:meth:`set_bounded` has been
+          invoked.
+        * ``client.id`` is overridden to "client.id.prefix-RANDOM_LONG", or 
"group.id-RANDOM_LONG"
+          if the client id prefix is not set.
+
+        :param props: the properties to set for the KafkaSource.
+        :return: this KafkaSourceBuilder.
+        """
+        gateway = get_gateway()
+        j_properties = gateway.jvm.java.util.Properties()
+        for key, value in props.items():
+            j_properties.setProperty(key, value)
+        self._j_builder.setProperties(j_properties)
+        return self
+
+
+class KafkaTopicPartition(object):
+    """
+    Corresponding to Java ``org.apache.kafka.common.TopicPartition`` class.
+
+    Example:
+    ::
+
+        >>> topic_partition = KafkaTopicPartition('TOPIC1', 0)
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self, topic: str, partition: int):
+        self._topic = topic
+        self._partition = partition
+
+    def _to_j_topic_partition(self):
+        jvm = get_gateway().jvm
+        return 
jvm.org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition(
+            self._topic, self._partition)
+
+    def __eq__(self, other):
+        if not isinstance(other, KafkaTopicPartition):
+            return False
+        return self._topic == other._topic and self._partition == 
other._partition
+
+    def __hash__(self):
+        return 31 * (31 + self._partition) + hash(self._topic)
+
+
+class KafkaOffsetResetStrategy(Enum):
+    """
+    Corresponding to Java 
``org.apache.kafka.client.consumer.OffsetResetStrategy`` class.
+
+    .. versionadded:: 1.16.0
+    """
+
+    LATEST = 0
+    EARLIEST = 1
+    NONE = 2
+
+    def _to_j_offset_reset_strategy(self):
+        JOffsetResetStrategy = 
get_gateway().jvm.org.apache.flink.kafka.shaded.org.apache.kafka.\
+            clients.consumer.OffsetResetStrategy
+        return getattr(JOffsetResetStrategy, self.name)
+
+
+class KafkaOffsetsInitializer(object):
+    """
+    An interface for users to specify the starting / stopping offset of a 
KafkaPartitionSplit.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self, j_initializer: JavaObject):
+        self._j_initializer = j_initializer
+
+    @staticmethod
+    def committed_offsets(
+            offset_reset_strategy: 'KafkaOffsetResetStrategy' = 
KafkaOffsetResetStrategy.NONE) -> \
+            'KafkaOffsetsInitializer':
+        """
+        Get an :class:`KafkaOffsetsInitializer` which initializes the offsets 
to the committed
+        offsets. An exception will be thrown at runtime if there is no 
committed offsets.
+
+        An optional :class:`KafkaOffsetResetStrategy` can be specified to 
initialize the offsets if
+        the committed offsets does not exist.
+
+        :param offset_reset_strategy: the offset reset strategy to use when 
the committed offsets do
+            not exist.
+        :return: an offset initializer which initialize the offsets to the 
committed offsets.
+        """
+        JOffsetsInitializer = 
get_gateway().jvm.org.apache.flink.connector.kafka.source.\
+            enumerator.initializer.OffsetsInitializer
+        return KafkaOffsetsInitializer(JOffsetsInitializer.committedOffsets(
+            offset_reset_strategy._to_j_offset_reset_strategy()))
+
+    @staticmethod
+    def timestamp(timestamp: int) -> 'KafkaOffsetsInitializer':
+        """
+        Get an :class:`KafkaOffsetsInitializer` which initializes the offsets 
in each partition so
+        that the initialized offset is the offset of the first record whose 
record timestamp is
+        greater than or equals the give timestamp.
+
+        :param timestamp: the timestamp to start the consumption.
+        :return: an :class:`OffsetsInitializer` which initializes the offsets 
based on the given
+            timestamp.
+        """
+        JOffsetsInitializer = 
get_gateway().jvm.org.apache.flink.connector.kafka.source. \
+            enumerator.initializer.OffsetsInitializer
+        return 
KafkaOffsetsInitializer(JOffsetsInitializer.timestamp(timestamp))
+
+    @staticmethod
+    def earliest() -> 'KafkaOffsetsInitializer':
+        """
+        Get an :class:`KafkaOffsetsInitializer` which initializes the offsets 
to the earliest
+        available offsets of each partition.
+
+        :return: an :class:`KafkaOffsetsInitializer` which initializes the 
offsets to the earliest
+            available offsets.
+        """
+        JOffsetsInitializer = 
get_gateway().jvm.org.apache.flink.connector.kafka.source. \
+            enumerator.initializer.OffsetsInitializer
+        return KafkaOffsetsInitializer(JOffsetsInitializer.earliest())
+
+    @staticmethod
+    def latest() -> 'KafkaOffsetsInitializer':
+        """
+        Get an :class:`KafkaOffsetsInitializer` which initializes the offsets 
to the latest offsets
+        of each partition.
+
+        :return: an :class:`KafkaOffsetsInitializer` which initializes the 
offsets to the latest
+            offsets.
+        """
+        JOffsetsInitializer = 
get_gateway().jvm.org.apache.flink.connector.kafka.source. \
+            enumerator.initializer.OffsetsInitializer
+        return KafkaOffsetsInitializer(JOffsetsInitializer.latest())
+
+    @staticmethod
+    def offsets(offsets: Dict['KafkaTopicPartition', int],
+                offset_reset_strategy: 'KafkaOffsetResetStrategy' =
+                KafkaOffsetResetStrategy.EARLIEST) -> 
'KafkaOffsetsInitializer':
+        """
+        Get an :class:`KafkaOffsetsInitializer` which initializes the offsets 
to the specified
+        offsets.
+
+        An optional :class:`KafkaOffsetResetStrategy` can be specified to 
initialize the offsets in
+        case the specified offset is out of range.
+
+        Example:
+        ::
+
+            >>> KafkaOffsetsInitializer.offsets({
+            ...     KafkaTopicPartition('TOPIC1', 0): 0,
+            ...     KafkaTopicPartition('TOPIC1', 1): 10000
+            ... }, KafkaOffsetResetStrategy.EARLIEST)
+
+        :param offsets: the specified offsets for each partition.
+        :param offset_reset_strategy: the :class:`KafkaOffsetResetStrategy` to 
use when the
+            specified offset is out of range.
+        :return: an :class:`KafkaOffsetsInitializer` which initializes the 
offsets to the specified
+            offsets.
+        """
+        jvm = get_gateway().jvm
+        j_map_wrapper = jvm.org.apache.flink.python.util.HashMapWrapper(
+            None, get_java_class(jvm.Long))
+        for tp, offset in offsets.items():
+            j_map_wrapper.put(tp._to_j_topic_partition(), offset)
+
+        JOffsetsInitializer = 
get_gateway().jvm.org.apache.flink.connector.kafka.source. \
+            enumerator.initializer.OffsetsInitializer
+        return KafkaOffsetsInitializer(JOffsetsInitializer.offsets(
+            j_map_wrapper.asMap(), 
offset_reset_strategy._to_j_offset_reset_strategy()))
+
+
+class KafkaSink(Sink, SupportsPreprocessing):
+    """
+    Flink Sink to produce data into a Kafka topic. The sink supports all 
delivery guarantees
+    described by :class:`DeliveryGuarantee`.
+
+    * :attr:`DeliveryGuarantee.NONE` does not provide any guarantees: messages 
may be lost in case
+      of issues on the Kafka broker and messages may be duplicated in case of 
a Flink failure.
+    * :attr:`DeliveryGuarantee.AT_LEAST_ONCE` the sink will wait for all 
outstanding records in the
+      Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. 
No messages will be
+      lost in case of any issue with the Kafka brokers but messages may be 
duplicated when Flink
+      restarts.
+    * :attr:`DeliveryGuarantee.EXACTLY_ONCE`: In this mode the KafkaSink will 
write all messages in
+      a Kafka transaction that will be committed to Kafka on a checkpoint. 
Thus, if the consumer
+      reads only committed data (see Kafka consumer config 
``isolation.level``), no duplicates
+      will be seen in case of a Flink restart. However, this delays record 
writing effectively
+      until a checkpoint is written, so adjust the checkpoint duration 
accordingly. Please ensure
+      that you use unique transactional id prefixes across your applications 
running on the same
+      Kafka cluster such that multiple running jobs do not interfere in their 
transactions!
+      Additionally, it is highly recommended to tweak Kafka transaction 
timeout (link) >> maximum
+      checkpoint duration + maximum restart duration or data loss may happen 
when Kafka expires an
+      uncommitted transaction.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self, j_kafka_sink, transformer: Optional[StreamTransformer] 
= None):
+        super().__init__(j_kafka_sink)
+        self._transformer = transformer
+
+    @staticmethod
+    def builder() -> 'KafkaSinkBuilder':
+        """
+        Create a :class:`KafkaSinkBuilder` to construct :class:`KafkaSink`.
+        """
+        return KafkaSinkBuilder()
+
+    def get_transformer(self) -> Optional[StreamTransformer]:
+        return self._transformer
+
+
+class KafkaSinkBuilder(object):
+    """
+    Builder to construct :class:`KafkaSink`.
+
+    The following example shows the minimum setup to create a KafkaSink that 
writes String values
+    to a Kafka topic.
+
+    ::
+
+        >>> record_serializer = KafkaRecordSerializationSchema.builder() \\
+        ...     .set_topic(MY_SINK_TOPIC) \\
+        ...     .set_value_serialization_schema(SimpleStringSchema()) \\
+        ...     .build()
+        >>> sink = KafkaSink.builder() \\
+        ...     .set_bootstrap_servers(MY_BOOTSTRAP_SERVERS) \\
+        ...     .set_record_serializer(record_serializer) \\
+        ...     .build()
+
+    One can also configure different :class:`DeliveryGuarantee` by using
+    :meth:`set_delivery_guarantee` but keep in mind when using
+    :attr:`DeliveryGuarantee.EXACTLY_ONCE`, one must set the transactional id 
prefix
+    :meth:`set_transactional_id_prefix`.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self):
+        jvm = get_gateway().jvm
+        self._j_builder = 
jvm.org.apache.flink.connector.kafka.sink.KafkaSink.builder()
+        self._preprocessing = None
+
+    def build(self) -> 'KafkaSink':
+        """
+        Constructs the :class:`KafkaSink` with the configured properties.
+        """
+        return KafkaSink(self._j_builder.build(), self._preprocessing)
+
+    def set_bootstrap_servers(self, bootstrap_servers: str) -> 
'KafkaSinkBuilder':
+        """
+        Sets the Kafka bootstrap servers.
+
+        :param bootstrap_servers: A comma separated list of valid URIs to 
reach the Kafka broker.
+        """
+        self._j_builder.setBootstrapServers(bootstrap_servers)
+        return self
+
+    def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) -> 
'KafkaSinkBuilder':
+        """
+        Sets the wanted :class:`DeliveryGuarantee`. The default delivery 
guarantee is
+        :attr:`DeliveryGuarantee.NONE`.
+
+        :param delivery_guarantee: The wanted :class:`DeliveryGuarantee`.
+        """
+        
self._j_builder.setDeliveryGuarantee(delivery_guarantee._to_j_delivery_guarantee())
+        return self
+
+    def set_transactional_id_prefix(self, transactional_id_prefix: str) -> 
'KafkaSinkBuilder':
+        """
+        Sets the prefix for all created transactionalIds if 
:attr:`DeliveryGuarantee.EXACTLY_ONCE`
+        is configured.
+
+        It is mandatory to always set this value with 
:attr:`DeliveryGuarantee.EXACTLY_ONCE` to
+        prevent corrupted transactions if multiple jobs using the KafkaSink 
run against the same
+        Kafka Cluster. The default prefix is ``"kafka-sink"``.
+
+        The size of the prefix is capped by MAXIMUM_PREFIX_BYTES (6400) 
formatted with UTF-8.
+
+        It is important to keep the prefix stable across application restarts. 
If the prefix changes
+        it might happen that lingering transactions are not correctly aborted 
and newly written
+        messages are not immediately consumable until transactions timeout.
+
+        :param transactional_id_prefix: The transactional id prefix.
+        """
+        self._j_builder.setTransactionalIdPrefix(transactional_id_prefix)
+        return self
+
+    def set_record_serializer(self, record_serializer: 
'KafkaRecordSerializationSchema') \
+            -> 'KafkaSinkBuilder':
+        """
+        Sets the :class:`KafkaRecordSerializationSchema` that transforms 
incoming records to kafka
+        producer records.
+
+        :param record_serializer: The :class:`KafkaRecordSerializationSchema`.
+        """
+        # NOTE: If topic selector is a generated first-column selector, do 
extra preprocessing
+        j_topic_selector = 
get_field_value(record_serializer._j_serialization_schema,
+                                           'topicSelector')
+        if (
+            j_topic_selector.getClass().getCanonicalName() ==
+            
'org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder.'
+            'CachingTopicSelector'
+        ) and (
+            get_field_value(j_topic_selector, 
'topicSelector').getClass().getCanonicalName()
+            is not None and
+            (get_field_value(j_topic_selector, 
'topicSelector').getClass().getCanonicalName()
+             .startswith('com.sun.proxy') or
+             get_field_value(j_topic_selector, 
'topicSelector').getClass().getCanonicalName()
+             .startswith('jdk.proxy'))
+        ):
+            record_serializer._wrap_serialization_schema()
+            self._preprocessing = record_serializer._build_preprocessing()
+
+        
self._j_builder.setRecordSerializer(record_serializer._j_serialization_schema)
+        return self
+
+    def set_property(self, key: str, value: str) -> 'KafkaSinkBuilder':
+        """
+        Sets kafka producer config.
+
+        :param key: Kafka producer config key.
+        :param value: Kafka producer config value.
+        """
+        self._j_builder.setProperty(key, value)
+        return self
+
+
+class KafkaRecordSerializationSchema(SerializationSchema):
+    """
+    A serialization schema which defines how to convert the stream record to 
kafka producer record.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self, j_serialization_schema,
+                 topic_selector: Optional['KafkaTopicSelector'] = None):
+        super().__init__(j_serialization_schema)
+        self._topic_selector = topic_selector
+
+    @staticmethod
+    def builder() -> 'KafkaRecordSerializationSchemaBuilder':
+        """
+        Creates a default schema builder to provide common building blocks 
i.e. key serialization,
+        value serialization, topic selection.
+        """
+        return KafkaRecordSerializationSchemaBuilder()
+
+    def _wrap_serialization_schema(self):
+        jvm = get_gateway().jvm
+
+        def _wrap_schema(field_name):
+            j_schema_field = 
get_field(self._j_serialization_schema.getClass(), field_name)
+            if j_schema_field.get(self._j_serialization_schema) is not None:
+                j_schema_field.set(
+                    self._j_serialization_schema,
+                    jvm.org.apache.flink.python.util.PythonConnectorUtils
+                    .SecondColumnSerializationSchema(
+                        j_schema_field.get(self._j_serialization_schema)
+                    )
+                )
+
+        _wrap_schema('keySerializationSchema')
+        _wrap_schema('valueSerializationSchema')
+
+    def _build_preprocessing(self) -> StreamTransformer:
+        class SelectTopicTransformer(StreamTransformer):
+
+            def __init__(self, topic_selector: KafkaTopicSelector):
+                self._topic_selector = topic_selector
+
+            def apply(self, ds):
+                output_type = Types.ROW([Types.STRING(), ds.get_type()])
+                return ds.map(lambda v: Row(self._topic_selector.apply(v), v),
+                              output_type=output_type)
+
+        return SelectTopicTransformer(self._topic_selector)
+
+
+class KafkaRecordSerializationSchemaBuilder(object):
+    """
+    Builder to construct :class:`KafkaRecordSerializationSchema`.
+
+    Example:
+    ::
+
+        >>> KafkaRecordSerializationSchema.builder() \\
+        ...     .set_topic('topic') \\
+        ...     .set_key_serialization_schema(SimpleStringSchema()) \\
+        ...     .set_value_serialization_schema(SimpleStringSchema()) \\
+        ...     .build()
+
+    And the sink topic can be calculated dynamically from each record:
+    ::
+
+        >>> KafkaRecordSerializationSchema.builder() \\
+        ...     .set_topic_selector(lambda row: 'topic-' + row['category']) \\
+        ...     .set_value_serialization_schema(
+        ...         
JsonRowSerializationSchema.builder().with_type_info(ROW_TYPE).build()) \\
+        ...     .build()
+
+    It is necessary to configure exactly one serialization method for the 
value and a topic.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self):
+        jvm = get_gateway().jvm
+        self._j_builder = jvm.org.apache.flink.connector.kafka.sink \
+            .KafkaRecordSerializationSchemaBuilder()
+        self._fixed_topic = True  # type: bool
+        self._topic_selector = None  # type: Optional[KafkaTopicSelector]
+        self._key_serialization_schema = None  # type: 
Optional[SerializationSchema]
+        self._value_serialization_schema = None  # type: 
Optional[SerializationSchema]
+
+    def build(self) -> 'KafkaRecordSerializationSchema':
+        """
+        Constructs the :class:`KafkaRecordSerializationSchemaBuilder` with the 
configured
+        properties.
+        """
+        if self._fixed_topic:
+            return KafkaRecordSerializationSchema(self._j_builder.build())
+        else:
+            return KafkaRecordSerializationSchema(self._j_builder.build(), 
self._topic_selector)
+
+    def set_topic(self, topic: str) -> 'KafkaRecordSerializationSchemaBuilder':
+        """
+        Sets a fixed topic which used as destination for all records.
+
+        :param topic: The fixed topic.
+        """
+        self._j_builder.setTopic(topic)
+        self._fixed_topic = True
+        return self
+
+    def set_topic_selector(self, topic_selector: Union[Callable[[Any], str], 
'KafkaTopicSelector'])\
+            -> 'KafkaRecordSerializationSchemaBuilder':
+        """
+        Sets a topic selector which computes the target topic for every 
incoming record.
+
+        :param topic_selector: A :class:`KafkaTopicSelector` implementation or 
a function that
+            consumes each incoming record and return the topic string.
+        """
+        if not isinstance(topic_selector, KafkaTopicSelector) and not 
callable(topic_selector):
+            raise TypeError('topic_selector must be KafkaTopicSelector or a 
callable')
+        if not isinstance(topic_selector, KafkaTopicSelector):
+            class TopicSelectorFunctionAdapter(KafkaTopicSelector):
+
+                def __init__(self, f: Callable[[Any], str]):
+                    self._f = f
+
+                def apply(self, data) -> str:
+                    return self._f(data)
+
+            topic_selector = TopicSelectorFunctionAdapter(topic_selector)
+
+        jvm = get_gateway().jvm
+        self._j_builder.setTopicSelector(
+            
jvm.org.apache.flink.python.util.PythonConnectorUtils.createFirstColumnTopicSelector(
+                
get_java_class(jvm.org.apache.flink.connector.kafka.sink.TopicSelector)
+            )
+        )
+        self._fixed_topic = False
+        self._topic_selector = topic_selector
+        return self
+
+    def set_key_serialization_schema(self, key_serialization_schema: 
SerializationSchema) \
+            -> 'KafkaRecordSerializationSchemaBuilder':
+        """
+        Sets a :class:`SerializationSchema` which is used to serialize the 
incoming element to the
+        key of the producer record. The key serialization is optional, if not 
set, the key of the
+        producer record will be null.
+
+        :param key_serialization_schema: The :class:`SerializationSchema` to 
serialize each incoming
+            record as the key of producer record.
+        """
+        self._key_serialization_schema = key_serialization_schema
+        
self._j_builder.setKeySerializationSchema(key_serialization_schema._j_serialization_schema)
+        return self
+
+    def set_value_serialization_schema(self, value_serialization_schema: 
SerializationSchema) \
+            -> 'KafkaRecordSerializationSchemaBuilder':
+        """
+        Sets a :class:`SerializationSchema` which is used to serialize the 
incoming element to the
+        value of the producer record. The value serialization is required.
+
+        :param value_serialization_schema: The :class:`SerializationSchema` to 
serialize each data
+            record as the key of producer record.
+        """
+        self._value_serialization_schema = value_serialization_schema
+        self._j_builder.setValueSerializationSchema(
+            value_serialization_schema._j_serialization_schema)
+        return self
+
+
+class KafkaTopicSelector(ABC):
+    """
+    Select topic for an incoming record
+
+    .. versionadded:: 1.16.0
+    """
+
+    @abstractmethod
+    def apply(self, data) -> str:
+        pass
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py 
b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
new file mode 100644
index 00000000..dea06b3e
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
@@ -0,0 +1,669 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import json
+from typing import Dict
+
+import pyflink.datastream.data_stream as data_stream
+from pyflink.common import typeinfo
+
+from pyflink.common.configuration import Configuration
+from pyflink.common.serialization import SimpleStringSchema, 
DeserializationSchema
+from pyflink.common.typeinfo import Types
+from pyflink.common.types import Row
+from pyflink.common.watermark_strategy import WatermarkStrategy
+from pyflink.datastream.connectors.base import DeliveryGuarantee
+from pyflink.datastream.connectors.kafka import KafkaSource, 
KafkaTopicPartition, \
+    KafkaOffsetsInitializer, KafkaOffsetResetStrategy, 
KafkaRecordSerializationSchema, KafkaSink, \
+    FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, 
AvroRowSerializationSchema
+from pyflink.datastream.formats.csv import CsvRowDeserializationSchema, 
CsvRowSerializationSchema
+from pyflink.datastream.formats.json import JsonRowDeserializationSchema, 
JsonRowSerializationSchema
+from pyflink.java_gateway import get_gateway
+from pyflink.testing.test_case_utils import (
+    PyFlinkStreamingTestCase,
+    PyFlinkTestCase,
+    invoke_java_object_method,
+    to_java_data_structure,
+)
+from pyflink.util.java_utils import to_jarray, is_instance_of, get_field_value
+
+
+class KafkaSourceTests(PyFlinkStreamingTestCase):
+
+    def test_legacy_kafka_connector(self):
+        source_topic = 'test_source_topic'
+        sink_topic = 'test_sink_topic'
+        props = {'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group'}
+        type_info = Types.ROW([Types.INT(), Types.STRING()])
+
+        # Test for kafka consumer
+        deserialization_schema = JsonRowDeserializationSchema.builder() \
+            .type_info(type_info=type_info).build()
+
+        flink_kafka_consumer = FlinkKafkaConsumer(source_topic, 
deserialization_schema, props)
+        flink_kafka_consumer.set_start_from_earliest()
+        flink_kafka_consumer.set_commit_offsets_on_checkpoints(True)
+
+        j_properties = 
get_field_value(flink_kafka_consumer.get_java_function(), 'properties')
+        self.assertEqual('localhost:9092', 
j_properties.getProperty('bootstrap.servers'))
+        self.assertEqual('test_group', j_properties.getProperty('group.id'))
+        
self.assertTrue(get_field_value(flink_kafka_consumer.get_java_function(),
+                                        'enableCommitOnCheckpoints'))
+        j_start_up_mode = 
get_field_value(flink_kafka_consumer.get_java_function(), 'startupMode')
+
+        j_deserializer = 
get_field_value(flink_kafka_consumer.get_java_function(), 'deserializer')
+        j_deserialize_type_info = invoke_java_object_method(j_deserializer, 
"getProducedType")
+        deserialize_type_info = 
typeinfo._from_java_type(j_deserialize_type_info)
+        self.assertTrue(deserialize_type_info == type_info)
+        self.assertTrue(j_start_up_mode.equals(get_gateway().jvm
+                                               
.org.apache.flink.streaming.connectors
+                                               
.kafka.config.StartupMode.EARLIEST))
+        j_topic_desc = 
get_field_value(flink_kafka_consumer.get_java_function(),
+                                       'topicsDescriptor')
+        j_topics = invoke_java_object_method(j_topic_desc, 'getFixedTopics')
+        self.assertEqual(['test_source_topic'], list(j_topics))
+
+        # Test for kafka producer
+        serialization_schema = 
JsonRowSerializationSchema.builder().with_type_info(type_info) \
+            .build()
+        flink_kafka_producer = FlinkKafkaProducer(sink_topic, 
serialization_schema, props)
+        flink_kafka_producer.set_write_timestamp_to_kafka(False)
+
+        j_producer_config = 
get_field_value(flink_kafka_producer.get_java_function(),
+                                            'producerConfig')
+        self.assertEqual('localhost:9092', 
j_producer_config.getProperty('bootstrap.servers'))
+        self.assertEqual('test_group', 
j_producer_config.getProperty('group.id'))
+        
self.assertFalse(get_field_value(flink_kafka_producer.get_java_function(),
+                                         'writeTimestampToKafka'))
+
+    def test_compiling(self):
+        source = KafkaSource.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_topics('test_topic') \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+
+        ds = self.env.from_source(source=source,
+                                  
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+                                  source_name='kafka source')
+        ds.print()
+        plan = json.loads(self.env.get_execution_plan())
+        self.assertEqual('Source: kafka source', plan['nodes'][0]['type'])
+
+    def test_set_properties(self):
+        source = KafkaSource.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_group_id('test_group_id') \
+            .set_client_id_prefix('test_client_id_prefix') \
+            .set_property('test_property', 'test_value') \
+            .set_topics('test_topic') \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        conf = self._get_kafka_source_configuration(source)
+        self.assertEqual(conf.get_string('bootstrap.servers', ''), 
'localhost:9092')
+        self.assertEqual(conf.get_string('group.id', ''), 'test_group_id')
+        self.assertEqual(conf.get_string('client.id.prefix', ''), 
'test_client_id_prefix')
+        self.assertEqual(conf.get_string('test_property', ''), 'test_value')
+
+    def test_set_topics(self):
+        source = KafkaSource.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_topics('test_topic1', 'test_topic2') \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        kafka_subscriber = get_field_value(source.get_java_function(), 
'subscriber')
+        self.assertEqual(
+            kafka_subscriber.getClass().getCanonicalName(),
+            
'org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber'
+        )
+        topics = get_field_value(kafka_subscriber, 'topics')
+        self.assertTrue(is_instance_of(topics, 
get_gateway().jvm.java.util.List))
+        self.assertEqual(topics.size(), 2)
+        self.assertEqual(topics[0], 'test_topic1')
+        self.assertEqual(topics[1], 'test_topic2')
+
+    def test_set_topic_pattern(self):
+        source = KafkaSource.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_topic_pattern('test_topic*') \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        kafka_subscriber = get_field_value(source.get_java_function(), 
'subscriber')
+        self.assertEqual(
+            kafka_subscriber.getClass().getCanonicalName(),
+            
'org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicPatternSubscriber'
+        )
+        topic_pattern = get_field_value(kafka_subscriber, 'topicPattern')
+        self.assertTrue(is_instance_of(topic_pattern, 
get_gateway().jvm.java.util.regex.Pattern))
+        self.assertEqual(topic_pattern.toString(), 'test_topic*')
+
+    def test_set_partitions(self):
+        topic_partition_1 = KafkaTopicPartition('test_topic', 1)
+        topic_partition_2 = KafkaTopicPartition('test_topic', 2)
+        source = KafkaSource.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_partitions({topic_partition_1, topic_partition_2}) \
+            .set_value_only_deserializer(SimpleStringSchema()) \
+            .build()
+        kafka_subscriber = get_field_value(source.get_java_function(), 
'subscriber')
+        self.assertEqual(
+            kafka_subscriber.getClass().getCanonicalName(),
+            
'org.apache.flink.connector.kafka.source.enumerator.subscriber.PartitionSetSubscriber'
+        )
+        partitions = get_field_value(kafka_subscriber, 'subscribedPartitions')
+        self.assertTrue(is_instance_of(partitions, 
get_gateway().jvm.java.util.Set))
+        self.assertTrue(topic_partition_1._to_j_topic_partition() in 
partitions)
+        self.assertTrue(topic_partition_2._to_j_topic_partition() in 
partitions)
+
+    def test_set_starting_offsets(self):
+        def _build_source(initializer: KafkaOffsetsInitializer):
+            return KafkaSource.builder() \
+                .set_bootstrap_servers('localhost:9092') \
+                .set_topics('test_topic') \
+                .set_value_only_deserializer(SimpleStringSchema()) \
+                .set_group_id('test_group') \
+                .set_starting_offsets(initializer) \
+                .build()
+
+        self._check_latest_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.latest()))
+        self._check_reader_handled_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.earliest()), -2,
+            KafkaOffsetResetStrategy.EARLIEST
+        )
+        self._check_reader_handled_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.committed_offsets()), -3,
+            KafkaOffsetResetStrategy.NONE
+        )
+        self._check_reader_handled_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.committed_offsets(
+                KafkaOffsetResetStrategy.LATEST
+            )), -3, KafkaOffsetResetStrategy.LATEST
+        )
+        self._check_timestamp_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.timestamp(100)), 100
+        )
+        specified_offsets = {
+            KafkaTopicPartition('test_topic1', 1): 1000,
+            KafkaTopicPartition('test_topic2', 2): 2000
+        }
+        self._check_specified_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.offsets(specified_offsets)), 
specified_offsets,
+            KafkaOffsetResetStrategy.EARLIEST
+        )
+        self._check_specified_offsets_initializer(
+            _build_source(KafkaOffsetsInitializer.offsets(
+                specified_offsets,
+                KafkaOffsetResetStrategy.LATEST
+            )),
+            specified_offsets,
+            KafkaOffsetResetStrategy.LATEST
+        )
+
+    def test_bounded(self):
+        def _build_source(initializer: KafkaOffsetsInitializer):
+            return KafkaSource.builder() \
+                .set_bootstrap_servers('localhost:9092') \
+                .set_topics('test_topic') \
+                .set_value_only_deserializer(SimpleStringSchema()) \
+                .set_group_id('test_group') \
+                .set_bounded(initializer) \
+                .build()
+
+        def _check_bounded(source: KafkaSource):
+            self.assertEqual(
+                get_field_value(source.get_java_function(), 
'boundedness').toString(), 'BOUNDED'
+            )
+
+        self._test_set_bounded_or_unbounded(_build_source, _check_bounded)
+
+    def test_unbounded(self):
+        def _build_source(initializer: KafkaOffsetsInitializer):
+            return KafkaSource.builder() \
+                .set_bootstrap_servers('localhost:9092') \
+                .set_topics('test_topic') \
+                .set_value_only_deserializer(SimpleStringSchema()) \
+                .set_group_id('test_group') \
+                .set_unbounded(initializer) \
+                .build()
+
+        def _check_bounded(source: KafkaSource):
+            self.assertEqual(
+                get_field_value(source.get_java_function(), 
'boundedness').toString(),
+                'CONTINUOUS_UNBOUNDED'
+            )
+
+        self._test_set_bounded_or_unbounded(_build_source, _check_bounded)
+
+    def _test_set_bounded_or_unbounded(self, _build_source, 
_check_boundedness):
+        source = _build_source(KafkaOffsetsInitializer.latest())
+        _check_boundedness(source)
+        self._check_latest_offsets_initializer(source, False)
+        source = _build_source(KafkaOffsetsInitializer.earliest())
+        _check_boundedness(source)
+        self._check_reader_handled_offsets_initializer(
+            source, -2, KafkaOffsetResetStrategy.EARLIEST, False
+        )
+        source = _build_source(KafkaOffsetsInitializer.committed_offsets())
+        _check_boundedness(source)
+        self._check_reader_handled_offsets_initializer(
+            source, -3, KafkaOffsetResetStrategy.NONE, False
+        )
+        source = _build_source(KafkaOffsetsInitializer.committed_offsets(
+            KafkaOffsetResetStrategy.LATEST
+        ))
+        _check_boundedness(source)
+        self._check_reader_handled_offsets_initializer(
+            source, -3, KafkaOffsetResetStrategy.LATEST, False
+        )
+        source = _build_source(KafkaOffsetsInitializer.timestamp(100))
+        _check_boundedness(source)
+        self._check_timestamp_offsets_initializer(source, 100, False)
+        specified_offsets = {
+            KafkaTopicPartition('test_topic1', 1): 1000,
+            KafkaTopicPartition('test_topic2', 2): 2000
+        }
+        source = 
_build_source(KafkaOffsetsInitializer.offsets(specified_offsets))
+        _check_boundedness(source)
+        self._check_specified_offsets_initializer(
+            source, specified_offsets, KafkaOffsetResetStrategy.EARLIEST, False
+        )
+        source = _build_source(KafkaOffsetsInitializer.offsets(
+            specified_offsets,
+            KafkaOffsetResetStrategy.LATEST)
+        )
+        _check_boundedness(source)
+        self._check_specified_offsets_initializer(
+            source, specified_offsets, KafkaOffsetResetStrategy.LATEST, False
+        )
+
+    def test_set_value_only_deserializer(self):
+        def _check(schema: DeserializationSchema, class_name: str):
+            source = KafkaSource.builder() \
+                .set_bootstrap_servers('localhost:9092') \
+                .set_topics('test_topic') \
+                .set_value_only_deserializer(schema) \
+                .build()
+            deserialization_schema_wrapper = 
get_field_value(source.get_java_function(),
+                                                             
'deserializationSchema')
+            self.assertEqual(
+                deserialization_schema_wrapper.getClass().getCanonicalName(),
+                'org.apache.flink.connector.kafka.source.reader.deserializer'
+                '.KafkaValueOnlyDeserializationSchemaWrapper'
+            )
+            deserialization_schema = 
get_field_value(deserialization_schema_wrapper,
+                                                     'deserializationSchema')
+            
self.assertEqual(deserialization_schema.getClass().getCanonicalName(),
+                             class_name)
+
+        _check(SimpleStringSchema(), 
'org.apache.flink.api.common.serialization.SimpleStringSchema')
+        _check(
+            
JsonRowDeserializationSchema.builder().type_info(Types.ROW([Types.STRING()])).build(),
+            'org.apache.flink.formats.json.JsonRowDeserializationSchema'
+        )
+        _check(
+            
CsvRowDeserializationSchema.Builder(Types.ROW([Types.STRING()])).build(),
+            'org.apache.flink.formats.csv.CsvRowDeserializationSchema'
+        )
+        avro_schema_string = """
+        {
+            "type": "record",
+            "name": "test_record",
+            "fields": []
+        }
+        """
+        _check(
+            
AvroRowDeserializationSchema(avro_schema_string=avro_schema_string),
+            'org.apache.flink.formats.avro.AvroRowDeserializationSchema'
+        )
+
+    def _check_reader_handled_offsets_initializer(self,
+                                                  source: KafkaSource,
+                                                  offset: int,
+                                                  reset_strategy: 
KafkaOffsetResetStrategy,
+                                                  is_start: bool = True):
+        if is_start:
+            field_name = 'startingOffsetsInitializer'
+        else:
+            field_name = 'stoppingOffsetsInitializer'
+        offsets_initializer = get_field_value(source.get_java_function(), 
field_name)
+        self.assertEqual(
+            offsets_initializer.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.enumerator.initializer'
+            '.ReaderHandledOffsetsInitializer'
+        )
+
+        starting_offset = get_field_value(offsets_initializer, 
'startingOffset')
+        self.assertEqual(starting_offset, offset)
+
+        offset_reset_strategy = get_field_value(offsets_initializer, 
'offsetResetStrategy')
+        self.assertTrue(
+            
offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
+        )
+
+    def _check_latest_offsets_initializer(self,
+                                          source: KafkaSource,
+                                          is_start: bool = True):
+        if is_start:
+            field_name = 'startingOffsetsInitializer'
+        else:
+            field_name = 'stoppingOffsetsInitializer'
+        offsets_initializer = get_field_value(source.get_java_function(), 
field_name)
+        self.assertEqual(
+            offsets_initializer.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.enumerator.initializer'
+            '.LatestOffsetsInitializer'
+        )
+
+    def _check_timestamp_offsets_initializer(self,
+                                             source: KafkaSource,
+                                             timestamp: int,
+                                             is_start: bool = True):
+        if is_start:
+            field_name = 'startingOffsetsInitializer'
+        else:
+            field_name = 'stoppingOffsetsInitializer'
+        offsets_initializer = get_field_value(source.get_java_function(), 
field_name)
+        self.assertEqual(
+            offsets_initializer.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.enumerator.initializer'
+            '.TimestampOffsetsInitializer'
+        )
+
+        starting_timestamp = get_field_value(offsets_initializer, 
'startingTimestamp')
+        self.assertEqual(starting_timestamp, timestamp)
+
+    def _check_specified_offsets_initializer(self,
+                                             source: KafkaSource,
+                                             offsets: 
Dict[KafkaTopicPartition, int],
+                                             reset_strategy: 
KafkaOffsetResetStrategy,
+                                             is_start: bool = True):
+        if is_start:
+            field_name = 'startingOffsetsInitializer'
+        else:
+            field_name = 'stoppingOffsetsInitializer'
+        offsets_initializer = get_field_value(source.get_java_function(), 
field_name)
+        self.assertEqual(
+            offsets_initializer.getClass().getCanonicalName(),
+            'org.apache.flink.connector.kafka.source.enumerator.initializer'
+            '.SpecifiedOffsetsInitializer'
+        )
+
+        initial_offsets = get_field_value(offsets_initializer, 
'initialOffsets')
+        self.assertTrue(is_instance_of(initial_offsets, 
get_gateway().jvm.java.util.Map))
+        self.assertEqual(initial_offsets.size(), len(offsets))
+        for j_topic_partition in initial_offsets:
+            topic_partition = KafkaTopicPartition(j_topic_partition.topic(),
+                                                  
j_topic_partition.partition())
+            self.assertIsNotNone(offsets.get(topic_partition))
+            self.assertEqual(initial_offsets[j_topic_partition], 
offsets[topic_partition])
+
+        offset_reset_strategy = get_field_value(offsets_initializer, 
'offsetResetStrategy')
+        self.assertTrue(
+            
offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
+        )
+
+    @staticmethod
+    def _get_kafka_source_configuration(source: KafkaSource):
+        jvm = get_gateway().jvm
+        j_source = source.get_java_function()
+        j_to_configuration = j_source.getClass().getDeclaredMethod(
+            'getConfiguration', to_jarray(jvm.java.lang.Class, [])
+        )
+        j_to_configuration.setAccessible(True)
+        j_configuration = j_to_configuration.invoke(j_source, 
to_jarray(jvm.java.lang.Object, []))
+        return Configuration(j_configuration=j_configuration)
+
+
+class KafkaSinkTests(PyFlinkStreamingTestCase):
+
+    def test_compile(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+
+        ds = self.env.from_collection([], type_info=Types.STRING())
+        ds.sink_to(sink)
+
+        plan = json.loads(self.env.get_execution_plan())
+        self.assertEqual(plan['nodes'][1]['type'], 'Sink: Writer')
+        self.assertEqual(plan['nodes'][2]['type'], 'Sink: Committer')
+
+    def test_set_bootstrap_severs(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092,localhost:9093') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        config = get_field_value(sink.get_java_function(), 
'kafkaProducerConfig')
+        self.assertEqual(config.get('bootstrap.servers'), 
'localhost:9092,localhost:9093')
+
+    def test_set_delivery_guarantee(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        guarantee = get_field_value(sink.get_java_function(), 
'deliveryGuarantee')
+        self.assertEqual(guarantee.toString(), 'none')
+
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        guarantee = get_field_value(sink.get_java_function(), 
'deliveryGuarantee')
+        self.assertEqual(guarantee.toString(), 'at-least-once')
+
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE) \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        guarantee = get_field_value(sink.get_java_function(), 
'deliveryGuarantee')
+        self.assertEqual(guarantee.toString(), 'exactly-once')
+
+    def test_set_transactional_id_prefix(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_transactional_id_prefix('test-prefix') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        prefix = get_field_value(sink.get_java_function(), 
'transactionalIdPrefix')
+        self.assertEqual(prefix, 'test-prefix')
+
+    def test_set_property(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .set_property('test-key', 'test-value') \
+            .build()
+        config = get_field_value(sink.get_java_function(), 
'kafkaProducerConfig')
+        self.assertEqual(config.get('test-key'), 'test-value')
+
+    def test_set_record_serializer(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        serializer = get_field_value(sink.get_java_function(), 
'recordSerializer')
+        self.assertEqual(serializer.getClass().getCanonicalName(),
+                         'org.apache.flink.connector.kafka.sink.'
+                         'KafkaRecordSerializationSchemaBuilder.'
+                         'KafkaRecordSerializationSchemaWrapper')
+        topic_selector = get_field_value(serializer, 'topicSelector')
+        self.assertEqual(topic_selector.apply(None), 'test-topic')
+        value_serializer = get_field_value(serializer, 
'valueSerializationSchema')
+        self.assertEqual(value_serializer.getClass().getCanonicalName(),
+                         
'org.apache.flink.api.common.serialization.SimpleStringSchema')
+
+    @staticmethod
+    def _build_serialization_schema() -> KafkaRecordSerializationSchema:
+        return KafkaRecordSerializationSchema.builder() \
+            .set_topic('test-topic') \
+            .set_value_serialization_schema(SimpleStringSchema()) \
+            .build()
+
+
+class KafkaRecordSerializationSchemaTests(PyFlinkTestCase):
+
+    def test_set_topic(self):
+        input_type = Types.ROW([Types.STRING()])
+
+        serialization_schema = KafkaRecordSerializationSchema.builder() \
+            .set_topic('test-topic') \
+            .set_value_serialization_schema(
+                
JsonRowSerializationSchema.builder().with_type_info(input_type).build()) \
+            .build()
+        jvm = get_gateway().jvm
+        serialization_schema._j_serialization_schema.open(
+            
jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext(),
+            jvm.org.apache.flink.connector.kafka.sink.DefaultKafkaSinkContext(
+                0, 1, jvm.java.util.Properties()))
+
+        j_record = serialization_schema._j_serialization_schema.serialize(
+            to_java_data_structure(Row('test')), None, None
+        )
+        self.assertEqual(j_record.topic(), 'test-topic')
+        self.assertIsNone(j_record.key())
+        self.assertEqual(j_record.value(), b'{"f0":"test"}')
+
+    def test_set_topic_selector(self):
+        def _select(data):
+            data = data[0]
+            if data == 'a':
+                return 'topic-a'
+            elif data == 'b':
+                return 'topic-b'
+            else:
+                return 'topic-dead-letter'
+
+        def _check_record(data, topic, serialized_data):
+            input_type = Types.ROW([Types.STRING()])
+
+            serialization_schema = KafkaRecordSerializationSchema.builder() \
+                .set_topic_selector(_select) \
+                .set_value_serialization_schema(
+                    
JsonRowSerializationSchema.builder().with_type_info(input_type).build()) \
+                .build()
+            jvm = get_gateway().jvm
+            serialization_schema._j_serialization_schema.open(
+                
jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext(),
+                
jvm.org.apache.flink.connector.kafka.sink.DefaultKafkaSinkContext(
+                    0, 1, jvm.java.util.Properties()))
+            sink = KafkaSink.builder() \
+                .set_bootstrap_servers('localhost:9092') \
+                .set_record_serializer(serialization_schema) \
+                .build()
+
+            ds = MockDataStream(Types.ROW([Types.STRING()]))
+            ds.sink_to(sink)
+            row = Row(data)
+            topic_row = ds.feed(row)  # type: Row
+            j_record = serialization_schema._j_serialization_schema.serialize(
+                to_java_data_structure(topic_row), None, None
+            )
+            self.assertEqual(j_record.topic(), topic)
+            self.assertIsNone(j_record.key())
+            self.assertEqual(j_record.value(), serialized_data)
+
+        _check_record('a', 'topic-a', b'{"f0":"a"}')
+        _check_record('b', 'topic-b', b'{"f0":"b"}')
+        _check_record('c', 'topic-dead-letter', b'{"f0":"c"}')
+        _check_record('d', 'topic-dead-letter', b'{"f0":"d"}')
+
+    def test_set_key_serialization_schema(self):
+        def _check_key_serialization_schema(key_serialization_schema, 
expected_class):
+            serialization_schema = KafkaRecordSerializationSchema.builder() \
+                .set_topic('test-topic') \
+                .set_key_serialization_schema(key_serialization_schema) \
+                .set_value_serialization_schema(SimpleStringSchema()) \
+                .build()
+            schema_field = 
get_field_value(serialization_schema._j_serialization_schema,
+                                           'keySerializationSchema')
+            self.assertIsNotNone(schema_field)
+            self.assertEqual(schema_field.getClass().getCanonicalName(), 
expected_class)
+
+        
self._check_serialization_schema_implementations(_check_key_serialization_schema)
+
+    def test_set_value_serialization_schema(self):
+        def _check_value_serialization_schema(value_serialization_schema, 
expected_class):
+            serialization_schema = KafkaRecordSerializationSchema.builder() \
+                .set_topic('test-topic') \
+                .set_value_serialization_schema(value_serialization_schema) \
+                .build()
+            schema_field = 
get_field_value(serialization_schema._j_serialization_schema,
+                                           'valueSerializationSchema')
+            self.assertIsNotNone(schema_field)
+            self.assertEqual(schema_field.getClass().getCanonicalName(), 
expected_class)
+
+        
self._check_serialization_schema_implementations(_check_value_serialization_schema)
+
+    @staticmethod
+    def _check_serialization_schema_implementations(check_function):
+        input_type = Types.ROW([Types.STRING()])
+
+        check_function(
+            
JsonRowSerializationSchema.builder().with_type_info(input_type).build(),
+            'org.apache.flink.formats.json.JsonRowSerializationSchema'
+        )
+        check_function(
+            CsvRowSerializationSchema.Builder(input_type).build(),
+            'org.apache.flink.formats.csv.CsvRowSerializationSchema'
+        )
+        avro_schema_string = """
+        {
+            "type": "record",
+            "name": "test_record",
+            "fields": []
+        }
+        """
+        check_function(
+            AvroRowSerializationSchema(avro_schema_string=avro_schema_string),
+            'org.apache.flink.formats.avro.AvroRowSerializationSchema'
+        )
+        check_function(
+            SimpleStringSchema(),
+            'org.apache.flink.api.common.serialization.SimpleStringSchema'
+        )
+
+
+class MockDataStream(data_stream.DataStream):
+
+    def __init__(self, original_type=None):
+        super().__init__(None)
+        self._operators = []
+        self._type = original_type
+
+    def feed(self, data):
+        for op in self._operators:
+            data = op(data)
+        return data
+
+    def get_type(self):
+        return self._type
+
+    def map(self, f, output_type=None):
+        self._operators.append(f)
+        self._type = output_type
+
+    def sink_to(self, sink):
+        ds = self
+        from pyflink.datastream.connectors.base import SupportsPreprocessing
+        if isinstance(sink, SupportsPreprocessing) and sink.get_transformer() 
is not None:
+            ds = sink.get_transformer().apply(self)
+        return ds
diff --git a/flink-python/pyflink/pyflink_gateway_server.py 
b/flink-python/pyflink/pyflink_gateway_server.py
new file mode 100644
index 00000000..1cf25a54
--- /dev/null
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -0,0 +1,288 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This is a copy of the pyflink_gateway_server.py file from the Flink.
+# The original file which is accessible here:
+# 
https://github.com/apache/flink/blob/master/flink-python/pyflink/pyflink_gateway_server.py
+# Additional change is the handling of the FLINK_TEST_LIB_DIR environmental 
variable.
+# It could be used to add extra testing jars for the gateway classpath.
+# The plan is to remove this once Pyflink 1.19 is released
+
+import argparse
+import getpass
+import glob
+import os
+import platform
+import signal
+import socket
+import sys
+from collections import namedtuple
+from string import Template
+from subprocess import Popen, PIPE
+
+from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root
+
+KEY_ENV_LOG_DIR = "env.log.dir"
+KEY_ENV_YARN_CONF_DIR = "env.yarn.conf.dir"
+KEY_ENV_HADOOP_CONF_DIR = "env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR = "env.hbase.conf.dir"
+KEY_ENV_JAVA_HOME = "env.java.home"
+KEY_ENV_JAVA_OPTS = "env.java.opts.all"
+KEY_ENV_JAVA_OPTS_DEPRECATED = "env.java.opts"
+
+
+def on_windows():
+    return platform.system() == "Windows"
+
+
+def read_from_config(key, default_value, flink_conf_file):
+    value = default_value
+    # get the realpath of tainted path value to avoid CWE22 problem that 
constructs a path or URI
+    # using the tainted value and might allow an attacker to access, modify, 
or test the existence
+    # of critical or sensitive files.
+    with open(os.path.realpath(flink_conf_file), "r") as f:
+        while True:
+            line = f.readline()
+            if not line:
+                break
+            if line.startswith("#") or len(line.strip()) == 0:
+                continue
+            k, v = line.split(":", 1)
+            if k.strip() == key:
+                value = v.strip()
+    return value
+
+
+def find_java_executable():
+    java_executable = "java.exe" if on_windows() else "java"
+    flink_home = _find_flink_home()
+    flink_conf_file = os.path.join(flink_home, "conf", "flink-conf.yaml")
+    java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_file)
+
+    if java_home is None and "JAVA_HOME" in os.environ:
+        java_home = os.environ["JAVA_HOME"]
+
+    if java_home is not None:
+        java_executable = os.path.join(java_home, "bin", java_executable)
+
+    return java_executable
+
+
+def prepare_environment_variables(env):
+    flink_home = _find_flink_home()
+    # get the realpath of tainted path value to avoid CWE22 problem that 
constructs a path or URI
+    # using the tainted value and might allow an attacker to access, modify, 
or test the existence
+    # of critical or sensitive files.
+    real_flink_home = os.path.realpath(flink_home)
+
+    if 'FLINK_CONF_DIR' in env:
+        flink_conf_directory = os.path.realpath(env['FLINK_CONF_DIR'])
+    else:
+        flink_conf_directory = os.path.join(real_flink_home, "conf")
+    env['FLINK_CONF_DIR'] = flink_conf_directory
+
+    if 'FLINK_LIB_DIR' in env:
+        flink_lib_directory = os.path.realpath(env['FLINK_LIB_DIR'])
+    else:
+        flink_lib_directory = os.path.join(real_flink_home, "lib")
+    env['FLINK_LIB_DIR'] = flink_lib_directory
+
+    if 'FLINK_OPT_DIR' in env:
+        flink_opt_directory = os.path.realpath(env['FLINK_OPT_DIR'])
+    else:
+        flink_opt_directory = os.path.join(real_flink_home, "opt")
+    env['FLINK_OPT_DIR'] = flink_opt_directory
+
+    if 'FLINK_PLUGINS_DIR' in env:
+        flink_plugins_directory = os.path.realpath(env['FLINK_PLUGINS_DIR'])
+    else:
+        flink_plugins_directory = os.path.join(real_flink_home, "plugins")
+    env['FLINK_PLUGINS_DIR'] = flink_plugins_directory
+
+    env["FLINK_BIN_DIR"] = os.path.join(real_flink_home, "bin")
+
+
+def construct_log_settings(env):
+    templates = [
+        
"-Dlog.file=${flink_log_dir}/flink-${flink_ident_string}-python-${hostname}.log",
+        "-Dlog4j.configuration=${log4j_properties}",
+        "-Dlog4j.configurationFile=${log4j_properties}",
+        "-Dlogback.configurationFile=${logback_xml}"
+    ]
+
+    flink_home = os.path.realpath(_find_flink_home())
+    flink_conf_dir = env['FLINK_CONF_DIR']
+    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+
+    if "FLINK_LOG_DIR" in env:
+        flink_log_dir = env["FLINK_LOG_DIR"]
+    else:
+        flink_log_dir = read_from_config(
+            KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), flink_conf_file)
+
+    if "LOG4J_PROPERTIES" in env:
+        log4j_properties = env["LOG4J_PROPERTIES"]
+    else:
+        log4j_properties = "%s/log4j-cli.properties" % flink_conf_dir
+
+    if "LOGBACK_XML" in env:
+        logback_xml = env["LOGBACK_XML"]
+    else:
+        logback_xml = "%s/logback.xml" % flink_conf_dir
+
+    if "FLINK_IDENT_STRING" in env:
+        flink_ident_string = env["FLINK_IDENT_STRING"]
+    else:
+        flink_ident_string = getpass.getuser()
+
+    hostname = socket.gethostname()
+    log_settings = []
+    for template in templates:
+        log_settings.append(Template(template).substitute(
+            log4j_properties=log4j_properties,
+            logback_xml=logback_xml,
+            flink_log_dir=flink_log_dir,
+            flink_ident_string=flink_ident_string,
+            hostname=hostname))
+    return log_settings
+
+
+def get_jvm_opts(env):
+    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+    jvm_opts = env.get(
+        'FLINK_ENV_JAVA_OPTS',
+        read_from_config(
+            KEY_ENV_JAVA_OPTS,
+            read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "", 
flink_conf_file),
+            flink_conf_file))
+
+    # Remove leading and ending double quotes (if present) of value
+    jvm_opts = jvm_opts.strip("\"")
+    return jvm_opts.split(" ")
+
+
+def construct_flink_classpath(env):
+    flink_home = _find_flink_home()
+    flink_lib_directory = env['FLINK_LIB_DIR']
+    flink_opt_directory = env['FLINK_OPT_DIR']
+
+    if on_windows():
+        # The command length is limited on Windows. To avoid the problem we 
should shorten the
+        # command length as much as possible.
+        lib_jars = os.path.join(flink_lib_directory, "*")
+    else:
+        lib_jars = os.pathsep.join(glob.glob(os.path.join(flink_lib_directory, 
"*.jar")))
+
+    flink_python_jars = glob.glob(os.path.join(flink_opt_directory, 
"flink-python*.jar"))
+    if len(flink_python_jars) < 1:
+        print("The flink-python jar is not found in the opt folder of the 
FLINK_HOME: %s" %
+              flink_home)
+        return lib_jars
+    flink_python_jar = flink_python_jars[0]
+
+    return os.pathsep.join([lib_jars, flink_python_jar])
+
+
+def construct_hadoop_classpath(env):
+    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+
+    hadoop_conf_dir = ""
+    if 'HADOOP_CONF_DIR' not in env and 'HADOOP_CLASSPATH' not in env:
+        if os.path.isdir("/etc/hadoop/conf"):
+            print("Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no 
HADOOP_CONF_DIR or"
+                  "HADOOP_CLASSPATH was set.")
+            hadoop_conf_dir = "/etc/hadoop/conf"
+
+    hbase_conf_dir = ""
+    if 'HBASE_CONF_DIR' not in env:
+        if os.path.isdir("/etc/hbase/conf"):
+            print("Setting HBASE_CONF_DIR=/etc/hbase/conf because no 
HBASE_CONF_DIR was set.")
+            hbase_conf_dir = "/etc/hbase/conf"
+
+    return os.pathsep.join(
+        [env.get("HADOOP_CLASSPATH", ""),
+         env.get("YARN_CONF_DIR",
+                 read_from_config(KEY_ENV_YARN_CONF_DIR, "", flink_conf_file)),
+         env.get("HADOOP_CONF_DIR",
+                 read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, 
flink_conf_file)),
+         env.get("HBASE_CONF_DIR",
+                 read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, 
flink_conf_file))])
+
+
+def construct_test_classpath(env):
+    test_jar_patterns = [
+        "flink-python/target/test-dependencies/*",
+        "flink-python/target/artifacts/testDataStream.jar",
+        "flink-python/target/flink-python*-tests.jar",
+    ]
+    test_jars = []
+
+    # Connector tests need to add specific jars to the gateway classpath
+    if 'FLINK_TEST_LIBS' in env:
+        test_jars += glob.glob(env['FLINK_TEST_LIBS'])
+    else:
+        flink_source_root = _find_flink_source_root()
+        for pattern in test_jar_patterns:
+            pattern = pattern.replace("/", os.path.sep)
+            test_jars += glob.glob(os.path.join(flink_source_root, pattern))
+    return os.path.pathsep.join(test_jars)
+
+
+def construct_program_args(args):
+    parser = argparse.ArgumentParser()
+    parser.add_argument("-c", "--class", required=True)
+    parser.add_argument("cluster_type", choices=["local", "remote", "yarn"])
+    parse_result, other_args = parser.parse_known_args(args)
+    main_class = getattr(parse_result, "class")
+    cluster_type = parse_result.cluster_type
+    return namedtuple(
+        "ProgramArgs", ["main_class", "cluster_type", "other_args"])(
+        main_class, cluster_type, other_args)
+
+
+def launch_gateway_server_process(env, args):
+    prepare_environment_variables(env)
+    program_args = construct_program_args(args)
+    if program_args.cluster_type == "local":
+        java_executable = find_java_executable()
+        log_settings = construct_log_settings(env)
+        jvm_args = env.get('JVM_ARGS', '')
+        jvm_opts = get_jvm_opts(env)
+        classpath = os.pathsep.join(
+            [construct_flink_classpath(env), construct_hadoop_classpath(env)])
+        if "FLINK_TESTING" in env:
+            classpath = os.pathsep.join([classpath, 
construct_test_classpath(env)])
+        command = [java_executable, jvm_args, 
"-XX:+IgnoreUnrecognizedVMOptions",
+                   "--add-opens=jdk.proxy2/jdk.proxy2=ALL-UNNAMED"] \
+            + jvm_opts + log_settings \
+            + ["-cp", classpath, program_args.main_class] + 
program_args.other_args
+    else:
+        command = [os.path.join(env["FLINK_BIN_DIR"], "flink"), "run"] + 
program_args.other_args \
+            + ["-c", program_args.main_class]
+    preexec_fn = None
+    if not on_windows():
+        def preexec_func():
+            # ignore ctrl-c / SIGINT
+            signal.signal(signal.SIGINT, signal.SIG_IGN)
+        preexec_fn = preexec_func
+    return Popen(list(filter(lambda c: len(c) != 0, command)),
+                 stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)
+
+
+if __name__ == "__main__":
+    launch_gateway_server_process(os.environ, sys.argv[1:])
diff --git a/flink-python/setup.py b/flink-python/setup.py
new file mode 100644
index 00000000..8e788d4e
--- /dev/null
+++ b/flink-python/setup.py
@@ -0,0 +1,158 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from __future__ import print_function
+
+import glob
+import io
+import os
+import sys
+
+from setuptools import setup
+from shutil import copy, rmtree
+from xml.etree import ElementTree as ET
+
+PACKAGE_NAME = 'apache-flink-connector-kafka'
+# Source files, directories
+CURRENT_DIR = os.path.abspath(os.path.dirname(__file__))
+POM_FILE = os.path.join(CURRENT_DIR, '../pom.xml')
+README_FILE = os.path.join(CURRENT_DIR, 'README.txt')
+
+# Generated files and directories
+VERSION_FILE = os.path.join(CURRENT_DIR, 
'pyflink/datastream/connectors/kafka_connector_version.py')
+LIB_PATH = os.path.join(CURRENT_DIR, 'pyflink/lib')
+DEPENDENCY_FILE = os.path.join(CURRENT_DIR, 'dev/dev-requirements.txt')
+
+
+# Removes a file or directory if exists.
+def remove_if_exists(file_path):
+    if os.path.exists(file_path):
+        if os.path.isfile(file_path):
+            os.remove(file_path)
+        if os.path.isdir(file_path):
+            rmtree(file_path)
+
+
+# Reads the content of the README.txt file.
+def readme_content():
+    with io.open(README_FILE, 'r', encoding='utf-8') as f:
+        return f.read()
+
+
+# Reads the parameters used by the setup command.
+# The source is the kafka_connector_version.py and the README.txt.
+def setup_parameters():
+    try:
+        exec(open(VERSION_FILE).read())
+        return locals()['__connector_version__'], 
locals()['__flink_dependency__'], readme_content()
+    except IOError:
+        print("Failed to load PyFlink version file for packaging. " +
+              "'%s' not found!" % VERSION_FILE,
+              file=sys.stderr)
+        sys.exit(-1)
+
+
+# Reads and parses the flink-connector-kafka main pom.xml.
+# Based on the version data in the pom.xml prepares the pyflink dir:
+#  - Generates kafka_connector_version.py
+#  - Generates dev-requirements.txt
+#  - Copies the flink-sql-connector-kafka*.jar to the pyflink/lib dir
+def prepare_pyflink_dir():
+    # source files
+    pom_root = ET.parse(POM_FILE).getroot()
+    flink_version = pom_root.findall(
+        "./{http://maven.apache.org/POM/4.0.0}properties/"; +
+        "{http://maven.apache.org/POM/4.0.0}flink.version";
+    )[0].text
+    connector_version = pom_root.findall(
+        
"./{http://maven.apache.org/POM/4.0.0}version";)[0].text.replace("-SNAPSHOT", 
".dev0")
+
+    flink_dependency = "apache-flink>=" + flink_version
+
+    os.makedirs(LIB_PATH)
+    connector_jar = \
+        glob.glob(CURRENT_DIR + 
'/target/test-dependencies/flink-sql-connector-kafka*.jar')[0]
+    copy(connector_jar, LIB_PATH)
+
+    with io.open(VERSION_FILE, 'w', encoding='utf-8') as f:
+        f.write('# Generated file, do not edit\n')
+        f.write('__connector_version__ = "' + connector_version + '"\n')
+        f.write('__flink_dependency__ = "' + flink_dependency + '"\n')
+
+    with io.open(DEPENDENCY_FILE, 'w', encoding='utf-8') as f:
+        f.write('# Generated file, do not edit\n')
+        f.write(flink_dependency + '\n')
+
+
+# Main
+print("Python version used to package: " + sys.version)
+
+# Python version check
+if sys.version_info < (3, 7):
+    print("Python versions prior to 3.7 are not supported for PyFlink.",
+          file=sys.stderr)
+    sys.exit(-1)
+
+# Checks the running environment:
+#  - In the connector source root directory - package preparation
+#  - Otherwise - package deployment
+in_flink_source = os.path.isfile("../flink-connector-kafka/src/main/" +
+                                 
"java/org/apache/flink/connector/kafka/source/KafkaSource.java")
+
+# Cleans up the generated files and directories and regenerate them.
+if in_flink_source:
+    remove_if_exists(VERSION_FILE)
+    remove_if_exists(DEPENDENCY_FILE)
+    remove_if_exists(LIB_PATH)
+    prepare_pyflink_dir()
+    print("\nPreparing Flink Kafka connector package")
+
+# Reads the current setup data from the kafka_connector_version.py file and 
the README.txt
+(connector_version, flink_dependency, long_description) = setup_parameters()
+
+print("\nConnector version: " + connector_version)
+print("Flink dependency: " + flink_dependency + "\n")
+
+if in_flink_source:
+    # Removes temporary directory used by the setup tool
+    remove_if_exists(PACKAGE_NAME.replace('-', '_') + '.egg-info')
+
+# Runs the python setup
+setup(
+    name=PACKAGE_NAME,
+    version=connector_version,
+    include_package_data=True,
+    url='https://flink.apache.org',
+    license='https://www.apache.org/licenses/LICENSE-2.0',
+    author='Apache Software Foundation',
+    author_email='d...@flink.apache.org',
+    python_requires='>=3.8',
+    install_requires=[flink_dependency],
+    description='Apache Flink Python Kafka Connector API',
+    long_description=long_description,
+    long_description_content_type='text/plain',
+    zip_safe=False,
+    classifiers=[
+        'Development Status :: 5 - Production/Stable',
+        'License :: OSI Approved :: Apache Software License',
+        'Programming Language :: Python :: 3.7',
+        'Programming Language :: Python :: 3.8',
+        'Programming Language :: Python :: 3.9',
+        'Programming Language :: Python :: 3.10']
+)
+
+print("\nFlink Kafka connector package is ready\n")
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
new file mode 100644
index 00000000..c21c00f7
--- /dev/null
+++ b/flink-python/tox.ini
@@ -0,0 +1,51 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+[tox]
+# tox (https://tox.readthedocs.io/) is a tool for running tests
+# in multiple virtualenvs. This configuration file will run the
+# test suite on all supported python versions.
+# new environments will be excluded by default unless explicitly added to 
envlist.
+envlist = {py38, py39, py310}-cython
+
+[testenv]
+whitelist_externals = /bin/bash
+deps = apache-flink
+passenv = *
+commands =
+    python --version
+    pip install pytest
+    bash ./dev/integration_test.sh
+# Replace the default installation command with a custom retry installation 
script, because on high-speed
+# networks, downloading a package may raise a ConnectionResetError: [Errno 
104] Peer reset connection.
+install_command = {toxinidir}/dev/install_command.sh {opts} {packages}
+
+[flake8]
+# We follow PEP 8 (https://www.python.org/dev/peps/pep-0008/) with one 
exception: lines can be
+# up to 100 characters in length, not 79.
+ignore=E226,E241,E305,E402,E722,E731,E741,W503,W504
+max-line-length=100
+exclude=.tox/*,dev/*,lib/*,target/*,build/*,dist/*
+
+[mypy]
+files=pyflink/datastream/connectors/*.py
+ignore_missing_imports = True
+strict_optional=False
+
+[mypy-pyflink.fn_execution.*]
+ignore_errors = True
diff --git a/pom.xml b/pom.xml
index ff8efac3..d4cd2552 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@ under the License.
         <module>flink-connector-kafka</module>
         <module>flink-sql-connector-kafka</module>
         <module>flink-connector-kafka-e2e-tests</module>
+        <module>flink-python</module>
     </modules>
 
     <properties>

Reply via email to