This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fda2bdd05c7 [FLINK-28886][python] Support HybridSource in Python 
DataStream API
fda2bdd05c7 is described below

commit fda2bdd05c75587b81205e72ca010d0022a44616
Author: Dian Fu <[email protected]>
AuthorDate: Tue Aug 9 17:39:06 2022 +0800

    [FLINK-28886][python] Support HybridSource in Python DataStream API
    
    This closes #20515.
---
 .../docs/connectors/datastream/hybridsource.md     | 30 ++++++++-
 .../docs/connectors/datastream/hybridsource.md     | 30 ++++++++-
 flink-python/docs/pyflink.datastream.rst           |  6 ++
 flink-python/pyflink/datastream/__init__.py        |  2 +
 .../pyflink/datastream/connectors/hybrid_source.py | 71 ++++++++++++++++++++++
 5 files changed, 137 insertions(+), 2 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/hybridsource.md 
b/docs/content.zh/docs/connectors/datastream/hybridsource.md
index 7058ac43d0f..a6bf4f8d7c3 100644
--- a/docs/content.zh/docs/connectors/datastream/hybridsource.md
+++ b/docs/content.zh/docs/connectors/datastream/hybridsource.md
@@ -55,6 +55,8 @@ Here we cover the most basic and then a more complex 
scenario, following the Fil
 Example: Read till pre-determined switch time from files and then continue 
reading from Kafka.
 Each source covers an upfront known range and therefore the contained sources 
can be created upfront as if they were used directly:
 
+{{< tabs "1a84c6a0-0b2f-4f96-8cf8-43ec6dd3bc5d" >}}
+{{< tab "Java" >}}
 ```java
 long switchTimestamp = ...; // derive from file input paths
 FileSource<String> fileSource =
@@ -67,7 +69,26 @@ HybridSource<String> hybridSource =
           HybridSource.builder(fileSource)
                   .addSource(kafkaSource)
                   .build();
-```  
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+switch_timestamp = ... # derive from file input paths
+file_source = FileSource \
+    .for_record_stream_format(StreamFormat.text_line_format(), test_dir) \
+    .build()
+kafka_source = KafkaSource \
+    .builder() \
+    .set_bootstrap_servers('localhost:9092') \
+    .set_group_id('MY_GROUP') \
+    .set_topics('quickstart-events') \
+    .set_value_only_deserializer(SimpleStringSchema()) \
+    .set_starting_offsets(KafkaOffsetsInitializer.timestamp(switch_timestamp)) 
\
+    .build()
+hybrid_source = 
HybridSource.builder(file_source).add_source(kafka_source).build()
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 #### Dynamic start position at switch time
 
@@ -79,6 +100,8 @@ by implementing `SourceFactory`.
 Note that enumerators need to support getting the end timestamp. This may 
currently require a source customization.
 Adding support for dynamic end position to `FileSource` is tracked in 
[FLINK-23633](https://issues.apache.org/jira/browse/FLINK-23633).
 
+{{< tabs "1a84c6a0-0b2f-4f96-8cf8-43ec6dd3bc5c" >}}
+{{< tab "Java" >}}
 ```java
 FileSource<String> fileSource = CustomFileSource.readTillOneDayFromLatest();
 HybridSource<String> hybridSource =
@@ -98,3 +121,8 @@ HybridSource<String> hybridSource =
             Boundedness.CONTINUOUS_UNBOUNDED)
         .build();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+Still not supported in Python API.
+{{< /tab >}}
+{{< /tabs >}}
diff --git a/docs/content/docs/connectors/datastream/hybridsource.md 
b/docs/content/docs/connectors/datastream/hybridsource.md
index 7058ac43d0f..a6bf4f8d7c3 100644
--- a/docs/content/docs/connectors/datastream/hybridsource.md
+++ b/docs/content/docs/connectors/datastream/hybridsource.md
@@ -55,6 +55,8 @@ Here we cover the most basic and then a more complex 
scenario, following the Fil
 Example: Read till pre-determined switch time from files and then continue 
reading from Kafka.
 Each source covers an upfront known range and therefore the contained sources 
can be created upfront as if they were used directly:
 
+{{< tabs "1a84c6a0-0b2f-4f96-8cf8-43ec6dd3bc5d" >}}
+{{< tab "Java" >}}
 ```java
 long switchTimestamp = ...; // derive from file input paths
 FileSource<String> fileSource =
@@ -67,7 +69,26 @@ HybridSource<String> hybridSource =
           HybridSource.builder(fileSource)
                   .addSource(kafkaSource)
                   .build();
-```  
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+switch_timestamp = ... # derive from file input paths
+file_source = FileSource \
+    .for_record_stream_format(StreamFormat.text_line_format(), test_dir) \
+    .build()
+kafka_source = KafkaSource \
+    .builder() \
+    .set_bootstrap_servers('localhost:9092') \
+    .set_group_id('MY_GROUP') \
+    .set_topics('quickstart-events') \
+    .set_value_only_deserializer(SimpleStringSchema()) \
+    .set_starting_offsets(KafkaOffsetsInitializer.timestamp(switch_timestamp)) 
\
+    .build()
+hybrid_source = 
HybridSource.builder(file_source).add_source(kafka_source).build()
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 #### Dynamic start position at switch time
 
@@ -79,6 +100,8 @@ by implementing `SourceFactory`.
 Note that enumerators need to support getting the end timestamp. This may 
currently require a source customization.
 Adding support for dynamic end position to `FileSource` is tracked in 
[FLINK-23633](https://issues.apache.org/jira/browse/FLINK-23633).
 
+{{< tabs "1a84c6a0-0b2f-4f96-8cf8-43ec6dd3bc5c" >}}
+{{< tab "Java" >}}
 ```java
 FileSource<String> fileSource = CustomFileSource.readTillOneDayFromLatest();
 HybridSource<String> hybridSource =
@@ -98,3 +121,8 @@ HybridSource<String> hybridSource =
             Boundedness.CONTINUOUS_UNBOUNDED)
         .build();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+Still not supported in Python API.
