This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 21b612a7a [INLONG-6896][Sort] PostgreSQL-CDC supports table level
metrics (#6897)
21b612a7a is described below
commit 21b612a7a2c93fdebb9457227285749229cca310
Author: Liao Rui <[email protected]>
AuthorDate: Fri Dec 16 19:28:09 2022 +0800
[INLONG-6896][Sort] PostgreSQL-CDC supports table level metrics (#6897)
---
.../sort/cdc/base/util/CallbackCollector.java | 2 +-
.../oracle/debezium/DebeziumSourceFunction.java | 2 +-
.../sort/cdc/postgres}/DebeziumSourceFunction.java | 51 +++++++++++++++++-----
.../sort/cdc/postgres}/PostgreSQLSource.java | 10 ++++-
.../PostgreSQLJdbcConnectionIProvider.java | 0
.../PostgreSQLJdbcConnectionOptions.java | 0
.../PostgreSQLJdbcConnectionProvider.java | 0
.../postgres}/debezium/internal/ColumnImpl.java | 0
.../debezium/internal/DebeziumChangeFetcher.java | 0
.../debezium/internal/TableEditorImpl.java | 0
.../cdc/postgres}/debezium/internal/TableImpl.java | 0
.../postgres}/manager/PostgreSQLQueryVisitor.java | 0
.../cdc/postgres/table/PostgreSQLTableSource.java | 1 +
13 files changed, 51 insertions(+), 15 deletions(-)
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
index 289f470bf..83c89a15d 100644
---
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.cdc.oracle.debezium.utils;
+package org.apache.inlong.sort.cdc.base.util;
import org.apache.flink.util.Collector;
import org.apache.flink.util.function.ThrowingConsumer;
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
index 31021e3de..d114d5834 100644
---
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
@@ -80,11 +80,11 @@ import
org.apache.inlong.sort.cdc.base.debezium.internal.FlinkDatabaseHistory;
import
org.apache.inlong.sort.cdc.base.debezium.internal.FlinkOffsetBackingStore;
import org.apache.inlong.sort.cdc.base.debezium.internal.Handover;
import org.apache.inlong.sort.cdc.base.debezium.internal.SchemaRecord;
+import org.apache.inlong.sort.cdc.base.util.CallbackCollector;
import org.apache.inlong.sort.cdc.base.util.DatabaseHistoryUtil;
import
org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeFetcher;
import
org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeConsumer;
import
org.apache.inlong.sort.cdc.oracle.debezium.internal.FlinkDatabaseSchemaHistory;
-import org.apache.inlong.sort.cdc.oracle.debezium.utils.CallbackCollector;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
similarity index 91%
rename from
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
rename to
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
index 41fa64327..cc7f4c46b 100644
---
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
@@ -31,6 +31,9 @@ import
com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
import com.ververica.cdc.debezium.internal.Handover;
import com.ververica.cdc.debezium.internal.SchemaRecord;
import com.ververica.cdc.debezium.utils.DatabaseHistoryUtil;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.data.Envelope;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect;
@@ -73,13 +76,16 @@ import
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.cdc.base.util.CallbackCollector;
import
org.apache.inlong.sort.cdc.postgres.debezium.internal.DebeziumChangeFetcher;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -231,12 +237,14 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
private String inlongAudit;
- private SourceMetricData sourceMetricData;
+ private SourceTableMetricData sourceMetricData;
private transient ListState<MetricState> metricStateListState;
private MetricState metricState;
+ private boolean migrateAll;
+
//
---------------------------------------------------------------------------------------
public DebeziumSourceFunction(
@@ -245,13 +253,15 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
@Nullable DebeziumOffset specificOffset,
Validator validator,
String inlongMetric,
- String inlongAudit) {
+ String inlongAudit,
+ boolean migrateAll) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.inlongMetric = inlongMetric;
this.inlongAudit = inlongAudit;
+ this.migrateAll = migrateAll;
}
@Override
@@ -446,7 +456,11 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption, metricGroup);
+ sourceMetricData = new SourceTableMetricData(metricOption,
metricGroup);
+ if (migrateAll) {
+ // register sub source metric data from metric state
+ sourceMetricData.registerSubMetricsGroup(metricState);
+ }
}
properties.setProperty("name", "engine");
@@ -487,19 +501,34 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
@Override
public void deserialize(SourceRecord record,
Collector<T> out) throws Exception {
- if (sourceMetricData != null) {
-
sourceMetricData.outputMetricsWithEstimate(record.value());
- }
- deserializer.deserialize(record, out);
+ deserializer.deserialize(record, new
CallbackCollector<>(inputRow -> {
+ if (sourceMetricData != null) {
+
sourceMetricData.outputMetricsWithEstimate(record.value());
+ }
+ out.collect(inputRow);
+ }));
}
@Override
public void deserialize(SourceRecord record,
Collector<T> out,
TableChange tableSchema) throws Exception {
- if (sourceMetricData != null) {
-
sourceMetricData.outputMetricsWithEstimate(record.value());
- }
- deserializer.deserialize(record, out,
tableSchema);
+ deserializer.deserialize(record, new
CallbackCollector<>(inputRow -> {
+ if (sourceMetricData != null && record !=
null && migrateAll) {
+ Struct value = (Struct) record.value();
+ Struct source =
value.getStruct(Envelope.FieldName.SOURCE);
+ String dbName =
source.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
+ String schemaName =
source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY);
+ String tableName =
source.getString(AbstractSourceInfo.TABLE_NAME_KEY);
+ SnapshotRecord snapshotRecord =
SnapshotRecord.fromSource(source);
+ boolean isSnapshotRecord =
(SnapshotRecord.TRUE == snapshotRecord);
+ sourceMetricData
+
.outputMetricsWithEstimate(dbName, schemaName, tableName,
+ isSnapshotRecord,
value);
+ } else if (sourceMetricData != null &&
record != null) {
+
sourceMetricData.outputMetricsWithEstimate(record.value());
+ }
+ out.collect(inputRow);
+ }), tableSchema);
}
@Override
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java
similarity index 95%
rename from
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
rename to
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java
index d2135a64f..045395159 100644
---
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
+++
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java
@@ -54,6 +54,7 @@ public class PostgreSQLSource {
private DebeziumDeserializationSchema<T> deserializer;
private String inlongMetric;
private String inlongAudit;
+ private boolean migrateAll = false;
/**
* The name of the Postgres logical decoding plug-in installed on the
server. Supported
@@ -157,6 +158,11 @@ public class PostgreSQLSource {
return this;
}
+ public Builder<T> migrateAll(boolean migrateAll) {
+ this.migrateAll = migrateAll;
+ return this;
+ }
+
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class",
PostgresConnector.class.getCanonicalName());
@@ -187,8 +193,8 @@ public class PostgreSQLSource {
if (dbzProperties != null) {
props.putAll(dbzProperties);
}
- return new DebeziumSourceFunction<>(
- deserializer, props, null,
Validator.getDefaultValidator(), inlongMetric, inlongAudit);
+ return new DebeziumSourceFunction<>(deserializer, props, null,
Validator.getDefaultValidator(),
+ inlongMetric, inlongAudit, migrateAll);
}
}
}
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionIProvider.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionIProvider.java
similarity index 100%
rename from
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionIProvider.java
rename to
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionIProvider.java
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionOptions.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionOptions.java
similarity index 100%
rename from
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionOptions.java
rename to
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionOptions.java
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionProvider.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionProvider.java
similarity index 100%
rename from
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionProvider.java
rename to
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionProvider.java
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/ColumnImpl.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/ColumnImpl.java
similarity index 100%
rename from
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/ColumnImpl.java
rename to
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/ColumnImpl.java
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/DebeziumChangeFetcher.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/DebeziumChangeFetcher.java
similarity index 100%
rename from
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/DebeziumChangeFetcher.java
rename to
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/DebeziumChangeFetcher.java
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableEditorImpl.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableEditorImpl.java
similarity index 100%
rename from
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableEditorImpl.java
rename to
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableEditorImpl.java
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableImpl.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableImpl.java
similarity index 100%
rename from
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableImpl.java
rename to
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableImpl.java
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/manager/PostgreSQLQueryVisitor.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/manager/PostgreSQLQueryVisitor.java
similarity index 100%
rename from
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/manager/PostgreSQLQueryVisitor.java
rename to
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/manager/PostgreSQLQueryVisitor.java
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
index 1cf40425f..032094898 100644
---
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
+++
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
@@ -169,6 +169,7 @@ public class PostgreSQLTableSource implements
ScanTableSource, SupportsReadingMe
.deserializer(deserializer)
.inlongMetric(inlongMetric)
.inlongAudit(inlongAudit)
+ .migrateAll(sourceMultipleEnable)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
}