This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ec6a8dadd6 [INLONG-9223][Sort] TubeMQ source support InlongAudit
(#9236)
ec6a8dadd6 is described below
commit ec6a8dadd698d0733f545ea0d1cbee196b549c92
Author: vernedeng <[email protected]>
AuthorDate: Thu Nov 9 10:22:19 2023 +0800
[INLONG-9223][Sort] TubeMQ source support InlongAudit (#9236)
* [INLONG-9223][Sort] TubeMQ source support InlongAudit
---
.../protocol/node/extract/TubeMQExtractNode.java | 31 +++++++++++++-
.../sort-connectors/tubemq/pom.xml | 11 +++++
.../inlong/sort/tubemq/FlinkTubeMQConsumer.java | 31 +++++++-------
.../table/DynamicTubeMQDeserializationSchema.java | 29 ++++++++++++-
.../tubemq/table/TubeMQDynamicTableFactory.java | 43 +++++++++++++++----
.../sort/tubemq/table/TubeMQTableSource.java | 49 ++++++++++++++++------
.../sort-connectors/tubemq/pom.xml | 6 +++
.../inlong/sort/tubemq/FlinkTubeMQConsumer.java | 1 +
.../table/DynamicTubeMQDeserializationSchema.java | 29 ++++++++++++-
.../tubemq/table/TubeMQDynamicTableFactory.java | 25 +++++++++--
.../sort/tubemq/table/TubeMQTableSource.java | 29 +++++++++++--
11 files changed, 238 insertions(+), 46 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
index cc6ac2b5f1..d67ef15a09 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
@@ -17,8 +17,11 @@
package org.apache.inlong.sort.protocol.node.extract;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.sort.formats.util.StringUtils;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.Metadata;
import org.apache.inlong.sort.protocol.constant.TubeMQConstant;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.format.Format;
@@ -35,8 +38,10 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Serializable;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
/**
@@ -45,7 +50,7 @@ import java.util.TreeSet;
@EqualsAndHashCode(callSuper = true)
@JsonTypeName("tubeMQExtract")
@Data
-public class TubeMQExtractNode extends ExtractNode implements Serializable {
+public class TubeMQExtractNode extends ExtractNode implements Serializable,
InlongMetric, Metadata {
private static final long serialVersionUID = -2544747886429528474L;
@@ -119,4 +124,28 @@ public class TubeMQExtractNode extends ExtractNode
implements Serializable {
return String.format("table_%s", super.getId());
}
+ @Override
+ public String getMetadataKey(MetaField metaField) {
+ String metadataKey;
+ switch (metaField) {
+ case AUDIT_DATA_TIME:
+ metadataKey = "value.data-time";
+ break;
+ default:
+ throw new
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+ this.getClass().getSimpleName(), metaField));
+ }
+ return metadataKey;
+ }
+
+ @Override
+ public boolean isVirtual(MetaField metaField) {
+ return true;
+ }
+
+ @Override
+ public Set<MetaField> supportedMetaFields() {
+ return EnumSet.of(MetaField.AUDIT_DATA_TIME);
+ }
+
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml
index d0cca1e07b..19dd75f435 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml
@@ -46,6 +46,17 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index 96c27d2b9b..abd69f8ecb 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -68,8 +68,6 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
private static final Logger LOG =
LoggerFactory.getLogger(FlinkTubeMQConsumer.class);
private static final String TUBE_OFFSET_STATE = "tube-offset-state";
- private static final String SPLIT_COMMA = ",";
- private static final String SPLIT_COLON = ":";
/**
* The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
@@ -82,9 +80,9 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
private final String topic;
/**
- * The tubemq consumers use this tid set to filter records reading from
server.
+ * The tubemq consumers use this streamId set to filter records reading
from server.
*/
- private final TreeSet<String> tidSet;
+ private final TreeSet<String> streamIdSet;
/**
* The consumer group name.
@@ -130,7 +128,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
/**
* The current offsets of partitions which are stored in {@link
#offsetsState}
* once a checkpoint is triggered.
- *
+ * <p>
* NOTE: The offsets are populated in the main thread and saved in the
* checkpoint thread. Its usage must be guarded by the checkpoint lock.</p>
*/
@@ -147,18 +145,18 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
/**
* Build a TubeMQ source function
*
- * @param masterAddress the master address of TubeMQ
- * @param topic the topic name
- * @param tidSet the topic's filter condition items
- * @param consumerGroup the consumer group name
+ * @param masterAddress the master address of TubeMQ
+ * @param topic the topic name
+ * @param streamIdSet the topic's filter condition items
+ * @param consumerGroup the consumer group name
* @param deserializationSchema the deserialize schema
- * @param configuration the configure
- * @param sessionKey the tube session key
+ * @param configuration the configure
+ * @param sessionKey the tube session key
*/
public FlinkTubeMQConsumer(
String masterAddress,
String topic,
- TreeSet<String> tidSet,
+ TreeSet<String> streamIdSet,
String consumerGroup,
DeserializationSchema<T> deserializationSchema,
Configuration configuration,
@@ -166,14 +164,14 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
Boolean innerFormat) {
checkNotNull(masterAddress, "The master address must not be null.");
checkNotNull(topic, "The topic must not be null.");
- checkNotNull(tidSet, "The tid set must not be null.");
+ checkNotNull(streamIdSet, "The streamId set must not be null.");
checkNotNull(consumerGroup, "The consumer group must not be null.");
checkNotNull(deserializationSchema, "The deserialization schema must
not be null.");
checkNotNull(configuration, "The configuration must not be null.");
this.masterAddress = masterAddress;
this.topic = topic;
- this.tidSet = tidSet;
+ this.streamIdSet = streamIdSet;
this.consumerGroup = consumerGroup;
this.deserializationSchema = deserializationSchema;
this.sessionKey = sessionKey;
@@ -210,6 +208,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
@Override
public void open(Configuration parameters) throws Exception {
+ deserializationSchema.open(null);
ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress,
consumerGroup);
consumerConfig.setConsumePosition(consumeFromMax
? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
@@ -220,7 +219,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
messagePullConsumer =
messageSessionFactory.createPullConsumer(consumerConfig);
- messagePullConsumer.subscribe(topic, tidSet);
+ messagePullConsumer.subscribe(topic, streamIdSet);
String jobId = getRuntimeContext().getJobId().toString();
messagePullConsumer.completeSubscribe(sessionKey.concat(jobId),
numTasks, true, currentOffsets);
@@ -305,7 +304,9 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
rowDataList.forEach(data -> records.add((T) data));
}
}
+
return lastConsumeInstant;
+
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
index f5880f1a78..f68bc4cf5e 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -17,22 +17,31 @@
package org.apache.inlong.sort.tubemq.table;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.tubemq.corebase.Message;
import com.google.common.base.Objects;
+import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.stream.Collectors;
public class DynamicTubeMQDeserializationSchema implements
DeserializationSchema<RowData> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class);
/**
* data buffer message
*/
@@ -53,15 +62,28 @@ public class DynamicTubeMQDeserializationSchema implements
DeserializationSchema
*/
private final boolean ignoreErrors;
+ private SourceMetricData sourceMetricData;
+
+ private MetricOption metricOption;
+
public DynamicTubeMQDeserializationSchema(
DeserializationSchema<RowData> schema,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
- boolean ignoreErrors) {
+ boolean ignoreErrors,
+ MetricOption metricOption) {
this.deserializationSchema = schema;
this.metadataConverters = metadataConverters;
this.producedTypeInfo = producedTypeInfo;
this.ignoreErrors = ignoreErrors;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption);
+ }
}
@Override
@@ -71,7 +93,10 @@ public class DynamicTubeMQDeserializationSchema implements
DeserializationSchema
@Override
public void deserialize(byte[] message, Collector<RowData> out) throws
IOException {
- deserializationSchema.deserialize(message, out);
+ List<RowData> rows = new ArrayList<>();
+ deserializationSchema.deserialize(message,
+ new MetricsCollector<>(new ListCollector<>(rows),
sourceMetricData));
+ rows.forEach(out::collect);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 43fb3e198e..6af6f8f645 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.tubemq.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
@@ -25,6 +26,7 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.format.Format;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
@@ -32,6 +34,7 @@ import
org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
@@ -41,6 +44,9 @@ import java.util.Set;
import java.util.TreeSet;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
@@ -68,13 +74,18 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory {
.orElseGet(() ->
helper.discoverDecodingFormat(DeserializationFormatFactory.class, FORMAT));
}
+ private static EncodingFormat<SerializationSchema<RowData>>
getValueEncodingFormat(
+ TableFactoryHelper helper) {
+ return
helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, FORMAT)
+ .orElseGet(() ->
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT));
+ }
+
private static void validatePKConstraints(
ObjectIdentifier tableName, CatalogTable catalogTable, Format
format) {
if (catalogTable.getSchema().getPrimaryKey().isPresent()
&& format.getChangelogMode().containsOnly(RowKind.INSERT)) {
Configuration options =
Configuration.fromMap(catalogTable.getOptions());
String formatName =
options.getOptional(FORMAT).orElse(options.get(FORMAT));
- innerFormat = INNERFORMATTYPE.equals(formatName);
throw new ValidationException(String.format(
"The TubeMQ table '%s' with '%s' format doesn't support
defining PRIMARY KEY constraint"
+ " on the table, because it can't guarantee the
semantic of primary key.",
@@ -110,10 +121,15 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory {
helper.validateExcept(INNERFORMATTYPE);
validatePKConstraints(context.getObjectIdentifier(),
context.getCatalogTable(), valueDecodingFormat);
+ innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT));
final Configuration properties =
getTubeMQProperties(context.getCatalogTable().getOptions());
- final DataType physicalDataType =
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+ final DataType physicalDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+
+ String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+ String auditKeys = tableOptions.get(AUDIT_KEYS);
return createTubeMQTableSource(
physicalDataType,
@@ -123,7 +139,10 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory {
TubeMQOptions.getTiSet(tableOptions),
TubeMQOptions.getConsumerGroup(tableOptions),
TubeMQOptions.getSessionKey(tableOptions),
- properties);
+ properties,
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
}
protected TubeMQTableSource createTubeMQTableSource(
@@ -131,23 +150,29 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory {
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
String topic,
String url,
- TreeSet<String> tid,
+ TreeSet<String> streamId,
String consumerGroup,
String sessionKey,
- Configuration properties) {
+ Configuration properties,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
return new TubeMQTableSource(
physicalDataType,
valueDecodingFormat,
url,
topic,
- tid,
+ streamId,
consumerGroup,
sessionKey,
properties,
null,
null,
false,
- innerFormat);
+ innerFormat,
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
}
@Override
@@ -172,6 +197,10 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory {
options.add(SESSION_KEY);
options.add(BOOTSTRAP_FROM_MAX);
options.add(TOPIC_PATTERN);
+ options.add(AUDIT_KEYS);
+ options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
return options;
}
+
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
index c2642fd351..4a1d332ca8 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.tubemq.table;
+import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
import
org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter;
import org.apache.inlong.tubemq.corebase.Message;
@@ -40,6 +41,8 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -62,6 +65,8 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
private static final String VALUE_METADATA_PREFIX = "value.";
+ private static final Logger LOG =
LoggerFactory.getLogger(TubeMQTableSource.class);
+
//
--------------------------------------------------------------------------------------------
// Mutable attributes
//
--------------------------------------------------------------------------------------------
@@ -84,9 +89,9 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
*/
private final String topic;
/**
- * The TubeMQ tid filter collection.
+ * The TubeMQ streamId filter collection.
*/
- private final TreeSet<String> tidSet;
+ private final TreeSet<String> streamIdSet;
/**
* The TubeMQ consumer group name.
*/
@@ -120,6 +125,11 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
* Metadata that is appended at the end of a physical source row.
*/
protected List<String> metadataKeys;
+
+ private String inlongMetric;
+ private String auditHostAndPorts;
+ private String auditKeys;
+
/**
* Watermark strategy that is used to generate per-partition watermark.
*/
@@ -129,15 +139,16 @@ public class TubeMQTableSource implements
ScanTableSource, SupportsReadingMetada
public TubeMQTableSource(DataType physicalDataType,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
String masterAddress, String topic,
- TreeSet<String> tidSet, String consumerGroup, String sessionKey,
+ TreeSet<String> streamIdSet, String consumerGroup, String
sessionKey,
Configuration configuration, @Nullable WatermarkStrategy<RowData>
watermarkStrategy,
- Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean
innerFormat) {
+ Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean
innerFormat,
+ String inlongMetric, String auditHostAndPorts, String auditKeys) {
Preconditions.checkNotNull(physicalDataType, "Physical data type must
not be null.");
Preconditions.checkNotNull(valueDecodingFormat, "The deserialization
schema must not be null.");
Preconditions.checkNotNull(masterAddress, "The master address must not
be null.");
Preconditions.checkNotNull(topic, "The topic must not be null.");
- Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+ Preconditions.checkNotNull(streamIdSet, "The streamId set must not be
null.");
Preconditions.checkNotNull(consumerGroup, "The consumer group must not
be null.");
Preconditions.checkNotNull(configuration, "The configuration must not
be null.");
@@ -147,7 +158,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
this.valueDecodingFormat = valueDecodingFormat;
this.masterAddress = masterAddress;
this.topic = topic;
- this.tidSet = tidSet;
+ this.streamIdSet = streamIdSet;
this.consumerGroup = consumerGroup;
this.sessionKey = sessionKey;
this.configuration = configuration;
@@ -155,6 +166,9 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
this.proctimeAttribute = proctimeAttribute;
this.ignoreErrors = ignoreErrors;
this.innerFormat = innerFormat;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.auditKeys = auditKeys;
}
@Override
@@ -167,6 +181,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
final LogicalType physicalType = physicalDataType.getLogicalType();
final int physicalFieldCount =
LogicalTypeChecks.getFieldCount(physicalType);
final IntStream physicalFields = IntStream.range(0,
physicalFieldCount);
+
final DeserializationSchema<RowData> deserialization =
createDeserialization(context,
valueDecodingFormat, physicalFields.toArray(), null);
@@ -182,8 +197,9 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
public DynamicTableSource copy() {
return new TubeMQTableSource(
physicalDataType, valueDecodingFormat, masterAddress,
- topic, tidSet, consumerGroup, sessionKey, configuration,
- watermarkStrategy, proctimeAttribute, ignoreErrors,
innerFormat);
+ topic, streamIdSet, consumerGroup, sessionKey, configuration,
+ watermarkStrategy, proctimeAttribute, ignoreErrors,
innerFormat,
+ inlongMetric, auditHostAndPorts, auditKeys);
}
@Override
@@ -247,7 +263,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
&& Objects.equals(valueDecodingFormat,
that.valueDecodingFormat)
&& Objects.equals(masterAddress, that.masterAddress)
&& Objects.equals(topic, that.topic)
- && Objects.equals(String.valueOf(tidSet),
String.valueOf(that.tidSet))
+ && Objects.equals(String.valueOf(streamIdSet),
String.valueOf(that.streamIdSet))
&& Objects.equals(consumerGroup, that.consumerGroup)
&& Objects.equals(proctimeAttribute, that.proctimeAttribute)
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
@@ -260,7 +276,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
valueDecodingFormat,
masterAddress,
topic,
- tidSet,
+ streamIdSet,
consumerGroup,
configuration,
watermarkStrategy,
@@ -273,7 +289,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
@Nullable
private DeserializationSchema<RowData> createDeserialization(
- DynamicTableSource.Context context,
+ Context context,
@Nullable DecodingFormat<DeserializationSchema<RowData>> format,
int[] projection,
@Nullable String prefix) {
@@ -299,10 +315,17 @@ public class TubeMQTableSource implements
ScanTableSource, SupportsReadingMetada
.orElseThrow(IllegalStateException::new))
.map(m -> m.converter)
.toArray(MetadataConverter[]::new);
+
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+
final DeserializationSchema<RowData> tubeMQDeserializer = new
DynamicTubeMQDeserializationSchema(
- deserialization, metadataConverters, producedTypeInfo,
ignoreErrors);
+ deserialization, metadataConverters, producedTypeInfo,
ignoreErrors, metricOption);
- final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new
FlinkTubeMQConsumer(masterAddress, topic, tidSet,
+ final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new
FlinkTubeMQConsumer(masterAddress, topic, streamIdSet,
consumerGroup, tubeMQDeserializer, configuration, sessionKey,
innerFormat);
return tubeMQConsumer;
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
index aec6e19919..5cc11b3783 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
@@ -51,6 +51,12 @@
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-base</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index cafa653f65..abd69f8ecb 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -208,6 +208,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
@Override
public void open(Configuration parameters) throws Exception {
+ deserializationSchema.open(null);
ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress,
consumerGroup);
consumerConfig.setConsumePosition(consumeFromMax
? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
index f5880f1a78..f68bc4cf5e 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -17,22 +17,31 @@
package org.apache.inlong.sort.tubemq.table;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.tubemq.corebase.Message;
import com.google.common.base.Objects;
+import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.stream.Collectors;
public class DynamicTubeMQDeserializationSchema implements
DeserializationSchema<RowData> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class);
/**
* data buffer message
*/
@@ -53,15 +62,28 @@ public class DynamicTubeMQDeserializationSchema implements
DeserializationSchema
*/
private final boolean ignoreErrors;
+ private SourceMetricData sourceMetricData;
+
+ private MetricOption metricOption;
+
public DynamicTubeMQDeserializationSchema(
DeserializationSchema<RowData> schema,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
- boolean ignoreErrors) {
+ boolean ignoreErrors,
+ MetricOption metricOption) {
this.deserializationSchema = schema;
this.metadataConverters = metadataConverters;
this.producedTypeInfo = producedTypeInfo;
this.ignoreErrors = ignoreErrors;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption);
+ }
}
@Override
@@ -71,7 +93,10 @@ public class DynamicTubeMQDeserializationSchema implements
DeserializationSchema
@Override
public void deserialize(byte[] message, Collector<RowData> out) throws
IOException {
- deserializationSchema.deserialize(message, out);
+ List<RowData> rows = new ArrayList<>();
+ deserializationSchema.deserialize(message,
+ new MetricsCollector<>(new ListCollector<>(rows),
sourceMetricData));
+ rows.forEach(out::collect);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 0472353037..e3a0a0c6d7 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -46,6 +46,9 @@ import java.util.Set;
import java.util.TreeSet;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
@@ -126,6 +129,10 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
final DataType physicalDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+ String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+ String auditKeys = tableOptions.get(AUDIT_KEYS);
+
return createTubeMQTableSource(
physicalDataType,
valueDecodingFormat,
@@ -134,7 +141,10 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
TubeMQOptions.getTiSet(tableOptions),
TubeMQOptions.getConsumerGroup(tableOptions),
TubeMQOptions.getSessionKey(tableOptions),
- properties);
+ properties,
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
}
@Override
@@ -171,7 +181,10 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
TreeSet<String> streamId,
String consumerGroup,
String sessionKey,
- Configuration properties) {
+ Configuration properties,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
return new TubeMQTableSource(
physicalDataType,
valueDecodingFormat,
@@ -184,7 +197,10 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
null,
null,
false,
- innerFormat);
+ innerFormat,
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
}
protected TubeMQTableSink createTubeMQTableSink(
@@ -225,6 +241,9 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
options.add(SESSION_KEY);
options.add(BOOTSTRAP_FROM_MAX);
options.add(TOPIC_PATTERN);
+ options.add(AUDIT_KEYS);
+ options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
return options;
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
index e79685ff70..4a1d332ca8 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.tubemq.table;
+import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
import
org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter;
import org.apache.inlong.tubemq.corebase.Message;
@@ -40,6 +41,8 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -62,6 +65,8 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
private static final String VALUE_METADATA_PREFIX = "value.";
+ private static final Logger LOG =
LoggerFactory.getLogger(TubeMQTableSource.class);
+
//
--------------------------------------------------------------------------------------------
// Mutable attributes
//
--------------------------------------------------------------------------------------------
@@ -120,6 +125,11 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
* Metadata that is appended at the end of a physical source row.
*/
protected List<String> metadataKeys;
+
+ private String inlongMetric;
+ private String auditHostAndPorts;
+ private String auditKeys;
+
/**
* Watermark strategy that is used to generate per-partition watermark.
*/
@@ -131,7 +141,8 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
String masterAddress, String topic,
TreeSet<String> streamIdSet, String consumerGroup, String
sessionKey,
Configuration configuration, @Nullable WatermarkStrategy<RowData>
watermarkStrategy,
- Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean
innerFormat) {
+ Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean
innerFormat,
+ String inlongMetric, String auditHostAndPorts, String auditKeys) {
Preconditions.checkNotNull(physicalDataType, "Physical data type must
not be null.");
Preconditions.checkNotNull(valueDecodingFormat, "The deserialization
schema must not be null.");
@@ -155,6 +166,9 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
this.proctimeAttribute = proctimeAttribute;
this.ignoreErrors = ignoreErrors;
this.innerFormat = innerFormat;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.auditKeys = auditKeys;
}
@Override
@@ -167,6 +181,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
final LogicalType physicalType = physicalDataType.getLogicalType();
final int physicalFieldCount =
LogicalTypeChecks.getFieldCount(physicalType);
final IntStream physicalFields = IntStream.range(0,
physicalFieldCount);
+
final DeserializationSchema<RowData> deserialization =
createDeserialization(context,
valueDecodingFormat, physicalFields.toArray(), null);
@@ -183,7 +198,8 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
return new TubeMQTableSource(
physicalDataType, valueDecodingFormat, masterAddress,
topic, streamIdSet, consumerGroup, sessionKey, configuration,
- watermarkStrategy, proctimeAttribute, ignoreErrors,
innerFormat);
+ watermarkStrategy, proctimeAttribute, ignoreErrors,
innerFormat,
+ inlongMetric, auditHostAndPorts, auditKeys);
}
@Override
@@ -299,8 +315,15 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
.orElseThrow(IllegalStateException::new))
.map(m -> m.converter)
.toArray(MetadataConverter[]::new);
+
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+
final DeserializationSchema<RowData> tubeMQDeserializer = new
DynamicTubeMQDeserializationSchema(
- deserialization, metadataConverters, producedTypeInfo,
ignoreErrors);
+ deserialization, metadataConverters, producedTypeInfo,
ignoreErrors, metricOption);
final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new
FlinkTubeMQConsumer(masterAddress, topic, streamIdSet,
consumerGroup, tubeMQDeserializer, configuration, sessionKey,
innerFormat);