This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 41c3984877 [INLONG-10623][Sort] Fix the Pulsar connector on flink1.18
not set audit time (#10624)
41c3984877 is described below
commit 41c3984877e43bde8b1d1624ab4130ae883d31a1
Author: XiaoYou201 <[email protected]>
AuthorDate: Mon Jul 15 14:15:09 2024 +0800
[INLONG-10623][Sort] Fix the Pulsar connector on flink1.18 not set audit
time (#10624)
---
.../sort-connectors/pulsar/pom.xml | 5 +
.../table/source/PulsarReadableMetadata.java | 163 +++++++++++++++++++++
.../table/source/PulsarRowDataConverter.java | 132 +++++++++++++++++
.../source/PulsarTableDeserializationSchema.java | 9 +-
.../PulsarTableDeserializationSchemaFactory.java | 2 -
.../pulsar/table/source/PulsarTableSource.java | 1 -
licenses/inlong-sort-connectors/LICENSE | 2 +
7 files changed, 307 insertions(+), 7 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
index 6721922c29..79c94ad40f 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
@@ -42,6 +42,11 @@
<artifactId>pulsar-client-all</artifactId>
<version>${pulsar.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-base</artifactId>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java
new file mode 100644
index 0000000000..8b4e6a8a79
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table.source;
+
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Collector;
+import org.apache.pulsar.client.api.Message;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.inlong.sort.pulsar.table.source.PulsarReadableMetadata.ReadableMetadata.CONSUME_TIME;
+
+/**
+ * Class for reading metadata fields from a Pulsar message and put in
corresponding Flink row
+ * fields.
+ *
+ * <p>Contains list of readable metadata and provide util methods for metadata
manipulation.
+ * Modify from {@link
org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata}
+ */
+public class PulsarReadableMetadata implements Serializable {
+
+ private static final long serialVersionUID = -4409932324481235973L;
+
+ private final List<String> connectorMetadataKeys;
+
+ private final List<MetadataConverter> metadataConverters;
+
+ public PulsarReadableMetadata(List<String> connectorMetadataKeys) {
+ this.connectorMetadataKeys = connectorMetadataKeys;
+ this.metadataConverters = initializeMetadataConverters();
+ }
+
+ private List<MetadataConverter> initializeMetadataConverters() {
+ return connectorMetadataKeys.stream()
+ .map(
+ k -> Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .collect(Collectors.toList());
+ }
+
+ public void appendProducedRowWithMetadata(
+ GenericRowData producedRowData, int physicalArity, Message<?>
message, Collector<RowData> collector) {
+ for (int metadataPos = 0; metadataPos < metadataConverters.size();
metadataPos++) {
+ Object metadata =
metadataConverters.get(metadataPos).read(message);
+ producedRowData.setField(
+ physicalArity + metadataPos, metadata);
+ if
(CONSUME_TIME.key.equals(connectorMetadataKeys.get(metadataPos)) &&
+ collector instanceof MetricsCollector) {
+ ((MetricsCollector<RowData>) collector).resetTimestamp((Long)
metadata);
+ }
+
+ }
+ }
+
+ public int getConnectorMetadataArity() {
+ return metadataConverters.size();
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Metadata handling
+ //
--------------------------------------------------------------------------------------------
+ interface MetadataConverter extends Serializable {
+
+ Object read(Message<?> message);
+ }
+
+ /** Lists the metadata that is readable from a Pulsar message. Used in SQL
source connector. */
+ public enum ReadableMetadata {
+
+ TOPIC(
+ "topic",
+ DataTypes.STRING().notNull(),
+ message -> StringData.fromString(message.getTopicName())),
+
+ MESSAGE_SIZE("message_size", DataTypes.INT().notNull(), Message::size),
+
+ PRODUCER_NAME(
+ "producer_name",
+ DataTypes.STRING().notNull(),
+ message -> StringData.fromString(message.getProducerName())),
+
+ MESSAGE_ID(
+ "message_id",
+ DataTypes.BYTES().notNull(),
+ message -> message.getMessageId().toByteArray()),
+
+ SEQUENCE_ID("sequenceId", DataTypes.BIGINT().notNull(),
Message::getSequenceId),
+
+ PUBLISH_TIME(
+ "publish_time",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ message ->
TimestampData.fromEpochMillis(message.getPublishTime())),
+
+ EVENT_TIME(
+ "event_time",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ message ->
TimestampData.fromEpochMillis(message.getEventTime())),
+
+ CONSUME_TIME(
+ ExtractNode.CONSUME_AUDIT_TIME,
+ DataTypes.BIGINT().notNull(),
+ message -> System.currentTimeMillis()),
+
+ PROPERTIES(
+ "properties",
+ // key and value of the map are nullable to make handling
easier in queries
+ DataTypes.MAP(DataTypes.STRING().nullable(),
DataTypes.STRING().nullable())
+ .notNull(),
+ message -> {
+ final Map<StringData, StringData> map = new HashMap<>();
+ for (Map.Entry<String, String> e :
message.getProperties().entrySet()) {
+ map.put(
+ StringData.fromString(e.getKey()),
+ StringData.fromString(e.getValue()));
+ }
+ return new GenericMapData(map);
+ });
+
+ public final String key;
+
+ public final DataType dataType;
+
+ public final MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType, MetadataConverter
converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
new file mode 100644
index 0000000000..44abbaf7c4
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table.source;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.pulsar.client.api.Message;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Contains the projection information needed to map a Pulsar message to
proper key fields, value
+ * fields and metadata fields.
+ * Modify from {@link
org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter}
+ */
+public class PulsarRowDataConverter implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int physicalArity;
+
+ private final int[] keyProjection;
+
+ private final int[] valueProjection;
+
+ private final PulsarReadableMetadata readableMetadata;
+
+ private final boolean upsertMode;
+
+ public PulsarRowDataConverter(
+ int physicalArity,
+ int[] keyProjection,
+ int[] valueProjection,
+ PulsarReadableMetadata readableMetadata,
+ boolean upsertMode) {
+ this.physicalArity = physicalArity;
+ this.keyProjection = keyProjection;
+ this.valueProjection = valueProjection;
+ this.readableMetadata = readableMetadata;
+ this.upsertMode = upsertMode;
+ }
+
+ public void projectToProducedRowAndCollect(
+ Message<?> message,
+ List<RowData> keyRowDataList,
+ List<RowData> valueRowDataList,
+ Collector<RowData> collector) {
+ // no key defined
+ if (hasNoKeyProjection()) {
+ valueRowDataList.forEach(
+ valueRow -> emitRow(null, (GenericRowData) valueRow,
collector, message));
+ } else {
+ // otherwise emit a value for each key
+ valueRowDataList.forEach(
+ valueRow -> keyRowDataList.forEach(
+ keyRow -> emitRow(
+ (GenericRowData) keyRow,
+ (GenericRowData) valueRow,
+ collector,
+ message)));
+ }
+ }
+
+ public void projectToRowWithNullValueRow(
+ Message<?> message, List<RowData> keyRowDataList,
Collector<RowData> collector) {
+ for (RowData keyRow : keyRowDataList) {
+ emitRow((GenericRowData) keyRow, null, collector, message);
+ }
+ }
+
+ private void emitRow(
+ @Nullable GenericRowData physicalKeyRow,
+ @Nullable GenericRowData physicalValueRow,
+ Collector<RowData> collector,
+ Message<?> message) {
+
+ final RowKind rowKind;
+ if (physicalValueRow == null) {
+ if (upsertMode) {
+ rowKind = RowKind.DELETE;
+ } else {
+ throw new DeserializationException(
+ "Invalid null value received in non-upsert mode. Could
not to set row kind for output record."
+ + "upsert mode is not supported yet.");
+ }
+
+ } else {
+ rowKind = physicalValueRow.getRowKind();
+ }
+
+ final GenericRowData producedRow =
+ new GenericRowData(
+ rowKind, physicalArity +
readableMetadata.getConnectorMetadataArity());
+
+ for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
+ producedRow.setField(valueProjection[valuePos],
physicalValueRow.getField(valuePos));
+ }
+
+ for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
+ assert physicalKeyRow != null;
+ producedRow.setField(keyProjection[keyPos],
physicalKeyRow.getField(keyPos));
+ }
+
+ readableMetadata.appendProducedRowWithMetadata(producedRow,
physicalArity, message, collector);
+ collector.collect(producedRow);
+ }
+
+ private boolean hasNoKeyProjection() {
+ return keyProjection.length == 0;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
index 13ec80862f..17466899d7 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
@@ -26,7 +26,6 @@ import
org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.pulsar.client.api.Message;
@@ -117,12 +116,14 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
rowDataConverter.projectToRowWithNullValueRow(message, keyRowData,
collector);
return;
}
+ MetricsCollector<RowData> metricsCollector =
+ new MetricsCollector<>(collector, sourceMetricData);
- valueDeserialization.deserialize(message.getData(),
- new MetricsCollector<>(new ListCollector<>(valueRowData),
sourceMetricData));
+ valueDeserialization.deserialize(message.getData(), new
ListCollector<>(valueRowData));
rowDataConverter.projectToProducedRowAndCollect(
- message, keyRowData, valueRowData, collector);
+ message, keyRowData, valueRowData, metricsCollector);
+
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
index 87ce2c7541..6698e1e12b 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
@@ -23,8 +23,6 @@ import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata;
-import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
index 8864ca45c0..bf48356d26 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
@@ -24,7 +24,6 @@ import
org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index b7c89b9245..c340f78668 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -851,6 +851,8 @@
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarReadableMetadata.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchema.java
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableDeserializationSchemaFactory.java
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java