Repository: incubator-apex-malhar Updated Branches: refs/heads/release-3.2 dca248462 -> 73c8abf7f
MLHR-1886: moving restoring of a file in its own method and added a test that covers re-writing the file when it isn't closed before failure Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/e80b7dfd Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/e80b7dfd Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/e80b7dfd Branch: refs/heads/release-3.2 Commit: e80b7dfd3763424b8bade84d31ba5dc4f649111a Parents: dca2484 Author: Chandni Singh <[email protected]> Authored: Tue Nov 3 13:06:25 2015 -0800 Committer: Chandni Singh <[email protected]> Committed: Thu Nov 5 17:26:56 2015 -0800 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileOutputOperator.java | 113 +++++++++++-------- .../io/fs/AbstractFileOutputOperatorTest.java | 52 +++++++++ 2 files changed, 116 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e80b7dfd/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index 09294a2..744f024 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -351,50 +351,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp } if (fs.exists(activeFilePath)) { - LOG.debug("path exists {}", activeFilePath); - long offset = endOffsets.get(seenFileName).longValue(); - FSDataInputStream inputStream = fs.open(activeFilePath); - FileStatus status = fs.getFileStatus(activeFilePath); - - if (status.getLen() != offset) { - LOG.info("path corrupted {} {} {}", activeFilePath, offset, status.getLen()); - byte[] buffer = new byte[COPY_BUFFER_SIZE]; - String recoveryFileName = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION; - Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName); - FSDataOutputStream fsOutput = openStream(recoveryFilePath, false); - - while (inputStream.getPos() < offset) { - long remainingBytes = offset - inputStream.getPos(); - int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int) remainingBytes : COPY_BUFFER_SIZE; - inputStream.read(buffer); - fsOutput.write(buffer, 0, bytesToWrite); - } - - flush(fsOutput); - fsOutput.close(); - inputStream.close(); - - FileContext fileContext = FileContext.getFileContext(fs.getUri()); - LOG.debug("active {} recovery {} ", activeFilePath, recoveryFilePath); - - if (alwaysWriteToTmp) { - //recovery file is used as the new tmp file and we cannot delete the old tmp file because when the operator - //is restored to an earlier check-pointed window, it will look for an older tmp. - fileNameToTmpName.put(seenFileNamePart, recoveryFileName); - } else { - LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath()); - fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE); - } - } else { - if (alwaysWriteToTmp && filesWithOpenStreams.contains(seenFileName)) { - String currentTmp = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION; - FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false); - IOUtils.copy(inputStream, outputStream); - outputStream.close(); - fileNameToTmpName.put(seenFileNamePart, currentTmp); - } - inputStream.close(); - } + recoverFile(seenFileName, seenFileNamePart, activeFilePath); } } } @@ -448,10 +405,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp } } } - LOG.debug("setup completed"); - LOG.debug("end-offsets {}", endOffsets); - } catch (IOException e) { throw new RuntimeException(e); } @@ -463,6 +417,68 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp } /** + * Recovers a file which exists on the disk. If the length of the file is not same as the + * length which the operator remembers then the file is truncated. <br/> + * When always writing to a temporary file, then a file is restored even when the length is same as what the + * operator remembers however this is done only for files which had open streams that weren't closed before + * failure. + * + * @param filename name of the actual file. + * @param partFileName name of the part file. When not rolling this is same as filename; otherwise this is the + * latest open part file name. + * @param filepath path of the file. When always writing to temp file, this is the path of the temp file; otherwise + * path of the actual file. + * @throws IOException + */ + private void recoverFile(String filename, String partFileName, Path filepath) throws IOException + { + LOG.debug("path exists {}", filepath); + long offset = endOffsets.get(filename).longValue(); + FSDataInputStream inputStream = fs.open(filepath); + FileStatus status = fs.getFileStatus(filepath); + + if (status.getLen() != offset) { + LOG.info("path corrupted {} {} {}", filepath, offset, status.getLen()); + byte[] buffer = new byte[COPY_BUFFER_SIZE]; + String recoveryFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION; + Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName); + FSDataOutputStream fsOutput = openStream(recoveryFilePath, false); + + while (inputStream.getPos() < offset) { + long remainingBytes = offset - inputStream.getPos(); + int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int)remainingBytes : COPY_BUFFER_SIZE; + inputStream.read(buffer); + fsOutput.write(buffer, 0, bytesToWrite); + } + + flush(fsOutput); + fsOutput.close(); + inputStream.close(); + + FileContext fileContext = FileContext.getFileContext(fs.getUri()); + LOG.debug("active {} recovery {} ", filepath, recoveryFilePath); + + if (alwaysWriteToTmp) { + //recovery file is used as the new tmp file and we cannot delete the old tmp file because when the operator + //is restored to an earlier check-pointed window, it will look for an older tmp. + fileNameToTmpName.put(partFileName, recoveryFileName); + } else { + LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath()); + fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE); + } + } else { + if (alwaysWriteToTmp && filesWithOpenStreams.contains(filename)) { + String currentTmp = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION; + FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false); + IOUtils.copy(inputStream, outputStream); + streamsCache.put(filename, new FSFilterStreamContext(outputStream)); + fileNameToTmpName.put(partFileName, currentTmp); + } + inputStream.close(); + } + } + + /** * Creates the {@link CacheLoader} for loading an output stream when it is not present in the cache. * @return cache loader */ @@ -656,7 +672,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp fileName = getPartFileNamePri(fileName); part.setValue(currentOpenPart.getValue()); } - LOG.debug("request finalize {}", fileName); filesPerWindow.add(fileName); } @@ -1215,7 +1230,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp //a tmp file has tmp extension always preceded by timestamp String actualFileName = statusName.substring(0, statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1)); if (fileName.equals(actualFileName)) { - LOG.debug("deleting vagrant file {}", statusName); + LOG.debug("deleting stray file {}", statusName); fs.delete(status.getPath(), true); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e80b7dfd/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index f7d1731..cbcc8b4 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -1883,6 +1883,58 @@ public class AbstractFileOutputOperatorTest checkCompressedFile(oddFile, oddOffsets, 1, 5, 1000, null, null); } + @Test + public void testRecoveryOfOpenFiles() + { + EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter(); + writer.setMaxLength(4); + File meta = new File(testMeta.getDir()); + writer.setFilePath(meta.getAbsolutePath()); + writer.setAlwaysWriteToTmp(true); + writer.setup(testMeta.testOperatorContext); + + writer.beginWindow(0); + writer.input.put(0); + writer.input.put(1); + writer.input.put(2); + writer.input.put(3); + writer.endWindow(); + + //failure and restored + writer.setup(testMeta.testOperatorContext); + writer.input.put(4); + writer.input.put(5); + writer.endWindow(); + + writer.beginWindow(1); + writer.input.put(6); + writer.input.put(7); + writer.input.put(8); + writer.input.put(9); + writer.input.put(6); + writer.input.put(7); + writer.endWindow(); + + writer.committed(1); + + //Part 0 checks + String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE; + String correctContents = "0\n" + "2\n" + "4\n"; + checkOutput(0, evenFileName, correctContents); + + String oddFileName = testMeta.getDir() + File.separator + ODD_FILE; + correctContents = "1\n" + "3\n" + "5\n"; + checkOutput(0, oddFileName, correctContents); + + + //Part 1 checks + correctContents = "6\n" + "8\n" + "6\n"; + checkOutput(1, evenFileName, correctContents); + + correctContents = "7\n" + "9\n" + "7\n"; + checkOutput(1, oddFileName, correctContents); + } + private void checkCompressedFile(File file, List<Long> offsets, int startVal, int totalWindows, int totalRecords, SecretKey secretKey, byte[] iv) throws IOException { FileInputStream fis;
