deadwind4 commented on a change in pull request #18388:
URL: https://github.com/apache/flink/pull/18388#discussion_r790406471



##########
File path: flink-python/pyflink/datastream/connectors.py
##########
@@ -1079,6 +1085,320 @@ def for_row_format(base_path: str, encoder: Encoder) -> 
'FileSink.RowFormatBuild
             JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
 
 
+class PulsarDeserializationSchema(object):
+    """
+    A schema bridge for deserializing the pulsar's Message into a flink 
managed instance. We
+    support both the pulsar's self managed schema and flink managed schema.
+    """
+
+    def __init__(self, _j_pulsar_deserialization_schema):
+        self._j_pulsar_deserialization_schema = 
_j_pulsar_deserialization_schema
+
+    @staticmethod
+    def flink_schema(deserialization_schema: DeserializationSchema) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the flink's 
DeserializationSchema. It would
+        consume the pulsar message as byte array and decode the message by 
using flink's logic.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            
.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_pulsar_deserialization_schema = 
JPulsarDeserializationSchema.flinkSchema(
+            deserialization_schema._j_deserialization_schema)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+    @staticmethod
+    def flink_type_info(type_information: TypeInformation, execution_config: 
ExecutionConfig) \
+            -> 'PulsarDeserializationSchema':
+        """
+        Create a PulsarDeserializationSchema by using the given 
TypeInformation. This method is
+        only used for treating message that was written into pulsar by 
TypeInformation.
+        """
+        JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
+            
.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
+        _j_execution_config = execution_config._j_execution_config \
+            if execution_config is not None else None
+        _j_pulsar_deserialization_schema = 
JPulsarDeserializationSchema.flinkTypeInfo(
+            type_information.get_java_type_info(), _j_execution_config)
+        return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
+
+
+class SubscriptionType(Enum):
+    """
+    Types of subscription supported by Pulsar.
+
+    :data: `Exclusive`:
+
+    There can be only 1 consumer on the same topic with the same subscription 
name.
+
+    :data: `Shared`:
+
+    Multiple consumer will be able to use the same subscription name and the 
messages will be
+    dispatched according to a round-robin rotation between the connected 
consumers. In this mode,
+    the consumption order is not guaranteed.
+
+    :data: `Failover`:
+
+    Multiple consumer will be able to use the same subscription name but only 
1 consumer will
+    receive the messages. If that consumer disconnects, one of the other 
connected consumers will
+    start receiving messages. In failover mode, the consumption ordering is 
guaranteed. In case of
+    partitioned topics, the ordering is guaranteed on a per-partition basis. 
The partitions
+    assignments will be split across the available consumers. On each 
partition, at most one
+    consumer will be active at a given point in time.
+
+    :data: `Key_Shared`:
+
+    Multiple consumer will be able to use the same subscription and all 
messages with the same key
+    will be dispatched to only one consumer. Use ordering_key to overwrite the 
message key for
+    message ordering.
+    """
+
+    Exclusive = 0,
+    Shared = 1,
+    Failover = 2,
+    Key_Shared = 3
+
+    def _to_j_subscription_type(self):
+        JSubscriptionType = 
get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType
+        return getattr(JSubscriptionType, self.name)
+
+
+class StartCursor(object):
+    """
+    A interface for users to specify the start position of a pulsar 
subscription.
+    Since it would be serialized into split.
+    The implementation for this interface should be well considered.
+    I don't recommend adding extra internal state for this implementation.
+
+    This class would be used only for SubscriptionType.Exclusive and 
SubscriptionType.Failover.
+    """
+
+    def __init__(self, _j_start_cursor):
+        self._j_start_cursor = _j_start_cursor
+
+    @staticmethod
+    def default_start_cursor() -> 'StartCursor':
+        return StartCursor.earliest()
+
+    @staticmethod
+    def earliest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.earliest())
+
+    @staticmethod
+    def latest() -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.latest())
+
+    @staticmethod
+    def from_message_time(timestamp: int) -> 'StartCursor':
+        JStartCursor = get_gateway().jvm \
+            
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor
+        return StartCursor(JStartCursor.fromMessageTime(timestamp))
+
+
+class StopCursor(object):
+    """
+    A interface for users to specify the stop position of a pulsar 
subscription. Since it would be
+    serialized into split. The implementation for this interface should be 
well considered. I don't
+    recommend adding extra internal state for this implementation.
+    """
+
+    def __init__(self, _j_stop_cursor):
+        self._j_stop_cursor = _j_stop_cursor
+
+    @staticmethod
+    def default_stop_cursor() -> 'StopCursor':
+        return StopCursor.never()
+
+    @staticmethod
+    def never() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.never())
+
+    @staticmethod
+    def latest() -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.latest())
+
+    @staticmethod
+    def at_event_time(timestamp: int) -> 'StopCursor':
+        JStopCursor = get_gateway().jvm \
+            
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
+        return StopCursor(JStopCursor.atEventTime(timestamp))
+
+
+class PulsarSource(Source):
+    """
+    The Source implementation of Pulsar. Please use a PulsarSourceBuilder to 
construct a
+    PulsarSource.
+    """
+
+    def __init__(self, j_pulsar_source):
+        super(PulsarSource, self).__init__(source=j_pulsar_source)
+
+    @staticmethod
+    def builder() -> 'PulsarSourceBuilder':
+        """
+        Get a PulsarSourceBuilder to builder a PulsarSource.
+        """
+        return PulsarSourceBuilder()
+
+
+class PulsarSourceBuilder(object):
+    """
+    The builder class for PulsarSource to make it easier for the users to 
construct a PulsarSource.
+
+    The service url, admin url, subscription name, topics to consume, and the 
record deserializer
+    are required fields that must be set.
+
+    To specify the starting position of PulsarSource, one can call 
set_start_cursor(StartCursor).
+
+    By default the PulsarSource runs in an Boundedness.CONTINUOUS_UNBOUNDED 
mode and never stop
+    until the Flink job is canceled or fails. To let the PulsarSource run in
+    Boundedness.CONTINUOUS_UNBOUNDED but stops at some given offsets, one can 
call
+    set_unbounded_stop_cursor(StopCursor).

Review comment:
       have added examples.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to