[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377027#comment-16377027
 ] 

ASF GitHub Bot commented on FLINK-8543:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5563#discussion_r170626581
  
    --- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 ---
    @@ -158,7 +158,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
     
        @Override
        public void close() throws IOException {
    -           super.close(); //the order is important since super.close 
flushes inside
    +           flush(); // super.close() also does a flush
                if (keyValueWriter != null) {
    --- End diff --
    
    Ah yeah, I would call super if the `keyValueWriter` is `null`. That should 
do it, right?


> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --------------------------------------------------------------------------
>
>                 Key: FLINK-8543
>                 URL: https://issues.apache.org/jira/browse/FLINK-8543
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.4.0
>         Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>            Reporter: chris snow
>            Priority: Blocker
>             Fix For: 1.5.0
>
>         Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink<Tuple2<String, Object>>(path)
>          .setWriter(writer)
>          .setBucketer(new DateTimeBucketer<Tuple2<String, 
> Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
>     private final OutputStream backupStream;
>     private final File backupFile;
>     private final AtomicBoolean closed = new AtomicBoolean(false);
>     private final String key;
>     private final Progressable progress;
>     private final S3AFileSystem fs;
>     public static final Logger LOG = S3AFileSystem.LOG;
>     public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
>         this.key = key;
>         this.progress = progress;
>         this.fs = fs;
>         this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
>         LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
>         this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
>     }
>     void checkOpen() throws IOException {
>         if (!this.closed.get()) return;
>         // vvvvvv-- Additional logging --vvvvvvv
>         LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
>         throw new IOException("Output Stream closed");
>     }
>     @Override
>     public void flush() throws IOException {
>         this.checkOpen();
>         this.backupStream.flush();
>     }
>     @Override
>     public void close() throws IOException {
>         if (this.closed.getAndSet(true)) {
>             return;
>         }
>         this.backupStream.close();
>         LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
>         try {
>             ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
>             Upload upload = 
> this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
>             ProgressableProgressListener listener = new 
> ProgressableProgressListener(this.fs, this.key, upload, this.progress);
>             upload.addProgressListener((ProgressListener)listener);
>             upload.waitForUploadResult();
>             listener.uploadCompleted();
>             this.fs.finishedWrite(this.key);
>         }
>         catch (InterruptedException e) {
>             throw (InterruptedIOException)new 
> InterruptedIOException(e.toString()).initCause(e);
>         }
>         catch (AmazonClientException e) {
>             throw S3AUtils.translateException("saving output", this.key, e);
>         }
>         finally {
>             if (!this.backupFile.delete()) {
>                 LOG.warn("Could not delete temporary s3a file: {}", 
> (Object)this.backupFile);
>             }
>             super.close();
>         }
>         LOG.debug("OutputStream for key '{}' upload complete", 
> (Object)this.key);
>     }
>     @Override
>     public void write(int b) throws IOException {
>         this.checkOpen();
>         this.backupStream.write(b);
>     }
>     @Override
>     public void write(byte[] b, int off, int len) throws IOException {
>         this.checkOpen();
>         this.backupStream.write(b, off, len);
>     }
>     static {
>     }
> }
> {code}
>  
> You can see from this addition log output that the S3AOutputStream#close() 
> method **appears** to be called before the S3AOutputStream#flush() method:
>  
> {code:java}
> 2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 
> 128497 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem          
>               - Finished write to 
> landingzone/2018-02-01--1240/_part-0-0.in-progress
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem          
>               - object_delete_requests += 1  ->  3
> vvvvv- close() is called here? -vvvvv
> 2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem          
>               
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' 
> upload complete
> vvvvv- flush() is called here? -vvvvv
> 2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem          
>               
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' 
> closed.
> 2018-02-01 12:42:21,212 INFO  org.apache.flink.runtime.taskmanager.Task       
>               
> - Attempting to fail task externally Source: Custom Source -> Map -> Sink: 
> Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
> 2018-02-01 12:42:21,214 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Source: Custom Source -> Map -> Sink: Unnamed (1/2) 
> (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
> TimerException{java.io.IOException: Output Stream closed}
>       at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Output Stream closed
>       at 
> org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
>       at 
> org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
>       at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>       at java.io.DataOutputStream.flush(DataOutputStream.java:123)
>       at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>       at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
>       at 
> org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
>       at 
> org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
>       at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
>       at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
>       at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
>       at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
>       at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
>       at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
>       at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
>       at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
>       ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to