This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6aebdc0384 [Improve][Oracle-CDC] Support
ReadOnlyLogWriterFlushStrategy (#8912)
6aebdc0384 is described below
commit 6aebdc0384a47140a38211f4b4e6aeeeb6dec18a
Author: hailin0 <[email protected]>
AuthorDate: Mon Mar 10 14:10:09 2025 +0800
[Improve][Oracle-CDC] Support ReadOnlyLogWriterFlushStrategy (#8912)
---
.../LogMinerStreamingChangeEventSource.java | 10 +++-
.../logwriter/ReadOnlyLogWriterFlushStrategy.java | 38 +++++++++++++++
.../oracle/config/OracleSourceConfigFactory.java | 2 +
.../ReadOnlyLogWriterFlushStrategyTest.java | 54 ++++++++++++++++++++++
4 files changed, 103 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
index a77da296fc..9ad42f1b6b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
@@ -17,6 +17,8 @@
package io.debezium.connector.oracle.logminer;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +35,7 @@ import io.debezium.connector.oracle.Scn;
import
io.debezium.connector.oracle.logminer.logwriter.CommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import
io.debezium.connector.oracle.logminer.logwriter.RacCommitLogWriterFlushStrategy;
+import
io.debezium.connector.oracle.logminer.logwriter.ReadOnlyLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.ErrorHandler;
@@ -1012,7 +1015,12 @@ public class LogMinerStreamingChangeEventSource
*
* @return the strategy to be used to flush Oracle's LGWR process, never
{@code null}.
*/
- private LogWriterFlushStrategy resolveFlushStrategy() {
+ public LogWriterFlushStrategy resolveFlushStrategy() {
+ if (connectorConfig
+ .getConfig()
+ .getBoolean(OracleSourceConfigFactory.LOG_MINING_READONLY_KEY,
false)) {
+ return new ReadOnlyLogWriterFlushStrategy();
+ }
if (connectorConfig.isRacSystem()) {
return new RacCommitLogWriterFlushStrategy(
connectorConfig, jdbcConfiguration, streamingMetrics);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategy.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategy.java
new file mode 100644
index 0000000000..6e80421a33
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.oracle.logminer.logwriter;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.oracle.Scn;
+
+public class ReadOnlyLogWriterFlushStrategy implements LogWriterFlushStrategy {
+ @Override
+ public String getHost() {
+ throw new DebeziumException("Not applicable when using read-only
flushing strategy");
+ }
+
+ @Override
+ public void flush(Scn currentScn) throws InterruptedException {
+ // no operation
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no operation
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
index d7d6c70061..fa240bc158 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
@@ -41,6 +41,7 @@ public class OracleSourceConfigFactory extends
JdbcSourceConfigFactory {
public static final String SCHEMA_CHANGE_KEY = "include.schema.changes";
public static final String LOG_MINING_STRATEGY_KEY = "log.mining.strategy";
public static final String LOG_MINING_STRATEGY_DEFAULT = "online_catalog";
+ public static final String LOG_MINING_READONLY_KEY =
"log.mining.read.only";
private List<String> schemaList;
@@ -106,6 +107,7 @@ public class OracleSourceConfigFactory extends
JdbcSourceConfigFactory {
props.setProperty("connect.timeout.ms",
String.valueOf(connectTimeoutMillis));
// disable tombstones
props.setProperty("tombstones.on.delete", String.valueOf(false));
+ props.setProperty(LOG_MINING_READONLY_KEY, "true");
if (originUrl != null) {
props.setProperty("database.url", originUrl);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategyTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategyTest.java
new file mode 100644
index 0000000000..c0d5a7e68b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategyTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.oracle.logminer.logwriter;
+
+import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.DebeziumException;
+import io.debezium.config.Configuration;
+import io.debezium.connector.oracle.OracleConnectorConfig;
+import
io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ReadOnlyLogWriterFlushStrategyTest {
+
+ @Test
+ void returnsReadOnlyLogWriterFlushStrategyWhenReadOnlyKeyIsTrue() throws
Exception {
+ OracleConnectorConfig config = mock(OracleConnectorConfig.class);
+ Configuration configuration = mock(Configuration.class);
+ when(config.getConfig()).thenReturn(configuration);
+
when(configuration.getBoolean(OracleSourceConfigFactory.LOG_MINING_READONLY_KEY,
false))
+ .thenReturn(true);
+
+ LogMinerStreamingChangeEventSource source =
+ new LogMinerStreamingChangeEventSource(
+ config, null, null, null, null, null, null, null);
+ LogWriterFlushStrategy strategy = source.resolveFlushStrategy();
+ assertTrue(strategy instanceof ReadOnlyLogWriterFlushStrategy);
+
+ Assertions.assertThrows(DebeziumException.class, () ->
strategy.getHost());
+ strategy.flush(null);
+ strategy.close();
+ }
+}