This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fa13965020 [INLONG-8436][Sort] Fix the backfill task not running bug
in oracle cdc connector (#8437)
fa13965020 is described below
commit fa139650206bd65cd2b8982d295bb065bd4dc391
Author: emhui <[email protected]>
AuthorDate: Wed Aug 16 11:25:45 2023 +0800
[INLONG-8436][Sort] Fix the backfill task not running bug in oracle cdc
connector (#8437)
---
.../source/reader/fetch/OracleScanFetchTask.java | 53 ++++++++++++----------
.../source/reader/fetch/OracleStreamFetchTask.java | 10 ++--
2 files changed, 34 insertions(+), 29 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java
index eb7111a8c7..ef12d46842 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java
@@ -114,17 +114,17 @@ public class OracleScanFetchTask implements
FetchTask<SourceSplitBase> {
snapshotSplitReadTask.execute(
changeEventSourceContext,
sourceFetchContext.getOffsetContext());
- final StreamSplit backfillBinlogSplit =
+ final StreamSplit backfillRedoLogSplit =
createBackfillRedoLogSplit(changeEventSourceContext);
- // optimization that skip the binlog read when the low watermark
equals high
+ // optimization that skip the redo log read when the low watermark
equals high
// watermark
- final boolean binlogBackfillRequired =
- backfillBinlogSplit
+ final boolean redoLogBackfillRequired =
+ backfillRedoLogSplit
.getEndingOffset()
- .isAfter(backfillBinlogSplit.getStartingOffset());
- if (!binlogBackfillRequired) {
- dispatchBinlogEndEvent(
- backfillBinlogSplit,
+ .isAfter(backfillRedoLogSplit.getStartingOffset());
+ if (!redoLogBackfillRequired) {
+ dispatchRedoLogEndEvent(
+ backfillRedoLogSplit,
((OracleSourceFetchTaskContext)
context).getOffsetContext().getPartition(),
((OracleSourceFetchTaskContext) context).getDispatcher());
taskRunning = false;
@@ -132,11 +132,16 @@ public class OracleScanFetchTask implements
FetchTask<SourceSplitBase> {
}
// execute redoLog read task
if (snapshotResult.isCompletedOrSkipped()) {
- final RedoLogSplitReadTask backfillBinlogReadTask =
- createBackfillRedoLogReadTask(backfillBinlogSplit,
sourceFetchContext);
- backfillBinlogReadTask.execute(
- new SnapshotBinlogSplitChangeEventSourceContext(),
- sourceFetchContext.getOffsetContext());
+ final RedoLogSplitReadTask backfillRedoLogReadReTask =
+ createBackfillRedoLogReadTask(backfillRedoLogSplit,
sourceFetchContext);
+ final LogMinerOracleOffsetContextLoader loader =
+ new LogMinerOracleOffsetContextLoader(
+ ((OracleSourceFetchTaskContext)
context).getDbzConnectorConfig());
+ final OracleOffsetContext oracleOffsetContext =
+
loader.load(backfillRedoLogSplit.getStartingOffset().getOffset());
+ backfillRedoLogReadReTask.execute(
+ new SnapshotStreamSplitChangeEventSourceContext(),
oracleOffsetContext);
+ taskRunning = false;
} else {
taskRunning = false;
throw new IllegalStateException(
@@ -156,13 +161,13 @@ public class OracleScanFetchTask implements
FetchTask<SourceSplitBase> {
}
private RedoLogSplitReadTask createBackfillRedoLogReadTask(
- StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext
context) {
+ StreamSplit backfillRedoLogSplit, OracleSourceFetchTaskContext
context) {
OracleConnectorConfig oracleConnectorConfig =
context.getSourceConfig().getDbzConnectorConfig();
final OffsetContext.Loader<OracleOffsetContext> loader =
new LogMinerOracleOffsetContextLoader(oracleConnectorConfig);
final OracleOffsetContext oracleOffsetContext =
-
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
+
loader.load(backfillRedoLogSplit.getStartingOffset().getOffset());
// we should only capture events for the current table,
// otherwise, we may can't find corresponding schema
Configuration dezConf =
@@ -173,7 +178,7 @@ public class OracleScanFetchTask implements
FetchTask<SourceSplitBase> {
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build();
- // task to read binlog and backfill for current split
+ // task to read redo log and backfill for current split
return new RedoLogSplitReadTask(
new OracleConnectorConfig(dezConf),
createOracleConnection(context.getSourceConfig().getDbzConfiguration()),
@@ -182,18 +187,18 @@ public class OracleScanFetchTask implements
FetchTask<SourceSplitBase> {
context.getDatabaseSchema(),
context.getSourceConfig().getOriginDbzConnectorConfig(),
context.getStreamingChangeEventSourceMetrics(),
- backfillBinlogSplit);
+ backfillRedoLogSplit);
}
- private void dispatchBinlogEndEvent(
- StreamSplit backFillBinlogSplit,
+ private void dispatchRedoLogEndEvent(
+ StreamSplit backFillRedoLogSplit,
Map<String, ?> sourcePartition,
JdbcSourceEventDispatcher eventDispatcher)
throws InterruptedException {
eventDispatcher.dispatchWatermarkEvent(
sourcePartition,
- backFillBinlogSplit,
- backFillBinlogSplit.getEndingOffset(),
+ backFillRedoLogSplit,
+ backFillRedoLogSplit.getEndingOffset(),
WatermarkKind.END);
}
@@ -284,7 +289,7 @@ public class OracleScanFetchTask implements
FetchTask<SourceSplitBase> {
"Snapshot step 3 - Determining high watermark {} for split
{}",
highWatermark,
snapshotSplit);
- ((SnapshotSplitChangeEventSourceContext)
(context)).setHighWatermark(lowWatermark);
+ ((SnapshotSplitChangeEventSourceContext)
(context)).setHighWatermark(highWatermark);
dispatcher.dispatchWatermarkEvent(
offsetContext.getPartition(), snapshotSplit,
highWatermark, WatermarkKind.HIGH);
return SnapshotResult.completed(ctx.offset);
@@ -462,10 +467,10 @@ public class OracleScanFetchTask implements
FetchTask<SourceSplitBase> {
}
/**
- * The {@link ChangeEventSource.ChangeEventSourceContext} implementation
for bounded binlog task
+ * The {@link ChangeEventSource.ChangeEventSourceContext} implementation
for bounded stream task
* of a snapshot split task.
*/
- public class SnapshotBinlogSplitChangeEventSourceContext
+ public class SnapshotStreamSplitChangeEventSourceContext
implements
ChangeEventSource.ChangeEventSourceContext {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java
index 300a007350..e5af201f89 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java
@@ -87,7 +87,7 @@ public class OracleStreamFetchTask implements
FetchTask<SourceSplitBase> {
}
/**
- * A wrapped task to read all binlog for table and also supports read
bounded (from lowWatermark
+ * A wrapped task to read all redo log for table and also supports read
bounded (from lowWatermark
* to highWatermark) binlog.
*/
public static class RedoLogSplitReadTask extends
LogMinerStreamingChangeEventSource {
@@ -130,11 +130,11 @@ public class OracleStreamFetchTask implements
FetchTask<SourceSplitBase> {
@Override
public void afterHandleScn(OracleOffsetContext offsetContext) {
super.afterHandleScn(offsetContext);
- // check do we need to stop for fetch binlog for snapshot split.
+ // check do we need to stop for fetch redo log for snapshot split.
if (isBoundedRead()) {
final RedoLogOffset currentRedoLogOffset =
getCurrentRedoLogOffset(offsetContext.getOffset());
- // reach the high watermark, the binlog fetcher should be
finished
+ // reach the high watermark, the redo log fetcher should be
finished
if
(currentRedoLogOffset.isAtOrAfter(redoLogSplit.getEndingOffset())) {
// send binlog end event
try {
@@ -148,8 +148,8 @@ public class OracleStreamFetchTask implements
FetchTask<SourceSplitBase> {
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog
signal event", e));
}
- // tell fetcher the binlog task finished
-
((OracleScanFetchTask.SnapshotBinlogSplitChangeEventSourceContext) context)
+ // tell fetcher the redo log task finished
+
((OracleScanFetchTask.SnapshotStreamSplitChangeEventSourceContext) context)
.finished();
}
}