[ 
https://issues.apache.org/jira/browse/FLINK-33536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prabhu Joseph updated FLINK-33536:
----------------------------------
    Summary: Flink Table API CSV streaming sink fails with "IOException: Stream 
closed"  (was: Flink Table API CSV streaming sink throws "IOException: Stream 
closed")

> Flink Table API CSV streaming sink fails with "IOException: Stream closed"
> --------------------------------------------------------------------------
>
>                 Key: FLINK-33536
>                 URL: https://issues.apache.org/jira/browse/FLINK-33536
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystems, Table SQL / API
>    Affects Versions: 1.18.0
>            Reporter: Prabhu Joseph
>            Priority: Major
>
> Flink Table API CSV streaming sink throws "IOException: Stream closed". Prior 
> to Flink 1.18, CSV streaming sink used to fail with 
> "S3RecoverableFsDataOutputStream cannot sync state to S3" which is fixed by 
> [FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513]. The fix 
> seems not complete, it fails with this issue now.
> *Repro*
> {code}
> SET 'execution.runtime-mode' = 'streaming';
> create table dummy_table (
>   id int,
>   data string
> ) with (
>   'connector' = 'filesystem',
>   'path' = 's3://prabhuflinks3/dummy_table/',
>   'format' = 'csv'
> );
> INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), 
> (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
> {code}
> *Error*
> {code}
> Caused by: java.io.IOException: Stream closed.
>       at 
> org.apache.flink.core.fs.RefCountedFileWithStream.requireOpened(RefCountedFileWithStream.java:76)
>       at 
> org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:52)
>       at 
> org.apache.flink.core.fs.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:104)
>       at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:209)
>       at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:177)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.closeForCommit(OutputStreamBasedPartFileWriter.java:75)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:65)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:379)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:338)
>       at 
> org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput(AbstractStreamingWriter.java:155)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:619)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:367)
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>       at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to