This is an automated email from the ASF dual-hosted git repository.
yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3126faaf5 [INLONG-6751][Sort] Add read phase metric and table level
metric for Oracle (#6808)
3126faaf5 is described below
commit 3126faaf5e8851aa62a8772eeb4096bf47034613
Author: emhui <[email protected]>
AuthorDate: Tue Dec 13 16:49:37 2022 +0800
[INLONG-6751][Sort] Add read phase metric and table level metric for Oracle
(#6808)
---
.../base/metric/sub/SourceTableMetricData.java | 56 ++++++++++++++++---
.../sort/cdc/base/util/CallbackCollector.java | 47 ++++++++++++++++
.../inlong/sort/cdc/oracle/OracleSource.java | 9 +++-
.../oracle/debezium/DebeziumSourceFunction.java | 63 ++++++++++++++++------
.../sort/cdc/oracle/table/OracleTableSource.java | 3 +-
5 files changed, 152 insertions(+), 26 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
index 112000903..a11d73711 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
@@ -122,9 +122,14 @@ public class SourceTableMetricData extends
SourceMetricData implements SourceSub
String metricGroupLabels = labels.entrySet().stream().map(entry ->
entry.getKey() + "=" + entry.getValue())
.collect(Collectors.joining(DELIMITER));
StringBuilder labelBuilder = new StringBuilder(metricGroupLabels);
-
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
-
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
-
+ if (schemaInfoArray.length == 2) {
+
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
+ } else if (schemaInfoArray.length == 3) {
+
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+
.append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1])
+
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]);
+ }
MetricOption metricOption = MetricOption.builder()
.withInitRecords(subMetricState != null ?
subMetricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(subMetricState != null ?
subMetricState.getMetricValue(NUM_BYTES_IN) : 0L)
@@ -135,14 +140,18 @@ public class SourceTableMetricData extends
SourceMetricData implements SourceSub
}
/**
- * build record schema identify,in the form of database.table
+ * build record schema identify,in the form of database.schema.table or
database.table
*
* @param database the database name of record
+ * @param schema the schema name of record
* @param table the table name of record
* @return the record schema identify
*/
- public String buildSchemaIdentify(String database, String table) {
- return database + Constants.SEMICOLON + table;
+ public String buildSchemaIdentify(String database, String schema, String
table) {
+ if (schema == null) {
+ return database + Constants.SEMICOLON + table;
+ }
+ return database + Constants.SEMICOLON + schema + Constants.SEMICOLON +
table;
}
/**
@@ -168,7 +177,7 @@ public class SourceTableMetricData extends SourceMetricData
implements SourceSub
outputMetricsWithEstimate(data);
return;
}
- String identify = buildSchemaIdentify(database, table);
+ String identify = buildSchemaIdentify(database, null, table);
SourceMetricData subSourceMetricData;
if (subSourceMetricMap.containsKey(identify)) {
subSourceMetricData = subSourceMetricMap.get(identify);
@@ -186,6 +195,39 @@ public class SourceTableMetricData extends
SourceMetricData implements SourceSub
outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE :
ReadPhase.INCREASE_PHASE);
}
+ /**
+ * output metrics with estimate
+ *
+ * @param database the database name of record
+ * @param schema the schema name of record
+ * @param table the table name of record
+ * @param isSnapshotRecord is it snapshot record
+ * @param data the data of record
+ */
+ public void outputMetricsWithEstimate(String database, String schema,
String table,
+ boolean isSnapshotRecord, Object data) {
+ if (StringUtils.isBlank(database) || StringUtils.isBlank(schema) ||
StringUtils.isBlank(table)) {
+ outputMetricsWithEstimate(data);
+ return;
+ }
+ String identify = buildSchemaIdentify(database, schema, table);
+ SourceMetricData subSourceMetricData;
+ if (subSourceMetricMap.containsKey(identify)) {
+ subSourceMetricData = subSourceMetricMap.get(identify);
+ } else {
+ subSourceMetricData = buildSubSourceMetricData(new
String[]{database, schema, table}, this);
+ subSourceMetricMap.put(identify, subSourceMetricData);
+ }
+ // source metric and sub source metric output metrics
+ long rowCountSize = 1L;
+ long rowDataSize =
data.toString().getBytes(StandardCharsets.UTF_8).length;
+ this.outputMetrics(rowCountSize, rowDataSize);
+ subSourceMetricData.outputMetrics(rowCountSize, rowDataSize);
+
+ // output read phase metric
+ outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE :
ReadPhase.INCREASE_PHASE);
+ }
+
/**
* output read phase metric
*
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
new file mode 100644
index 000000000..289f470bf
--- /dev/null
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.debezium.utils;
+
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * A collector supporting callback.
+ */
+public class CallbackCollector<T> implements Collector<T> {
+
+ private final ThrowingConsumer<T, Exception> callback;
+
+ public CallbackCollector(ThrowingConsumer<T, Exception> callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void collect(T t) {
+ try {
+ callback.accept(t);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
index 5aacc1dcc..921bee3f1 100644
---
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
@@ -55,6 +55,7 @@ public class OracleSource {
private DebeziumDeserializationSchema<T> deserializer;
private String inlongMetric;
private String inlongAudit;
+ private boolean sourceMultipleEnable;
public Builder<T> hostname(String hostname) {
this.hostname = hostname;
@@ -141,6 +142,11 @@ public class OracleSource {
return this;
}
+ public Builder<T> sourceMultipleEnable(boolean sourceMultipleEnable) {
+ this.sourceMultipleEnable = sourceMultipleEnable;
+ return this;
+ }
+
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class",
OracleConnector.class.getCanonicalName());
@@ -184,7 +190,8 @@ public class OracleSource {
}
return new DebeziumSourceFunction<>(
- deserializer, props, specificOffset, new
OracleValidator(props), inlongMetric, inlongAudit);
+ deserializer, props, specificOffset, new
OracleValidator(props),
+ inlongMetric, inlongAudit, sourceMultipleEnable);
}
}
}
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
index c67538bf5..8966b87d5 100644
---
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
@@ -23,6 +23,9 @@ import static
org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
import com.ververica.cdc.debezium.Validator;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.data.Envelope;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect;
@@ -65,6 +68,12 @@ import
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.base.debezium.internal.DebeziumOffset;
import
org.apache.inlong.sort.cdc.base.debezium.internal.DebeziumOffsetSerializer;
@@ -72,15 +81,12 @@ import
org.apache.inlong.sort.cdc.base.debezium.internal.FlinkDatabaseHistory;
import
org.apache.inlong.sort.cdc.base.debezium.internal.FlinkOffsetBackingStore;
import org.apache.inlong.sort.cdc.base.debezium.internal.Handover;
import org.apache.inlong.sort.cdc.base.debezium.internal.SchemaRecord;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.cdc.base.util.DatabaseHistoryUtil;
-import org.apache.inlong.sort.base.util.MetricStateUtils;
-import
org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeConsumer;
import
org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeFetcher;
+import
org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeConsumer;
import
org.apache.inlong.sort.cdc.oracle.debezium.internal.FlinkDatabaseSchemaHistory;
+import org.apache.inlong.sort.cdc.oracle.debezium.utils.CallbackCollector;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -232,7 +238,9 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
private String inlongAudit;
- private SourceMetricData sourceMetricData;
+ private boolean sourceMultipleEnable;
+
+ private SourceTableMetricData sourceMetricData;
private transient ListState<MetricState> metricStateListState;
@@ -246,13 +254,15 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
@Nullable DebeziumOffset specificOffset,
Validator validator,
String inlongMetric,
- String inlongAudit) {
+ String inlongAudit,
+ boolean sourceMultipleEnable) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.inlongMetric = inlongMetric;
this.inlongAudit = inlongAudit;
+ this.sourceMultipleEnable = sourceMultipleEnable;
}
@Override
@@ -447,7 +457,11 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption, metricGroup);
+ sourceMetricData = new SourceTableMetricData(metricOption,
metricGroup);
+ if (sourceMultipleEnable) {
+ // register sub source metric data from metric state
+ sourceMetricData.registerSubMetricsGroup(metricState);
+ }
}
properties.setProperty("name", "engine");
@@ -488,19 +502,34 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
@Override
public void deserialize(SourceRecord record,
Collector<T> out) throws Exception {
- if (sourceMetricData != null) {
-
sourceMetricData.outputMetricsWithEstimate(record.value());
- }
- deserializer.deserialize(record, out);
+ deserializer.deserialize(record, new
CallbackCollector<>(inputRow -> {
+ if (sourceMetricData != null) {
+
sourceMetricData.outputMetricsWithEstimate(record.value());
+ }
+ out.collect(inputRow);
+ }));
}
@Override
public void deserialize(SourceRecord record,
Collector<T> out,
TableChange tableSchema) throws Exception {
- if (sourceMetricData != null) {
-
sourceMetricData.outputMetricsWithEstimate(record.value());
- }
- deserializer.deserialize(record, out,
tableSchema);
+ deserializer.deserialize(record, new
CallbackCollector<>(inputRow -> {
+ if (sourceMetricData != null && record !=
null && sourceMultipleEnable) {
+ Struct value = (Struct) record.value();
+ Struct source =
value.getStruct(Envelope.FieldName.SOURCE);
+ String dbName =
source.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
+ String schemaName =
source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY);
+ String tableName =
source.getString(AbstractSourceInfo.TABLE_NAME_KEY);
+ SnapshotRecord snapshotRecord =
SnapshotRecord.fromSource(source);
+ boolean isSnapshotRecord =
(SnapshotRecord.TRUE == snapshotRecord);
+ sourceMetricData
+
.outputMetricsWithEstimate(dbName, schemaName, tableName,
+ isSnapshotRecord,
value);
+ } else if (sourceMetricData != null &&
record != null) {
+
sourceMetricData.outputMetricsWithEstimate(record.value());
+ }
+ out.collect(inputRow);
+ }), tableSchema);
}
@Override
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
index cbb586cb7..41c9ff75c 100644
---
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
@@ -145,7 +145,8 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
.startupOptions(startupOptions)
.deserializer(deserializer)
.inlongMetric(inlongMetric)
- .inlongAudit(inlongAudit);
+ .inlongAudit(inlongAudit)
+ .sourceMultipleEnable(sourceMultipleEnable);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
return SourceFunctionProvider.of(sourceFunction, false);