fsk119 commented on a change in pull request #12712:
URL: https://github.com/apache/flink/pull/12712#discussion_r442903979
##########
File path:
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
##########
@@ -104,6 +107,52 @@ public void testEnrichedClassCastException() {
}
}
+ @Test
+ public void testJdbcBatchingOutputFormatCloseDuringRuntime() throws
Exception{
+ JdbcOptions options = JdbcOptions.builder()
+ .setDBUrl(getDbMetadata().getUrl())
+ .setTableName(OUTPUT_TABLE)
+ .build();
+
+ RuntimeContext context = Mockito.mock(RuntimeContext.class);
+ ExecutionConfig executionConfig =
Mockito.mock(ExecutionConfig.class);
+ JdbcExecutionOptions jdbcExecutionOptions =
Mockito.mock(JdbcExecutionOptions.class);
+ JdbcBatchStatementExecutor executor =
Mockito.mock(JdbcBatchStatementExecutor.class);
+
+ doReturn(executionConfig).when(context).getExecutionConfig();
+ // use scheduledThreadPool
+ doReturn(500L).when(jdbcExecutionOptions).getBatchIntervalMs();
+ doReturn(2).when(jdbcExecutionOptions).getBatchSize();
+ doReturn(3).when(jdbcExecutionOptions).getMaxRetries();
+ // always throw Exception to trigger close() method
+ doThrow(SQLException.class).when(executor).executeBatch();
+
+ JdbcBatchingOutputFormat<Tuple2<Boolean, Row>, Row,
JdbcBatchStatementExecutor<Row>> format =
+ new JdbcBatchingOutputFormat<>(
+ new SimpleJdbcConnectionProvider(options),
+ jdbcExecutionOptions,
+ (ctx) -> executor,
+ (tuple2) -> tuple2.f1);
+
+ format.setRuntimeContext(context);
+ format.open(0, 1);
+
+ try {
+ for (JdbcTestFixture.TestEntry entry : TEST_DATA) {
+ format.writeRecord(Tuple2.of(true,
toRow(entry)));
+ }
+ } catch (Exception e) {
+ try {
+ format.close();
+ } catch (Exception realException){
+ Connection connection = format.getConnection();
+ if (connection != null &&
!connection.isClosed()){
+ throw new RuntimeException("Resource
leak!");
+ }
+ }
+ }
Review comment:
It's much better than my origin code. Thanks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]