This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cf35eecdfc [Fix][Connector-V2] Fix jdbc sink statement buffer wrong 
time to clear (#8653)
cf35eecdfc is described below

commit cf35eecdfce5b8232a5de500a6c539e23fb4891a
Author: Jia Fan <[email protected]>
AuthorDate: Thu Feb 13 21:12:46 2025 +0800

    [Fix][Connector-V2] Fix jdbc sink statement buffer wrong time to clear 
(#8653)
---
 .../BufferReducedBatchStatementExecutor.java       |   3 -
 .../executor/BufferedBatchStatementExecutor.java   |   3 -
 .../jdbc/catalog/utils/TestConnection.java         | 287 +++++++++++++++++++++
 .../jdbc/internal/executor/BufferExecutorTest.java |  59 +++++
 .../BufferReducedBatchStatementExecutorTest.java   |  69 +++++
 .../BufferedBatchStatementExecutorTest.java        |  51 ++++
 6 files changed, 466 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutor.java
index 9f03f652f1..501898ce3c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutor.java
@@ -101,9 +101,6 @@ public class BufferReducedBatchStatementExecutor
                 executeBatch();
             }
         } finally {
-            if (!buffer.isEmpty()) {
-                buffer.clear();
-            }
             upsertExecutor.closeStatements();
             deleteExecutor.closeStatements();
         }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutor.java
