Fuyao created FLINK-21674:
-----------------------------

             Summary: JDBC sink can't get valid connection after 5 minutes 
using Oracle JDBC driver
                 Key: FLINK-21674
                 URL: https://issues.apache.org/jira/browse/FLINK-21674
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.12.1
         Environment: Flink version: 1.12.1 Scala version: 2.11 Java version: 
1.11 Flink System parallelism: 1 JDBC Driver: Oracle ojdbc10 Database: Oracle 
Autonomous Database on Oracle Cloud Infrastructure version 19c(You can regard 
this as an cloud based Oracle Database)

 

Flink user mailing list: 
http://mail-archives.apache.org/mod_mbox/flink-user/202103.mbox/%3CCH2PR10MB402466373B33A3BBC5635A0AEC8C9%40CH2PR10MB4024.namprd10.prod.outlook.com%3E
            Reporter: Fuyao


With Oracle JDBC driver. I can sink data into Oracle Autonomous database 
sucessfully. If there is IDLE time of over 5 minutes, then do a insertion, the 
retry mechanism can't reestablish the JDBC and it will run into the error below.
11:41:04,872 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 0
java.sql.BatchUpdateException: IO Error: Broken pipe


It  will fail the application and restart from checkpoint. After restarting 
from checkpoint, the JDBC connection can be established correctly.

The connection timeout can be configured by 

alter system set MAX_IDLE_TIME=1440; // Connection will get timeout after 1440 
minutes.

Such timeout parameter behavior change can be verified by SQL developer. 
However, Flink still got connection error after configuring this.


Full log:


{code:java}
11:41:04,872 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 0
java.sql.BatchUpdateException: IO Error: Broken pipe
        at 
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9711)
        at 
oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
        at 
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
        at 
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
        at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
        at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
        at 
org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
        at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
11:41:05,725 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
connection is not valid, and reestablish connection failed.
java.sql.SQLRecoverableException: Closed Connection
        at 
oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)
        at 
oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)
        at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)
        at 
oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)
        at 
oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)
        at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
        at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
        at 
org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
        at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
11:41:05,729 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 0
java.sql.SQLRecoverableException: Closed Connection
        at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
        at 
oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093)
        at 
oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183)
        at 
org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103)
        at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
        at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
11:41:05,770 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 1
java.sql.SQLRecoverableException: Closed Connection
        at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
        at 
oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093)
        at 
oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183)
        at 
org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103)
        at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
        at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
11:41:06,820 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 2
java.sql.SQLRecoverableException: Closed Connection
        at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
        at 
oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093)
        at 
oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183)
        at 
org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103)
        at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
        at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
11:41:08,865 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 3
java.sql.SQLRecoverableException: Closed Connection
        at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
        at 
oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093)
        at 
oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183)
        at 
org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103)
        at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
        at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
11:41:08,866 WARN  
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - Writing 
records to JDBC failed.
java.io.IOException: java.sql.SQLRecoverableException: Closed Connection
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
        at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLRecoverableException: Closed Connection
        at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
        at 
oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093)
        at 
oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183)
        at 
org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103)
        at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
        ... 11 more
11:41:08,866 WARN  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Error closing 
producer.
java.lang.NoSuchMethodError: 
org.apache.kafka.clients.producer.KafkaProducer.close(Ljava/time/Duration;)V
        at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:172)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:949)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
11:41:08,868 WARN  org.apache.flink.runtime.taskmanager.Task                    
 - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: 
invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: 
Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: 
distributions-notification) (1/1)#0 (95b16d216f03759f0f0c131ba188b338) switched 
from RUNNING to FAILED.
java.io.IOException: Writing records to JDBC failed.
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)
        at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
        at 
org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
        at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Reestablish JDBC connection failed
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
        ... 29 more
Caused by: java.sql.SQLRecoverableException: Closed Connection
        at 
oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)
        at 
oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)
        at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)
        at 
oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)
        at 
oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)
        at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)
        ... 30 more
11:41:08,869 INFO  org.apache.flink.runtime.taskmanager.Task                    
 - Freeing task resources for ProcessTableOutput -> (Sink: adwSink, Sink: Print 
to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: 
header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: 
Print to Std. Out, Sink: distributions-notification) (1/1)#0 
(95b16d216f03759f0f0c131ba188b338).
11:41:08,876 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           
 - Un-registering task and sending final execution state FAILED to JobManager 
for task ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: 
invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: 
Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: 
distributions-notification) (1/1)#0 95b16d216f03759f0f0c131ba188b338.
11:41:08,880 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       
 - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: 
invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: 
Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: 
distributions-notification) (1/1) (95b16d216f03759f0f0c131ba188b338) switched 
from RUNNING to FAILED on ac2c0e70-42f9-4d5d-820a-19f6561a6297 @ localhost 
(dataPort=-1).
java.io.IOException: Writing records to JDBC failed.
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)
        at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
        at 
org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
        at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Reestablish JDBC connection failed
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
        ... 29 more
Caused by: java.sql.SQLRecoverableException: Closed Connection
        at 
oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)
        at 
oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)
        at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)
        at 
oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)
        at 
oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)
        at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)
        ... 30 more
11:41:08,886 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
  - Calculating tasks to restart to recover the failed task 
91f695b4d2df74b06fe58043ee03541f_0.
11:41:08,887 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
  - 9 tasks should be restarted to recover the failed task 
91f695b4d2df74b06fe58043ee03541f_0. 
{code}






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to