Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 cdc8b28af -> 30e4b1ff6
MLHR-1825 #comment handling case when no open part is updated but stream is not created Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c6da4a05 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c6da4a05 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c6da4a05 Branch: refs/heads/devel-3 Commit: c6da4a05e3df215a39eb06752e31760bb206528b Parents: 91321ce Author: Chandni Singh <[email protected]> Authored: Tue Sep 1 15:57:10 2015 -0700 Committer: Chandni Singh <[email protected]> Committed: Wed Sep 2 13:48:12 2015 -0700 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileOutputOperator.java | 36 +++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c6da4a05/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index 3f45afb..a589751 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -484,34 +484,40 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp //of this operator. for(String seenFileName: endOffsets.keySet()) { try { - Integer part = openPart.get(seenFileName).getValue() + 1; + Integer fileOpenPart = this.openPart.get(seenFileName).getValue(); + int nextPart = fileOpenPart + 1; String seenPartFileName; while (true) { - seenPartFileName = getPartFileName(seenFileName, part); - Path activePath; + seenPartFileName = getPartFileName(seenFileName, nextPart); + Path activePath = null; if (alwaysWriteToTmp) { String tmpFileName = fileNameToTmpName.get(seenPartFileName); - activePath = new Path(filePath + Path.SEPARATOR + tmpFileName); + if (tmpFileName != null) { + activePath = new Path(filePath + Path.SEPARATOR + tmpFileName); + } } else { activePath = new Path(filePath + Path.SEPARATOR + seenPartFileName); } - if (!fs.exists(activePath)) { + if (activePath == null || !fs.exists(activePath)) { break; } fs.delete(activePath, true); - part = part + 1; + nextPart++; } - seenPartFileName = getPartFileName(seenFileName, openPart.get(seenFileName).intValue()); - Path activeFilePath; + seenPartFileName = getPartFileName(seenFileName, fileOpenPart); + Path activePath = null; if (alwaysWriteToTmp) { - activeFilePath = new Path(filePath + Path.SEPARATOR + fileNameToTmpName.get(seenPartFileName)); + String tmpFileName = fileNameToTmpName.get(seenPartFileName); + if (tmpFileName != null) { + activePath = new Path(filePath + Path.SEPARATOR + fileNameToTmpName.get(seenPartFileName)); + } } else { - activeFilePath = new Path(filePath + Path.SEPARATOR + seenPartFileName); + activePath = new Path(filePath + Path.SEPARATOR + seenPartFileName); } - if (fs.getFileStatus(activeFilePath).getLen() > maxLength) { + if (activePath != null && fs.getFileStatus(activePath).getLen() > maxLength) { //Handle the case when restoring to a checkpoint where the current rolling file //already has a length greater than max length. LOG.debug("rotating file at setup."); @@ -1143,8 +1149,12 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp for (FileStatus status : statuses) { String statusName = status.getPath().getName(); if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(fileName)) { - LOG.debug("deleting vagrant file {}", statusName); - fs.delete(status.getPath(), true); + //a tmp file has tmp extension always preceded by timestamp + String actualFileName = statusName.substring(0, statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1)); + if (fileName.equals(actualFileName)) { + LOG.debug("deleting vagrant file {}", statusName); + fs.delete(status.getPath(), true); + } } } }
