This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 4de795b3303ca146a7c7a56fd30ba84992573440 Author: Xin Gong <[email protected]> AuthorDate: Wed Aug 17 10:54:28 2022 +0800 [INLONG-5461][Sort] Add audit for MongoDB extract node (#5548) --- inlong-sort/sort-connectors/mongodb-cdc/pom.xml | 11 +++++ .../sort/cdc/mongodb/DebeziumSourceFunction.java | 22 +++++++--- .../inlong/sort/cdc/mongodb/MongoDBSource.java | 14 ++++-- .../sort/cdc/mongodb/table/MongoDBTableSource.java | 22 ++++++---- .../mongodb/table/MongoDBTableSourceFactory.java | 51 +++++++++++----------- 5 files changed, 78 insertions(+), 42 deletions(-) diff --git a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml index f4d3e6036..077595230 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/pom.xml +++ b/inlong-sort/sort-connectors/mongodb-cdc/pom.xml @@ -41,6 +41,11 @@ <artifactId>sort-connector-base</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>audit-sdk</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> @@ -98,6 +103,12 @@ </filter> </filters> <relocations> + <relocation> + <pattern>org.apache.inlong.sort.base</pattern> + <shadedPattern> + org.apache.inlong.sort.cdc.mongodb.shaded.org.apache.inlong.sort.base + </shadedPattern> + </relocation> <relocation> <pattern>org.apache.kafka</pattern> <shadedPattern> diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java index db41472ed..450990eb4 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java @@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.inlong.audit.AuditImp; import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.kafka.connect.source.SourceRecord; @@ -68,7 +69,9 @@ import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -76,8 +79,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; + import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; +import static org.apache.inlong.sort.base.Constants.DELIMITER; /** * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data @@ -222,6 +227,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> private String inlongMetric; + private String inlongAudit; + private SourceMetricData metricData; // --------------------------------------------------------------------------------------- @@ -230,12 +237,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> DebeziumDeserializationSchema<T> deserializer, Properties properties, @Nullable DebeziumOffset specificOffset, - Validator validator, String inlongMetric) { + Validator validator, String inlongMetric, String inlongAudit) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; this.inlongMetric = inlongMetric; + this.inlongAudit = inlongAudit; } @Override @@ -414,7 +422,12 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> String groupId = inlongMetricArray[0]; String streamId = inlongMetricArray[1]; String nodeId = inlongMetricArray[2]; - metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup); + AuditImp auditImp = null; + if (inlongAudit != null) { + AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER)))); + auditImp = AuditImp.getInstance(); + } + metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp); metricData.registerMetricsForNumRecordsIn(); metricData.registerMetricsForNumBytesIn(); metricData.registerMetricsForNumBytesInPerSecond(); @@ -458,9 +471,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> @Override public void deserialize(SourceRecord record, Collector<T> out) throws Exception { if (metricData != null) { - metricData.getNumRecordsIn().inc(1L); - metricData.getNumBytesIn() - .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length); + metricData.outputMetrics(1L, + record.value().toString().getBytes(StandardCharsets.UTF_8).length); } deserializer.deserialize(record, out); } diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java index ebe9bf4b5..dad8581d7 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java @@ -139,7 +139,8 @@ public class MongoDBSource { private String errorsTolerance; private Integer heartbeatIntervalMillis; private DebeziumDeserializationSchema<T> deserializer; - private String inLongMetric; + private String inlongMetric; + private String inlongAudit; /** The comma-separated list of hostname and port pairs of mongodb servers. */ public Builder<T> hosts(String hosts) { @@ -317,8 +318,13 @@ public class MongoDBSource { return this; } - public Builder<T> inLongMetric(String inLongMetric) { - this.inLongMetric = inLongMetric; + public Builder<T> inlongMetric(String inlongMetric) { + this.inlongMetric = inlongMetric; + return this; + } + + public Builder<T> inlongAudit(String inlongAudit) { + this.inlongAudit = inlongAudit; return this; } @@ -441,7 +447,7 @@ public class MongoDBSource { Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME_DEFAULT); return new DebeziumSourceFunction<>( - deserializer, props, null, Validator.getDefaultValidator(), inLongMetric); + deserializer, props, null, Validator.getDefaultValidator(), inlongMetric, inlongAudit); } } } diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java index e037ef55a..612f8b48d 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java @@ -76,7 +76,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad private final Integer heartbeatIntervalMillis; private final ZoneId localTimeZone; - private final String inLongMetric; + private final String inlongMetric; + private final String inlongAudit; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -106,7 +107,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad @Nullable Integer pollAwaitTimeMillis, @Nullable Integer heartbeatIntervalMillis, ZoneId localTimeZone, - String inLongMetric) { + String inlongMetric, + String inlongAudit) { this.physicalSchema = physicalSchema; this.hosts = checkNotNull(hosts); this.username = username; @@ -126,7 +128,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad this.localTimeZone = localTimeZone; this.producedDataType = physicalSchema.toPhysicalRowDataType(); this.metadataKeys = Collections.emptyList(); - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; + this.inlongAudit = inlongAudit; } @Override @@ -184,8 +187,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize); Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis); Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis); - Optional.ofNullable(inLongMetric).ifPresent(builder::inLongMetric); - + Optional.ofNullable(inlongMetric).ifPresent(builder::inlongMetric); + Optional.ofNullable(inlongAudit).ifPresent(builder::inlongAudit); DebeziumSourceFunction<RowData> sourceFunction = builder.build(); return SourceFunctionProvider.of(sourceFunction, false); @@ -243,7 +246,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad pollAwaitTimeMillis, heartbeatIntervalMillis, localTimeZone, - inLongMetric); + inlongMetric, + inlongAudit); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -277,7 +281,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad && Objects.equals(localTimeZone, that.localTimeZone) && Objects.equals(producedDataType, that.producedDataType) && Objects.equals(metadataKeys, that.metadataKeys) - && Objects.equals(inLongMetric, that.inLongMetric); + && Objects.equals(inlongMetric, that.inlongMetric) + && Objects.equals(inlongAudit, that.inlongAudit); } @Override @@ -302,7 +307,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad localTimeZone, producedDataType, metadataKeys, - inLongMetric); + inlongMetric, + inlongAudit); } @Override diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java index d7299cf19..d235bc860 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils; import java.time.ZoneId; import java.util.HashSet; @@ -36,6 +37,8 @@ import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; /** * Factory for creating configured instance of {@link MongoDBTableSource}. @@ -46,12 +49,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { private static final String DOCUMENT_ID_FIELD = "_id"; - public static final ConfigOption<String> INLONG_METRIC = - ConfigOptions.key("inlong.metric") - .stringType() - .defaultValue("") - .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID"); - private static final ConfigOption<String> HOSTS = ConfigOptions.key("hosts") .stringType() @@ -199,35 +196,37 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { final ReadableConfig config = helper.getOptions(); - String hosts = config.get(HOSTS); - String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null); + final String hosts = config.get(HOSTS); + final String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null); - String username = config.getOptional(USERNAME).orElse(null); - String password = config.getOptional(PASSWORD).orElse(null); + final String username = config.getOptional(USERNAME).orElse(null); + final String password = config.getOptional(PASSWORD).orElse(null); - String database = config.getOptional(DATABASE).orElse(null); - String collection = config.getOptional(COLLECTION).orElse(null); + final String database = config.getOptional(DATABASE).orElse(null); + final String collection = config.getOptional(COLLECTION).orElse(null); - String errorsTolerance = config.get(ERRORS_TOLERANCE); - Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE); + final String errorsTolerance = config.get(ERRORS_TOLERANCE); + final Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE); - Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE); - Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS); + final Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE); + final Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS); - Integer heartbeatIntervalMillis = + final Integer heartbeatIntervalMillis = config.getOptional(HEARTBEAT_INTERVAL_MILLIS).orElse(null); - Boolean copyExisting = config.get(COPY_EXISTING); - String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null); - Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null); - Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null); + final Boolean copyExisting = config.get(COPY_EXISTING); + final String copyExistingPipeline = config.getOptional(COPY_EXISTING_PIPELINE).orElse(null); + final Integer copyExistingMaxThreads = config.getOptional(COPY_EXISTING_MAX_THREADS).orElse(null); + final Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null); - String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE); - ZoneId localTimeZone = + final String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE); + final ZoneId localTimeZone = TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zoneId) ? ZoneId.systemDefault() : ZoneId.of(zoneId); - String inLongMetric = config.get(INLONG_METRIC); + final String inlongMetric = config.get(INLONG_METRIC); + final String inlongAudit = config.get(INLONG_AUDIT); + ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present"); @@ -251,7 +250,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { pollAwaitTimeMillis, heartbeatIntervalMillis, localTimeZone, - inLongMetric); + inlongMetric, + inlongAudit); } private void checkPrimaryKey(UniqueConstraint pk, String message) { @@ -290,6 +290,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { options.add(POLL_AWAIT_TIME_MILLIS); options.add(HEARTBEAT_INTERVAL_MILLIS); options.add(INLONG_METRIC); + options.add(INLONG_AUDIT); return options; } }
