Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 30e4b1ff6 -> d910102d7
MLHR-1837 #resolve #comment creating a new file even when it is not corrupted and also closing stale streams Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2a7d36dc Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2a7d36dc Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2a7d36dc Branch: refs/heads/devel-3 Commit: 2a7d36dc4157aa6837b6e5706f0c014ce9389524 Parents: 30e4b1f Author: Chandni Singh <[email protected]> Authored: Thu Sep 3 22:48:25 2015 -0700 Committer: Chandni Singh <[email protected]> Committed: Thu Sep 3 22:52:32 2015 -0700 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileOutputOperator.java | 44 ++++++++++++- .../io/fs/AbstractFileOutputOperatorTest.java | 65 ++++++++++---------- 2 files changed, 75 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2a7d36dc/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index a589751..aef0739 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.*; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.validation.constraints.Min; @@ -31,6 +32,7 @@ import com.google.common.cache.*; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,6 +228,17 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp protected long currentWindow; /** + * The stream is expired (closed and evicted from cache) after the specified duration has passed since it was last + * accessed by a read or write. + * <p/> + * https://code.google.com/p/guava-libraries/wiki/CachesExplained <br/> + * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a value expires, or anything of the sort. + * Instead, it performs small amounts of maintenance during write operations, or during occasional read operations if writes are rare.<br/> + * This isn't the most effective way but adds a little bit of optimization. + */ + private Long expireStreamAfterAcessMillis; + + /** * This input port receives incoming tuples. */ public final transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>() @@ -286,6 +299,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp public void setup(Context.OperatorContext context) { LOG.debug("setup initiated"); + if (expireStreamAfterAcessMillis == null) { + expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) * context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT)); + } rollingFile = (maxLength < Long.MAX_VALUE) || (rotationWindows > 0); //Getting required file system instance. @@ -416,13 +432,13 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp } }; - streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).removalListener(removalListener).build(loader); + streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis, TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader); try { LOG.debug("File system class: {}", fs.getClass()); LOG.debug("end-offsets {}", endOffsets); - //Restore the files in case they were corrupted and the operator + //Restore the files in case they were corrupted and the operator was re-deployed. Path writerPath = new Path(filePath); if (fs.exists(writerPath)) { for (String seenFileName : endOffsets.keySet()) { @@ -473,6 +489,13 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE); } } else { + if (alwaysWriteToTmp) { + String currentTmp = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION; + FSDataOutputStream outputStream = fs.create(new Path(filePath + Path.SEPARATOR + currentTmp)); + IOUtils.copy(inputStream, outputStream); + outputStream.close(); + fileNameToTmpName.put(seenFileNamePart, currentTmp); + } inputStream.close(); } } @@ -1188,6 +1211,23 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp { return finalizedFiles; } + + public Long getExpireStreamAfterAccessMillis() + { + return expireStreamAfterAcessMillis; + } + + /** + * Sets the duration after which the stream is expired (closed and removed from the cache) since it was last accessed + * by a read or write. + * + * @param millis time in millis. + */ + public void setExpireStreamAfterAccessMillis(Long millis) + { + this.expireStreamAfterAcessMillis = millis; + } + /** * Return the filter to use. If this method returns a filter the filter is applied to data before the data is stored * in the file. If it returns null no filter is applied and data is written as is. Override this method to provide http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2a7d36dc/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index 98f0152..b900af2 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -45,9 +45,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.RandomWordGenerator; import com.datatorrent.lib.util.TestUtils.TestInfo; -import com.datatorrent.api.DAG; -import com.datatorrent.api.LocalMode; -import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.*; import com.datatorrent.netlet.util.DTThrowable; @@ -61,12 +59,10 @@ public class AbstractFileOutputOperatorTest @Rule public FSTestWatcher testMeta = new FSTestWatcher(); - public static OperatorContextTestHelper.TestIdOperatorContext testOperatorContext = - new OperatorContextTestHelper.TestIdOperatorContext(0); - public static class FSTestWatcher extends TestInfo { public boolean writeToTmp = false; + public OperatorContextTestHelper.TestIdOperatorContext testOperatorContext; @Override protected void starting(Description description) @@ -74,6 +70,11 @@ public class AbstractFileOutputOperatorTest super.starting(description); try { FileUtils.forceMkdir(new File(getDir())); + Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 60); + attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500); + + testOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(0, attributeMap); } catch (IOException e) { throw new RuntimeException(e); } @@ -347,7 +348,7 @@ public class AbstractFileOutputOperatorTest { writer.setFilePath(testMeta.getDir()); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -428,7 +429,7 @@ public class AbstractFileOutputOperatorTest File meta = new File(testMeta.getDir()); writer.setFilePath(meta.getAbsolutePath()); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -444,7 +445,7 @@ public class AbstractFileOutputOperatorTest writer.teardown(); restoreCheckPoint(checkPointWriter, writer); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(1); writer.input.put(4); @@ -635,7 +636,7 @@ public class AbstractFileOutputOperatorTest File meta = new File(testMeta.getDir()); writer.setFilePath(meta.getAbsolutePath()); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -661,7 +662,7 @@ public class AbstractFileOutputOperatorTest File meta = new File(testMeta.getDir()); writer.setFilePath(meta.getAbsolutePath()); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -762,7 +763,7 @@ public class AbstractFileOutputOperatorTest File meta = new File(testMeta.getDir()); writer.setFilePath(meta.getAbsolutePath()); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -783,7 +784,7 @@ public class AbstractFileOutputOperatorTest restoreCheckPoint(checkPointWriter, writer); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(2); writer.input.put(6); @@ -889,7 +890,7 @@ public class AbstractFileOutputOperatorTest writer.setFilePath(testMeta.getDir()); writer.setMaxLength(4); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -960,7 +961,7 @@ public class AbstractFileOutputOperatorTest writer.setMaxLength(4); writer.setFilePath(testMeta.getDir()); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -978,7 +979,7 @@ public class AbstractFileOutputOperatorTest restoreCheckPoint(checkPointWriter, writer); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(1); writer.input.put(3); @@ -1003,7 +1004,7 @@ public class AbstractFileOutputOperatorTest writer.setFilePath(testMeta.getDir()); writer.setMaxLength(4); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -1028,7 +1029,7 @@ public class AbstractFileOutputOperatorTest restoreCheckPoint(checkPointWriter, writer); LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(2); writer.input.put(5); @@ -1043,7 +1044,7 @@ public class AbstractFileOutputOperatorTest writer.teardown(); restoreCheckPoint(checkPointWriter1, writer); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.committed(2); String singleFilePath = testMeta.getDir() + File.separator + SINGLE_FILE; @@ -1143,7 +1144,7 @@ public class AbstractFileOutputOperatorTest File meta = new File(testMeta.getDir()); writer.setFilePath(meta.getAbsolutePath()); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -1211,7 +1212,7 @@ public class AbstractFileOutputOperatorTest File meta = new File(testMeta.getDir()); writer.setFilePath(meta.getAbsolutePath()); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -1356,7 +1357,7 @@ public class AbstractFileOutputOperatorTest writer.setMaxLength(4); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -1372,7 +1373,7 @@ public class AbstractFileOutputOperatorTest restoreCheckPoint(checkPointWriter, writer); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(1); writer.input.put(2); @@ -1476,7 +1477,7 @@ public class AbstractFileOutputOperatorTest writer.setFilePath(meta.getAbsolutePath()); writer.setMaxLength(4); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -1492,7 +1493,7 @@ public class AbstractFileOutputOperatorTest restoreCheckPoint(checkPointWriter, writer); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(1); writer.input.put(4); @@ -1528,7 +1529,7 @@ public class AbstractFileOutputOperatorTest writer.setFilePath(meta.getAbsolutePath()); writer.setMaxLength(4); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.process(0); @@ -1544,7 +1545,7 @@ public class AbstractFileOutputOperatorTest restoreCheckPoint(checkPointWriter, writer); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(1); writer.input.process(4); @@ -1662,7 +1663,7 @@ public class AbstractFileOutputOperatorTest private void singleFileMultiRollingFailureHelper(SingleHDFSExactlyOnceWriter writer) { writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(0); writer.input.put(0); @@ -1690,7 +1691,7 @@ public class AbstractFileOutputOperatorTest restoreCheckPoint(checkPointWriter, writer); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); writer.beginWindow(1); writer.input.put(0); @@ -1746,7 +1747,7 @@ public class AbstractFileOutputOperatorTest writer.setFilePath(testMeta.getDir()); writer.setRotationWindows(30); writer.setAlwaysWriteToTmp(testMeta.writeToTmp); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); // Check that rotation doesn't happen prematurely for (int i = 0; i < 30; ++i) { @@ -1823,7 +1824,7 @@ public class AbstractFileOutputOperatorTest writer.setFilePath(testMeta.getDir()); writer.setAlwaysWriteToTmp(false); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); for (int i = 0; i < 10; ++i) { writer.beginWindow(i); @@ -1982,7 +1983,7 @@ public class AbstractFileOutputOperatorTest writer.setFilePath(testMeta.getDir()); writer.setAlwaysWriteToTmp(false); - writer.setup(testOperatorContext); + writer.setup(testMeta.testOperatorContext); for (int i = 0; i < 10; ++i) { writer.beginWindow(i);
