This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ca0abf2438 [INLONG-9247][Sort] TubeMQ source support audit when the
deserialized type is not InlongMsg (#9258)
ca0abf2438 is described below
commit ca0abf243863cac31fe9f89da2c01a812e347b65
Author: vernedeng <[email protected]>
AuthorDate: Mon Nov 13 10:51:32 2023 +0800
[INLONG-9247][Sort] TubeMQ source support audit when the deserialized type
is not InlongMsg (#9258)
---
.../inlong/sort/protocol/node/ExtractNode.java | 4 +
.../protocol/node/extract/TubeMQExtractNode.java | 9 +-
.../sort-connectors/tubemq/pom.xml | 5 +
.../inlong/sort/tubemq/FlinkTubeMQConsumer.java | 12 +-
.../table/DynamicTubeMQDeserializationSchema.java | 148 ++++-----------------
...> DynamicTubeMQTableDeserializationSchema.java} | 41 +++---
.../tubemq/table/TubeMQDynamicTableFactory.java | 10 +-
.../sort/tubemq/table/TubeMQTableSource.java | 13 +-
8 files changed, 85 insertions(+), 157 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index b9d649d621..fc68f0f356 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -73,6 +73,10 @@ public abstract class ExtractNode implements Node {
public static final String INLONG_MSG = "inlong-msg";
+ public static final String INLONG_MSG_AUDIT_TIME = "value.data-time";
+
+ public static final String CONSUME_AUDIT_TIME = "consume_time";
+
@JsonProperty("id")
private String id;
@JsonInclude(Include.NON_NULL)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
index d67ef15a09..327cb522a2 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
@@ -25,6 +25,7 @@ import org.apache.inlong.sort.protocol.Metadata;
import org.apache.inlong.sort.protocol.constant.TubeMQConstant;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
import com.google.common.base.Preconditions;
@@ -129,10 +130,14 @@ public class TubeMQExtractNode extends ExtractNode
implements Serializable, Inlo
String metadataKey;
switch (metaField) {
case AUDIT_DATA_TIME:
- metadataKey = "value.data-time";
+ if (format instanceof InLongMsgFormat) {
+ metadataKey = INLONG_MSG_AUDIT_TIME;
+ } else {
+ metadataKey = CONSUME_AUDIT_TIME;
+ }
break;
default:
- throw new
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+ throw new
UnsupportedOperationException(String.format("Unsupported meta field for %s: %s",
this.getClass().getSimpleName(), metaField));
}
return metadataKey;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
index 5cc11b3783..898c56643c 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
@@ -58,6 +58,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index abd69f8ecb..b9fb6d1b0d 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.tubemq;
+import org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema;
import org.apache.inlong.sort.tubemq.table.TubeMQOptions;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
@@ -27,7 +28,6 @@ import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
import org.apache.flink.api.common.functions.util.ListCollector;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
@@ -92,7 +92,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
/**
* The deserializer for records.
*/
- private final DeserializationSchema<T> deserializationSchema;
+ private final DynamicTubeMQDeserializationSchema<T> deserializationSchema;
/**
* The random key for TubeMQ consumer group when startup.
@@ -158,7 +158,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
String topic,
TreeSet<String> streamIdSet,
String consumerGroup,
- DeserializationSchema<T> deserializationSchema,
+ DynamicTubeMQDeserializationSchema<T> deserializationSchema,
Configuration configuration,
String sessionKey,
Boolean innerFormat) {
@@ -208,7 +208,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
@Override
public void open(Configuration parameters) throws Exception {
- deserializationSchema.open(null);
+ deserializationSchema.open();
ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress,
consumerGroup);
consumerConfig.setConsumePosition(consumeFromMax
? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
@@ -292,14 +292,14 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
lastConsumeInstant = Instant.now();
if (!innerFormat) {
for (Message message : messageList) {
- T record =
deserializationSchema.deserialize(message.getData());
+ T record = deserializationSchema.deserialize(message);
records.add(record);
}
} else {
List<RowData> rowDataList = new ArrayList<>();
ListCollector<RowData> out = new ListCollector<>(rowDataList);
for (Message message : messageList) {
- deserializationSchema.deserialize(message.getData(),
(Collector<T>) out);
+ deserializationSchema.deserialize(message, (Collector<T>)
out);
}
rowDataList.forEach(data -> records.add((T) data));
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
index f68bc4cf5e..4c4eaac841 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -17,143 +17,45 @@
package org.apache.inlong.sort.tubemq.table;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.tubemq.corebase.Message;
-import com.google.common.base.Objects;
-import org.apache.flink.api.common.functions.util.ListCollector;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-public class DynamicTubeMQDeserializationSchema implements
DeserializationSchema<RowData> {
+public interface DynamicTubeMQDeserializationSchema<T> extends Serializable,
ResultTypeQueryable<T> {
- private static final Logger LOG =
LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class);
- /**
- * data buffer message
- */
- private final DeserializationSchema<RowData> deserializationSchema;
-
- /**
- * {@link MetadataConverter} of how to produce metadata from message.
- */
- private final MetadataConverter[] metadataConverters;
+ @PublicEvolving
+ default void open() throws Exception {
+ }
/**
- * {@link TypeInformation} of the produced {@link RowData} (physical +
meta data).
+ * Deserializes the byte message.
+ *
+ * @param message The message, as a byte array.
+ * @return The deserialized message as an object (null if the message
cannot be deserialized).
*/
- private final TypeInformation<RowData> producedTypeInfo;
+ T deserialize(Message message) throws IOException;
/**
- * status of error
+ * Deserializes the byte message.
+ *
+ * <p>Can output multiple records through the {@link Collector}. Note that
number and size of
+ * the produced records should be relatively small. Depending on the
source implementation
+ * records can be buffered in memory or collecting records might delay
emitting checkpoint
+ * barrier.
+ *
+ * @param message The message, as a byte array.
+ * @param out The collector to put the resulting messages.
*/
- private final boolean ignoreErrors;
-
- private SourceMetricData sourceMetricData;
-
- private MetricOption metricOption;
-
- public DynamicTubeMQDeserializationSchema(
- DeserializationSchema<RowData> schema,
- MetadataConverter[] metadataConverters,
- TypeInformation<RowData> producedTypeInfo,
- boolean ignoreErrors,
- MetricOption metricOption) {
- this.deserializationSchema = schema;
- this.metadataConverters = metadataConverters;
- this.producedTypeInfo = producedTypeInfo;
- this.ignoreErrors = ignoreErrors;
- this.metricOption = metricOption;
- }
-
- @Override
- public void open(InitializationContext context) {
- if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption);
+ @PublicEvolving
+ default void deserialize(Message message, Collector<T> out) throws
IOException {
+ T deserialize = deserialize(message);
+ if (deserialize != null) {
+ out.collect(deserialize);
}
}
-
- @Override
- public RowData deserialize(byte[] bytes) throws IOException {
- return deserializationSchema.deserialize(bytes);
- }
-
- @Override
- public void deserialize(byte[] message, Collector<RowData> out) throws
IOException {
- List<RowData> rows = new ArrayList<>();
- deserializationSchema.deserialize(message,
- new MetricsCollector<>(new ListCollector<>(rows),
sourceMetricData));
- rows.forEach(out::collect);
- }
-
- @Override
- public boolean isEndOfStream(RowData rowData) {
- return false;
- }
-
- @Override
- public TypeInformation<RowData> getProducedType() {
- return producedTypeInfo;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof DynamicTubeMQDeserializationSchema)) {
- return false;
- }
- DynamicTubeMQDeserializationSchema that =
(DynamicTubeMQDeserializationSchema) o;
- return ignoreErrors == that.ignoreErrors
- &&
Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
-
Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
- && Objects.equal(deserializationSchema,
that.deserializationSchema)
- && Objects.equal(producedTypeInfo, that.producedTypeInfo);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(deserializationSchema, metadataConverters,
producedTypeInfo, ignoreErrors);
- }
-
- /**
- * add metadata column
- */
- private void emitRow(Message head, GenericRowData physicalRow,
Collector<RowData> out) {
- if (metadataConverters.length == 0) {
- out.collect(physicalRow);
- return;
- }
- final int physicalArity = physicalRow.getArity();
- final int metadataArity = metadataConverters.length;
- final GenericRowData producedRow =
- new GenericRowData(physicalRow.getRowKind(), physicalArity +
metadataArity);
- for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
- producedRow.setField(physicalPos,
physicalRow.getField(physicalPos));
- }
- for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
- producedRow.setField(
- physicalArity + metadataPos,
metadataConverters[metadataPos].read(head));
- }
- out.collect(producedRow);
- }
-
- interface MetadataConverter extends Serializable {
-
- Object read(Message head);
- }
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
similarity index 81%
copy from
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
copy to
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
index f68bc4cf5e..8ee154c535 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
@@ -29,8 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -39,9 +37,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
-public class DynamicTubeMQDeserializationSchema implements
DeserializationSchema<RowData> {
+public class DynamicTubeMQTableDeserializationSchema implements
DynamicTubeMQDeserializationSchema<RowData> {
- private static final Logger LOG =
LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class);
/**
* data buffer message
*/
@@ -62,46 +59,54 @@ public class DynamicTubeMQDeserializationSchema implements
DeserializationSchema
*/
private final boolean ignoreErrors;
+ private final boolean innerFormat;
+
private SourceMetricData sourceMetricData;
private MetricOption metricOption;
- public DynamicTubeMQDeserializationSchema(
+ public DynamicTubeMQTableDeserializationSchema(
DeserializationSchema<RowData> schema,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean ignoreErrors,
+ boolean innerFormat,
MetricOption metricOption) {
this.deserializationSchema = schema;
this.metadataConverters = metadataConverters;
this.producedTypeInfo = producedTypeInfo;
this.ignoreErrors = ignoreErrors;
+ this.innerFormat = innerFormat;
this.metricOption = metricOption;
}
@Override
- public void open(InitializationContext context) {
+ public void open() {
if (metricOption != null) {
sourceMetricData = new SourceMetricData(metricOption);
}
}
@Override
- public RowData deserialize(byte[] bytes) throws IOException {
- return deserializationSchema.deserialize(bytes);
+ public RowData deserialize(Message message) throws IOException {
+ return deserializationSchema.deserialize(message.getData());
}
@Override
- public void deserialize(byte[] message, Collector<RowData> out) throws
IOException {
+ public void deserialize(Message message, Collector<RowData> out) throws
IOException {
List<RowData> rows = new ArrayList<>();
- deserializationSchema.deserialize(message,
- new MetricsCollector<>(new ListCollector<>(rows),
sourceMetricData));
- rows.forEach(out::collect);
- }
- @Override
- public boolean isEndOfStream(RowData rowData) {
- return false;
+ MetricsCollector<RowData> metricsCollector =
+ new MetricsCollector<>(new ListCollector<>(rows),
sourceMetricData);
+
+ // reset time stamp if the deserialize schema has not inner format
+ if (!innerFormat) {
+ metricsCollector.resetTimestamp(System.currentTimeMillis());
+ }
+ deserializationSchema.deserialize(message.getData(), metricsCollector);
+
+ rows.forEach(row -> emitRow(message, (GenericRowData) row, out));
+
}
@Override
@@ -114,10 +119,10 @@ public class DynamicTubeMQDeserializationSchema
implements DeserializationSchema
if (this == o) {
return true;
}
- if (!(o instanceof DynamicTubeMQDeserializationSchema)) {
+ if (!(o instanceof DynamicTubeMQTableDeserializationSchema)) {
return false;
}
- DynamicTubeMQDeserializationSchema that =
(DynamicTubeMQDeserializationSchema) o;
+ DynamicTubeMQTableDeserializationSchema that =
(DynamicTubeMQTableDeserializationSchema) o;
return ignoreErrors == that.ignoreErrors
&&
Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index e3a0a0c6d7..4962364a9c 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.tubemq.table;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
@@ -66,8 +68,6 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
public static final String IDENTIFIER = "tubemq-inlong";
- public static final String INNERFORMATTYPE = "inlong-msg";
-
public static boolean innerFormat = false;
private static DecodingFormat<DeserializationSchema<RowData>>
getValueDecodingFormat(
@@ -120,10 +120,10 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat = getValueDecodingFormat(helper);
// validate all options
- helper.validateExcept(INNERFORMATTYPE);
+ helper.validateExcept(ExtractNode.INLONG_MSG);
validatePKConstraints(context.getObjectIdentifier(),
context.getCatalogTable(), valueDecodingFormat);
- innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT));
+ innerFormat = ExtractNode.INLONG_MSG.equals(tableOptions.get(FORMAT));
final Configuration properties =
getTubeMQProperties(context.getCatalogTable().getOptions());
@@ -156,7 +156,7 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat
= getValueEncodingFormat(helper);
// validate all options
- helper.validateExcept(INNERFORMATTYPE);
+ helper.validateExcept(ExtractNode.INLONG_MSG);
validatePKConstraints(context.getObjectIdentifier(),
context.getCatalogTable(), valueEncodingFormat);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
index 4a1d332ca8..2d5ddbb3d5 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -18,8 +18,9 @@
package org.apache.inlong.sort.tubemq.table;
import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
-import
org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter;
+import
org.apache.inlong.sort.tubemq.table.DynamicTubeMQTableDeserializationSchema.MetadataConverter;
import org.apache.inlong.tubemq.corebase.Message;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -322,8 +323,9 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
.withAuditKeys(auditKeys)
.build();
- final DeserializationSchema<RowData> tubeMQDeserializer = new
DynamicTubeMQDeserializationSchema(
- deserialization, metadataConverters, producedTypeInfo,
ignoreErrors, metricOption);
+ final DynamicTubeMQDeserializationSchema<RowData> tubeMQDeserializer =
+ new DynamicTubeMQTableDeserializationSchema(
+ deserialization, metadataConverters, producedTypeInfo,
ignoreErrors, innerFormat, metricOption);
final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new
FlinkTubeMQConsumer(masterAddress, topic, streamIdSet,
consumerGroup, tubeMQDeserializer, configuration, sessionKey,
innerFormat);
@@ -336,6 +338,11 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
enum ReadableMetadata {
+ CONSUME_TIME(
+ ExtractNode.CONSUME_AUDIT_TIME,
+ DataTypes.BIGINT().notNull(),
+ m -> System.currentTimeMillis()),
+
TOPIC(
"topic",
DataTypes.STRING().notNull(),