+{{< /tab >}}
+{{< /tabs >}}
diff --git a/flink-python/docs/pyflink.datastream.rst 
b/flink-python/docs/pyflink.datastream.rst
index 646d251c563..222852c9f11 100644
--- a/flink-python/docs/pyflink.datastream.rst
+++ b/flink-python/docs/pyflink.datastream.rst
@@ -95,6 +95,12 @@ pyflink.datastream.connectors.pulsar module
     :members:
     :undoc-members:
 
+pyflink.datastream.connectors.hybrid_source module
+--------------------------------------------------
+.. automodule:: pyflink.datastream.connectors.hybrid_source
+    :members:
+    :undoc-members:
+
 pyflink.datastream.connectors.cassandra module
 --------------------------------------------------
 .. automodule:: pyflink.datastream.connectors.cassandra
diff --git a/flink-python/pyflink/datastream/__init__.py 
b/flink-python/pyflink/datastream/__init__.py
index 4bcf0050446..6684a7120c0 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -206,6 +206,8 @@ Classes to define source & sink:
     - :class:`connectors.kinesis.KinesisFirehoseSink`:
       A Kinesis Data Firehose (KDF) Sink that performs async requests against 
a destination delivery
       stream using the buffering protocol.
+    - :class:`connectors.hybrid_source.HybridSource`:
+      A Hybrid source that switches underlying sources based on configured 
source chain.
 
 
 Classes to define formats used together with source & sink:
diff --git a/flink-python/pyflink/datastream/connectors/hybrid_source.py 
b/flink-python/pyflink/datastream/connectors/hybrid_source.py
new file mode 100644
index 00000000000..fa716b66877
--- /dev/null
+++ b/flink-python/pyflink/datastream/connectors/hybrid_source.py
@@ -0,0 +1,71 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from py4j.java_gateway import JavaObject
+
+from pyflink.datastream.connectors import Source
+from pyflink.java_gateway import get_gateway
+
+
+__all__ = [
+    'HybridSource',
+    'HybridSourceBuilder'
+]
+
+
+class HybridSource(Source):
+    """
+    Hybrid source that switches underlying sources based on configured source 
chain.
+
+    A simple example with FileSource and KafkaSource with fixed Kafka start 
position:
+
+    ::
+
+        >>> file_source = FileSource \\
+        ...      .for_record_stream_format(StreamFormat.text_line_format(), 
test_dir) \\
+        ...      .build()
+        >>> kafka_source = KafkaSource \\
+        ...     .builder() \\
+        ...     .set_bootstrap_servers('localhost:9092') \\
+        ...     .set_group_id('MY_GROUP') \\
+        ...     .set_topics('quickstart-events') \\
+        ...     .set_value_only_deserializer(SimpleStringSchema()) \\
+        ...     .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \\
+        ...     .build()
+        >>> hybrid_source = 
HybridSource.builder(file_source).add_source(kafka_source).build()
+    """
+
+    def __init__(self, j_hybrid_source: JavaObject):
+        super(HybridSource, self).__init__(j_hybrid_source)
+
+    @staticmethod
+    def builder(first_source: Source) -> 'HybridSourceBuilder':
+        JHybridSource = 
get_gateway().jvm.org.apache.flink.connector.base.source.hybrid.HybridSource
+        return 
HybridSourceBuilder(JHybridSource.builder(first_source.get_java_function()))
+
+
+class HybridSourceBuilder(object):
+
+    def __init__(self, j_hybrid_source_builder):
+        self._j_hybrid_source_builder = j_hybrid_source_builder
+
+    def add_source(self, source: Source) -> 'HybridSourceBuilder':
+        self._j_hybrid_source_builder.addSource(source.get_java_function())
+        return self
+
+    def build(self) -> 'HybridSource':
+        return HybridSource(self._j_hybrid_source_builder.build())

Reply via email to