[
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371882#comment-16371882
]
chris snow edited comment on FLINK-8543 at 2/21/18 7:42 PM:
------------------------------------------------------------
So all files being accompanied by .valid-length is expected behavoir when
saving to s3?
If so, unless client applications can understand and use the .valid-length
(which I don’t think will be the case), I don’t think this functionality makes
sense with s3? I.e. am I trying to do something with Flink that it wasn’t
designed to do?
was (Author: snowch):
So all files being accompanied by .valid-length is expected behavoir when
saving to s3?
If so, unless client applications can understand and use the .valid-length
(which I don’t think will be the case), I don’t think this functionality makes
sense with s3?
> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --------------------------------------------------------------------------
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.4.0
> Environment: IBM Analytics Engine -
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2
> Jupyter Enterprise Gateway 0.5.0
> HBase 1.1.2 *
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 *
> Tez 0.7.0 *
> Pig 0.16.0 *
> Sqoop 1.4.6 *
> Slider 0.92.0 *
> Reporter: chris snow
> Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>
> {code:java}
> return new BucketingSink<Tuple2<String, Object>>(path)
> .setWriter(writer)
> .setBucketer(new DateTimeBucketer<Tuple2<String,
> Object>>(formatString));
> {code}
>
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!
> The Flink console output is showing an exception being thrown by
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster
> and added some additional logging to the checkOpen() method to log the 'key'
> just before the exception is thrown:
>
> {code:java}
> /*
> * Decompiled with CFR.
> */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key,
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vvvvvv-- Additional logging --vvvvvvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload",
> (Object)this.key);
> try {
> ObjectMetadata om =
> this.fs.newObjectMetadata(this.backupFile.length());
> Upload upload =
> this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
> ProgressableProgressListener listener = new
> ProgressableProgressListener(this.fs, this.key, upload, this.progress);
> upload.addProgressListener((ProgressListener)listener);
> upload.waitForUploadResult();
> listener.uploadCompleted();
> this.fs.finishedWrite(this.key);
> }
> catch (InterruptedException e) {
> throw (InterruptedIOException)new
> InterruptedIOException(e.toString()).initCause(e);
> }
> catch (AmazonClientException e) {
> throw S3AUtils.translateException("saving output", this.key, e);
> }
> finally {
> if (!this.backupFile.delete()) {
> LOG.warn("Could not delete temporary s3a file: {}",
> (Object)this.backupFile);
> }
> super.close();
> }
> LOG.debug("OutputStream for key '{}' upload complete",
> (Object)this.key);
> }
> @Override
> public void write(int b) throws IOException {
> this.checkOpen();
> this.backupStream.write(b);
> }
> @Override
> public void write(byte[] b, int off, int len) throws IOException {
> this.checkOpen();
> this.backupStream.write(b, off, len);
> }
> static {
> }
> }
> {code}
>
> You can see from this addition log output that the S3AOutputStream#close()
> method **appears** to be called before the S3AOutputStream#flush() method:
>
> {code:java}
> 2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress:
> 128497 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0
> bytes
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
> - Finished write to
> landingzone/2018-02-01--1240/_part-0-0.in-progress
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
> - object_delete_requests += 1 -> 3
> vvvvv- close() is called here? -vvvvv
> 2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
>
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress'
> upload complete
> vvvvv- flush() is called here? -vvvvv
> 2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem
>
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress'
> closed.
> 2018-02-01 12:42:21,212 INFO org.apache.flink.runtime.taskmanager.Task
>
> - Attempting to fail task externally Source: Custom Source -> Map -> Sink:
> Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
> 2018-02-01 12:42:21,214 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Custom Source -> Map -> Sink: Unnamed (1/2)
> (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
> TimerException{java.io.IOException: Output Stream closed}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Output Stream closed
> at
> org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
> at
> org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
> at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
> at java.io.DataOutputStream.flush(DataOutputStream.java:123)
> at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
> at
> org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
> at
> org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
> at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
> at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
> at
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
> at
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
> ... 7 more
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)