e-mhui opened a new pull request, #7443:
URL: https://github.com/apache/inlong/pull/7443

   ### Prepare a Pull Request
   
   Support metrics for Oracle CDC Connector with incremental snapshot enabled
   
   - Fixes #7437 
   
   ### Motivation
   
   Support metrics for Oracle CDC Connector with incremental snapshot enabled.
   
   ### Modifications
   
   1. Copy some codes from 
[flink-cdc-connectors:release-2.3](https://github.com/ververica/flink-cdc-connectors/tree/release-2.3).
   2. Add `inlongMetric` and `inlongAudit` variables in `OracleSourceBuilder`, 
`JdbcSourceConfig` and `JdbcSourceConfigFactory`.
   3. Register the mertics in the `createReader` method.
   
   position: 
`inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java`
   
   ```java
       @Override
       public IncrementalSourceReader<T, C> createReader(SourceReaderContext 
readerContext)
               throws Exception {
           // create source config for the given subtask (e.g. unique server id)
           C sourceConfig = 
configFactory.create(readerContext.getIndexOfSubtask());
           JdbcSourceConfig jdbcSourceConfig = (JdbcSourceConfig) sourceConfig;
           FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> 
elementsQueue =
                   new FutureCompletingBlockingQueue<>();
   
           // Forward compatible with flink 1.13
           final Method metricGroupMethod = 
readerContext.getClass().getMethod("metricGroup");
           metricGroupMethod.setAccessible(true);
           final MetricGroup metricGroup = (MetricGroup) 
metricGroupMethod.invoke(readerContext);
           final SourceReaderMetrics sourceReaderMetrics = new 
SourceReaderMetrics(metricGroup);
   
           // create source config for the given subtask (e.g. unique server id)
           MetricOption metricOption = MetricOption.builder()
                   .withInlongLabels(jdbcSourceConfig.getInlongMetric())
                   .withInlongAudit(jdbcSourceConfig.getInlongAudit())
                   .withRegisterMetric(RegisteredMetric.ALL)
                   .build();
   
           sourceReaderMetrics.registerMetrics(metricOption);
           Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier =
                   () -> new IncrementalSourceSplitReader<>(
                           readerContext.getIndexOfSubtask(), 
dataSourceDialect, sourceConfig);
           return new IncrementalSourceReader<>(
                   elementsQueue,
                   splitReaderSupplier,
                   createRecordEmitter(sourceConfig, sourceReaderMetrics),
                   readerContext.getConfiguration(),
                   readerContext,
                   sourceConfig,
                   sourceSplitSerializer,
                   dataSourceDialect,
                   sourceReaderMetrics);
       }
   ```
   
   4. Store the metrics in the `snapshotState` method, .
   
   position: 
`inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceReader.java`
   
   ```java
       @Override
       public List<SourceSplitBase> snapshotState(long checkpointId) {
           // unfinished splits
           List<SourceSplitBase> stateSplits = 
super.snapshotState(checkpointId);
   
           // add finished snapshot splits that didn't receive ack yet
           stateSplits.addAll(finishedUnackedSplits.values());
   
           // add stream splits who are uncompleted
           stateSplits.addAll(uncompletedStreamSplits.values());
   
           SourceTableMetricData sourceMetricData = 
sourceReaderMetrics.getSourceMetricData();
           LOG.info("metric-states snapshot sourceMetricData:{}", 
sourceMetricData);
           if (sourceMetricData != null) {
               long countNumBytesIn = 
sourceMetricData.getNumBytesIn().getCount();
               long countNumRecordsIn = 
sourceMetricData.getNumRecordsIn().getCount();
               Map<String, Long> readPhaseMetricMap = 
sourceMetricData.getReadPhaseMetricMap().entrySet().stream().collect(
                       Collectors.toMap(v -> v.getKey().getPhase(), e -> 
e.getValue().getReadPhase().getCount()));
               Map<String, TableMetric> tableMetricMap = 
sourceMetricData.getSubSourceMetricMap().entrySet().stream()
                       .collect(Collectors.toMap(Entry::getKey,
                               e -> new 
TableMetric(e.getValue().getNumRecordsIn().getCount(),
                                       
e.getValue().getNumBytesIn().getCount())));
               stateSplits
                       .add(new MetricSplit(countNumBytesIn, countNumRecordsIn, 
readPhaseMetricMap, tableMetricMap));
           }
   
           return stateSplits;
       }
   ```
   
   5. Restore the metrics from SourceSplitBase in the `addSplits` method.
   
   position: 
`inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/IncrementalSourceReader.java`
   
   ```java
   @Override
       public void addSplits(List<SourceSplitBase> splits) {
           // restore for finishedUnackedSplits
           List<SourceSplitBase> unfinishedSplits = new ArrayList<>();
           for (SourceSplitBase split : splits) {
               if (split instanceof MetricSplit) {
                   MetricSplit metricSplit = (MetricSplit) split;
                   LOG.info("inlong-metric-states restore metricSplit:{}", 
metricSplit);
                   
sourceReaderMetrics.initMetrics(metricSplit.getNumRecordsIn(),
                           metricSplit.getNumBytesIn(), 
metricSplit.getReadPhaseMetricMap(),
                           metricSplit.getTableMetricMap());
                   LOG.info("inlong-metric-states restore 
sourceReaderMetrics:{}",
                           sourceReaderMetrics.getSourceMetricData());
                   continue;
               }
               if (split.isSnapshotSplit()) {
                   SnapshotSplit snapshotSplit = split.asSnapshotSplit();
                   if (snapshotSplit.isSnapshotReadFinished()) {
                       finishedUnackedSplits.put(snapshotSplit.splitId(), 
snapshotSplit);
                   } else {
                       unfinishedSplits.add(split);
                   }
               } else {
                   // the stream split is uncompleted
                   if (!split.asStreamSplit().isCompletedSplit()) {
                       uncompletedStreamSplits.put(split.splitId(), 
split.asStreamSplit());
                       requestStreamSplitMetaIfNeeded(split.asStreamSplit());
                   } else {
                       uncompletedStreamSplits.remove(split.splitId());
                       StreamSplit streamSplit =
                               
discoverTableSchemasForStreamSplit(split.asStreamSplit());
                       unfinishedSplits.add(streamSplit);
                   }
               }
           }
           // notify split enumerator again about the finished unacked snapshot 
splits
           reportFinishedSnapshotSplitsIfNeed();
           // add all un-finished splits (including stream split) to 
SourceReaderBase
           super.addSplits(unfinishedSplits);
       }
   ```
   
   6. Implement the `OracleRecordEmitter` class, inheriting from 
`IncrementalSourceRecordEmitter`, and override the `processElement` method to 
collect metric of Oracle.
   
   position: 
`inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/OracleRecordEmitter.java`
   
   ```java
       @Override
       protected void processElement(
               SourceRecord element, SourceOutput<T> output, SourceSplitState 
splitState)
               throws Exception {
           if (isWatermarkEvent(element)) {
               Offset watermark = 
super.getOffsetPosition(element.sourceOffset());
               if (isHighWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
                   
splitState.asSnapshotSplitState().setHighWatermark(watermark);
               }
           } else if (isSchemaChangeEvent(element) && 
splitState.isStreamSplitState()) {
               HistoryRecord historyRecord = getHistoryRecord(element);
               Array tableChanges =
                       
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
               TableChanges changes = 
TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
               for (TableChanges.TableChange tableChange : changes) {
                   
splitState.asStreamSplitState().recordSchema(tableChange.getId(), tableChange);
               }
               if (includeSchemaChanges) {
                   emitElement(element, output, splitState);
               }
           } else if (isDataChangeRecord(element)) {
               if (splitState.isStreamSplitState()) {
                   Offset position = getOffsetPosition(element);
                   splitState.asStreamSplitState().setStartingOffset(position);
               }
               reportMetrics(element);
               emitElement(element, output, splitState);
           } else {
               // unknown element
               LOG.info("Meet unknown element {}, just skip.", element);
           }
       }
   
       protected void emitElement(SourceRecord element, SourceOutput<T> output,
               SourceSplitState splitState) throws Exception {
           debeziumDeserializationSchema.deserialize(element, new 
Collector<T>() {
   
               @Override
               public void collect(T record) {
                   Struct value = (Struct) element.value();
                   Struct source = value.getStruct(Envelope.FieldName.SOURCE);
                   String dbName = 
source.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
                   String schemaName = 
source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY);
                   String tableName = 
source.getString(AbstractSourceInfo.TABLE_NAME_KEY);
                   sourceReaderMetrics
                           .outputMetrics(dbName, schemaName, tableName, 
splitState.isSnapshotSplitState(), value);
                   output.collect(record);
               }
   
               @Override
               public void close() {
   
               }
           });
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to