[FLINK-9841][rest] Close log file channel after response was fully written This closes #6329.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ec89302 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ec89302 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ec89302 Branch: refs/heads/release-1.5 Commit: 1ec893027b0b24545509ff9715039fc3580130a0 Parents: aa770ba Author: yanghua <yanghua1...@gmail.com> Authored: Fri Jul 13 16:48:04 2018 +0800 Committer: zentol <ches...@apache.org> Committed: Mon Jul 23 09:19:24 2018 +0200 ---------------------------------------------------------------------- .../AbstractTaskManagerFileHandler.java | 25 +++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1ec89302/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java index 265813f..303f7d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java @@ -63,6 +63,7 @@ import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureList import javax.annotation.Nonnull; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; @@ -208,11 +209,20 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag } private void transferFile(ChannelHandlerContext ctx, File file, HttpRequest httpRequest) throws FlinkException { - try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { - final long fileLength = randomAccessFile.length(); + final RandomAccessFile randomAccessFile; - try (final FileChannel fileChannel = randomAccessFile.getChannel()) { + try { + randomAccessFile = new RandomAccessFile(file, "r"); + } catch (FileNotFoundException e) { + throw new FlinkException("Can not find file " + file + ".", e); + } + + try { + final long fileLength = randomAccessFile.length(); + final FileChannel fileChannel = randomAccessFile.getChannel(); + + try { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); response.headers().set(CONTENT_TYPE, "text/plain"); @@ -251,8 +261,17 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag if (!HttpHeaders.isKeepAlive(httpRequest)) { lastContentFuture.addListener(ChannelFutureListener.CLOSE); } + } catch (IOException ex) { + fileChannel.close(); + throw ex; } } catch (IOException ioe) { + try { + randomAccessFile.close(); + } catch (IOException e) { + throw new FlinkException("Close file or channel error.", e); + } + throw new FlinkException("Could not transfer file " + file + " to the client.", ioe); } }