APEX-54 #resolve Added code to copy from local file to HDFS with overwrite option for AsyncFSStorageAgent
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/be4af0af Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/be4af0af Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/be4af0af Branch: refs/heads/devel-3 Commit: be4af0af1fe9800c690e98f770c1059ac2168143 Parents: 9d08532 Author: ishark <[email protected]> Authored: Mon Aug 17 18:28:47 2015 -0700 Committer: ishark <[email protected]> Committed: Fri Aug 21 14:28:40 2015 -0700 ---------------------------------------------------------------------- .../common/util/AsyncFSStorageAgent.java | 50 +++++++++++++++----- 1 file changed, 39 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/be4af0af/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java index d5de61c..f6077a7 100644 --- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java @@ -15,19 +15,16 @@ */ package com.datatorrent.common.util; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectStreamException; +import java.io.*; +import java.util.EnumSet; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.netlet.util.DTThrowable; public class AsyncFSStorageAgent extends FSStorageAgent { private final transient FileSystem fs; @@ -85,9 +82,40 @@ public class AsyncFSStorageAgent extends FSStorageAgent String operatorIdStr = String.valueOf(operatorId); File directory = new File(localBasePath, operatorIdStr); String window = Long.toHexString(windowId); - Path lPath = new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + System.currentTimeMillis() + TMP_FILE); - FileUtil.copy(new File(directory, String.valueOf(windowId)), fs, lPath, true, conf); - fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window), Options.Rename.OVERWRITE); + Path lPath = new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + TMP_FILE); + File srcFile = new File(directory, String.valueOf(windowId)); + FSDataOutputStream stream = null; + boolean stateSaved = false; + try { + // Create the temporary file with OverWrite option to avoid dangling lease issue and avoid exception if file already exists + stream = fileContext.create(lPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent()); + InputStream in = null; + try { + in = new FileInputStream(srcFile); + IOUtils.copyBytes(in, stream, conf, false); + } finally { + IOUtils.closeStream(in); + } + stateSaved = true; + } catch (Throwable t) { + logger.debug("while saving {} {}", operatorId, window, t); + stateSaved = false; + DTThrowable.rethrow(t); + } finally { + try { + if (stream != null) { + stream.close(); + } + } catch (IOException ie) { + stateSaved = false; + throw new RuntimeException(ie); + } finally { + if (stateSaved) { + fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window), Options.Rename.OVERWRITE); + } + FileUtil.fullyDelete(srcFile); + } + } } @Override
