This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e78b239ab7 [INLONG-10682][Sort] Make pulsar source support to send
audit information exactly once and add ExtractNode.INLONG_MSG to helper
validate (#10686)
e78b239ab7 is described below
commit e78b239ab793641b3ebf67e851020b291b0aece9
Author: XiaoYou201 <[email protected]>
AuthorDate: Mon Jul 22 17:38:32 2024 +0800
[INLONG-10682][Sort] Make pulsar source support to send audit information
exactly once and add ExtractNode.INLONG_MSG to helper validate (#10686)
---
.../inlong/sort/pulsar/source/PulsarSource.java | 187 ++++++
.../sort/pulsar/source/PulsarSourceBuilder.java | 647 +++++++++++++++++++++
.../pulsar/source/reader/PulsarSourceReader.java | 314 ++++++++++
.../sort/pulsar/table/PulsarTableFactory.java | 4 +-
.../source/PulsarTableDeserializationSchema.java | 26 +-
.../pulsar/table/source/PulsarTableSource.java | 5 +-
licenses/inlong-sort-connectors/LICENSE | 3 +
7 files changed, 1179 insertions(+), 7 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
new file mode 100644
index 0000000000..e26ee645aa
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source;
+
+import org.apache.inlong.sort.pulsar.source.reader.PulsarSourceReader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
+import
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * The Source implementation of Pulsar. Please use a {@link
PulsarSourceBuilder} to construct a
+ * {@link PulsarSource}. The following example shows how to create a
PulsarSource emitting records
+ * of <code>String</code> type.
+ *
+ * <pre>{@code
+ * PulsarSource<String> source = PulsarSource
+ * .builder()
+ * .setTopics(TOPIC1, TOPIC2)
+ * .setServiceUrl(getServiceUrl())
+ * .setSubscriptionName("test")
+ * .setDeserializationSchema(new SimpleStringSchema())
+ * .setBounded(StopCursor::defaultStopCursor)
+ * .build();
+ * }</pre>
+ *
+ * <p>See {@link PulsarSourceBuilder} for more details.
+ *
+ * @param <OUT> The output type of the source.
+ * Modify from {@link org.apache.flink.connector.pulsar.source.PulsarSource}
+ */
+@PublicEvolving
+public final class PulsarSource<OUT>
+ implements
+ Source<OUT, PulsarPartitionSplit, PulsarSourceEnumState>,
+ ResultTypeQueryable<OUT> {
+
+ private static final long serialVersionUID = 7773108631275567433L;
+
+ /**
+ * The configuration for pulsar source, we don't support the pulsar's
configuration class
+ * directly.
+ */
+ private final SourceConfiguration sourceConfiguration;
+
+ private final PulsarSubscriber subscriber;
+
+ private final RangeGenerator rangeGenerator;
+
+ private final StartCursor startCursor;
+
+ private final StopCursor stopCursor;
+
+ private final Boundedness boundedness;
+
+ /** The pulsar deserialization schema is used for deserializing message. */
+ private final PulsarDeserializationSchema<OUT> deserializationSchema;
+
+ private final PulsarCrypto pulsarCrypto;
+
+ /**
+ * The constructor for PulsarSource, it's package protected for forcing
using {@link
+ * PulsarSourceBuilder}.
+ */
+ PulsarSource(
+ SourceConfiguration sourceConfiguration,
+ PulsarSubscriber subscriber,
+ RangeGenerator rangeGenerator,
+ StartCursor startCursor,
+ StopCursor stopCursor,
+ Boundedness boundedness,
+ PulsarDeserializationSchema<OUT> deserializationSchema,
+ PulsarCrypto pulsarCrypto) {
+ this.sourceConfiguration = sourceConfiguration;
+ this.subscriber = subscriber;
+ this.rangeGenerator = rangeGenerator;
+ this.startCursor = startCursor;
+ this.stopCursor = stopCursor;
+ this.boundedness = boundedness;
+ this.deserializationSchema = deserializationSchema;
+ this.pulsarCrypto = pulsarCrypto;
+ }
+
+ /**
+ * Get a PulsarSourceBuilder to builder a {@link PulsarSource}.
+ *
+ * @return a Pulsar source builder.
+ */
+ public static <OUT> PulsarSourceBuilder<OUT> builder() {
+ return new PulsarSourceBuilder<>();
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return boundedness;
+ }
+
+ @Internal
+ @Override
+ public SourceReader<OUT, PulsarPartitionSplit>
createReader(SourceReaderContext readerContext)
+ throws Exception {
+ return PulsarSourceReader.create(
+ sourceConfiguration, deserializationSchema, pulsarCrypto,
readerContext);
+ }
+
+ @Internal
+ @Override
+ public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState>
createEnumerator(
+ SplitEnumeratorContext<PulsarPartitionSplit> enumContext) throws
PulsarClientException {
+ return new PulsarSourceEnumerator(
+ subscriber,
+ startCursor,
+ stopCursor,
+ rangeGenerator,
+ sourceConfiguration,
+ enumContext);
+ }
+
+ @Internal
+ @Override
+ public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState>
restoreEnumerator(
+ SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
+ PulsarSourceEnumState checkpoint)
+ throws PulsarClientException {
+ return new PulsarSourceEnumerator(
+ subscriber,
+ startCursor,
+ stopCursor,
+ rangeGenerator,
+ sourceConfiguration,
+ enumContext,
+ checkpoint);
+ }
+
+ @Internal
+ @Override
+ public SimpleVersionedSerializer<PulsarPartitionSplit>
getSplitSerializer() {
+ return PulsarPartitionSplitSerializer.INSTANCE;
+ }
+
+ @Internal
+ @Override
+ public SimpleVersionedSerializer<PulsarSourceEnumState>
getEnumeratorCheckpointSerializer() {
+ return PulsarSourceEnumStateSerializer.INSTANCE;
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
new file mode 100644
index 0000000000..cfabe671a4
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
+import
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.GenericRecordDeserializationSchema;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.GenericRecordDeserializer;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaWrapper;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarTypeInformationWrapper;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
+import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_SCHEMA_EVOLUTION;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.SOURCE_CONFIG_VALIDATOR;
+import static org.apache.flink.util.InstantiationUtil.isSerializable;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The builder class for {@link PulsarSource} to make it easier for the users
to construct a {@link
+ * PulsarSource}.
+ *
+ * <p>The following example shows the minimum setup to create a PulsarSource
that reads the String
+ * values from a Pulsar topic.
+ *
+ * <pre>{@code
+ * PulsarSource<String> source = PulsarSource
+ * .builder()
+ * .setServiceUrl(PULSAR_BROKER_URL)
+ * .setSubscriptionName("flink-source-1")
+ * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ * .setDeserializationSchema(new SimpleStringSchema())
+ * .build();
+ * }</pre>
+ *
+ * <p>The service url, subscription name, topics to consume, and the record
deserializer are
+ * required fields that must be set.
+ *
+ * <p>To specify the starting position of PulsarSource, one can call {@link
+ * #setStartCursor(StartCursor)}.
+ *
+ * <p>By default, the PulsarSource runs in an {@link
Boundedness#CONTINUOUS_UNBOUNDED} mode and
+ * never stop until the Flink job is canceled or fails. To let the
PulsarSource run in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can
call {@link
+ * #setUnboundedStopCursor(StopCursor)} and disable auto partition discovery
as described below. For
+ * example the following PulsarSource stops after it consumes up to a event
time when the Flink
+ * started.
+ *
+ * <p>To stop the connector user has to disable the auto partition discovery.
As auto partition
+ * discovery always expected new splits to come and not exiting. To disable
auto partition
+ * discovery, use builder.setConfig({@link
+ * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+ *
+ * <pre>{@code
+ * PulsarSource<String> source = PulsarSource
+ * .builder()
+ * .setServiceUrl(PULSAR_BROKER_URL)
+ * .setSubscriptionName("flink-source-1")
+ * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ * .setDeserializationSchema(new SimpleStringSchema())
+ *
.setUnboundedStopCursor(StopCursor.atEventTime(System.currentTimeMillis()))
+ * .build();
+ * }</pre>
+ *
+ * @param <OUT> The output type of the source.
+ * Modify from {@link
org.apache.flink.connector.pulsar.source.PulsarSourceBuilder}
+ */
+@PublicEvolving
+public final class PulsarSourceBuilder<OUT> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarSourceBuilder.class);
+
+ private final PulsarConfigBuilder configBuilder;
+
+ private PulsarSubscriber subscriber;
+ private RangeGenerator rangeGenerator;
+ private StartCursor startCursor;
+ private StopCursor stopCursor;
+ private Boundedness boundedness;
+ private PulsarDeserializationSchema<OUT> deserializationSchema;
+ private PulsarCrypto pulsarCrypto;
+
+ // private builder constructor.
+ PulsarSourceBuilder() {
+ this.configBuilder = new PulsarConfigBuilder();
+ this.startCursor = StartCursor.defaultStartCursor();
+ this.stopCursor = StopCursor.defaultStopCursor();
+ this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ /**
+ * Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
+ *
+ * @param adminUrl the url for the PulsarAdmin.
+ * @return this PulsarSourceBuilder.
+ * @deprecated this method will return builder directly
+ */
+ @Deprecated
+ public PulsarSourceBuilder<OUT> setAdminUrl(String adminUrl) {
+ return this;
+ }
+
+ /**
+ * Sets the server's link for the PulsarConsumer of the PulsarSource.
+ *
+ * @param serviceUrl the server url of the Pulsar cluster.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setServiceUrl(String serviceUrl) {
+ return setConfig(PULSAR_SERVICE_URL, serviceUrl);
+ }
+
+ /**
+ * Sets the name for this pulsar subscription.
+ *
+ * @param subscriptionName the server url of the Pulsar cluster.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setSubscriptionName(String
subscriptionName) {
+ return setConfig(PULSAR_SUBSCRIPTION_NAME, subscriptionName);
+ }
+
+ /**
+ * Set a pulsar topic list for the flink source. Some topics may not exist
currently, consuming
+ * this non-existed topic wouldn't throw any exception. But the best
solution is just consuming
+ * by using a topic regex. You can set topics once either with {@link
#setTopics} or {@link
+ * #setTopicPattern} in this builder.
+ *
+ * @param topics The topic list you would like to consume messages.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setTopics(String... topics) {
+ return setTopics(Arrays.asList(topics));
+ }
+
+ /**
+ * Set a pulsar topic list for the flink source. Some topics may not exist
currently, consuming
+ * this non-existed topic wouldn't throw any exception. But the best
solution is just consuming
+ * by using a topic regex. You can set topics once either with {@link
#setTopics} or {@link
+ * #setTopicPattern} in this builder.
+ *
+ * @param topics The topic list you would like to consume messages.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setTopics(List<String> topics) {
+ ensureSubscriberIsNull("topics");
+ List<String> distinctTopics = TopicNameUtils.distinctTopics(topics);
+ this.subscriber =
PulsarSubscriber.getTopicListSubscriber(distinctTopics);
+ return this;
+ }
+
+ /**
+ * Set a topic pattern to consume from the java regex str. You can set
topics once either with
+ * {@link #setTopics} or {@link #setTopicPattern} in this builder.
+ *
+ * <p>Remember that we will only subscribe to one tenant and one namespace
by using regular
+ * expression. If you didn't provide the tenant and namespace in the given
topic pattern. We
+ * will use default one instead.
+ *
+ * @param topicsPattern the pattern of the topic name to consume from.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setTopicPattern(String topicsPattern) {
+ return setTopicPattern(Pattern.compile(topicsPattern));
+ }
+
+ /**
+ * Set a topic pattern to consume from the java {@link Pattern}. You can
set topics once either
+ * with {@link #setTopics} or {@link #setTopicPattern} in this builder.
+ *
+ * <p>Remember that we will only subscribe to one tenant and one namespace
by using regular
+ * expression. If you didn't provide the tenant and namespace in the given
topic pattern. We
+ * will use default one instead.
+ *
+ * @param topicsPattern the pattern of the topic name to consume from.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setTopicPattern(Pattern topicsPattern) {
+ return setTopicPattern(topicsPattern, RegexSubscriptionMode.AllTopics);
+ }
+
+ /**
+ * Set a topic pattern to consume from the java regex str. You can set
topics once either with
+ * {@link #setTopics} or {@link #setTopicPattern} in this builder.
+ *
+ * <p>Remember that we will only subscribe to one tenant and one namespace
by using regular
+ * expression. If you didn't provide the tenant and namespace in the given
topic pattern. We
+ * will use default one instead.
+ *
+ * @param topicsPattern the pattern of the topic name to consume from.
+ * @param regexSubscriptionMode The topic filter for regex subscription.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setTopicPattern(
+ String topicsPattern, RegexSubscriptionMode regexSubscriptionMode)
{
+ return setTopicPattern(Pattern.compile(topicsPattern),
regexSubscriptionMode);
+ }
+
+ /**
+ * Set a topic pattern to consume from the java {@link Pattern}. You can
set topics once either
+ * with {@link #setTopics} or {@link #setTopicPattern} in this builder.
+ *
+ * <p>Remember that we will only subscribe to one tenant and one namespace
by using regular
+ * expression. If you didn't provide the tenant and namespace in the given
topic pattern. We
+ * will use default one instead.
+ *
+ * @param topicsPattern the pattern of the topic name to consume from.
+ * @param regexSubscriptionMode When subscribing to a topic using a
regular expression, you can
+ * pick a certain type of topic.
+ * <ul>
+ * <li>PersistentOnly: only subscribe to persistent topics.
+ * <li>NonPersistentOnly: only subscribe to non-persistent topics.
+ * <li>AllTopics: subscribe to both persistent and non-persistent
topics.
+ * </ul>
+ *
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setTopicPattern(
+ Pattern topicsPattern, RegexSubscriptionMode
regexSubscriptionMode) {
+ ensureSubscriberIsNull("topic pattern");
+ this.subscriber =
+ PulsarSubscriber.getTopicPatternSubscriber(topicsPattern,
regexSubscriptionMode);
+ return this;
+ }
+
+ /**
+ * The consumer name is informative, and it can be used to identify a
particular consumer
+ * instance from the topic stats.
+ */
+ public PulsarSourceBuilder<OUT> setConsumerName(String consumerName) {
+ return setConfig(PULSAR_CONSUMER_NAME, consumerName);
+ }
+
+ /**
+ * If you enable this option, we would consume and deserialize the message
by using Pulsar
+ * {@link Schema}.
+ */
+ public PulsarSourceBuilder<OUT> enableSchemaEvolution() {
+ configBuilder.set(PULSAR_READ_SCHEMA_EVOLUTION, true);
+ return this;
+ }
+
+ /**
+ * Set a topic range generator for consuming a sub set of keys.
+ *
+ * @param rangeGenerator A generator which would generate a set of {@link
TopicRange} for given
+ * topic.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setRangeGenerator(RangeGenerator
rangeGenerator) {
+ this.rangeGenerator = checkNotNull(rangeGenerator);
+ return this;
+ }
+
+ /**
+ * Specify from which offsets the PulsarSource should start consume from
by providing an {@link
+ * StartCursor}.
+ *
+ * @param startCursor set the starting offsets for the Source.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setStartCursor(StartCursor startCursor) {
+ this.startCursor = checkNotNull(startCursor);
+ return this;
+ }
+
+ /**
+ * By default, the PulsarSource runs in an {@link
Boundedness#CONTINUOUS_UNBOUNDED} mode and
+ * never stop until the Flink job is canceled or fails. To let the
PulsarSource run in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one
can call {@link
+ * #setUnboundedStopCursor(StopCursor)} and disable auto partition
discovery as described below.
+ *
+ * <p>This method is different from {@link
#setBoundedStopCursor(StopCursor)} that after setting
+ * the stopping offsets with this method, {@link
PulsarSource#getBoundedness()} will still
+ * return {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will
stop at the stopping
+ * offsets specified by the stopping offsets {@link StopCursor}.
+ *
+ * <p>To stop the connector user has to disable the auto partition
discovery. As auto partition
+ * discovery always expected new splits to come and not exiting. To
disable auto partition
+ * discovery, use builder.setConfig({@link
+ * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+ *
+ * @param stopCursor The {@link StopCursor} to specify the stopping offset.
+ * @return this PulsarSourceBuilder.
+ * @see #setBoundedStopCursor(StopCursor)
+ */
+ public PulsarSourceBuilder<OUT> setUnboundedStopCursor(StopCursor
stopCursor) {
+ this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ this.stopCursor = checkNotNull(stopCursor);
+ return this;
+ }
+
+ /**
+ * By default, the PulsarSource is set to run in {@link
Boundedness#CONTINUOUS_UNBOUNDED} manner
+ * and thus never stops until the Flink job fails or is canceled. To let
the PulsarSource run in
+ * {@link Boundedness#BOUNDED} manner and stops at some point, one can set
an {@link StopCursor}
+ * to specify the stopping offsets for each partition. When all the
partitions have reached
+ * their stopping offsets, the PulsarSource will then exit.
+ *
+ * <p>This method is different from {@link
#setUnboundedStopCursor(StopCursor)} that after
+ * setting the stopping offsets with this method, {@link
PulsarSource#getBoundedness()} will
+ * return {@link Boundedness#BOUNDED} instead of {@link
Boundedness#CONTINUOUS_UNBOUNDED}.
+ *
+ * @param stopCursor the {@link StopCursor} to specify the stopping
offsets.
+ * @return this PulsarSourceBuilder.
+ * @see #setUnboundedStopCursor(StopCursor)
+ */
+ public PulsarSourceBuilder<OUT> setBoundedStopCursor(StopCursor
stopCursor) {
+ this.boundedness = Boundedness.BOUNDED;
+ this.stopCursor = checkNotNull(stopCursor);
+ return this;
+ }
+
+ /**
+ * Deserialize messages from Pulsar by using the flink's {@link
DeserializationSchema}. It would
+ * consume the pulsar message as a byte array and decode the message by
using flink's logic.
+ */
+ public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+ DeserializationSchema<T> deserializationSchema) {
+ return setDeserializationSchema(
+ new
PulsarDeserializationSchemaWrapper<>(deserializationSchema));
+ }
+
+ /**
+ * Deserialize the messages from Pulsar by using {@link
Schema#AUTO_CONSUME()}. It will turn the
+ * pulsar message into a {@link GenericRecord} first. Using this method
can consume the messages
+ * with multiple schemas in the same topic.
+ */
+ public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+ GenericRecordDeserializer<T> deserializer) {
+ return setDeserializationSchema(new
GenericRecordDeserializationSchema<>(deserializer));
+ }
+
+ /**
+ * Deserialize messages from Pulsar by using the Pulsar {@link Schema}
instance. It would
+ * consume the pulsar message as a byte array and decode the message by
using flink's logic.
+ *
+ * <p>We only support <a
+ *
href="https://pulsar.apache.org/docs/en/schema-understand/#primitive-type">primitive
+ * types</a> here.
+ */
+ public <T extends OUT> PulsarSourceBuilder<T>
setDeserializationSchema(Schema<T> schema) {
+ ensureSchemaTypeIsValid(schema);
+ return setDeserializationSchema(new PulsarSchemaWrapper<>(schema));
+ }
+
+ /**
+ * Deserialize messages from Pulsar by using the Pulsar {@link Schema}
instance. It would
+ * consume the pulsar message as a byte array and decode the message by
using flink's logic.
+ *
+ * <p>We only support <a
+ *
href="https://pulsar.apache.org/docs/en/schema-understand/#struct">struct
types</a> here.
+ */
+ public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+ Schema<T> schema, Class<T> typeClass) {
+ ensureSchemaTypeIsValid(schema);
+ return setDeserializationSchema(new PulsarSchemaWrapper<>(schema,
typeClass));
+ }
+
+ /**
+ * Deserialize messages from Pulsar by using the Pulsar {@link Schema}
instance. It would
+ * consume the pulsar message as a byte array and decode the message by
using flink's logic.
+ *
+ * <p>We only support <a
+ *
href="https://pulsar.apache.org/docs/en/schema-understand/#keyvalue">keyvalue
types</a> here.
+ */
+ public <K, V, T extends OUT> PulsarSourceBuilder<T>
setDeserializationSchema(
+ Schema<KeyValue<K, V>> schema, Class<K> keyClass, Class<V>
valueClass) {
+ ensureSchemaTypeIsValid(schema);
+ return setDeserializationSchema(new PulsarSchemaWrapper<>(schema,
keyClass, valueClass));
+ }
+
+ /**
+ * Deserialize messages from Pulsar by using the flink's {@link
TypeInformation}. This method is
+ * only used for treating messages that was written into pulsar by {@link
TypeInformation}.
+ */
+ public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+ TypeInformation<T> information, ExecutionConfig config) {
+ return setDeserializationSchema(new
PulsarTypeInformationWrapper<>(information, config));
+ }
+
+ /**
+ * PulsarDeserializationSchema is required for deserializing messages from
Pulsar and getting
+ * the {@link TypeInformation} for message serialization in flink.
+ */
+ public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
+ PulsarDeserializationSchema<T> deserializationSchema) {
+ PulsarSourceBuilder<T> self = specialized();
+ self.deserializationSchema = deserializationSchema;
+ return self;
+ }
+
+ /**
+ * Sets a {@link PulsarCrypto}. Configure the key reader and keys to be
used to encrypt the
+ * message payloads.
+ *
+ * @param pulsarCrypto PulsarCrypto object.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setPulsarCrypto(
+ PulsarCrypto pulsarCrypto, ConsumerCryptoFailureAction action) {
+ this.pulsarCrypto = checkNotNull(pulsarCrypto);
+ configBuilder.set(PULSAR_CRYPTO_FAILURE_ACTION, action);
+ return this;
+ }
+
+ /**
+ * Configure the authentication provider to use in the Pulsar client
instance.
+ *
+ * @param authPluginClassName name of the Authentication-Plugin you want
to use
+ * @param authParamsString string which represents parameters for the
Authentication-Plugin,
+ * e.g., "key1:val1,key2:val2"
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setAuthentication(
+ String authPluginClassName, String authParamsString) {
+ checkArgument(
+ !configBuilder.contains(PULSAR_AUTH_PARAM_MAP),
+ "Duplicated authentication setting.");
+ configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
+ configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString);
+ return this;
+ }
+
+ /**
+ * Configure the authentication provider to use in the Pulsar client
instance.
+ *
+ * @param authPluginClassName name of the Authentication-Plugin you want
to use
+ * @param authParams map which represents parameters for the
Authentication-Plugin
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setAuthentication(
+ String authPluginClassName, Map<String, String> authParams) {
+ checkArgument(
+ !configBuilder.contains(PULSAR_AUTH_PARAMS), "Duplicated
authentication setting.");
+ configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
+ configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams);
+ return this;
+ }
+
+ /**
+ * Set an arbitrary property for the PulsarSource and Pulsar Consumer. The
valid keys can be
+ * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+ *
+ * <p>Make sure the option could be set only once or with same value.
+ *
+ * @param key the key of the property.
+ * @param value the value of the property.
+ * @return this PulsarSourceBuilder.
+ */
+ public <T> PulsarSourceBuilder<OUT> setConfig(ConfigOption<T> key, T
value) {
+ configBuilder.set(key, value);
+ return this;
+ }
+
+ /**
+ * Set arbitrary properties for the PulsarSource and Pulsar Consumer. The
valid keys can be
+ * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+ *
+ * @param config the config to set for the PulsarSource.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setConfig(Configuration config) {
+ configBuilder.set(config);
+ return this;
+ }
+
+ /**
+ * Set arbitrary properties for the PulsarSource and Pulsar Consumer. The
valid keys can be
+ * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+ *
+ * <p>This method is mainly used for future flink SQL binding.
+ *
+ * @param properties the config properties to set for the PulsarSource.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder<OUT> setProperties(Properties properties) {
+ configBuilder.set(properties);
+ return this;
+ }
+
+ /**
+ * Build the {@link PulsarSource}.
+ *
+ * @return a PulsarSource with the settings made for this builder.
+ */
+ @SuppressWarnings("java:S3776")
+ public PulsarSource<OUT> build() {
+ // Ensure the topic subscriber for pulsar.
+ checkNotNull(subscriber, "No topic names or topic pattern are
provided.");
+
+ if (rangeGenerator == null) {
+ LOG.warn(
+ "No range generator provided, we would use the
FullRangeGenerator as the default range generator.");
+ this.rangeGenerator = new FullRangeGenerator();
+ }
+
+ if (boundedness == null) {
+ LOG.warn("No boundedness was set, mark it as a endless stream.");
+ this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+ if (boundedness == Boundedness.BOUNDED
+ && configBuilder.get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS)
>= 0) {
+ LOG.warn(
+ "{} property is overridden to -1 because the source is
bounded.",
+ PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
+ configBuilder.override(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
-1L);
+ }
+
+ checkNotNull(deserializationSchema, "deserializationSchema should be
set.");
+ // Schema evolution validation.
+ if
(Boolean.TRUE.equals(configBuilder.get(PULSAR_READ_SCHEMA_EVOLUTION))) {
+ checkState(
+ deserializationSchema instanceof PulsarSchemaWrapper,
+ "When enabling schema evolution, you must provide a Pulsar
Schema in builder's setDeserializationSchema method.");
+ } else if (deserializationSchema instanceof PulsarSchemaWrapper) {
+ LOG.info(
+ "It seems like you are consuming messages by using Pulsar
Schema."
+ + " You can builder.enableSchemaEvolution() to
enable schema evolution for better Pulsar Schema check."
+ + " We would use bypass Schema check by default.");
+ }
+
+ if (pulsarCrypto == null) {
+ this.pulsarCrypto = PulsarCrypto.disabled();
+ }
+
+ if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
+ LOG.warn(
+ "We recommend set a readable consumer name through
setConsumerName(String) in production mode.");
+ } else {
+ String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME);
+ if (!consumerName.contains("%s")) {
+ configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + "
- %s");
+ }
+ }
+
+ // Make sure they are serializable.
+ checkState(
+ isSerializable(deserializationSchema),
+ "PulsarDeserializationSchema isn't serializable");
+ checkState(isSerializable(startCursor), "StartCursor isn't
serializable");
+ checkState(isSerializable(stopCursor), "StopCursor isn't
serializable");
+ checkState(isSerializable(rangeGenerator), "RangeGenerator isn't
serializable");
+ checkState(isSerializable(pulsarCrypto), "PulsarCrypto isn't
serializable");
+
+ // Check builder configuration.
+ SourceConfiguration sourceConfiguration =
+ configBuilder.build(SOURCE_CONFIG_VALIDATOR,
SourceConfiguration::new);
+
+ return new PulsarSource<>(
+ sourceConfiguration,
+ subscriber,
+ rangeGenerator,
+ startCursor,
+ stopCursor,
+ boundedness,
+ deserializationSchema,
+ pulsarCrypto);
+ }
+
+ // ------------- private helpers --------------
+
+ /** Helper method for java compiler recognizes the generic type. */
+ @SuppressWarnings("unchecked")
+ private <T extends OUT> PulsarSourceBuilder<T> specialized() {
+ return (PulsarSourceBuilder<T>) this;
+ }
+
+ /** Topic name and topic pattern are conflict, make sure they are set only
once. */
+ private void ensureSubscriberIsNull(String attemptingSubscribeMode) {
+ if (subscriber != null) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot use %s for consumption because a %s is
already set for consumption",
+ attemptingSubscribeMode,
subscriber.getClass().getSimpleName()));
+ }
+ }
+
+ private void ensureSchemaTypeIsValid(Schema<?> schema) {
+ SchemaInfo info = schema.getSchemaInfo();
+ if (info.getType() == SchemaType.AUTO_CONSUME) {
+ throw new IllegalArgumentException(
+ "Auto schema is only supported by providing a
GenericRecordDeserializer");
+ }
+ if (info.getType() == SchemaType.AUTO_PUBLISH) {
+ throw new IllegalStateException(
+ "Auto produce schema is not supported in consuming
messages");
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java
new file mode 100644
index 0000000000..10139a000e
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source.reader;
+
+import
org.apache.inlong.sort.pulsar.table.source.PulsarTableDeserializationSchema;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderBase;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
+import org.apache.flink.connector.pulsar.common.schema.BytesSchema;
+import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import
org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader;
+import org.apache.flink.connector.pulsar.source.reader.PulsarRecordEmitter;
+import
org.apache.flink.connector.pulsar.source.reader.PulsarSourceFetcherManager;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+
+/**
+ * The source reader for pulsar subscription Failover and Exclusive, which
consumes the ordered
+ * messages.
+ *
+ * @param <OUT> The output message type for flink.
+ * Modify from {@link
org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader}
+ */
+@Internal
+public class PulsarSourceReader<OUT>
+ extends
+ SourceReaderBase<Message<byte[]>, OUT, PulsarPartitionSplit,
PulsarPartitionSplitState> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarSourceReader.class);
+
+ private final SourceConfiguration sourceConfiguration;
+ private final PulsarClient pulsarClient;
+ @VisibleForTesting
+ final SortedMap<Long, Map<TopicPartition, MessageId>> cursorsToCommit;
+ private final ConcurrentMap<TopicPartition, MessageId>
cursorsOfFinishedSplits;
+ private final AtomicReference<Throwable> cursorCommitThrowable;
+ private final PulsarDeserializationSchema<OUT> deserializationSchema;
+ private ScheduledExecutorService cursorScheduler;
+
+ private PulsarSourceReader(
+
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>>
elementsQueue,
+ PulsarSourceFetcherManager fetcherManager,
+ PulsarDeserializationSchema<OUT> deserializationSchema,
+ SourceConfiguration sourceConfiguration,
+ PulsarClient pulsarClient,
+ SourceReaderContext context) {
+ super(
+ elementsQueue,
+ fetcherManager,
+ new PulsarRecordEmitter<>(deserializationSchema),
+ sourceConfiguration,
+ context);
+
+ this.deserializationSchema = deserializationSchema;
+ this.sourceConfiguration = sourceConfiguration;
+ this.pulsarClient = pulsarClient;
+
+ this.cursorsToCommit = Collections.synchronizedSortedMap(new
TreeMap<>());
+ this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
+ this.cursorCommitThrowable = new AtomicReference<>();
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ if (sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
+ this.cursorScheduler =
Executors.newSingleThreadScheduledExecutor();
+
+ // Auto commit cursor, this could be enabled when checkpoint is
also enabled.
+ cursorScheduler.scheduleAtFixedRate(
+ this::cumulativeAcknowledgmentMessage,
+ sourceConfiguration.getMaxFetchTime().toMillis(),
+ sourceConfiguration.getAutoCommitCursorInterval(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<OUT> output) throws Exception {
+ Throwable cause = cursorCommitThrowable.get();
+ if (cause != null) {
+ throw new FlinkRuntimeException("An error occurred in acknowledge
message.", cause);
+ }
+
+ return super.pollNext(output);
+ }
+
+ @Override
+ protected void onSplitFinished(Map<String, PulsarPartitionSplitState>
finishedSplitIds) {
+ // Close all the finished splits.
+ for (String splitId : finishedSplitIds.keySet()) {
+ ((PulsarSourceFetcherManager)
splitFetcherManager).closeFetcher(splitId);
+ }
+
+ // We don't require new splits, all the splits are pre-assigned by
source enumerator.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("onSplitFinished event: {}", finishedSplitIds);
+ }
+
+ for (Map.Entry<String, PulsarPartitionSplitState> entry :
finishedSplitIds.entrySet()) {
+ PulsarPartitionSplitState state = entry.getValue();
+ MessageId latestConsumedId = state.getLatestConsumedId();
+ if (latestConsumedId != null) {
+ cursorsOfFinishedSplits.put(state.getPartition(),
latestConsumedId);
+ }
+ }
+ }
+
+ @Override
+ protected PulsarPartitionSplitState initializedState(PulsarPartitionSplit
split) {
+ return new PulsarPartitionSplitState(split);
+ }
+
+ @Override
+ protected PulsarPartitionSplit toSplitType(
+ String splitId, PulsarPartitionSplitState splitState) {
+ return splitState.toPulsarPartitionSplit();
+ }
+
+ @Override
+ public void pauseOrResumeSplits(
+ Collection<String> splitsToPause, Collection<String>
splitsToResume) {
+ splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
+ }
+
+ @Override
+ public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
+ List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);
+
+ // Perform a snapshot for these splits.
+ Map<TopicPartition, MessageId> cursors =
+ cursorsToCommit.computeIfAbsent(checkpointId, id -> new
HashMap<>());
+ // Put the cursors of the active splits.
+ for (PulsarPartitionSplit split : splits) {
+ MessageId latestConsumedId = split.getLatestConsumedId();
+ if (latestConsumedId != null) {
+ cursors.put(split.getPartition(), latestConsumedId);
+ }
+ }
+ // Put cursors of all the finished splits.
+ cursors.putAll(cursorsOfFinishedSplits);
+ if (deserializationSchema instanceof PulsarTableDeserializationSchema)
{
+ ((PulsarTableDeserializationSchema)
deserializationSchema).updateCurrentCheckpointId(checkpointId);
+ }
+ return splits;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ LOG.debug("Committing cursors for checkpoint {}", checkpointId);
+ Map<TopicPartition, MessageId> cursors =
cursorsToCommit.get(checkpointId);
+ try {
+ ((PulsarSourceFetcherManager)
splitFetcherManager).acknowledgeMessages(cursors);
+ LOG.debug("Successfully acknowledge cursors for checkpoint {}",
checkpointId);
+
+ // Clean up the cursors.
+ cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
+ cursorsToCommit.headMap(checkpointId + 1).clear();
+ if (deserializationSchema instanceof
PulsarTableDeserializationSchema) {
+ PulsarTableDeserializationSchema schema =
(PulsarTableDeserializationSchema) deserializationSchema;
+ schema.flushAudit();
+ schema.updateLastCheckpointId(checkpointId);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to acknowledge cursors for checkpoint {}",
checkpointId, e);
+ cursorCommitThrowable.compareAndSet(null, e);
+ }
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (cursorScheduler != null) {
+ cursorScheduler.shutdown();
+ }
+
+ // Close the all the consumers.
+ super.close();
+
+ // Close shared pulsar resources.
+ pulsarClient.shutdown();
+ }
+
+ // ----------------- helper methods --------------
+
+ /** Acknowledge the pulsar topic partition cursor by the last consumed
message id. */
+ private void cumulativeAcknowledgmentMessage() {
+ Map<TopicPartition, MessageId> cursors = new
HashMap<>(cursorsOfFinishedSplits);
+
+ // We reuse snapshotState for acquiring a consume status snapshot.
+ // So the checkpoint didn't really happen, so we just pass a fake
checkpoint id.
+ List<PulsarPartitionSplit> splits = super.snapshotState(1L);
+ for (PulsarPartitionSplit split : splits) {
+ MessageId latestConsumedId = split.getLatestConsumedId();
+ if (latestConsumedId != null) {
+ cursors.put(split.getPartition(), latestConsumedId);
+ }
+ }
+
+ try {
+ ((PulsarSourceFetcherManager)
splitFetcherManager).acknowledgeMessages(cursors);
+ // Clean up the finish splits.
+ cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
+ } catch (Exception e) {
+ LOG.error("Fail in auto cursor commit.", e);
+ cursorCommitThrowable.compareAndSet(null, e);
+ }
+ }
+
+ /** Factory method for creating PulsarSourceReader. */
+ public static <OUT> PulsarSourceReader<OUT> create(
+ SourceConfiguration sourceConfiguration,
+ PulsarDeserializationSchema<OUT> deserializationSchema,
+ PulsarCrypto pulsarCrypto,
+ SourceReaderContext readerContext)
+ throws Exception {
+
+ // Create a message queue with the predefined source option.
+ int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>>
elementsQueue =
+ new FutureCompletingBlockingQueue<>(queueCapacity);
+
+ PulsarClient pulsarClient = createClient(sourceConfiguration);
+
+ // Initialize the deserialization schema before creating the pulsar
reader.
+ PulsarDeserializationSchemaInitializationContext initializationContext
=
+ new
PulsarDeserializationSchemaInitializationContext(readerContext, pulsarClient);
+ deserializationSchema.open(initializationContext, sourceConfiguration);
+
+ // Choose the right schema bytes to use.
+ Schema<byte[]> schema;
+ if (sourceConfiguration.isEnableSchemaEvolution()) {
+ // Wrap the schema into a byte array schema with extra schema info
check.
+ PulsarSchema<?> pulsarSchema =
+ ((PulsarSchemaWrapper<?>)
deserializationSchema).pulsarSchema();
+ schema = new BytesSchema(pulsarSchema);
+ } else {
+ schema = Schema.BYTES;
+ }
+
+ // Create an ordered split reader supplier.
+ Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>>
splitReaderSupplier =
+ () -> new PulsarPartitionSplitReader(
+ pulsarClient,
+ sourceConfiguration,
+ schema,
+ pulsarCrypto,
+ readerContext.metricGroup());
+
+ PulsarSourceFetcherManager fetcherManager =
+ new PulsarSourceFetcherManager(
+ elementsQueue, splitReaderSupplier,
readerContext.getConfiguration());
+
+ return new PulsarSourceReader<>(
+ elementsQueue,
+ fetcherManager,
+ deserializationSchema,
+ sourceConfiguration,
+ pulsarClient,
+ readerContext);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
index deb240c7ed..e528f983e1 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.pulsar.table;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
import
org.apache.inlong.sort.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
import org.apache.inlong.sort.pulsar.table.source.PulsarTableSource;
@@ -129,7 +130,8 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory, DynamicTab
PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
- PulsarSinkOptions.SINK_CONFIG_PREFIX);
+ PulsarSinkOptions.SINK_CONFIG_PREFIX,
+ ExtractNode.INLONG_MSG);
validatePrimaryKeyConstraints(
context.getObjectIdentifier(), context.getPrimaryKeyIndexes(),
helper);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
index 17466899d7..eeeafa6872 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.pulsar.table.source;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -65,7 +65,7 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
private final boolean upsertMode;
- private SourceMetricData sourceMetricData;
+ private SourceExactlyMetric sourceExactlyMetric;
private MetricOption metricOption;
@@ -94,7 +94,7 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
keyDeserialization.open(context);
}
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption);
+ sourceExactlyMetric = new SourceExactlyMetric(metricOption);
}
valueDeserialization.open(context);
}
@@ -117,7 +117,7 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
return;
}
MetricsCollector<RowData> metricsCollector =
- new MetricsCollector<>(collector, sourceMetricData);
+ new MetricsCollector<>(collector, sourceExactlyMetric);
valueDeserialization.deserialize(message.getData(), new
ListCollector<>(valueRowData));
@@ -130,4 +130,22 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
}
+
+ public void flushAudit() {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.flushAudit();
+ }
+ }
+
+ public void updateCurrentCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+ }
+ }
+
+ public void updateLastCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+ }
+ }
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
index bf48356d26..0445739212 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
@@ -17,9 +17,10 @@
package org.apache.inlong.sort.pulsar.table.source;
+import org.apache.inlong.sort.pulsar.source.PulsarSource;
+import org.apache.inlong.sort.pulsar.source.PulsarSourceBuilder;
+
import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index c340f78668..8d1fd1ede9 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -856,6 +856,9 @@
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarTableSource.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReader.java
Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note
that the software have been modified.)
License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE