dailai commented on code in PR #6740:
URL: https://github.com/apache/seatunnel/pull/6740#discussion_r1579365909


##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java:
##########
@@ -110,265 +119,285 @@ public LogMinerStreamingChangeEventSource(
         this.schema = schema;
         this.connectorConfig = connectorConfig;
         this.strategy = connectorConfig.getLogMiningStrategy();
-        this.isContinuousMining =
-                jdbcConnection.getOracleVersion().getMajor() >= 19
-                        ? false
-                        : connectorConfig.isContinuousMining();
+        this.isContinuousMining = connectorConfig.isContinuousMining();
         this.errorHandler = errorHandler;
         this.streamingMetrics = streamingMetrics;
         this.jdbcConfiguration = JdbcConfiguration.adapt(jdbcConfig);
-        this.isRac = connectorConfig.isRacSystem();
-        if (this.isRac) {
-            this.racHosts.addAll(
-                    connectorConfig.getRacNodes().stream()
-                            .map(String::toUpperCase)
-                            .collect(Collectors.toSet()));
-            instantiateFlushConnections(jdbcConfiguration, racHosts);
-        }
         this.archiveLogRetention = 
connectorConfig.getLogMiningArchiveLogRetention();
         this.archiveLogOnlyMode = connectorConfig.isArchiveLogOnlyMode();
         this.archiveDestinationName = 
connectorConfig.getLogMiningArchiveDestinationName();
+        this.logFileQueryMaxRetries = 
connectorConfig.getMaximumNumberOfLogQueryRetries();
+        this.initialDelay = connectorConfig.getLogMiningInitialDelay();
+        this.maxDelay = connectorConfig.getLogMiningMaxDelay();
     }
 
     /**
-     * This is the loop to get changes from LogMiner.
+     * This is the loop to get changes from LogMiner
      *
      * @param context change event source context
      */
     @Override
-    public void execute(ChangeEventSourceContext context, OracleOffsetContext 
offsetContext) {
-        try (TransactionalBuffer transactionalBuffer =
-                new TransactionalBuffer(
-                        connectorConfig, schema, clock, errorHandler, 
streamingMetrics)) {
-            try {
-                startScn = offsetContext.getScn();
-
-                Scn firstOnlineLogScn;
-                if (!isContinuousMining
-                        && startScn.compareTo(
-                                        (firstOnlineLogScn =
-                                                getFirstOnlineLogScn(
-                                                        jdbcConnection,
-                                                        archiveLogRetention,
-                                                        
archiveDestinationName)))
-                                < 0) {
-                    // throw new DebeziumException(
-                    //        "Online REDO LOG files or archive log files do 
not contain the offset
-                    // scn "
-                    //                + startScn
-                    //                + ".  Please perform a new snapshot.");
-
-                    //
-                    LOGGER.warn(
-                            "Online REDO LOG files or archive log files do not 
contain the offset scn {}."
-                                    + "Turn start scn to online log first scn: 
{}.",
-                            startScn,
-                            firstOnlineLogScn);
-                    startScn = firstOnlineLogScn;
+    public void execute(
+            ChangeEventSourceContext context,
+            OraclePartition partition,
+            OracleOffsetContext offsetContext) {
+        if (!connectorConfig.getSnapshotMode().shouldStream()) {
+            LOGGER.info("Streaming is not enabled in current configuration");
+            return;
+        }
+        try {
+            // We explicitly expect auto-commit to be disabled
+            jdbcConnection.setAutoCommit(false);
+
+            startScn = offsetContext.getScn();
+            snapshotScn = offsetContext.getSnapshotScn();
+            Scn firstScn = getFirstScnInLogs(jdbcConnection);
+            if (startScn.compareTo(snapshotScn) == 0) {
+                // This is the initial run of the streaming change event 
source.
+                // We need to compute the correct start offset for mining. 
That is not the snapshot
+                // offset,
+                // but the start offset of the oldest transaction that was 
still pending when the
+                // snapshot
+                // was taken.
+                computeStartScnForFirstMiningSession(offsetContext, firstScn);
+            }
+
+            try (LogWriterFlushStrategy flushStrategy = 
resolveFlushStrategy()) {
+                if (!isContinuousMining && 
startScn.compareTo(firstScn.subtract(Scn.ONE)) < 0) {
+                    // startScn is the exclusive lower bound, so must be >= 
(firstScn - 1)
+                    throw new DebeziumException(
+                            "Online REDO LOG files or archive log files do not 
contain the offset scn "
+                                    + startScn
+                                    + ".  Please perform a new snapshot.");
                 }
 
                 setNlsSessionParameters(jdbcConnection);
-                checkSupplementalLogging(jdbcConnection, 
connectorConfig.getPdbName(), schema);
+                checkDatabaseAndTableState(jdbcConnection, 
connectorConfig.getPdbName(), schema);
 
-                if (archiveLogOnlyMode && 
!waitForStartScnInArchiveLogs(context, startScn)) {
-                    return;
-                }
+                try (LogMinerEventProcessor processor =
+                        createProcessor(context, partition, offsetContext)) {
 
-                initializeRedoLogsForMining(jdbcConnection, false, startScn);
-
-                HistoryRecorder historyRecorder = 
connectorConfig.getLogMiningHistoryRecorder();
-
-                try {
-                    // todo: why can't OracleConnection be used rather than a
-                    // Factory+JdbcConfiguration?
-                    historyRecorder.prepare(
-                            streamingMetrics,
-                            jdbcConfiguration,
-                            
connectorConfig.getLogMinerHistoryRetentionHours());
-
-                    final LogMinerQueryResultProcessor processor =
-                            new LogMinerQueryResultProcessor(
-                                    context,
-                                    connectorConfig,
-                                    streamingMetrics,
-                                    transactionalBuffer,
-                                    offsetContext,
-                                    schema,
-                                    dispatcher,
-                                    historyRecorder);
-
-                    final String query =
-                            LogMinerQueryBuilder.build(
-                                    connectorConfig, schema, 
jdbcConnection.username());
-                    try (PreparedStatement miningView =
-                            jdbcConnection
-                                    .connection()
-                                    .prepareStatement(
-                                            query,
-                                            ResultSet.TYPE_FORWARD_ONLY,
-                                            ResultSet.CONCUR_READ_ONLY,
-                                            
ResultSet.HOLD_CURSORS_OVER_COMMIT)) {
-
-                        currentRedoLogSequences = getCurrentRedoLogSequences();
-                        Stopwatch stopwatch = Stopwatch.reusable();
-                        while (context.isRunning()) {
-                            // Calculate time difference before each mining 
session to detect time
-                            // zone offset changes (e.g. DST) on database 
server
-                            
streamingMetrics.calculateTimeDifference(getSystime(jdbcConnection));
-
-                            if (archiveLogOnlyMode
-                                    && !waitForStartScnInArchiveLogs(context, 
startScn)) {
-                                break;
-                            }
+                    if (archiveLogOnlyMode && 
!waitForStartScnInArchiveLogs(context, startScn)) {
+                        return;
+                    }
 
-                            Instant start = Instant.now();
-                            endScn =
-                                    getEndScn(
-                                            jdbcConnection,
-                                            startScn,
-                                            endScn,
-                                            streamingMetrics,
-                                            
connectorConfig.getLogMiningBatchSizeDefault(),
-                                            connectorConfig.isLobEnabled(),
-                                            
connectorConfig.isArchiveLogOnlyMode(),
-                                            
connectorConfig.getLogMiningArchiveDestinationName());
-
-                            // This is a small window where when archive log 
only mode has
-                            // completely caught up to the last
-                            // record in the archive logs that both the start 
and end values are
-                            // identical. In this use
-                            // case we want to pause and restart the loop 
waiting for a new archive
-                            // log before proceeding.
-                            if (archiveLogOnlyMode && startScn.equals(endScn)) 
{
-                                pauseBetweenMiningSessions();
-                                
dispatcher.dispatchHeartbeatEvent(offsetContext);
-                                continue;
-                            }
+                    initializeRedoLogsForMining(jdbcConnection, false, 
startScn);
 
-                            if (hasLogSwitchOccurred()) {
-                                // This is the way to mitigate PGA leaks.
-                                // With one mining session, it grows and maybe 
there is another way
-                                // to flush PGA.
-                                // At this point we use a new mining session
-                                LOGGER.trace(
-                                        "Ending log mining startScn={}, 
endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
-                                        startScn,
-                                        endScn,
-                                        offsetContext.getScn(),
-                                        strategy,
-                                        isContinuousMining);
-                                endMining(jdbcConnection);
-
-                                initializeRedoLogsForMining(jdbcConnection, 
true, startScn);
-
-                                abandonOldTransactionsIfExist(
-                                        jdbcConnection, offsetContext, 
transactionalBuffer);
-
-                                // This needs to be re-calculated because 
building the data
-                                // dictionary will force the
-                                // current redo log sequence to be advanced 
due to a complete log
-                                // switch of all logs.
-                                currentRedoLogSequences = 
getCurrentRedoLogSequences();
-                            }
+                    int retryAttempts = 1;
+                    Stopwatch sw = Stopwatch.accumulating().start();
+                    while (context.isRunning()) {
+                        // Calculate time difference before each mining 
session to detect time zone
+                        // offset changes (e.g. DST) on database server
+                        streamingMetrics.calculateTimeDifference(
+                                getDatabaseSystemTime(jdbcConnection));
 
-                            startLogMining(
-                                    jdbcConnection,
-                                    startScn,
-                                    endScn,
-                                    strategy,
-                                    isContinuousMining,
-                                    streamingMetrics);
-
-                            LOGGER.trace(
-                                    "Fetching LogMiner view results SCN {} to 
{}",
-                                    startScn,
-                                    endScn);
-                            stopwatch.start();
-                            
miningView.setFetchSize(connectorConfig.getMaxQueueSize());
-                            
miningView.setFetchDirection(ResultSet.FETCH_FORWARD);
-                            miningView.setString(1, startScn.toString());
-                            miningView.setString(2, endScn.toString());
-                            try (ResultSet rs = miningView.executeQuery()) {
-                                Duration lastDurationOfBatchCapturing =
-                                        
stopwatch.stop().durations().statistics().getTotal();
-                                
streamingMetrics.setLastDurationOfBatchCapturing(
-                                        lastDurationOfBatchCapturing);
-                                processor.processResult(rs);
-                                if (connectorConfig.isLobEnabled()) {
-                                    startScn =
-                                            
transactionalBuffer.updateOffsetContext(
-                                                    offsetContext, dispatcher);
-                                } else {
-
-                                    final Scn lastProcessedScn = 
processor.getLastProcessedScn();
-                                    if (!lastProcessedScn.isNull()
-                                            && 
lastProcessedScn.compareTo(endScn) < 0) {
-                                        // If the last processed SCN is before 
the endScn we need to
-                                        // use the last processed SCN as the
-                                        // next starting point as the LGWR 
buffer didn't flush all
-                                        // entries from memory to disk yet.
-                                        endScn = lastProcessedScn;
-                                    }
-
-                                    if (transactionalBuffer.isEmpty()) {
-                                        LOGGER.debug(
-                                                "Buffer is empty, updating 
offset SCN to {}",
-                                                endScn);
-                                        offsetContext.setScn(endScn);
-                                    } else {
-                                        final Scn minStartScn = 
transactionalBuffer.getMinimumScn();
-                                        if (!minStartScn.isNull()) {
-                                            offsetContext.setScn(
-                                                    
minStartScn.subtract(Scn.valueOf(1)));
-                                            
dispatcher.dispatchHeartbeatEvent(offsetContext);
-                                        }
-                                    }
-                                    startScn = endScn;
-                                }
+                        if (archiveLogOnlyMode
+                                && !waitForStartScnInArchiveLogs(context, 
startScn)) {
+                            break;
+                        }
+
+                        Instant start = Instant.now();
+                        endScn = calculateEndScn(jdbcConnection, startScn, 
endScn);
+
+                        // This is a small window where when archive log only 
mode has completely
+                        // caught up to the last
+                        // record in the archive logs that both the start and 
end values are
+                        // identical. In this use
+                        // case we want to pause and restart the loop waiting 
for a new archive log
+                        // before proceeding.
+                        if (archiveLogOnlyMode && startScn.equals(endScn)) {
+                            pauseBetweenMiningSessions();
+                            continue;
+                        }
+
+                        flushStrategy.flush(jdbcConnection.getCurrentScn());
+
+                        boolean restartRequired = false;
+                        if 
(connectorConfig.getLogMiningMaximumSession().isPresent()) {
+                            final Duration totalDuration =
+                                    
sw.stop().durations().statistics().getTotal();
+                            if (totalDuration.toMillis()
+                                    >= connectorConfig
+                                            .getLogMiningMaximumSession()
+                                            .get()
+                                            .toMillis()) {
+                                LOGGER.info(
+                                        "LogMiner session has exceeded maximum 
session time of '{}', forcing restart.",
+                                        
connectorConfig.getLogMiningMaximumSession());
+                                restartRequired = true;
+                            } else {
+                                // resume the existing stop watch, we haven't 
met the criteria yet
+                                sw.start();
                             }
+                        }
 
-                            afterHandleScn(offsetContext);
-                            streamingMetrics.setCurrentBatchProcessingTime(
-                                    Duration.between(start, Instant.now()));
+                        if (restartRequired || hasLogSwitchOccurred()) {
+                            // This is the way to mitigate PGA leaks.
+                            // With one mining session, it grows and maybe 
there is another way to
+                            // flush PGA.
+                            // At this point we use a new mining session
+                            endMiningSession(jdbcConnection, offsetContext);
+                            initializeRedoLogsForMining(jdbcConnection, true, 
startScn);
+
+                            // log switch or restart required, re-create a new 
stop watch
+                            sw = Stopwatch.accumulating().start();
+                        }
+
+                        if (context.isRunning()) {
+                            if (!startMiningSession(
+                                    jdbcConnection, startScn, endScn, 
retryAttempts)) {
+                                retryAttempts++;
+                            } else {
+                                retryAttempts = 1;
+                                startScn = processor.process(partition, 
startScn, endScn);
+                                streamingMetrics.setCurrentBatchProcessingTime(
+                                        Duration.between(start, 
Instant.now()));
+                                captureSessionMemoryStatistics(jdbcConnection);
+                            }
                             pauseBetweenMiningSessions();
                         }
                     }
-                } finally {
-                    historyRecorder.close();
                 }
-            } catch (Throwable t) {
-                logError(streamingMetrics, "Mining session stopped due to the 
{}", t);
-                errorHandler.setProducerThrowable(t);
-            } finally {
+            }
+        } catch (Throwable t) {
+            logError(streamingMetrics, "Mining session stopped due to the {}", 
t);
+            errorHandler.setProducerThrowable(t);
+        } finally {
+            LOGGER.info("startScn={}, endScn={}", startScn, endScn);
+            LOGGER.info("Streaming metrics dump: {}", 
streamingMetrics.toString());
+            LOGGER.info("Offsets: {}", offsetContext);
+        }
+    }
+
+    /**
+     * Computes the start SCN for the first mining session.
+     *
+     * <p>Normally, this would be the snapshot SCN, but if there were pending 
transactions at the
+     * time the snapshot was taken, we'd miss the events in those transactions 
that have an SCN
+     * smaller than the snapshot SCN.
+     *
+     * @param offsetContext the offset context
+     * @param firstScn the oldest SCN still available in the REDO logs
+     */
+    private void computeStartScnForFirstMiningSession(
+            OracleOffsetContext offsetContext, Scn firstScn) {
+        // This is the initial run of the streaming change event source.
+        // We need to compute the correct start offset for mining. That is not 
the snapshot offset,
+        // but the start offset of the oldest transaction that was still 
pending when the snapshot
+        // was taken.
+        Map<String, Scn> snapshotPendingTransactions =
+                offsetContext.getSnapshotPendingTransactions();
+        if (snapshotPendingTransactions == null || 
snapshotPendingTransactions.isEmpty()) {
+            // no pending transactions, we can start mining from the snapshot 
SCN
+            startScn = snapshotScn;
+        } else {
+            // find the oldest transaction we can still fully process, and 
start from there.
+            Scn minScn = snapshotScn;
+            for (Map.Entry<String, Scn> entry : 
snapshotPendingTransactions.entrySet()) {
+                String transactionId = entry.getKey();
+                Scn scn = entry.getValue();
+                LOGGER.info(
+                        "Transaction {} was pending across snapshot boundary. 
Start SCN = {}, snapshot SCN = {}",
+                        transactionId,
+                        scn,
+                        startScn);
+                if (scn.compareTo(firstScn) < 0) {
+                    LOGGER.warn(
+                            "Transaction {} was still ongoing while snapshot 
was taken, but is no longer completely recorded in the archive logs. Events 
will be lost. Oldest SCN in logs = {}, TX start SCN = {}",
+                            transactionId,
+                            firstScn,
+                            scn);
+                    minScn = firstScn;
+                } else if (scn.compareTo(minScn) < 0) {
+                    minScn = scn;
+                }
+            }
+
+            // Make sure the commit SCN is at least the snapshot SCN - 1.
+            // This ensures we'll never emit events for transactions that were 
complete before the
+            // snapshot was
+            // taken.
+            if (offsetContext.getCommitScn().compareTo(snapshotScn) < 0) {
+                LOGGER.info(
+                        "Setting commit SCN to {} (snapshot SCN - 1) to ensure 
we don't double-emit events from pre-snapshot transactions.",
+                        snapshotScn.subtract(Scn.ONE));
+                offsetContext
+                        .getCommitScn()
+                        
.setCommitScnOnAllThreads(snapshotScn.subtract(Scn.ONE));
+            }
+
+            // set start SCN to minScn
+            if (minScn.compareTo(startScn) < 0) {

Review Comment:
   We rely on debezium at the bottom, and I think the cdc upgrade should also 
be based on debezium until the problem is discovered。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to