index 0ebef85f52..2ec87c432e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutor.java
@@ -62,9 +62,6 @@ public class BufferedBatchStatementExecutor implements 
JdbcBatchStatementExecuto
                 executeBatch();
             }
         } finally {
-            if (!buffer.isEmpty()) {
-                buffer.clear();
-            }
             statementExecutor.closeStatements();
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestConnection.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestConnection.java
new file mode 100644
index 0000000000..4f1213f912
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestConnection.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+public class TestConnection implements Connection {
+    @Override
+    public Statement createStatement() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String nativeSQL(String sql) throws SQLException {
+        return "";
+    }
+
+    @Override
+    public void setAutoCommit(boolean autoCommit) throws SQLException {}
+
+    @Override
+    public boolean getAutoCommit() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public void commit() throws SQLException {}
+
+    @Override
+    public void rollback() throws SQLException {}
+
+    @Override
+    public void close() throws SQLException {}
+
+    @Override
+    public boolean isClosed() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public DatabaseMetaData getMetaData() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void setReadOnly(boolean readOnly) throws SQLException {}
+
+    @Override
+    public boolean isReadOnly() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public void setCatalog(String catalog) throws SQLException {}
+
+    @Override
+    public String getCatalog() throws SQLException {
+        return "";
+    }
+
+    @Override
+    public void setTransactionIsolation(int level) throws SQLException {}
+
+    @Override
+    public int getTransactionIsolation() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public SQLWarning getWarnings() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void clearWarnings() throws SQLException {}
+
+    @Override
+    public Statement createStatement(int resultSetType, int 
resultSetConcurrency)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(
+            String sql, int resultSetType, int resultSetConcurrency) throws 
SQLException {
+        return null;
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql, int resultSetType, int 
resultSetConcurrency)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Map<String, Class<?>> getTypeMap() throws SQLException {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {}
+
+    @Override
+    public void setHoldability(int holdability) throws SQLException {}
+
+    @Override
+    public int getHoldability() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public Savepoint setSavepoint() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Savepoint setSavepoint(String name) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void rollback(Savepoint savepoint) throws SQLException {}
+
+    @Override
+    public void releaseSavepoint(Savepoint savepoint) throws SQLException {}
+
+    @Override
+    public Statement createStatement(
+            int resultSetType, int resultSetConcurrency, int 
resultSetHoldability)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(
+            String sql, int resultSetType, int resultSetConcurrency, int 
resultSetHoldability)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public CallableStatement prepareCall(
+            String sql, int resultSetType, int resultSetConcurrency, int 
resultSetHoldability)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int 
autoGeneratedKeys)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) 
throws SQLException {
+        return null;
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, String[] columnNames)
+            throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Clob createClob() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Blob createBlob() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public NClob createNClob() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public SQLXML createSQLXML() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isValid(int timeout) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public void setClientInfo(String name, String value) throws 
SQLClientInfoException {}
+
+    @Override
+    public void setClientInfo(Properties properties) throws 
SQLClientInfoException {}
+
+    @Override
+    public String getClientInfo(String name) throws SQLException {
+        return "";
+    }
+
+    @Override
+    public Properties getClientInfo() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public Array createArrayOf(String typeName, Object[] elements) throws 
SQLException {
+        return null;
+    }
+
+    @Override
+    public Struct createStruct(String typeName, Object[] attributes) throws 
SQLException {
+        return null;
+    }
+
+    @Override
+    public void setSchema(String schema) throws SQLException {}
+
+    @Override
+    public String getSchema() throws SQLException {
+        return "";
+    }
+
+    @Override
+    public void abort(Executor executor) throws SQLException {}
+
+    @Override
+    public void setNetworkTimeout(Executor executor, int milliseconds) throws 
SQLException {}
+
+    @Override
+    public int getNetworkTimeout() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return false;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferExecutorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferExecutorTest.java
new file mode 100644
index 0000000000..1ca5c5f6b4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferExecutorTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.TestConnection;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class BufferExecutorTest {
+
+    abstract JdbcBatchStatementExecutor<SeaTunnelRow> 
getExecutorWithBatchRecorder(
+            List<SeaTunnelRow> recorder);
+
+    @Test
+    void testCacheAlwaysExistWhenInsertFailed() throws SQLException {
+        List<SeaTunnelRow> recorder = new ArrayList<>();
+
+        JdbcBatchStatementExecutor<SeaTunnelRow> executor = 
getExecutorWithBatchRecorder(recorder);
+        executor.prepareStatements(new TestConnection());
+        executor.addToBatch(new SeaTunnelRow(new Object[] {"test"}));
+
+        SQLException exception =
+                Assertions.assertThrows(SQLException.class, 
executor::executeBatch);
+        Assertions.assertEquals("test", exception.getMessage());
+        // the main point of this test is to check if the buffer is cleared 
after closeStatements
+        // and prepareStatements when executeBatch failed
+        Assertions.assertThrows(SQLException.class, executor::closeStatements);
+        executor.prepareStatements(new TestConnection());
+        SQLException exception2 =
+                Assertions.assertThrows(SQLException.class, 
executor::executeBatch);
+        Assertions.assertEquals("test", exception2.getMessage());
+
+        // three times of addToBatch, 1. executeBatch, 2. closeStatements, 3. 
executeBatch
+        Assertions.assertEquals(3, recorder.size());
+        // same row to executeBatch
+        Assertions.assertEquals(recorder.get(0), recorder.get(2));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutorTest.java
new file mode 100644
index 0000000000..3f72235389
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutorTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.function.Function;
+
+public class BufferReducedBatchStatementExecutorTest extends 
BufferExecutorTest {
+    @Override
+    JdbcBatchStatementExecutor<SeaTunnelRow> getExecutorWithBatchRecorder(
+            List<SeaTunnelRow> recorder) {
+        return new BufferReducedBatchStatementExecutor(
+                new JdbcBatchStatementExecutor<SeaTunnelRow>() {
+                    @Override
+                    public void prepareStatements(Connection connection) 
throws SQLException {}
+
+                    @Override
+                    public void addToBatch(SeaTunnelRow record) throws 
SQLException {
+                        recorder.add(record);
+                    }
+
+                    @Override
+                    public void executeBatch() throws SQLException {
+                        throw new SQLException("test");
+                    }
+
+                    @Override
+                    public void closeStatements() throws SQLException {}
+                },
+                new JdbcBatchStatementExecutor<SeaTunnelRow>() {
+                    @Override
+                    public void prepareStatements(Connection connection) 
throws SQLException {}
+
+                    @Override
+                    public void addToBatch(SeaTunnelRow record) throws 
SQLException {
+                        recorder.add(record);
+                    }
+
+                    @Override
+                    public void executeBatch() throws SQLException {
+                        throw new SQLException("test");
+                    }
+
+                    @Override
+                    public void closeStatements() throws SQLException {}
+                },
+                Function.identity(),
+                Function.identity());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutorTest.java
new file mode 100644
index 0000000000..c4a5b38481
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.function.Function;
+
+public class BufferedBatchStatementExecutorTest extends BufferExecutorTest {
+    @Override
+    JdbcBatchStatementExecutor<SeaTunnelRow> getExecutorWithBatchRecorder(
+            List<SeaTunnelRow> recorder) {
+        return new BufferedBatchStatementExecutor(
+                new JdbcBatchStatementExecutor<SeaTunnelRow>() {
+                    @Override
+                    public void prepareStatements(Connection connection) 
throws SQLException {}
+
+                    @Override
+                    public void addToBatch(SeaTunnelRow record) throws 
SQLException {
+                        recorder.add(record);
+                    }
+
+                    @Override
+                    public void executeBatch() throws SQLException {
+                        throw new SQLException("test");
+                    }
+
+                    @Override
+                    public void closeStatements() throws SQLException {}
+                },
+                Function.identity());
+    }
+}

Reply via email to