This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cf35eecdfc [Fix][Connector-V2] Fix jdbc sink statement buffer wrong
time to clear (#8653)
cf35eecdfc is described below
commit cf35eecdfce5b8232a5de500a6c539e23fb4891a
Author: Jia Fan <[email protected]>
AuthorDate: Thu Feb 13 21:12:46 2025 +0800
[Fix][Connector-V2] Fix jdbc sink statement buffer wrong time to clear
(#8653)
---
.../BufferReducedBatchStatementExecutor.java | 3 -
.../executor/BufferedBatchStatementExecutor.java | 3 -
.../jdbc/catalog/utils/TestConnection.java | 287 +++++++++++++++++++++
.../jdbc/internal/executor/BufferExecutorTest.java | 59 +++++
.../BufferReducedBatchStatementExecutorTest.java | 69 +++++
.../BufferedBatchStatementExecutorTest.java | 51 ++++
6 files changed, 466 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutor.java
index 9f03f652f1..501898ce3c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutor.java
@@ -101,9 +101,6 @@ public class BufferReducedBatchStatementExecutor
executeBatch();
}
} finally {
- if (!buffer.isEmpty()) {
- buffer.clear();
- }
upsertExecutor.closeStatements();
deleteExecutor.closeStatements();
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutor.java
index 0ebef85f52..2ec87c432e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutor.java
@@ -62,9 +62,6 @@ public class BufferedBatchStatementExecutor implements
JdbcBatchStatementExecuto
executeBatch();
}
} finally {
- if (!buffer.isEmpty()) {
- buffer.clear();
- }
statementExecutor.closeStatements();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestConnection.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestConnection.java
new file mode 100644
index 0000000000..4f1213f912
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestConnection.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+public class TestConnection implements Connection {
+ @Override
+ public Statement createStatement() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public String nativeSQL(String sql) throws SQLException {
+ return "";
+ }
+
+ @Override
+ public void setAutoCommit(boolean autoCommit) throws SQLException {}
+
+ @Override
+ public boolean getAutoCommit() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void commit() throws SQLException {}
+
+ @Override
+ public void rollback() throws SQLException {}
+
+ @Override
+ public void close() throws SQLException {}
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public DatabaseMetaData getMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setReadOnly(boolean readOnly) throws SQLException {}
+
+ @Override
+ public boolean isReadOnly() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setCatalog(String catalog) throws SQLException {}
+
+ @Override
+ public String getCatalog() throws SQLException {
+ return "";
+ }
+
+ @Override
+ public void setTransactionIsolation(int level) throws SQLException {}
+
+ @Override
+ public int getTransactionIsolation() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {}
+
+ @Override
+ public Statement createStatement(int resultSetType, int
resultSetConcurrency)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(
+ String sql, int resultSetType, int resultSetConcurrency) throws
SQLException {
+ return null;
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int
resultSetConcurrency)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void setTypeMap(Map<String, Class<?>> map) throws SQLException {}
+
+ @Override
+ public void setHoldability(int holdability) throws SQLException {}
+
+ @Override
+ public int getHoldability() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public Savepoint setSavepoint() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Savepoint setSavepoint(String name) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void rollback(Savepoint savepoint) throws SQLException {}
+
+ @Override
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {}
+
+ @Override
+ public Statement createStatement(
+ int resultSetType, int resultSetConcurrency, int
resultSetHoldability)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(
+ String sql, int resultSetType, int resultSetConcurrency, int
resultSetHoldability)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public CallableStatement prepareCall(
+ String sql, int resultSetType, int resultSetConcurrency, int
resultSetHoldability)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int
autoGeneratedKeys)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int[] columnIndexes)
throws SQLException {
+ return null;
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, String[] columnNames)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Clob createClob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Blob createBlob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public NClob createNClob() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public SQLXML createSQLXML() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isValid(int timeout) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setClientInfo(String name, String value) throws
SQLClientInfoException {}
+
+ @Override
+ public void setClientInfo(Properties properties) throws
SQLClientInfoException {}
+
+ @Override
+ public String getClientInfo(String name) throws SQLException {
+ return "";
+ }
+
+ @Override
+ public Properties getClientInfo() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Array createArrayOf(String typeName, Object[] elements) throws
SQLException {
+ return null;
+ }
+
+ @Override
+ public Struct createStruct(String typeName, Object[] attributes) throws
SQLException {
+ return null;
+ }
+
+ @Override
+ public void setSchema(String schema) throws SQLException {}
+
+ @Override
+ public String getSchema() throws SQLException {
+ return "";
+ }
+
+ @Override
+ public void abort(Executor executor) throws SQLException {}
+
+ @Override
+ public void setNetworkTimeout(Executor executor, int milliseconds) throws
SQLException {}
+
+ @Override
+ public int getNetworkTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return false;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferExecutorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferExecutorTest.java
new file mode 100644
index 0000000000..1ca5c5f6b4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferExecutorTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.TestConnection;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class BufferExecutorTest {
+
+ abstract JdbcBatchStatementExecutor<SeaTunnelRow>
getExecutorWithBatchRecorder(
+ List<SeaTunnelRow> recorder);
+
+ @Test
+ void testCacheAlwaysExistWhenInsertFailed() throws SQLException {
+ List<SeaTunnelRow> recorder = new ArrayList<>();
+
+ JdbcBatchStatementExecutor<SeaTunnelRow> executor =
getExecutorWithBatchRecorder(recorder);
+ executor.prepareStatements(new TestConnection());
+ executor.addToBatch(new SeaTunnelRow(new Object[] {"test"}));
+
+ SQLException exception =
+ Assertions.assertThrows(SQLException.class,
executor::executeBatch);
+ Assertions.assertEquals("test", exception.getMessage());
+ // the main point of this test is to check if the buffer is cleared
after closeStatements
+ // and prepareStatements when executeBatch failed
+ Assertions.assertThrows(SQLException.class, executor::closeStatements);
+ executor.prepareStatements(new TestConnection());
+ SQLException exception2 =
+ Assertions.assertThrows(SQLException.class,
executor::executeBatch);
+ Assertions.assertEquals("test", exception2.getMessage());
+
+ // three times of addToBatch, 1. executeBatch, 2. closeStatements, 3.
executeBatch
+ Assertions.assertEquals(3, recorder.size());
+ // same row to executeBatch
+ Assertions.assertEquals(recorder.get(0), recorder.get(2));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutorTest.java
new file mode 100644
index 0000000000..3f72235389
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutorTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.function.Function;
+
+public class BufferReducedBatchStatementExecutorTest extends
BufferExecutorTest {
+ @Override
+ JdbcBatchStatementExecutor<SeaTunnelRow> getExecutorWithBatchRecorder(
+ List<SeaTunnelRow> recorder) {
+ return new BufferReducedBatchStatementExecutor(
+ new JdbcBatchStatementExecutor<SeaTunnelRow>() {
+ @Override
+ public void prepareStatements(Connection connection)
throws SQLException {}
+
+ @Override
+ public void addToBatch(SeaTunnelRow record) throws
SQLException {
+ recorder.add(record);
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ throw new SQLException("test");
+ }
+
+ @Override
+ public void closeStatements() throws SQLException {}
+ },
+ new JdbcBatchStatementExecutor<SeaTunnelRow>() {
+ @Override
+ public void prepareStatements(Connection connection)
throws SQLException {}
+
+ @Override
+ public void addToBatch(SeaTunnelRow record) throws
SQLException {
+ recorder.add(record);
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ throw new SQLException("test");
+ }
+
+ @Override
+ public void closeStatements() throws SQLException {}
+ },
+ Function.identity(),
+ Function.identity());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutorTest.java
new file mode 100644
index 0000000000..c4a5b38481
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.function.Function;
+
+public class BufferedBatchStatementExecutorTest extends BufferExecutorTest {
+ @Override
+ JdbcBatchStatementExecutor<SeaTunnelRow> getExecutorWithBatchRecorder(
+ List<SeaTunnelRow> recorder) {
+ return new BufferedBatchStatementExecutor(
+ new JdbcBatchStatementExecutor<SeaTunnelRow>() {
+ @Override
+ public void prepareStatements(Connection connection)
throws SQLException {}
+
+ @Override
+ public void addToBatch(SeaTunnelRow record) throws
SQLException {
+ recorder.add(record);
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ throw new SQLException("test");
+ }
+
+ @Override
+ public void closeStatements() throws SQLException {}
+ },
+ Function.identity());
+ }
+}