liuxiao2shf commented on code in PR #3619:
URL: https://github.com/apache/flink-cdc/pull/3619#discussion_r1792984158
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java:
##########
@@ -60,4 +110,146 @@ public void recordFetchDelay(long fetchDelay) {
public void addNumRecordsInErrors(long delta) {
this.numRecordsInErrorsCounter.inc(delta);
}
+
+ public void updateLastReceivedEventTime(Long eventTimestamp) {
+ if (eventTimestamp != null && eventTimestamp > 0L) {
+ lastReceivedEventTime = eventTimestamp;
+ }
+ }
+
+ public void markRecord() {
+ metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc();
+ }
+
+ public void updateRecordCounters(SourceRecord record) {
+ catchAndWarnLogAllExceptions(
+ () -> {
+ // Increase reader and table level input counters
+ if (isDataChangeRecord(record)) {
+ TableMetrics tableMetrics =
getTableMetrics(getTableId(record));
+ Envelope.Operation op = Envelope.operationFor(record);
+ switch (op) {
+ case READ:
+ snapshotCounter.inc();
+ tableMetrics.markSnapshotRecord();
+ break;
+ case CREATE:
+ insertCounter.inc();
+ tableMetrics.markInsertRecord();
+ break;
+ case DELETE:
+ deleteCounter.inc();
+ tableMetrics.markDeleteRecord();
+ break;
+ case UPDATE:
+ updateCounter.inc();
+ tableMetrics.markUpdateRecord();
+ break;
+ }
+ } else if (isSchemaChangeEvent(record)) {
+ schemaChangeCounter.inc();
+ TableId tableId = getTableId(record);
+ if (tableId != null) {
+ getTableMetrics(tableId).markSchemaChangeRecord();
+ }
+ }
+ });
+ }
+
+ private TableMetrics getTableMetrics(TableId tableId) {
+ return tableMetricsMap.computeIfAbsent(
+ tableId,
+ id -> new TableMetrics(id.catalog(), id.schema(), id.table(),
metricGroup));
+ }
+
+ // ------------------------------- Helper functions
-----------------------------
+
+ private void catchAndWarnLogAllExceptions(Runnable runnable) {
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ // Catch all exceptions as errors in metric handling should not
fail the job
+ LOG.warn("Failed to update metrics", e);
+ }
+ }
+
+ private long getCurrentEventTimeLag() {
+ if (lastReceivedEventTime == UNDEFINED) {
+ return UNDEFINED;
+ }
+ return SystemClock.getInstance().absoluteTimeMillis() -
lastReceivedEventTime;
+ }
+
+ // ----------------------------------- Helper classes
--------------------------------
+
+ /**
+ * Collection class for managing metrics of a table.
+ *
+ * <p>Metrics of table level are registered in its corresponding subgroup
under the {@link
+ * SourceReaderMetricGroup}.
+ */
+ private static class TableMetrics {
+ // Snapshot + Stream
+ private final Counter recordsCounter;
+
+ // Snapshot phase
+ private final Counter snapshotCounter;
+
+ // Stream phase
+ private final Counter insertCounter;
+ private final Counter updateCounter;
+ private final Counter deleteCounter;
+ private final Counter schemaChangeCounter;
+
+ public TableMetrics(
+ String databaseName, String schemaName, String tableName,
MetricGroup parentGroup) {
+ databaseName = processNull(databaseName);
+ schemaName = processNull(schemaName);
+ tableName = processNull(tableName);
Review Comment:
> Will there be a problem if we provide an empty string for the group?
MetricGroup does not allow value to be null. In order to be compatible with
multi-layer models of different databases, null is uniformly treated as an
empty string
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]