This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new d3a3755a7ee [FLINK-32056][Python][Connector/Pulsar] Upgrade 
flink-connector-pulsar used in flink-python to v4.0.0, which is compatible with 
Flink 1.17.x and upwards
d3a3755a7ee is described below

commit d3a3755a7eef5708871580671169fd6bd2babf28
Author: Martijn Visser <[email protected]>
AuthorDate: Thu May 11 09:59:18 2023 +0200

    [FLINK-32056][Python][Connector/Pulsar] Upgrade flink-connector-pulsar used 
in flink-python to v4.0.0, which is compatible with Flink 1.17.x and upwards
    
    This closes #22571.
---
 .../reference/pyflink.datastream/connectors.rst    |   3 -
 flink-python/pom.xml                               |   2 +-
 .../pyflink/datastream/connectors/__init__.py      |   2 -
 .../pyflink/datastream/connectors/pulsar.py        | 158 +++------------------
 .../datastream/connectors/tests/test_pulsar.py     |  37 ++---
 .../examples/datastream/connectors/pulsar.py       |  12 +-
 6 files changed, 33 insertions(+), 181 deletions(-)

diff --git a/flink-python/docs/reference/pyflink.datastream/connectors.rst 
b/flink-python/docs/reference/pyflink.datastream/connectors.rst
index c0bce856baf..7b9ceb149ee 100644
--- a/flink-python/docs/reference/pyflink.datastream/connectors.rst
+++ b/flink-python/docs/reference/pyflink.datastream/connectors.rst
@@ -149,8 +149,6 @@ Pulsar Source
 .. autosummary::
     :toctree: api/
 
-    PulsarDeserializationSchema
-    SubscriptionType
     StartCursor
     StopCursor
     PulsarSource
@@ -165,7 +163,6 @@ Pulsar Sink
 .. autosummary::
     :toctree: api/
 
-    PulsarSerializationSchema
     TopicRoutingMode
     MessageDelayer
     PulsarSink
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index a2b3519eccb..2fce2eeba8d 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -290,7 +290,7 @@ under the License.
                        <!-- Indirectly accessed in pyflink_gateway_server -->
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-sql-connector-pulsar</artifactId>
-                       <version>3.0.0-1.16</version>
+                       <version>4.0.0-1.17</version>
                        <scope>test</scope>
                </dependency>
 
diff --git a/flink-python/pyflink/datastream/connectors/__init__.py 
b/flink-python/pyflink/datastream/connectors/__init__.py
index 02681d299a8..f8a29f60978 100644
--- a/flink-python/pyflink/datastream/connectors/__init__.py
+++ b/flink-python/pyflink/datastream/connectors/__init__.py
@@ -49,8 +49,6 @@ def _install():
     from pyflink.datastream.connectors import pulsar
     setattr(connectors, 'PulsarSource', pulsar.PulsarSource)
     setattr(connectors, 'PulsarSourceBuilder', pulsar.PulsarSourceBuilder)
-    setattr(connectors, 'PulsarDeserializationSchema', 
pulsar.PulsarDeserializationSchema)
-    setattr(connectors, 'SubscriptionType', pulsar.SubscriptionType)
     setattr(connectors, 'StartCursor', pulsar.StartCursor)
     setattr(connectors, 'StopCursor', pulsar.StopCursor)
 
diff --git a/flink-python/pyflink/datastream/connectors/pulsar.py 
b/flink-python/pyflink/datastream/connectors/pulsar.py
index 68739821e6e..bb4b503e8cd 100644
--- a/flink-python/pyflink/datastream/connectors/pulsar.py
+++ b/flink-python/pyflink/datastream/connectors/pulsar.py
@@ -19,8 +19,8 @@ import warnings
 from enum import Enum
 from typing import Dict, Union, List
 
-from pyflink.common import DeserializationSchema, TypeInformation, 
ExecutionConfig, \
-    ConfigOptions, Duration, SerializationSchema, ConfigOption
+from pyflink.common import DeserializationSchema, ConfigOptions, Duration, 
SerializationSchema, \
+    ConfigOption
 from pyflink.datastream.connectors import Source, Sink, DeliveryGuarantee
 from pyflink.java_gateway import get_gateway
 from pyflink.util.java_utils import load_java_class
