This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5674bc24f8 [INLONG-10069][Sort] Support audit metrics for
sort-connector-pulsar-1.18 (#10070)
5674bc24f8 is described below
commit 5674bc24f846609fd39e78f6f203dfb629b30af0
Author: AloysZhang <[email protected]>
AuthorDate: Fri Apr 26 18:52:26 2024 +0800
[INLONG-10069][Sort] Support audit metrics for sort-connector-pulsar-1.18
(#10070)
---
.../sort/pulsar/table/PulsarTableFactory.java | 15 +-
.../sort/pulsar/table/PulsarTableOptionUtils.java | 1 +
.../sort/pulsar/table/PulsarTableOptions.java | 1 +
.../pulsar/table/PulsarTableValidationUtils.java | 4 +-
.../source/PulsarTableDeserializationSchema.java | 132 +++++++++++
.../PulsarTableDeserializationSchemaFactory.java | 241 +++++++++++++++++++++
.../pulsar/table/source/PulsarTableSource.java | 227 +++++++++++++++++++
licenses/inlong-sort-connectors/LICENSE | 3 +
8 files changed, 620 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
index a6e7caa00e..4b982f4691 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
@@ -17,6 +17,9 @@
package org.apache.inlong.sort.pulsar.table;
+import
org.apache.inlong.sort.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
+import org.apache.inlong.sort.pulsar.table.source.PulsarTableSource;
+
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
@@ -31,8 +34,6 @@ import
org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import
org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory;
import org.apache.flink.connector.pulsar.table.sink.PulsarTableSink;
-import
org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
-import org.apache.flink.connector.pulsar.table.source.PulsarTableSource;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
@@ -99,6 +100,7 @@ import static
org.apache.pulsar.shade.org.apache.commons.lang3.RandomStringUtils
*
* <p>The main role of this class is to retrieve config options and validate
options from config and
* the table schema. It also sets default values if a config option is not
present.
+ * Modify from {@link
org.apache.flink.connector.pulsar.table.PulsarTableFactory}
*/
public class PulsarTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
@@ -154,6 +156,10 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory, DynamicTab
final int[] valueProjection =
createValueFormatProjection(tableOptions, physicalDataType);
final int[] keyProjection = createKeyFormatProjection(tableOptions,
physicalDataType);
+ String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+ String auditKeys = tableOptions.get(AUDIT_KEYS);
+
final PulsarTableDeserializationSchemaFactory
deserializationSchemaFactory =
new PulsarTableDeserializationSchemaFactory(
physicalDataType,
@@ -161,7 +167,10 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory, DynamicTab
keyProjection,
valueDecodingFormat,
valueProjection,
- UPSERT_DISABLED);
+ UPSERT_DISABLED,
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
// Set default values for configuration not exposed to user.
final DecodingFormat<DeserializationSchema<RowData>>
decodingFormatForMetadataPushdown =
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
index 12ef459280..7ff329a660 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
@@ -80,6 +80,7 @@ import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.VALUE_FORMA
* <li>Create key and value encoding/decoding format.
* <li>Create key and value projection.
* </ul>
+ * Modify from {@link
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils}
*/
public class PulsarTableOptionUtils {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
index ae186dc739..bc78dd2d98 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
@@ -37,6 +37,7 @@ import static
org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
* org.apache.flink.connector.pulsar.common.config.PulsarOptions}, {@link
* org.apache.flink.connector.pulsar.source.PulsarSourceOptions}, and {@link
* org.apache.flink.connector.pulsar.sink.PulsarSinkOptions}.
+ * Modify from {@link
org.apache.flink.connector.pulsar.table.PulsarTableOptions}
*/
@PublicEvolving
public final class PulsarTableOptions {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
index 8dac83871a..68ee618fc3 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
@@ -48,7 +48,9 @@ import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBS
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
import static org.apache.pulsar.common.naming.TopicName.isValid;
-/** Util class for source and sink validation rules. */
+/** Util class for source and sink validation rules.
+ * Modify from {@link
org.apache.flink.connector.pulsar.table.PulsarTableValidationUtils}
+ * */
public class PulsarTableValidationUtils {
private PulsarTableValidationUtils() {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
new file mode 100644
index 0000000000..13ec80862f
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table.source;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.pulsar.client.api.Message;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A specific {@link PulsarDeserializationSchema} for {@link
PulsarTableSource}.
+ *
+ * <p>Both Flink's key decoding format and value decoding format are wrapped
in this class. It is
+ * responsible for getting metadata fields from a physical pulsar message
body, and the final
+ * projection mapping from Pulsar message fields to Flink row.
+ *
+ * <p>After retrieving key and value bytes and convert them into a list of
{@link RowData}, it then
+ * delegates metadata appending, key and value {@link RowData} combining to a
{@link
+ * PulsarRowDataConverter} instance.
+ * Modify from {@link
org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchema}
+ */
+public class PulsarTableDeserializationSchema implements
PulsarDeserializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeInformation<RowData> producedTypeInfo;
+
+ @Nullable
+ private final DeserializationSchema<RowData> keyDeserialization;
+
+ private final DeserializationSchema<RowData> valueDeserialization;
+
+ private final PulsarRowDataConverter rowDataConverter;
+
+ private final boolean upsertMode;
+
+ private SourceMetricData sourceMetricData;
+
+ private MetricOption metricOption;
+
+ public PulsarTableDeserializationSchema(
+ @Nullable DeserializationSchema<RowData> keyDeserialization,
+ DeserializationSchema<RowData> valueDeserialization,
+ TypeInformation<RowData> producedTypeInfo,
+ PulsarRowDataConverter rowDataConverter,
+ boolean upsertMode,
+ MetricOption metricOption) {
+ if (upsertMode) {
+ checkNotNull(keyDeserialization, "upsert mode must specify a key
format");
+ }
+ this.keyDeserialization = keyDeserialization;
+ this.valueDeserialization = checkNotNull(valueDeserialization);
+ this.rowDataConverter = checkNotNull(rowDataConverter);
+ this.producedTypeInfo = checkNotNull(producedTypeInfo);
+ this.upsertMode = upsertMode;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public void open(PulsarInitializationContext context, SourceConfiguration
configuration)
+ throws Exception {
+ if (keyDeserialization != null) {
+ keyDeserialization.open(context);
+ }
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption);
+ }
+ valueDeserialization.open(context);
+ }
+
+ @Override
+ public void deserialize(Message<byte[]> message, Collector<RowData>
collector)
+ throws IOException {
+
+ // Get the key row data
+ List<RowData> keyRowData = new ArrayList<>();
+ if (keyDeserialization != null) {
+ keyDeserialization.deserialize(message.getKeyBytes(), new
ListCollector<>(keyRowData));
+ }
+
+ // Get the value row data
+ List<RowData> valueRowData = new ArrayList<>();
+
+ if (upsertMode && message.getData().length == 0) {
+ rowDataConverter.projectToRowWithNullValueRow(message, keyRowData,
collector);
+ return;
+ }
+
+ valueDeserialization.deserialize(message.getData(),
+ new MetricsCollector<>(new ListCollector<>(valueRowData),
sourceMetricData));
+
+ rowDataConverter.projectToProducedRowAndCollect(
+ message, keyRowData, valueRowData, collector);
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return producedTypeInfo;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
new file mode 100644
index 0000000000..87ce2c7541
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table.source;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata;
+import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Contains key, value projection and format information, and use such
information to create a
+ * {@link PulsarTableDeserializationSchema} instance used by runtime {@link
+ * org.apache.flink.connector.pulsar.source.PulsarSource} instance.
+ *
+ * <p>A Flink row fields has a strict order: Physical Fields (Key + value) +
Format Metadata Fields
+ * Connector Metadata Fields. Physical Fields are fields come directly from
Pulsar message body;
+ * Format Metadata Fields are from the extra information from the decoding
format. Connector
+ * metadata fields are the ones most Pulsar messages have, such as publish
time, message size and
+ * producer name.
+ *
+ * <p>In general, Physical fields + Format Metadata fields are contained in
the RowData decoded
+ * using valueDecodingFormat. Only Connector Metadata fields needs to be
appended to the decoded
+ * RowData. The tricky part is to put format metadata and connector metadata
in the right location.
+ * This requires an explicit adjustment process.
+ *
+ * <p>For example, suppose Physical Fields (Key + value) + Format Metadata
Fields + Connector
+ * Metadata Fields. has arity of 11, key projection is [0, 6], and physical
value projection is [1,
+ * 2, 3, 4, 5], Then after the adjustment, key projection should be [0, 6],
physical value
+ * projection should be [1, 2, 3, 4, 5] and format metadata projection should
be [7], connector
+ * metadata projection should be [8, 9, 10].
+ * Modify from {@link
org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory}
+ */
+public class PulsarTableDeserializationSchemaFactory implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final DataType physicalDataType;
+
+ @Nullable
+ private final DecodingFormat<DeserializationSchema<RowData>>
keyDecodingFormat;
+
+ private final int[] keyProjection;
+
+ private final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat;
+
+ private final int[] valueProjection;
+
+ //
--------------------------------------------------------------------------------------------
+ // Mutable attributes. Will be updated after the applyReadableMetadata()
+ //
--------------------------------------------------------------------------------------------
+ private DataType producedDataType;
+
+ private List<String> connectorMetadataKeys;
+
+ private final boolean upsertMode;
+
+ // audit related
+ private String inlongMetric;
+ private String auditHostAndPorts;
+ private String auditKeys;
+ private SourceMetricData sourceMetricData;
+
+ public PulsarTableDeserializationSchemaFactory(
+ DataType physicalDataType,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>>
keyDecodingFormat,
+ int[] keyProjection,
+ DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+ int[] valueProjection,
+ boolean upsertMode,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
+ this.physicalDataType =
+ checkNotNull(physicalDataType, "field physicalDataType must
not be null.");
+ this.keyDecodingFormat = keyDecodingFormat;
+ this.keyProjection = checkNotNull(keyProjection);
+ this.valueDecodingFormat =
+ checkNotNull(valueDecodingFormat, "field valueDecodingFormat
must not be null.");
+ this.valueProjection =
+ checkNotNull(valueProjection, "field valueProjection must not
be null.");
+
+ this.producedDataType = physicalDataType;
+ this.connectorMetadataKeys = Collections.emptyList();
+ this.upsertMode = upsertMode;
+
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.auditKeys = auditKeys;
+ }
+
+ private @Nullable DeserializationSchema<RowData> createDeserialization(
+ DynamicTableSource.Context context,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+ int[] projection,
+ @Nullable String prefix) {
+ if (format == null) {
+ return null;
+ }
+
+ DataType physicalFormatDataType =
Projection.of(projection).project(this.physicalDataType);
+ if (prefix != null) {
+ physicalFormatDataType =
DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+ }
+ return format.createRuntimeDecoder(context, physicalFormatDataType);
+ }
+
+ public PulsarDeserializationSchema<RowData> createPulsarDeserialization(
+ ScanTableSource.ScanContext context) {
+ final DeserializationSchema<RowData> keyDeserialization =
+ createDeserialization(context, keyDecodingFormat,
keyProjection, "");
+ final DeserializationSchema<RowData> valueDeserialization =
+ createDeserialization(context, valueDecodingFormat,
valueProjection, "");
+
+ final TypeInformation<RowData> producedTypeInfo =
+ context.createTypeInformation(producedDataType);
+
+ final PulsarReadableMetadata readableMetadata =
+ new PulsarReadableMetadata(connectorMetadataKeys);
+
+ // Get Physical Fields (key + value) + Format Metadata arity
+ final int physicalPlusFormatMetadataArity =
+ DataType.getFieldDataTypes(producedDataType).size()
+ - readableMetadata.getConnectorMetadataArity();
+ final int[] physicalValuePlusFormatMetadataProjection =
+
adjustValueProjectionByAppendConnectorMetadata(physicalPlusFormatMetadataArity);
+
+ final PulsarRowDataConverter rowDataConverter =
+ new PulsarRowDataConverter(
+ physicalPlusFormatMetadataArity,
+ keyProjection,
+ physicalValuePlusFormatMetadataProjection,
+ readableMetadata,
+ upsertMode);
+
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+
+ return new PulsarTableDeserializationSchema(
+ keyDeserialization,
+ valueDeserialization,
+ producedTypeInfo,
+ rowDataConverter,
+ upsertMode,
+ metricOption);
+ }
+
+ public void setProducedDataType(DataType producedDataType) {
+ this.producedDataType = producedDataType;
+ }
+
+ public void setConnectorMetadataKeys(List<String> metadataKeys) {
+ this.connectorMetadataKeys = metadataKeys;
+ }
+
+ private int[] adjustValueProjectionByAppendConnectorMetadata(
+ int physicalValuePlusFormatMetadataArity) {
+ // Concat the Physical Fields (value only) with Format metadata
projection.
+ final int[] physicalValuePlusFormatMetadataProjection =
+ IntStream.concat(
+ IntStream.of(valueProjection),
+ IntStream.range(
+ keyProjection.length + valueProjection.length,
+ physicalValuePlusFormatMetadataArity))
+ .toArray();
+ return physicalValuePlusFormatMetadataProjection;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PulsarTableDeserializationSchemaFactory that =
(PulsarTableDeserializationSchemaFactory) o;
+ return Objects.equals(physicalDataType, that.physicalDataType)
+ && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
+ && Arrays.equals(keyProjection, that.keyProjection)
+ && Objects.equals(valueDecodingFormat,
that.valueDecodingFormat)
+ && Arrays.equals(valueProjection, that.valueProjection)
+ && Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(connectorMetadataKeys,
that.connectorMetadataKeys)
+ && Objects.equals(upsertMode, that.upsertMode);
+ }
+
+ @Override
+ public int hashCode() {
+ int result =
+ Objects.hash(
+ physicalDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ producedDataType,
+ connectorMetadataKeys,
+ upsertMode);
+ result = 31 * result + Arrays.hashCode(keyProjection);
+ result = 31 * result + Arrays.hashCode(valueProjection);
+ return result;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
new file mode 100644
index 0000000000..281e3eb5e9
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ScanTableSource} implementation for Pulsar SQL Connector. It uses
a {@link
+ * SourceProvider} so it doesn't need to support {@link
+ *
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown}
interface.
+ *
+ * <p>{@link org.apache.flink.connector.pulsar.table.source.PulsarTableSource}
+ * Modify from {@link
org.apache.flink.connector.pulsar.table.source.PulsarTableSource}
+ */
+public class PulsarTableSource implements ScanTableSource,
SupportsReadingMetadata {
+ //
--------------------------------------------------------------------------------------------
+ // Format attributes
+ //
--------------------------------------------------------------------------------------------
+
+ private static final String FORMAT_METADATA_PREFIX = "value.";
+
+ private final PulsarTableDeserializationSchemaFactory
deserializationSchemaFactory;
+
+ /**
+ * Usually it is the same as the valueDecodingFormat, but use a different
naming to show that it
+ * is used to list all the format metadata keys.
+ */
+ private final DecodingFormat<DeserializationSchema<RowData>>
decodingFormatForReadingMetadata;
+
+ private final ChangelogMode changelogMode;
+
+ //
--------------------------------------------------------------------------------------------
+ // PulsarSource needed attributes
+ //
--------------------------------------------------------------------------------------------
+
+ private final List<String> topics;
+
+ private final Properties properties;
+
+ private final StartCursor startCursor;
+
+ private final StopCursor stopCursor;
+
+ private final SubscriptionType subscriptionType;
+
+ public PulsarTableSource(
+ PulsarTableDeserializationSchemaFactory
deserializationSchemaFactory,
+ DecodingFormat<DeserializationSchema<RowData>>
decodingFormatForReadingMetadata,
+ ChangelogMode changelogMode,
+ List<String> topics,
+ Properties properties,
+ StartCursor startCursor,
+ StopCursor stopCursor,
+ SubscriptionType subscriptionType) {
+ // Format attributes
+ this.deserializationSchemaFactory =
checkNotNull(deserializationSchemaFactory);
+ this.decodingFormatForReadingMetadata =
checkNotNull(decodingFormatForReadingMetadata);
+ this.changelogMode = changelogMode;
+ // DataStream connector attributes
+ this.topics = topics;
+ this.properties = checkNotNull(properties);
+ this.startCursor = checkNotNull(startCursor);
+ this.stopCursor = checkNotNull(stopCursor);
+ this.subscriptionType = checkNotNull(subscriptionType);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return changelogMode;
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+ PulsarDeserializationSchema<RowData> deserializationSchema =
+
deserializationSchemaFactory.createPulsarDeserialization(context);
+ PulsarSource<RowData> source =
+ PulsarSource.builder()
+ .setTopics(topics)
+ .setStartCursor(startCursor)
+ .setUnboundedStopCursor(stopCursor)
+ .setDeserializationSchema(deserializationSchema)
+ .setProperties(properties)
+ .build();
+ return SourceProvider.of(source);
+ }
+
+ /**
+ * According to convention, the order of the final row must be PHYSICAL +
FORMAT METADATA +
+ * CONNECTOR METADATA where the format metadata has the highest precedence.
+ *
+ * @return
+ */
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> allMetadataMap = new LinkedHashMap<>();
+
+ // add value format metadata with prefix
+ decodingFormatForReadingMetadata
+ .listReadableMetadata()
+ .forEach((key, value) ->
allMetadataMap.put(FORMAT_METADATA_PREFIX + key, value));
+ // add connector metadata
+ Stream.of(PulsarReadableMetadata.ReadableMetadata.values())
+ .forEachOrdered(m -> allMetadataMap.putIfAbsent(m.key,
m.dataType));
+
+ return allMetadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> allMetadataKeys, DataType
producedDataType) {
+ // separate connector and format metadata
+ final List<String> formatMetadataKeys =
+ allMetadataKeys.stream()
+ .filter(k -> k.startsWith(FORMAT_METADATA_PREFIX))
+ .collect(Collectors.toList());
+
+ final List<String> connectorMetadataKeys = new
ArrayList<>(allMetadataKeys);
+ connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+ // push down format metadata
+ final Map<String, DataType> formatMetadata =
+ decodingFormatForReadingMetadata.listReadableMetadata();
+ if (formatMetadata.size() > 0) {
+ final List<String> requestedFormatMetadataKeys =
+ formatMetadataKeys.stream()
+ .map(k ->
k.substring(FORMAT_METADATA_PREFIX.length()))
+ .collect(Collectors.toList());
+
decodingFormatForReadingMetadata.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+
+ // update the factory attributes.
+
deserializationSchemaFactory.setConnectorMetadataKeys(connectorMetadataKeys);
+ deserializationSchemaFactory.setProducedDataType(producedDataType);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Pulsar table source";
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new PulsarTableSource(
+ deserializationSchemaFactory,
+ decodingFormatForReadingMetadata,
+ changelogMode,
+ topics,
+ properties,
+ startCursor,
+ stopCursor,
+ subscriptionType);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ ConcurrentHashMap map = new ConcurrentHashMap<>();
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PulsarTableSource that = (PulsarTableSource) o;
+ return Objects.equals(deserializationSchemaFactory,
that.deserializationSchemaFactory)
+ && Objects.equals(
+ decodingFormatForReadingMetadata,
that.decodingFormatForReadingMetadata)
+ && Objects.equals(changelogMode, that.changelogMode)
+ && Objects.equals(topics, that.topics)
+ && Objects.equals(properties, that.properties)
+ && Objects.equals(startCursor, that.startCursor)
+ && Objects.equals(stopCursor, that.stopCursor)
+ && subscriptionType == that.subscriptionType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ deserializationSchemaFactory,
+ decodingFormatForReadingMetadata,
+ changelogMode,
+ topics,
+ properties,
+ startCursor,
+ stopCursor,
+ subscriptionType);
+ }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index e0443b0fd6..5a13bc9fe5 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -835,6 +835,9 @@
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note
that the software have been modified.)
License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE