Repository: apex-malhar Updated Branches: refs/heads/master 12d6183cf -> 1333910fd
APEXMALHAR-2226 Fixed the Not supported exception while re-deploying the AbstractFileOutput Operator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1333910f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1333910f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1333910f Branch: refs/heads/master Commit: 1333910fd8b8a79e74a503fc0a9fa337e93beba0 Parents: 12d6183 Author: chaitanya <[email protected]> Authored: Thu Sep 29 12:18:37 2016 +0530 Committer: chaitanya <[email protected]> Committed: Mon Oct 3 16:52:38 2016 +0530 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileOutputOperator.java | 45 +++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1333910f/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index 0195f7f..ab8bedc 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -116,6 +116,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp private static final String TMP_EXTENSION = ".tmp"; + private static final String APPEND_TMP_FILE = "_APPENDING"; + private static final int MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION = 25; /** @@ -636,7 +638,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp { FSDataOutputStream fsOutput; if (append) { - fsOutput = fs.append(filepath); + fsOutput = openStreamInAppendMode(filepath); } else { fsOutput = fs.create(filepath, (short)replication); fs.setPermission(filepath, FsPermission.createImmutable(filePermission)); @@ -645,6 +647,47 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp } /** + * Opens the stream for the given file path in append mode. Catch the exception if the FS doesnt support + * append operation and calls the openStreamForNonAppendFS(). + * @param filepath given file path + * @return output stream + */ + protected FSDataOutputStream openStreamInAppendMode(Path filepath) + { + FSDataOutputStream fsOutput = null; + try { + fsOutput = fs.append(filepath); + } catch (IOException e) { + if (e.getMessage().equals("Not supported")) { + fsOutput = openStreamForNonAppendFS(filepath); + } + } + return fsOutput; + } + + /** + * Opens the stream for the given file path for the file systems which are not supported append operation. + * @param filepath given file path + * @return output stream + */ + protected FSDataOutputStream openStreamForNonAppendFS(Path filepath) + { + try { + Path appendTmpFile = new Path(filepath + APPEND_TMP_FILE); + fs.rename(filepath, appendTmpFile); + FSDataInputStream fsIn = fs.open(appendTmpFile); + FSDataOutputStream fsOut = fs.create(filepath); + IOUtils.copy(fsIn, fsOut); + flush(fsOut); + fsIn.close(); + fs.delete(appendTmpFile); + return fsOut; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** * Closes the stream which has been removed from the cache. * * @param streamContext stream context which is removed from the cache.
