This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new d3a3755a7ee [FLINK-32056][Python][Connector/Pulsar] Upgrade
flink-connector-pulsar used in flink-python to v4.0.0, which is compatible with
Flink 1.17.x and upwards
d3a3755a7ee is described below
commit d3a3755a7eef5708871580671169fd6bd2babf28
Author: Martijn Visser <[email protected]>
AuthorDate: Thu May 11 09:59:18 2023 +0200
[FLINK-32056][Python][Connector/Pulsar] Upgrade flink-connector-pulsar used
in flink-python to v4.0.0, which is compatible with Flink 1.17.x and upwards
This closes #22571.
---
.../reference/pyflink.datastream/connectors.rst | 3 -
flink-python/pom.xml | 2 +-
.../pyflink/datastream/connectors/__init__.py | 2 -
.../pyflink/datastream/connectors/pulsar.py | 158 +++------------------
.../datastream/connectors/tests/test_pulsar.py | 37 ++---
.../examples/datastream/connectors/pulsar.py | 12 +-
6 files changed, 33 insertions(+), 181 deletions(-)
diff --git a/flink-python/docs/reference/pyflink.datastream/connectors.rst
b/flink-python/docs/reference/pyflink.datastream/connectors.rst
index c0bce856baf..7b9ceb149ee 100644
--- a/flink-python/docs/reference/pyflink.datastream/connectors.rst
+++ b/flink-python/docs/reference/pyflink.datastream/connectors.rst
@@ -149,8 +149,6 @@ Pulsar Source
.. autosummary::
:toctree: api/
- PulsarDeserializationSchema
- SubscriptionType
StartCursor
StopCursor
PulsarSource
@@ -165,7 +163,6 @@ Pulsar Sink
.. autosummary::
:toctree: api/
- PulsarSerializationSchema
TopicRoutingMode
MessageDelayer
PulsarSink
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index a2b3519eccb..2fce2eeba8d 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -290,7 +290,7 @@ under the License.
<!-- Indirectly accessed in pyflink_gateway_server -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-pulsar</artifactId>
- <version>3.0.0-1.16</version>
+ <version>4.0.0-1.17</version>
<scope>test</scope>
</dependency>
diff --git a/flink-python/pyflink/datastream/connectors/__init__.py
b/flink-python/pyflink/datastream/connectors/__init__.py
index 02681d299a8..f8a29f60978 100644
--- a/flink-python/pyflink/datastream/connectors/__init__.py
+++ b/flink-python/pyflink/datastream/connectors/__init__.py
@@ -49,8 +49,6 @@ def _install():
from pyflink.datastream.connectors import pulsar
setattr(connectors, 'PulsarSource', pulsar.PulsarSource)
setattr(connectors, 'PulsarSourceBuilder', pulsar.PulsarSourceBuilder)
- setattr(connectors, 'PulsarDeserializationSchema',
pulsar.PulsarDeserializationSchema)
- setattr(connectors, 'SubscriptionType', pulsar.SubscriptionType)
setattr(connectors, 'StartCursor', pulsar.StartCursor)
setattr(connectors, 'StopCursor', pulsar.StopCursor)
diff --git a/flink-python/pyflink/datastream/connectors/pulsar.py
b/flink-python/pyflink/datastream/connectors/pulsar.py
index 68739821e6e..bb4b503e8cd 100644
--- a/flink-python/pyflink/datastream/connectors/pulsar.py
+++ b/flink-python/pyflink/datastream/connectors/pulsar.py
@@ -19,8 +19,8 @@ import warnings
from enum import Enum
from typing import Dict, Union, List
-from pyflink.common import DeserializationSchema, TypeInformation,
ExecutionConfig, \
- ConfigOptions, Duration, SerializationSchema, ConfigOption
+from pyflink.common import DeserializationSchema, ConfigOptions, Duration,
SerializationSchema, \
+ ConfigOption
from pyflink.datastream.connectors import Source, Sink, DeliveryGuarantee
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import load_java_class
@@ -29,13 +29,10 @@ from pyflink.util.java_utils import load_java_class
__all__ = [
'PulsarSource',
'PulsarSourceBuilder',
- 'PulsarDeserializationSchema',
- 'SubscriptionType',
'StartCursor',
'StopCursor',
'PulsarSink',
'PulsarSinkBuilder',
- 'PulsarSerializationSchema',
'MessageDelayer',
'TopicRoutingMode'
]
@@ -43,86 +40,6 @@ __all__ = [
# ---- PulsarSource ----
-
-class PulsarDeserializationSchema(object):
- """
- A schema bridge for deserializing the pulsar's Message into a flink
managed instance. We
- support both the pulsar's self managed schema and flink managed schema.
- """
-
- def __init__(self, _j_pulsar_deserialization_schema):
- self._j_pulsar_deserialization_schema =
_j_pulsar_deserialization_schema
-
- @staticmethod
- def flink_schema(deserialization_schema: DeserializationSchema) \
- -> 'PulsarDeserializationSchema':
- """
- Create a PulsarDeserializationSchema by using the flink's
DeserializationSchema. It would
- consume the pulsar message as byte array and decode the message by
using flink's logic.
- """
- JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
-
.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
- _j_pulsar_deserialization_schema =
JPulsarDeserializationSchema.flinkSchema(
- deserialization_schema._j_deserialization_schema)
- return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
-
- @staticmethod
- def flink_type_info(type_information: TypeInformation,
- execution_config: ExecutionConfig = None) ->
'PulsarDeserializationSchema':
- """
- Create a PulsarDeserializationSchema by using the given
TypeInformation. This method is
- only used for treating message that was written into pulsar by
TypeInformation.
- """
- JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
-
.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
- JExecutionConfig =
get_gateway().jvm.org.apache.flink.api.common.ExecutionConfig
- _j_execution_config = execution_config._j_execution_config \
- if execution_config is not None else JExecutionConfig()
- _j_pulsar_deserialization_schema =
JPulsarDeserializationSchema.flinkTypeInfo(
- type_information.get_java_type_info(), _j_execution_config)
- return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
-
-
-class SubscriptionType(Enum):
- """
- Types of subscription supported by Pulsar.
-
- :data: `Exclusive`:
-
- There can be only 1 consumer on the same topic with the same subscription
name.
-
- :data: `Shared`:
-
- Multiple consumer will be able to use the same subscription name and the
messages will be
- dispatched according to a round-robin rotation between the connected
consumers. In this mode,
- the consumption order is not guaranteed.
-
- :data: `Failover`:
-
- Multiple consumer will be able to use the same subscription name but only
1 consumer will
- receive the messages. If that consumer disconnects, one of the other
connected consumers will
- start receiving messages. In failover mode, the consumption ordering is
guaranteed. In case of
- partitioned topics, the ordering is guaranteed on a per-partition basis.
The partitions
- assignments will be split across the available consumers. On each
partition, at most one
- consumer will be active at a given point in time.
-
- :data: `Key_Shared`:
-
- Multiple consumer will be able to use the same subscription and all
messages with the same key
- will be dispatched to only one consumer. Use ordering_key to overwrite the
message key for
- message ordering.
- """
-
- Exclusive = 0,
- Shared = 1,
- Failover = 2,
- Key_Shared = 3
-
- def _to_j_subscription_type(self):
- JSubscriptionType =
get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
- return getattr(JSubscriptionType, self.name)
-
-
class StartCursor(object):
"""
A factory class for users to specify the start position of a pulsar
subscription.
@@ -311,8 +228,7 @@ class PulsarSource(Source):
... .set_service_url(get_service_url()) \\
... .set_admin_url(get_admin_url()) \\
... .set_subscription_name("test") \\
- ... .set_deserialization_schema(
- ...
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
+ ... .set_deserialization_schema(SimpleStringSchema()) \\
... .set_bounded_stop_cursor(StopCursor.default_stop_cursor()) \\
... .build()
@@ -346,8 +262,7 @@ class PulsarSourceBuilder(object):
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_subscription_name("flink-source-1") \\
... .set_topics([TOPIC1, TOPIC2]) \\
- ... .set_deserialization_schema(
- ...
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
+ ... .set_deserialization_schema(SimpleStringSchema()) \\
... .build()
The service url, admin url, subscription name, topics to consume, and the
record deserializer
@@ -372,8 +287,7 @@ class PulsarSourceBuilder(object):
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_subscription_name("flink-source-1") \\
... .set_topics([TOPIC1, TOPIC2]) \\
- ... .set_deserialization_schema(
- ...
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
+ ... .set_deserialization_schema(SimpleStringSchema()) \\
...
.set_bounded_stop_cursor(StopCursor.at_publish_time(int(time.time() * 1000)))
... .build()
"""
@@ -404,16 +318,6 @@ class PulsarSourceBuilder(object):
self._j_pulsar_source_builder.setSubscriptionName(subscription_name)
return self
- def set_subscription_type(self, subscription_type: SubscriptionType) ->
'PulsarSourceBuilder':
- """
- SubscriptionType is the consuming behavior for pulsar, we would
generator different split
- by the given subscription type. Please take some time to consider
which subscription type
- matches your application best. Default is SubscriptionType.Shared.
- """
- self._j_pulsar_source_builder.setSubscriptionType(
- subscription_type._to_j_subscription_type())
- return self
-
def set_topics(self, topics: Union[str, List[str]]) ->
'PulsarSourceBuilder':
"""
Set a pulsar topic list for flink source. Some topic may not exist
currently, consuming this
@@ -483,18 +387,18 @@ class PulsarSourceBuilder(object):
self._j_pulsar_source_builder.setBoundedStopCursor(stop_cursor._j_stop_cursor)
return self
- def set_deserialization_schema(self,
- pulsar_deserialization_schema:
PulsarDeserializationSchema) \
+ def set_deserialization_schema(self, deserialization_schema:
DeserializationSchema) \
-> 'PulsarSourceBuilder':
"""
- DeserializationSchema is required for getting the Schema for
deserialize message from
- pulsar and getting the TypeInformation for message serialization in
flink.
+ Sets the :class:`~pyflink.common.serialization.DeserializationSchema`
for deserializing the
+ value of Pulsars message.
- We have defined a set of implementations, using
PulsarDeserializationSchema#flink_type_info
- or PulsarDeserializationSchema#flink_schema for creating the desired
schema.
+ :param deserialization_schema: the :class:`DeserializationSchema` to
use for
+ deserialization.
+ :return: this PulsarSourceBuilder.
"""
self._j_pulsar_source_builder.setDeserializationSchema(
- pulsar_deserialization_schema._j_pulsar_deserialization_schema)
+ deserialization_schema._j_deserialization_schema)
return self
def set_config(self, key: Union[str, ConfigOption], value) ->
'PulsarSourceBuilder':
@@ -543,29 +447,6 @@ class PulsarSourceBuilder(object):
# ---- PulsarSink ----
-
-class PulsarSerializationSchema(object):
- """
- The serialization schema for how to serialize records into Pulsar.
- """
-
- def __init__(self, _j_pulsar_serialization_schema):
- self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
-
- @staticmethod
- def flink_schema(serialization_schema: SerializationSchema) \
- -> 'PulsarSerializationSchema':
- """
- Create a PulsarSerializationSchema by using the flink's
SerializationSchema. It would
- serialize the message into byte array and send it to Pulsar with
Schema#BYTES.
- """
- JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \
- .connector.pulsar.sink.writer.serializer.PulsarSerializationSchema
- _j_pulsar_serialization_schema =
JPulsarSerializationSchema.flinkSchema(
- serialization_schema._j_serialization_schema)
- return PulsarSerializationSchema(_j_pulsar_serialization_schema)
-
-
class TopicRoutingMode(Enum):
"""
The routing policy for choosing the desired topic by the given message.
@@ -642,8 +523,7 @@ class PulsarSink(Sink):
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_topics(topic) \\
- ... .set_serialization_schema(
- ...
PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+ ... .set_serialization_schema(SimpleStringSchema()) \\
... .build()
The sink supports all delivery guarantees described by DeliveryGuarantee.
@@ -693,8 +573,7 @@ class PulsarSinkBuilder(object):
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_topics([TOPIC1, TOPIC2]) \\
- ... .set_serialization_schema(
- ...
PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+ ... .set_serialization_schema(SimpleStringSchema()) \\
... .build()
The service url, admin url, and the record serializer are required fields
that must be set. If
@@ -713,8 +592,7 @@ class PulsarSinkBuilder(object):
... .set_service_url(PULSAR_BROKER_URL) \\
... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
... .set_topics([TOPIC1, TOPIC2]) \\
- ... .set_serialization_schema(
- ...
PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+ ... .set_serialization_schema(SimpleStringSchema()) \\
... .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
... .build()
"""
@@ -780,13 +658,13 @@ class PulsarSinkBuilder(object):
self._j_pulsar_sink_builder.setTopicRouter(j_topic_router)
return self
- def set_serialization_schema(self, pulsar_serialization_schema:
PulsarSerializationSchema) \
+ def set_serialization_schema(self, serialization_schema:
SerializationSchema) \
-> 'PulsarSinkBuilder':
"""
- Sets the PulsarSerializationSchema that transforms incoming records to
bytes.
+ Sets the SerializationSchema of the PulsarSinkBuilder.
"""
self._j_pulsar_sink_builder.setSerializationSchema(
- pulsar_serialization_schema._j_pulsar_serialization_schema)
+ serialization_schema._j_serialization_schema)
return self
def delay_sending_message(self, message_delayer: MessageDelayer) ->
'PulsarSinkBuilder':
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_pulsar.py
b/flink-python/pyflink/datastream/connectors/tests/test_pulsar.py
index c82d9a14993..64c625ef0d1 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_pulsar.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_pulsar.py
@@ -17,9 +17,8 @@
################################################################################
from pyflink.common import WatermarkStrategy, SimpleStringSchema, Types,
ConfigOptions, Duration
from pyflink.datastream.connectors import DeliveryGuarantee
-from pyflink.datastream.connectors.pulsar import PulsarSerializationSchema,
TopicRoutingMode, \
- MessageDelayer, PulsarSink, PulsarSource, StartCursor,
PulsarDeserializationSchema, \
- StopCursor, SubscriptionType
+from pyflink.datastream.connectors.pulsar import TopicRoutingMode,
MessageDelayer, PulsarSink, \
+ PulsarSource, StartCursor, StopCursor
from pyflink.testing.test_case_utils import PyFlinkUTTestCase
from pyflink.util.java_utils import get_field_value, is_instance_of
@@ -36,11 +35,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
.set_unbounded_stop_cursor(StopCursor.never()) \
.set_bounded_stop_cursor(StopCursor.at_publish_time(22)) \
.set_subscription_name('ff') \
- .set_subscription_type(SubscriptionType.Exclusive) \
- .set_deserialization_schema(
- PulsarDeserializationSchema.flink_type_info(Types.STRING())) \
- .set_deserialization_schema(
-
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_deserialization_schema(SimpleStringSchema()) \
.set_config(TEST_OPTION_NAME, True) \
.set_properties({'pulsar.source.autoCommitCursorInterval':
'1000'}) \
.build()
@@ -68,11 +63,6 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
ConfigOptions.key('pulsar.consumer.subscriptionName')
.string_type()
.no_default_value()._j_config_option), 'ff')
- self.assertEqual(
- configuration.getString(
- ConfigOptions.key('pulsar.consumer.subscriptionType')
- .string_type()
- .no_default_value()._j_config_option),
SubscriptionType.Exclusive.name)
test_option =
ConfigOptions.key(TEST_OPTION_NAME).boolean_type().no_default_value()
self.assertEqual(
configuration.getBoolean(
@@ -89,8 +79,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
.set_admin_url('http://localhost:8080') \
.set_topics(['ada', 'beta']) \
.set_subscription_name('ff') \
- .set_deserialization_schema(
-
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_deserialization_schema(SimpleStringSchema()) \
.build()
def test_source_set_topics_pattern(self):
@@ -99,8 +88,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
.set_admin_url('http://localhost:8080') \
.set_topic_pattern('ada.*') \
.set_subscription_name('ff') \
- .set_deserialization_schema(
-
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_deserialization_schema(SimpleStringSchema()) \
.build()
def test_source_deprecated_method(self):
@@ -110,8 +98,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
.set_service_url('pulsar://localhost:6650') \
.set_admin_url('http://localhost:8080') \
.set_topic_pattern('ada.*') \
- .set_deserialization_schema(
- PulsarDeserializationSchema.flink_type_info(Types.STRING())) \
+ .set_deserialization_schema(SimpleStringSchema()) \
.set_unbounded_stop_cursor(StopCursor.at_publish_time(4444)) \
.set_subscription_name('ff') \
.set_config(test_option, True) \
@@ -133,8 +120,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
.set_admin_url('http://localhost:8080') \
.set_topics('ada') \
.set_subscription_name('ff') \
- .set_deserialization_schema(
-
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_deserialization_schema(SimpleStringSchema()) \
.set_start_cursor(StartCursor.from_publish_time(2)) \
.set_bounded_stop_cursor(StopCursor.at_publish_time(14)) \
.set_bounded_stop_cursor(StopCursor.after_publish_time(24)) \
@@ -146,8 +132,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
.set_admin_url('http://localhost:8080') \
.set_topics('ada') \
.set_subscription_name('ff') \
- .set_deserialization_schema(
-
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_deserialization_schema(SimpleStringSchema()) \
.set_bounded_stop_cursor(StopCursor.after_event_time(14)) \
.set_bounded_stop_cursor(StopCursor.at_event_time(24)) \
.build()
@@ -162,8 +147,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
.set_admin_url('http://localhost:8080') \
.set_producer_name('fo') \
.set_topics('ada') \
- .set_serialization_schema(
- PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_serialization_schema(SimpleStringSchema()) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.set_topic_routing_mode(TopicRoutingMode.ROUND_ROBIN) \
.delay_sending_message(MessageDelayer.fixed(Duration.of_seconds(12))) \
@@ -232,6 +216,5 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
.set_service_url('pulsar://localhost:6650') \
.set_admin_url('http://localhost:8080') \
.set_topics(['ada', 'beta']) \
- .set_serialization_schema(
- PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_serialization_schema(SimpleStringSchema()) \
.build()
diff --git a/flink-python/pyflink/examples/datastream/connectors/pulsar.py
b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
index 8e0d60879f7..f35ed99d196 100644
--- a/flink-python/pyflink/examples/datastream/connectors/pulsar.py
+++ b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
@@ -21,9 +21,8 @@ import sys
from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors.pulsar import PulsarSource, PulsarSink,\
- PulsarSerializationSchema, StartCursor, StopCursor, SubscriptionType, \
- PulsarDeserializationSchema, DeliveryGuarantee, TopicRoutingMode
+from pyflink.datastream.connectors.pulsar import PulsarSource, PulsarSink,
StartCursor, \
+ StopCursor, DeliveryGuarantee, TopicRoutingMode
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
@@ -43,9 +42,7 @@ if __name__ == '__main__':
.set_start_cursor(StartCursor.latest()) \
.set_unbounded_stop_cursor(StopCursor.never()) \
.set_subscription_name('pyflink_subscription') \
- .set_subscription_type(SubscriptionType.Exclusive) \
- .set_deserialization_schema(
- PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_deserialization_schema(SimpleStringSchema()) \
.set_config('pulsar.source.enableAutoAcknowledgeMessage', True) \
.set_properties({'pulsar.source.autoCommitCursorInterval': '1000'}) \
.build()
@@ -59,8 +56,7 @@ if __name__ == '__main__':
.set_admin_url(ADMIN_URL) \
.set_producer_name('pyflink_producer') \
.set_topics('beta') \
- .set_serialization_schema(
- PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_serialization_schema(SimpleStringSchema()) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.set_topic_routing_mode(TopicRoutingMode.ROUND_ROBIN) \
.set_config('pulsar.producer.maxPendingMessages', 1000) \