wuchong commented on a change in pull request #12712:
URL: https://github.com/apache/flink/pull/12712#discussion_r442693336
##########
File path:
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
##########
@@ -64,6 +64,19 @@ public void setup() {
keyFields = new String[]{"id"};
}
+ @Test
+ public void testUpsertFormatCloseBeforeOpen() throws Exception{
+ JdbcOptions options = JdbcOptions.builder()
+ .setDBUrl(getDbMetadata().getUrl())
+ .setTableName(OUTPUT_TABLE)
+ .build();
+ JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
+
.withTableName(options.getTableName()).withDialect(options.getDialect())
+
.withFieldNames(fieldNames).withKeyFields(keyFields).build();
+ format = new TableJdbcUpsertOutputFormat(new
SimpleJdbcConnectionProvider(options), dmlOptions,
JdbcExecutionOptions.defaults());
+ format.close();
Review comment:
Please add a comment on this:
```java
// FLINK-17544: There should be no NPE thrown from this method
```
##########
File path:
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
##########
@@ -104,6 +107,52 @@ public void testEnrichedClassCastException() {
}
}
+ @Test
+ public void testJdbcBatchingOutputFormatCloseDuringRuntime() throws
Exception{
+ JdbcOptions options = JdbcOptions.builder()
+ .setDBUrl(getDbMetadata().getUrl())
+ .setTableName(OUTPUT_TABLE)
+ .build();
+
+ RuntimeContext context = Mockito.mock(RuntimeContext.class);
+ ExecutionConfig executionConfig =
Mockito.mock(ExecutionConfig.class);
+ JdbcExecutionOptions jdbcExecutionOptions =
Mockito.mock(JdbcExecutionOptions.class);
+ JdbcBatchStatementExecutor executor =
Mockito.mock(JdbcBatchStatementExecutor.class);
+
+ doReturn(executionConfig).when(context).getExecutionConfig();
+ // use scheduledThreadPool
+ doReturn(500L).when(jdbcExecutionOptions).getBatchIntervalMs();
+ doReturn(2).when(jdbcExecutionOptions).getBatchSize();
+ doReturn(3).when(jdbcExecutionOptions).getMaxRetries();
Review comment:
Mockito is a bad practice which should be avoided as much as possible.
You can
```java
JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000_000L)
.withBatchSize(1)
.withMaxRetries(1)
.build();
ExecutionConfig executionConfig = new ExecutionConfig();
```
to create them.
##########
File path:
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
##########
@@ -104,6 +107,52 @@ public void testEnrichedClassCastException() {
}
}
+ @Test
+ public void testJdbcBatchingOutputFormatCloseDuringRuntime() throws
Exception{
+ JdbcOptions options = JdbcOptions.builder()
+ .setDBUrl(getDbMetadata().getUrl())
+ .setTableName(OUTPUT_TABLE)
+ .build();
+
+ RuntimeContext context = Mockito.mock(RuntimeContext.class);
+ ExecutionConfig executionConfig =
Mockito.mock(ExecutionConfig.class);
+ JdbcExecutionOptions jdbcExecutionOptions =
Mockito.mock(JdbcExecutionOptions.class);
+ JdbcBatchStatementExecutor executor =
Mockito.mock(JdbcBatchStatementExecutor.class);
+
+ doReturn(executionConfig).when(context).getExecutionConfig();
+ // use scheduledThreadPool
+ doReturn(500L).when(jdbcExecutionOptions).getBatchIntervalMs();
+ doReturn(2).when(jdbcExecutionOptions).getBatchSize();
+ doReturn(3).when(jdbcExecutionOptions).getMaxRetries();
Review comment:
I suggest to suse 1000s for the interval and 1 for the batch to quickly
throw the exception and avoid a async exception.
##########
File path:
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
##########
@@ -104,6 +107,52 @@ public void testEnrichedClassCastException() {
}
}
+ @Test
+ public void testJdbcBatchingOutputFormatCloseDuringRuntime() throws
Exception{
+ JdbcOptions options = JdbcOptions.builder()
+ .setDBUrl(getDbMetadata().getUrl())
+ .setTableName(OUTPUT_TABLE)
+ .build();
+
+ RuntimeContext context = Mockito.mock(RuntimeContext.class);
+ ExecutionConfig executionConfig =
Mockito.mock(ExecutionConfig.class);
+ JdbcExecutionOptions jdbcExecutionOptions =
Mockito.mock(JdbcExecutionOptions.class);
+ JdbcBatchStatementExecutor executor =
Mockito.mock(JdbcBatchStatementExecutor.class);
+
+ doReturn(executionConfig).when(context).getExecutionConfig();
+ // use scheduledThreadPool
+ doReturn(500L).when(jdbcExecutionOptions).getBatchIntervalMs();
+ doReturn(2).when(jdbcExecutionOptions).getBatchSize();
+ doReturn(3).when(jdbcExecutionOptions).getMaxRetries();
+ // always throw Exception to trigger close() method
+ doThrow(SQLException.class).when(executor).executeBatch();
+
+ JdbcBatchingOutputFormat<Tuple2<Boolean, Row>, Row,
JdbcBatchStatementExecutor<Row>> format =
+ new JdbcBatchingOutputFormat<>(
+ new SimpleJdbcConnectionProvider(options),
+ jdbcExecutionOptions,
+ (ctx) -> executor,
+ (tuple2) -> tuple2.f1);
+
+ format.setRuntimeContext(context);
+ format.open(0, 1);
+
+ try {
+ for (JdbcTestFixture.TestEntry entry : TEST_DATA) {
+ format.writeRecord(Tuple2.of(true,
toRow(entry)));
+ }
+ } catch (Exception e) {
+ try {
+ format.close();
+ } catch (Exception realException){
+ Connection connection = format.getConnection();
+ if (connection != null &&
!connection.isClosed()){
+ throw new RuntimeException("Resource
leak!");
+ }
+ }
+ }
Review comment:
```suggestion
} catch (Exception e) {
// artifact failure
format.close();
} finally {
assertNull(format.getConnection());
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]