MLHR-1886 #resolve #comment optimizing recovery and updating rotation states only when new part is open
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7156400b Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7156400b Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7156400b Branch: refs/heads/master Commit: 7156400be10c2fbda73a33ffe8c6fc03a580a569 Parents: c86e50a Author: Chandni Singh <[email protected]> Authored: Fri Oct 30 19:00:42 2015 -0700 Committer: Chandni Singh <[email protected]> Committed: Thu Nov 5 17:16:15 2015 -0800 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileOutputOperator.java | 361 ++++++++++--------- 1 file changed, 199 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7156400b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index 3819a33..09294a2 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -240,6 +240,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp * This isn't the most effective way but adds a little bit of optimization. */ private Long expireStreamAfterAcessMillis; + private final Set<String> filesWithOpenStreams; /** * This input port receives incoming tuples. @@ -264,7 +265,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp } }; - private static class RotationState { + private static class RotationState + { boolean notEmpty; boolean rotated; } @@ -278,6 +280,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp fileNameToTmpName = Maps.newHashMap(); finalizedFiles = Maps.newTreeMap(); finalizedPart = Maps.newHashMap(); + filesWithOpenStreams = Sets.newHashSet(); } /** @@ -303,7 +306,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp { LOG.debug("setup initiated"); if (expireStreamAfterAcessMillis == null) { - expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) * context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT)); + expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) * + context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT)); } rollingFile = (maxLength < Long.MAX_VALUE) || (rotationWindows > 0); @@ -321,126 +325,16 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp LOG.debug("FS class {}", fs.getClass()); - //When an entry is removed from the cache, removal listener is notified and it closes the output stream. - RemovalListener<String, FSFilterStreamContext> removalListener = new RemovalListener<String, FSFilterStreamContext>() - { - @Override - public void onRemoval(@Nonnull RemovalNotification<String, FSFilterStreamContext> notification) - { - FSFilterStreamContext streamContext = notification.getValue(); - if (streamContext != null) { - - //FilterOutputStream filterStream = streamContext.getFilterStream(); - try { - String filename = notification.getKey(); - String partFileName = getPartFileNamePri(filename); - - LOG.debug("closing {}", partFileName); - long start = System.currentTimeMillis(); - streamContext.close(); - //filterStream.close(); - totalWritingTime += System.currentTimeMillis() - start; - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - } - }; - - //Define cache - CacheLoader<String, FSFilterStreamContext> loader = new CacheLoader<String, FSFilterStreamContext>() - { - @Override - public FSFilterStreamContext load(@Nonnull String filename) - { - String partFileName = getPartFileNamePri(filename); - Path originalFilePath = new Path(filePath + Path.SEPARATOR + partFileName); - - Path activeFilePath; - if (!alwaysWriteToTmp) { - activeFilePath = originalFilePath; - } else { - //MLHR-1776 : writing to tmp file - String tmpFileName = fileNameToTmpName.get(partFileName); - if (tmpFileName == null) { - tmpFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION; - fileNameToTmpName.put(partFileName, tmpFileName); - } - activeFilePath = new Path(filePath + Path.SEPARATOR + tmpFileName); - } - - FSDataOutputStream fsOutput; - - boolean sawThisFileBefore = endOffsets.containsKey(filename); - - try { - if (fs.exists(originalFilePath) || (alwaysWriteToTmp && fs.exists(activeFilePath))) { - if(sawThisFileBefore) { - FileStatus fileStatus = fs.getFileStatus(activeFilePath); - MutableLong endOffset = endOffsets.get(filename); - - if (endOffset != null) { - endOffset.setValue(fileStatus.getLen()); - } - else { - endOffsets.put(filename, new MutableLong(fileStatus.getLen())); - } - - fsOutput = fs.append(activeFilePath); - LOG.debug("appending to {}", activeFilePath); - } - else { - //We never saw this file before and we don't want to append - //If the file is rolling we need to delete all its parts. - if(rollingFile) { - int part = 0; - - while (true) { - Path seenPartFilePath = new Path(filePath + Path.SEPARATOR + getPartFileName(filename, part)); - if (!fs.exists(seenPartFilePath)) { - break; - } - - fs.delete(seenPartFilePath, true); - part = part + 1; - } - - fsOutput = fs.create(activeFilePath, (short) replication); - } - else { - //Not rolling is easy, just delete the file and create it again. - fs.delete(activeFilePath, true); - if(alwaysWriteToTmp){ - //we need to delete original file if that exists - if(fs.exists(originalFilePath)){ - fs.delete(originalFilePath, true); - } - } - fsOutput = fs.create(activeFilePath, (short) replication); - } - } - } - else { - fsOutput = fs.create(activeFilePath, (short) replication); - fs.setPermission(activeFilePath, FsPermission.createImmutable(filePermission)); - } - - LOG.info("opened {}, active {}", partFileName, activeFilePath); - return new FSFilterStreamContext(fsOutput); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }; + //building cache + RemovalListener<String, FSFilterStreamContext> removalListener = createCacheRemoveListener(); + CacheLoader<String, FSFilterStreamContext> loader = createCacheLoader(); + streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis, + TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader); - streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis, TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader); + LOG.debug("File system class: {}", fs.getClass()); + LOG.debug("end-offsets {}", endOffsets); try { - LOG.debug("File system class: {}", fs.getClass()); - LOG.debug("end-offsets {}", endOffsets); - //Restore the files in case they were corrupted and the operator was re-deployed. Path writerPath = new Path(filePath); if (fs.exists(writerPath)) { @@ -467,7 +361,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp byte[] buffer = new byte[COPY_BUFFER_SIZE]; String recoveryFileName = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION; Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName); - FSDataOutputStream fsOutput = fs.create(recoveryFilePath, (short) replication); + FSDataOutputStream fsOutput = openStream(recoveryFilePath, false); while (inputStream.getPos() < offset) { long remainingBytes = offset - inputStream.getPos(); @@ -492,9 +386,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE); } } else { - if (alwaysWriteToTmp) { + if (alwaysWriteToTmp && filesWithOpenStreams.contains(seenFileName)) { String currentTmp = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION; - FSDataOutputStream outputStream = fs.create(new Path(filePath + Path.SEPARATOR + currentTmp)); + FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false); IOUtils.copy(inputStream, outputStream); outputStream.close(); fileNameToTmpName.put(seenFileNamePart, currentTmp); @@ -506,8 +400,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp } if (rollingFile) { - //delete the left over future rolling files produced from the previous crashed instance - //of this operator. + //delete the left over future rolling files produced from the previous crashed instance of this operator. for(String seenFileName: endOffsets.keySet()) { try { Integer fileOpenPart = this.openPart.get(seenFileName).getValue(); @@ -550,10 +443,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp rotate(seenFileName); } } - catch (IOException e) { - throw new RuntimeException(e); - } - catch (ExecutionException e) { + catch (IOException | ExecutionException e) { throw new RuntimeException(e); } } @@ -561,17 +451,178 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp LOG.debug("setup completed"); LOG.debug("end-offsets {}", endOffsets); - } - catch (IOException e) { + + } catch (IOException e) { throw new RuntimeException(e); } this.context = context; - fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN, - new MutableLong()); - fileCounters.setCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS, - new MutableLong()); + fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN, new MutableLong()); + fileCounters.setCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS, new MutableLong()); + } + + /** + * Creates the {@link CacheLoader} for loading an output stream when it is not present in the cache. + * @return cache loader + */ + private CacheLoader<String, FSFilterStreamContext> createCacheLoader() + { + return new CacheLoader<String, FSFilterStreamContext>() + { + @Override + public FSFilterStreamContext load(@Nonnull String filename) + { + if (rollingFile) { + RotationState state = getRotationState(filename); + if (rollingFile && state.rotated) { + openPart.get(filename).add(1); + state.rotated = false; + MutableLong offset = endOffsets.get(filename); + offset.setValue(0); + } + } + + String partFileName = getPartFileNamePri(filename); + Path originalFilePath = new Path(filePath + Path.SEPARATOR + partFileName); + + Path activeFilePath; + if (!alwaysWriteToTmp) { + activeFilePath = originalFilePath; + } else { + //MLHR-1776 : writing to tmp file + String tmpFileName = fileNameToTmpName.get(partFileName); + if (tmpFileName == null) { + tmpFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION; + fileNameToTmpName.put(partFileName, tmpFileName); + } + activeFilePath = new Path(filePath + Path.SEPARATOR + tmpFileName); + } + + FSDataOutputStream fsOutput; + + boolean sawThisFileBefore = endOffsets.containsKey(filename); + + try { + if (fs.exists(originalFilePath) || (alwaysWriteToTmp && fs.exists(activeFilePath))) { + if (sawThisFileBefore) { + FileStatus fileStatus = fs.getFileStatus(activeFilePath); + MutableLong endOffset = endOffsets.get(filename); + + if (endOffset != null) { + endOffset.setValue(fileStatus.getLen()); + } else { + endOffsets.put(filename, new MutableLong(fileStatus.getLen())); + } + + fsOutput = openStream(activeFilePath, true); + LOG.debug("appending to {}", activeFilePath); + } else { + //We never saw this file before and we don't want to append + //If the file is rolling we need to delete all its parts. + if (rollingFile) { + int part = 0; + + while (true) { + Path seenPartFilePath = new Path(filePath + Path.SEPARATOR + getPartFileName(filename, part)); + if (!fs.exists(seenPartFilePath)) { + break; + } + + fs.delete(seenPartFilePath, true); + part = part + 1; + } + + fsOutput = openStream(activeFilePath, false); + } else { + //Not rolling is easy, just delete the file and create it again. + fs.delete(activeFilePath, true); + if (alwaysWriteToTmp) { + //we need to delete original file if that exists + if (fs.exists(originalFilePath)) { + fs.delete(originalFilePath, true); + } + } + fsOutput = openStream(activeFilePath, false); + } + } + } else { + fsOutput = openStream(activeFilePath, false); + } + filesWithOpenStreams.add(filename); + + LOG.info("opened {}, active {}", partFileName, activeFilePath); + return new FSFilterStreamContext(fsOutput); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } + + /** + * Creates the removal listener which is attached to the cache. + * + * @return cache entry removal listener. + */ + private RemovalListener<String, FSFilterStreamContext> createCacheRemoveListener() + { + //When an entry is removed from the cache, removal listener is notified and it closes the output stream. + return new RemovalListener<String, FSFilterStreamContext>() + { + @Override + public void onRemoval(@Nonnull RemovalNotification<String, FSFilterStreamContext> notification) + { + FSFilterStreamContext streamContext = notification.getValue(); + if (streamContext != null) { + try { + String filename = notification.getKey(); + String partFileName = getPartFileNamePri(filename); + + LOG.debug("closing {}", partFileName); + long start = System.currentTimeMillis(); + + closeStream(streamContext); + filesWithOpenStreams.remove(filename); + + totalWritingTime += System.currentTimeMillis() - start; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }; + } + + /** + * Opens the stream for the specified file path in either append mode or create mode. + * + * @param filepath this is the path of either the actual file or the corresponding temporary file. + * @param append true for opening the file in append mode; false otherwise. + * @return output stream. + * @throws IOException + */ + protected FSDataOutputStream openStream(Path filepath, boolean append) throws IOException + { + FSDataOutputStream fsOutput; + if (append) { + fsOutput = fs.append(filepath); + } else { + fsOutput = fs.create(filepath, (short)replication); + fs.setPermission(filepath, FsPermission.createImmutable(filePermission)); + } + return fsOutput; + } + + /** + * Closes the stream which has been removed from the cache. + * + * @param streamContext stream context which is removed from the cache. + * @throws IOException + */ + protected void closeStream(FSFilterStreamContext streamContext) throws IOException + { + streamContext.close(); } /** @@ -592,12 +643,12 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp MutableInt part = finalizedPart.get(fileName); if (part == null) { - part = new MutableInt(); + part = new MutableInt(-1); finalizedPart.put(fileName, part); } MutableInt currentOpenPart = openPart.get(fileName); - for (int x = part.getValue() + 1; x < currentOpenPart.getValue(); x++) { + for (int x = part.getValue() + 1; x <= currentOpenPart.getValue(); x++) { String prevPartNotFinalized = getPartFileName(fileName, x); LOG.debug("request finalize {}", prevPartNotFinalized); filesPerWindow.add(prevPartNotFinalized); @@ -619,12 +670,11 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp //Close all the streams you can Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap(); for(String seenFileName: openStreams.keySet()) { - //FilterOutputStream filterStream = openStreams.get(seenFileName).getFilterStream(); FSFilterStreamContext fsFilterStreamContext = openStreams.get(seenFileName); try { long start = System.currentTimeMillis(); - //filterStream.close(); - fsFilterStreamContext.close(); + closeStream(fsFilterStreamContext); + filesWithOpenStreams.remove(seenFileName); totalWritingTime += System.currentTimeMillis() - start; } catch (IOException ex) { @@ -727,11 +777,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp } count.add(1); - } - catch (IOException ex) { - throw new RuntimeException(ex); - } - catch (ExecutionException ex) { + } catch (IOException | ExecutionException ex) { throw new RuntimeException(ex); } } @@ -743,24 +789,19 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp * @throws IOException * @throws ExecutionException */ - protected void rotate(String fileName) throws IllegalArgumentException, - IOException, - ExecutionException + protected void rotate(String fileName) throws IllegalArgumentException, IOException, ExecutionException { requestFinalize(fileName); counts.remove(fileName); streamsCache.invalidate(fileName); MutableInt mi = openPart.get(fileName); - int rotatedFileIndex = mi.getValue(); - mi.add(1); - LOG.debug("Part file index: {}", openPart); - endOffsets.get(fileName).setValue(0L); - String partFileName = getPartFileName(fileName, rotatedFileIndex); + LOG.debug("Part file rotated {} : {}", fileName, mi.getValue()); + + //TODO: remove this as rotateHook is deprecated. + String partFileName = getPartFileName(fileName, mi.getValue()); rotateHook(partFileName); - if (rotationWindows > 0) { - getRotationState(fileName).rotated = true; - } + getRotationState(fileName).rotated = true; } private RotationState getRotationState(String fileName) @@ -778,6 +819,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp * the name of the file part that has just completed closed. * @param finishedFile The name of the file part that has just completed and closed. */ + @Deprecated protected void rotateHook(@SuppressWarnings("unused") String finishedFile) { //Do nothing by default @@ -887,15 +929,10 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp if (rotate) { try { rotate(filename); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { + } catch (IOException | ExecutionException e) { throw new RuntimeException(e); } } - if (rotationState != null) { - rotationState.rotated = false; - } } } } @@ -1047,7 +1084,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp TOTAL_TIME_WRITING_MILLISECONDS } - private class FSFilterStreamContext implements FilterStreamContext<FilterOutputStream> + protected class FSFilterStreamContext implements FilterStreamContext<FilterOutputStream> { private FSDataOutputStream outputStream;
