e-mhui opened a new pull request, #7443: URL: https://github.com/apache/inlong/pull/7443
### Prepare a Pull Request Support metrics for Oracle CDC Connector with incremental snapshot enabled - Fixes #7437 ### Motivation Support metrics for Oracle CDC Connector with incremental snapshot enabled. ### Modifications 1. Copy some codes from [flink-cdc-connectors:release-2.3](https://github.com/ververica/flink-cdc-connectors/tree/release-2.3). 2. Add `inlongMetric` and `inlongAudit` variables in `OracleSourceBuilder`, `JdbcSourceConfig` and `JdbcSourceConfigFactory`. 3. Register the mertics in the `createReader` method. position: `inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java` ```java @Override public IncrementalSourceReader<T, C> createReader(SourceReaderContext readerContext) throws Exception { // create source config for the given subtask (e.g. unique server id) C sourceConfig = configFactory.create(readerContext.getIndexOfSubtask()); JdbcSourceConfig jdbcSourceConfig = (JdbcSourceConfig) sourceConfig; FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue = new FutureCompletingBlockingQueue<>(); // Forward compatible with flink 1.13 final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup"); metricGroupMethod.setAccessible(true); final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext); final SourceReaderMetrics sourceReaderMetrics = new SourceReaderMetrics(metricGroup); // create source config for the given subtask (e.g. unique server id) MetricOption metricOption = MetricOption.builder() .withInlongLabels(jdbcSourceConfig.getInlongMetric()) .withInlongAudit(jdbcSourceConfig.getInlongAudit()) .withRegisterMetric(RegisteredMetric.ALL) .build(); sourceReaderMetrics.registerMetrics(metricOption); Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier = () -> new IncrementalSourceSplitReader<>( readerContext.getIndexOfSubtask(), dataSourceDialect, sourceConfig); return new IncrementalSourceReader<>( elementsQueue, splitReaderSupplier, createRecordEmitter(sourceConfig, sourceReaderMetrics), readerContext.getConfiguration(), readerContext, sourceConfig, sourceSplitSerializer, dataSourceDialect, sourceReaderMetrics); } ``` 4. Store the metrics in the `snapshotState` method, . position: `inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceReader.java` ```java @Override public List<SourceSplitBase> snapshotState(long checkpointId) { // unfinished splits List<SourceSplitBase> stateSplits = super.snapshotState(checkpointId); // add finished snapshot splits that didn't receive ack yet stateSplits.addAll(finishedUnackedSplits.values()); // add stream splits who are uncompleted stateSplits.addAll(uncompletedStreamSplits.values()); SourceTableMetricData sourceMetricData = sourceReaderMetrics.getSourceMetricData(); LOG.info("metric-states snapshot sourceMetricData:{}", sourceMetricData); if (sourceMetricData != null) { long countNumBytesIn = sourceMetricData.getNumBytesIn().getCount(); long countNumRecordsIn = sourceMetricData.getNumRecordsIn().getCount(); Map<String, Long> readPhaseMetricMap = sourceMetricData.getReadPhaseMetricMap().entrySet().stream().collect( Collectors.toMap(v -> v.getKey().getPhase(), e -> e.getValue().getReadPhase().getCount())); Map<String, TableMetric> tableMetricMap = sourceMetricData.getSubSourceMetricMap().entrySet().stream() .collect(Collectors.toMap(Entry::getKey, e -> new TableMetric(e.getValue().getNumRecordsIn().getCount(), e.getValue().getNumBytesIn().getCount()))); stateSplits .add(new MetricSplit(countNumBytesIn, countNumRecordsIn, readPhaseMetricMap, tableMetricMap)); } return stateSplits; } ``` 5. Restore the metrics from SourceSplitBase in the `addSplits` method. position: `inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceReader.java` ```java @Override public void addSplits(List<SourceSplitBase> splits) { // restore for finishedUnackedSplits List<SourceSplitBase> unfinishedSplits = new ArrayList<>(); for (SourceSplitBase split : splits) { if (split instanceof MetricSplit) { MetricSplit metricSplit = (MetricSplit) split; LOG.info("inlong-metric-states restore metricSplit:{}", metricSplit); sourceReaderMetrics.initMetrics(metricSplit.getNumRecordsIn(), metricSplit.getNumBytesIn(), metricSplit.getReadPhaseMetricMap(), metricSplit.getTableMetricMap()); LOG.info("inlong-metric-states restore sourceReaderMetrics:{}", sourceReaderMetrics.getSourceMetricData()); continue; } if (split.isSnapshotSplit()) { SnapshotSplit snapshotSplit = split.asSnapshotSplit(); if (snapshotSplit.isSnapshotReadFinished()) { finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit); } else { unfinishedSplits.add(split); } } else { // the stream split is uncompleted if (!split.asStreamSplit().isCompletedSplit()) { uncompletedStreamSplits.put(split.splitId(), split.asStreamSplit()); requestStreamSplitMetaIfNeeded(split.asStreamSplit()); } else { uncompletedStreamSplits.remove(split.splitId()); StreamSplit streamSplit = discoverTableSchemasForStreamSplit(split.asStreamSplit()); unfinishedSplits.add(streamSplit); } } } // notify split enumerator again about the finished unacked snapshot splits reportFinishedSnapshotSplitsIfNeed(); // add all un-finished splits (including stream split) to SourceReaderBase super.addSplits(unfinishedSplits); } ``` 6. Implement the `OracleRecordEmitter` class, inheriting from `IncrementalSourceRecordEmitter`, and override the `processElement` method to collect metric of Oracle. position: `inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/OracleRecordEmitter.java` ```java @Override protected void processElement( SourceRecord element, SourceOutput<T> output, SourceSplitState splitState) throws Exception { if (isWatermarkEvent(element)) { Offset watermark = super.getOffsetPosition(element.sourceOffset()); if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) { splitState.asSnapshotSplitState().setHighWatermark(watermark); } } else if (isSchemaChangeEvent(element) && splitState.isStreamSplitState()) { HistoryRecord historyRecord = getHistoryRecord(element); Array tableChanges = historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES); TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true); for (TableChanges.TableChange tableChange : changes) { splitState.asStreamSplitState().recordSchema(tableChange.getId(), tableChange); } if (includeSchemaChanges) { emitElement(element, output, splitState); } } else if (isDataChangeRecord(element)) { if (splitState.isStreamSplitState()) { Offset position = getOffsetPosition(element); splitState.asStreamSplitState().setStartingOffset(position); } reportMetrics(element); emitElement(element, output, splitState); } else { // unknown element LOG.info("Meet unknown element {}, just skip.", element); } } protected void emitElement(SourceRecord element, SourceOutput<T> output, SourceSplitState splitState) throws Exception { debeziumDeserializationSchema.deserialize(element, new Collector<T>() { @Override public void collect(T record) { Struct value = (Struct) element.value(); Struct source = value.getStruct(Envelope.FieldName.SOURCE); String dbName = source.getString(AbstractSourceInfo.DATABASE_NAME_KEY); String schemaName = source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY); String tableName = source.getString(AbstractSourceInfo.TABLE_NAME_KEY); sourceReaderMetrics .outputMetrics(dbName, schemaName, tableName, splitState.isSnapshotSplitState(), value); output.collect(record); } @Override public void close() { } }); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