@@ -29,13 +29,10 @@ from pyflink.util.java_utils import load_java_class
 __all__ = [
     'PulsarSource',
     'PulsarSourceBuilder',
-    'PulsarDeserializationSchema',
-    'SubscriptionType',
     'StartCursor',
     'StopCursor',
     'PulsarSink',
     'PulsarSinkBuilder',
-    'PulsarSerializationSchema',
     'MessageDelayer',
     'TopicRoutingMode'
 ]
@@ -43,86 +40,6 @@ __all__ = [
 
 # ---- PulsarSource ----
 
-
-class PulsarDeserializationSchema(object):
-    """
-    A schema bridge for deserializing the pulsar's Message into a flink 
managed instance. We
-    support both the pulsar's self managed schema and flink managed schema.
-    """
-
-    def __init__(self, _j_pulsar_deserialization_schema):
-        self._j_pulsar_deserialization_schema = 
_j_pulsar_deserialization_schema
-
-    @staticmethod
-    def flink_schema(deserialization_schema: DeserializationSchema) \
-            -> 'PulsarDeserializationSchema':
-        """
-        Create a PulsarDeserializationSchema by using the flink's 
DeserializationSchema. It would
-        consume the pulsar message as byte array and decode the message by 
using flink's logic.
-        """
-        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
-            
.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
-        _j_pulsar_deserialization_schema = 
JPulsarDeserializationSchema.flinkSchema(
-            deserialization_schema._j_deserialization_schema)
-        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
-
-    @staticmethod
-    def flink_type_info(type_information: TypeInformation,
-                        execution_config: ExecutionConfig = None) -> 
'PulsarDeserializationSchema':
-        """
-        Create a PulsarDeserializationSchema by using the given 
TypeInformation. This method is
-        only used for treating message that was written into pulsar by 
TypeInformation.
-        """
-        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
-            
.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
-        JExecutionConfig = 
get_gateway().jvm.org.apache.flink.api.common.ExecutionConfig
-        _j_execution_config = execution_config._j_execution_config \
-            if execution_config is not None else JExecutionConfig()
-        _j_pulsar_deserialization_schema = 
JPulsarDeserializationSchema.flinkTypeInfo(
-            type_information.get_java_type_info(), _j_execution_config)
-        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
-
-
-class SubscriptionType(Enum):
-    """
-    Types of subscription supported by Pulsar.
-
-    :data: `Exclusive`:
-
-    There can be only 1 consumer on the same topic with the same subscription 
name.
-
-    :data: `Shared`:
-
-    Multiple consumer will be able to use the same subscription name and the 
messages will be
-    dispatched according to a round-robin rotation between the connected 
consumers. In this mode,
-    the consumption order is not guaranteed.
-
-    :data: `Failover`:
-
-    Multiple consumer will be able to use the same subscription name but only 
1 consumer will
-    receive the messages. If that consumer disconnects, one of the other 
connected consumers will
-    start receiving messages. In failover mode, the consumption ordering is 
guaranteed. In case of
-    partitioned topics, the ordering is guaranteed on a per-partition basis. 
The partitions
-    assignments will be split across the available consumers. On each 
partition, at most one
-    consumer will be active at a given point in time.
-
-    :data: `Key_Shared`:
-
-    Multiple consumer will be able to use the same subscription and all 
messages with the same key
-    will be dispatched to only one consumer. Use ordering_key to overwrite the 
message key for
-    message ordering.
-    """
-
-    Exclusive = 0,
-    Shared = 1,
-    Failover = 2,
-    Key_Shared = 3
-
-    def _to_j_subscription_type(self):
-        JSubscriptionType = 
get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
-        return getattr(JSubscriptionType, self.name)
-
-
 class StartCursor(object):
     """
     A factory class for users to specify the start position of a pulsar 
subscription.
@@ -311,8 +228,7 @@ class PulsarSource(Source):
         ...     .set_service_url(get_service_url()) \\
         ...     .set_admin_url(get_admin_url()) \\
         ...     .set_subscription_name("test") \\
-        ...     .set_deserialization_schema(
-        ...         
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .set_deserialization_schema(SimpleStringSchema()) \\
         ...     .set_bounded_stop_cursor(StopCursor.default_stop_cursor()) \\
         ...     .build()
 
@@ -346,8 +262,7 @@ class PulsarSourceBuilder(object):
         ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
         ...     .set_subscription_name("flink-source-1") \\
         ...     .set_topics([TOPIC1, TOPIC2]) \\
-        ...     .set_deserialization_schema(
-        ...         
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .set_deserialization_schema(SimpleStringSchema()) \\
         ...     .build()
 
     The service url, admin url, subscription name, topics to consume, and the 
record deserializer
@@ -372,8 +287,7 @@ class PulsarSourceBuilder(object):
         ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
         ...     .set_subscription_name("flink-source-1") \\
         ...     .set_topics([TOPIC1, TOPIC2]) \\
-        ...     .set_deserialization_schema(
-        ...         
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .set_deserialization_schema(SimpleStringSchema()) \\
         ...     
.set_bounded_stop_cursor(StopCursor.at_publish_time(int(time.time() * 1000)))
         ...     .build()
     """
@@ -404,16 +318,6 @@ class PulsarSourceBuilder(object):
         self._j_pulsar_source_builder.setSubscriptionName(subscription_name)
         return self
 
-    def set_subscription_type(self, subscription_type: SubscriptionType) -> 
'PulsarSourceBuilder':
-        """
-        SubscriptionType is the consuming behavior for pulsar, we would 
generator different split
-        by the given subscription type. Please take some time to consider 
which subscription type
-        matches your application best. Default is SubscriptionType.Shared.
-        """
-        self._j_pulsar_source_builder.setSubscriptionType(
-            subscription_type._to_j_subscription_type())
-        return self
-
     def set_topics(self, topics: Union[str, List[str]]) -> 
'PulsarSourceBuilder':
         """
         Set a pulsar topic list for flink source. Some topic may not exist 
currently, consuming this
@@ -483,18 +387,18 @@ class PulsarSourceBuilder(object):
         
self._j_pulsar_source_builder.setBoundedStopCursor(stop_cursor._j_stop_cursor)
         return self
 
-    def set_deserialization_schema(self,
-                                   pulsar_deserialization_schema: 
PulsarDeserializationSchema) \
+    def set_deserialization_schema(self, deserialization_schema: 
DeserializationSchema) \
             -> 'PulsarSourceBuilder':
         """
-        DeserializationSchema is required for getting the Schema for 
deserialize message from
-        pulsar and getting the TypeInformation for message serialization in 
flink.
+        Sets the :class:`~pyflink.common.serialization.DeserializationSchema` 
for deserializing the
+        value of Pulsars message.
 
-        We have defined a set of implementations, using 
PulsarDeserializationSchema#flink_type_info
-        or PulsarDeserializationSchema#flink_schema for creating the desired 
schema.
+        :param deserialization_schema: the :class:`DeserializationSchema` to 
use for
+            deserialization.
+        :return: this PulsarSourceBuilder.
         """
         self._j_pulsar_source_builder.setDeserializationSchema(
-            pulsar_deserialization_schema._j_pulsar_deserialization_schema)
+            deserialization_schema._j_deserialization_schema)
         return self
 
     def set_config(self, key: Union[str, ConfigOption], value) -> 
'PulsarSourceBuilder':
@@ -543,29 +447,6 @@ class PulsarSourceBuilder(object):
 
 # ---- PulsarSink ----
 
-
-class PulsarSerializationSchema(object):
-    """
-    The serialization schema for how to serialize records into Pulsar.
-    """
-
-    def __init__(self, _j_pulsar_serialization_schema):
-        self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
-
-    @staticmethod
-    def flink_schema(serialization_schema: SerializationSchema) \
-            -> 'PulsarSerializationSchema':
-        """
-        Create a PulsarSerializationSchema by using the flink's 
SerializationSchema. It would
-        serialize the message into byte array and send it to Pulsar with 
Schema#BYTES.
-        """
-        JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \
-            .connector.pulsar.sink.writer.serializer.PulsarSerializationSchema
-        _j_pulsar_serialization_schema = 
JPulsarSerializationSchema.flinkSchema(
-            serialization_schema._j_serialization_schema)
-        return PulsarSerializationSchema(_j_pulsar_serialization_schema)
-
-
 class TopicRoutingMode(Enum):
     """
     The routing policy for choosing the desired topic by the given message.
@@ -642,8 +523,7 @@ class PulsarSink(Sink):
         ...     .set_service_url(PULSAR_BROKER_URL) \\
         ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
         ...     .set_topics(topic) \\
-        ...     .set_serialization_schema(
-        ...         
PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .set_serialization_schema(SimpleStringSchema()) \\
         ...     .build()
 
     The sink supports all delivery guarantees described by DeliveryGuarantee.
@@ -693,8 +573,7 @@ class PulsarSinkBuilder(object):
         ...     .set_service_url(PULSAR_BROKER_URL) \\
         ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
         ...     .set_topics([TOPIC1, TOPIC2]) \\
-        ...     .set_serialization_schema(
-        ...         
PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .set_serialization_schema(SimpleStringSchema()) \\
         ...     .build()
 
     The service url, admin url, and the record serializer are required fields 
that must be set. If
@@ -713,8 +592,7 @@ class PulsarSinkBuilder(object):
         ...     .set_service_url(PULSAR_BROKER_URL) \\
         ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
         ...     .set_topics([TOPIC1, TOPIC2]) \\
-        ...     .set_serialization_schema(
-        ...         
PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .set_serialization_schema(SimpleStringSchema()) \\
         ...     .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
         ...     .build()
     """
@@ -780,13 +658,13 @@ class PulsarSinkBuilder(object):
         self._j_pulsar_sink_builder.setTopicRouter(j_topic_router)
         return self
 
-    def set_serialization_schema(self, pulsar_serialization_schema: 
PulsarSerializationSchema) \
+    def set_serialization_schema(self, serialization_schema: 
SerializationSchema) \
             -> 'PulsarSinkBuilder':
         """
-        Sets the PulsarSerializationSchema that transforms incoming records to 
bytes.
+        Sets the SerializationSchema of the PulsarSinkBuilder.
         """
         self._j_pulsar_sink_builder.setSerializationSchema(
-            pulsar_serialization_schema._j_pulsar_serialization_schema)
+            serialization_schema._j_serialization_schema)
         return self
 
     def delay_sending_message(self, message_delayer: MessageDelayer) -> 
'PulsarSinkBuilder':
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_pulsar.py 
b/flink-python/pyflink/datastream/connectors/tests/test_pulsar.py
index c82d9a14993..64c625ef0d1 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_pulsar.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_pulsar.py
@@ -17,9 +17,8 @@
 
################################################################################
 from pyflink.common import WatermarkStrategy, SimpleStringSchema, Types, 
ConfigOptions, Duration
 from pyflink.datastream.connectors import DeliveryGuarantee
-from pyflink.datastream.connectors.pulsar import PulsarSerializationSchema, 
TopicRoutingMode, \
-    MessageDelayer, PulsarSink, PulsarSource, StartCursor, 
PulsarDeserializationSchema, \
-    StopCursor, SubscriptionType
+from pyflink.datastream.connectors.pulsar import TopicRoutingMode, 
MessageDelayer, PulsarSink, \
+    PulsarSource, StartCursor, StopCursor
 from pyflink.testing.test_case_utils import PyFlinkUTTestCase
 from pyflink.util.java_utils import get_field_value, is_instance_of
 
@@ -36,11 +35,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
             .set_unbounded_stop_cursor(StopCursor.never()) \
             .set_bounded_stop_cursor(StopCursor.at_publish_time(22)) \
             .set_subscription_name('ff') \
-            .set_subscription_type(SubscriptionType.Exclusive) \
-            .set_deserialization_schema(
-                PulsarDeserializationSchema.flink_type_info(Types.STRING())) \
-            .set_deserialization_schema(
-                
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+            .set_deserialization_schema(SimpleStringSchema()) \
             .set_config(TEST_OPTION_NAME, True) \
             .set_properties({'pulsar.source.autoCommitCursorInterval': 
'1000'}) \
             .build()
@@ -68,11 +63,6 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
                 ConfigOptions.key('pulsar.consumer.subscriptionName')
                 .string_type()
                 .no_default_value()._j_config_option), 'ff')
-        self.assertEqual(
-            configuration.getString(
-                ConfigOptions.key('pulsar.consumer.subscriptionType')
-                .string_type()
-                .no_default_value()._j_config_option), 
SubscriptionType.Exclusive.name)
         test_option = 
ConfigOptions.key(TEST_OPTION_NAME).boolean_type().no_default_value()
         self.assertEqual(
             configuration.getBoolean(
@@ -89,8 +79,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
             .set_admin_url('http://localhost:8080') \
             .set_topics(['ada', 'beta']) \
             .set_subscription_name('ff') \
-            .set_deserialization_schema(
-                
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+            .set_deserialization_schema(SimpleStringSchema()) \
             .build()
 
     def test_source_set_topics_pattern(self):
@@ -99,8 +88,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
             .set_admin_url('http://localhost:8080') \
             .set_topic_pattern('ada.*') \
             .set_subscription_name('ff') \
-            .set_deserialization_schema(
-                
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+            .set_deserialization_schema(SimpleStringSchema()) \
             .build()
 
     def test_source_deprecated_method(self):
@@ -110,8 +98,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
             .set_service_url('pulsar://localhost:6650') \
             .set_admin_url('http://localhost:8080') \
             .set_topic_pattern('ada.*') \
-            .set_deserialization_schema(
-                PulsarDeserializationSchema.flink_type_info(Types.STRING())) \
+            .set_deserialization_schema(SimpleStringSchema()) \
             .set_unbounded_stop_cursor(StopCursor.at_publish_time(4444)) \
             .set_subscription_name('ff') \
             .set_config(test_option, True) \
@@ -133,8 +120,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
             .set_admin_url('http://localhost:8080') \
             .set_topics('ada') \
             .set_subscription_name('ff') \
-            .set_deserialization_schema(
-                
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+            .set_deserialization_schema(SimpleStringSchema()) \
             .set_start_cursor(StartCursor.from_publish_time(2)) \
             .set_bounded_stop_cursor(StopCursor.at_publish_time(14)) \
             .set_bounded_stop_cursor(StopCursor.after_publish_time(24)) \
@@ -146,8 +132,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
             .set_admin_url('http://localhost:8080') \
             .set_topics('ada') \
             .set_subscription_name('ff') \
-            .set_deserialization_schema(
-                
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+            .set_deserialization_schema(SimpleStringSchema()) \
             .set_bounded_stop_cursor(StopCursor.after_event_time(14)) \
             .set_bounded_stop_cursor(StopCursor.at_event_time(24)) \
             .build()
@@ -162,8 +147,7 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
             .set_admin_url('http://localhost:8080') \
             .set_producer_name('fo') \
             .set_topics('ada') \
-            .set_serialization_schema(
-                PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
+            .set_serialization_schema(SimpleStringSchema()) \
             .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
             .set_topic_routing_mode(TopicRoutingMode.ROUND_ROBIN) \
             
.delay_sending_message(MessageDelayer.fixed(Duration.of_seconds(12))) \
@@ -232,6 +216,5 @@ class FlinkPulsarTest(PyFlinkUTTestCase):
             .set_service_url('pulsar://localhost:6650') \
             .set_admin_url('http://localhost:8080') \
             .set_topics(['ada', 'beta']) \
-            .set_serialization_schema(
-                PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
+            .set_serialization_schema(SimpleStringSchema()) \
             .build()
diff --git a/flink-python/pyflink/examples/datastream/connectors/pulsar.py 
b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
index 8e0d60879f7..f35ed99d196 100644
--- a/flink-python/pyflink/examples/datastream/connectors/pulsar.py
+++ b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
@@ -21,9 +21,8 @@ import sys
 
 from pyflink.common import SimpleStringSchema, WatermarkStrategy
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors.pulsar import PulsarSource, PulsarSink,\
-    PulsarSerializationSchema, StartCursor, StopCursor, SubscriptionType, \
-    PulsarDeserializationSchema, DeliveryGuarantee, TopicRoutingMode
+from pyflink.datastream.connectors.pulsar import PulsarSource, PulsarSink, 
StartCursor, \
+    StopCursor, DeliveryGuarantee, TopicRoutingMode
 
 if __name__ == '__main__':
     logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")
@@ -43,9 +42,7 @@ if __name__ == '__main__':
         .set_start_cursor(StartCursor.latest()) \
         .set_unbounded_stop_cursor(StopCursor.never()) \
         .set_subscription_name('pyflink_subscription') \
-        .set_subscription_type(SubscriptionType.Exclusive) \
-        .set_deserialization_schema(
-            PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+        .set_deserialization_schema(SimpleStringSchema()) \
         .set_config('pulsar.source.enableAutoAcknowledgeMessage', True) \
         .set_properties({'pulsar.source.autoCommitCursorInterval': '1000'}) \
         .build()
@@ -59,8 +56,7 @@ if __name__ == '__main__':
         .set_admin_url(ADMIN_URL) \
         .set_producer_name('pyflink_producer') \
         .set_topics('beta') \
-        .set_serialization_schema(
-            PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
+        .set_serialization_schema(SimpleStringSchema()) \
         .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
         .set_topic_routing_mode(TopicRoutingMode.ROUND_ROBIN) \
         .set_config('pulsar.producer.maxPendingMessages', 1000) \

Reply via email to