This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 4bf1ed2  [FLINK-17544][jdbc] Fix NPE and resource leak problem in 
JdbcOutputFormat
4bf1ed2 is described below

commit 4bf1ed2ac4006ee5c4af54c028fe4eede07ffcc6
Author: Shengkai <[email protected]>
AuthorDate: Mon Jun 22 14:15:32 2020 +0800

    [FLINK-17544][jdbc] Fix NPE and resource leak problem in JdbcOutputFormat
    
    This closes #12712
---
 .../jdbc/internal/JdbcBatchingOutputFormat.java    |  6 +--
 .../jdbc/internal/TableJdbcUpsertOutputFormat.java |  4 +-
 .../connector/jdbc/internal/JdbcFullTest.java      | 47 ++++++++++++++++++++++
 .../jdbc/internal/JdbcTableOutputFormatTest.java   | 14 +++++++
 4 files changed, 66 insertions(+), 5 deletions(-)

diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
index af41ed1..c7afc31 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -209,8 +209,6 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
                if (!closed) {
                        closed = true;
 
-                       checkFlushException();
-
                        if (this.scheduledFuture != null) {
                                scheduledFuture.cancel(false);
                                this.scheduler.shutdown();
@@ -220,7 +218,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
                                try {
                                        flush();
                                } catch (Exception e) {
-                                       throw new RuntimeException("Writing 
records to JDBC failed.", e);
+                                       LOG.warn("Writing records to JDBC 
failed.", e);
                                }
                        }
 
@@ -233,6 +231,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
                        }
                }
                super.close();
+               checkFlushException();
        }
 
        public static Builder builder() {
@@ -348,5 +347,4 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
        static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] 
types) {
                return (st, record) -> setRecordToStatement(st, types, record);
        }
-
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
index 1865ebc..4afea82 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -83,7 +83,9 @@ class TableJdbcUpsertOutputFormat extends 
JdbcBatchingOutputFormat<Tuple2<Boolea
                        super.close();
                } finally {
                        try {
-                               deleteExecutor.closeStatements();
+                               if (deleteExecutor != null){
+                                       deleteExecutor.closeStatements();
+                               }
                        } catch (SQLException e) {
                                LOG.warn("unable to close delete statement 
runner", e);
                        }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
index f2aec19..15e6425 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.connector.jdbc.JdbcDataTestBase;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import org.apache.flink.connector.jdbc.JdbcInputFormat;
 import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.JdbcTestFixture;
 import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
@@ -43,6 +44,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.function.Function;
@@ -57,8 +59,10 @@ import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static 
org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 
 /**
  * Tests using both {@link JdbcInputFormat} and {@link 
JdbcBatchingOutputFormat}.
@@ -104,6 +108,49 @@ public class JdbcFullTest extends JdbcDataTestBase {
                }
        }
 
+       @Test
+       public void testJdbcBatchingOutputFormatCloseDuringRuntime() throws 
Exception{
+               JdbcOptions options = JdbcOptions.builder()
+                       .setDBUrl(getDbMetadata().getUrl())
+                       .setTableName(OUTPUT_TABLE)
+                       .build();
+               // use scheduledThreadPool
+               JdbcExecutionOptions jdbcExecutionOptions = 
JdbcExecutionOptions.builder()
+                       .withBatchIntervalMs(1000_000L)
+                       .withBatchSize(2)
+                       .withMaxRetries(1)
+                       .build();
+               ExecutionConfig executionConfig = new ExecutionConfig();
+
+               RuntimeContext context = Mockito.mock(RuntimeContext.class);
+               JdbcBatchStatementExecutor executor = 
Mockito.mock(JdbcBatchStatementExecutor.class);
+
+               doReturn(executionConfig).when(context).getExecutionConfig();
+               // always throw Exception to trigger close() method
+               doThrow(SQLException.class).when(executor).executeBatch();
+
+               JdbcBatchingOutputFormat<Tuple2<Boolean, Row>, Row, 
JdbcBatchStatementExecutor<Row>> format =
+                       new JdbcBatchingOutputFormat<>(
+                               new SimpleJdbcConnectionProvider(options),
+                               jdbcExecutionOptions,
+                               (ctx) -> executor,
+                               (tuple2) -> tuple2.f1);
+
+               format.setRuntimeContext(context);
+               format.open(0, 1);
+
+               try {
+                       for (JdbcTestFixture.TestEntry entry : TEST_DATA) {
+                               format.writeRecord(Tuple2.of(true, 
toRow(entry)));
+                       }
+               } catch (Exception e) {
+                       // artifact failure
+                       format.close();
+               } finally {
+                       assertNull(format.getConnection());
+               }
+       }
+
        private void runTest(boolean exploitParallelism) throws Exception {
                ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
                JdbcInputFormat.JdbcInputFormatBuilder inputBuilder = 
JdbcInputFormat.buildJdbcInputFormat()
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
index 2d67a4d..677db91 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
@@ -65,6 +65,20 @@ public class JdbcTableOutputFormatTest extends 
JdbcDataTestBase {
        }
 
        @Test
+       public void testUpsertFormatCloseBeforeOpen() throws Exception{
+               JdbcOptions options = JdbcOptions.builder()
+                       .setDBUrl(getDbMetadata().getUrl())
+                       .setTableName(OUTPUT_TABLE)
+                       .build();
+               JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
+                       
.withTableName(options.getTableName()).withDialect(options.getDialect())
+                       
.withFieldNames(fieldNames).withKeyFields(keyFields).build();
+               format = new TableJdbcUpsertOutputFormat(new 
SimpleJdbcConnectionProvider(options), dmlOptions, 
JdbcExecutionOptions.defaults());
+               // FLINK-17544: There should be no NPE thrown from this method
+               format.close();
+       }
+
+       @Test
        public void testJdbcOutputFormat() throws Exception {
                JdbcOptions options = JdbcOptions.builder()
                                .setDBUrl(getDbMetadata().getUrl())

Reply via email to