Repository: incubator-apex-malhar Updated Branches: refs/heads/master 2138512bb -> 72de84005
APEXMALHAR-2080 APEXMALHAR-2079 fixed the typo and increased the default value of expireStreamAfterAccessMillis Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/72de8400 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/72de8400 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/72de8400 Branch: refs/heads/master Commit: 72de84005945ce6e588c85a699183380016791f2 Parents: 2138512 Author: Chandni Singh <csi...@apache.org> Authored: Thu May 5 10:23:48 2016 -0700 Committer: Chandni Singh <csi...@apache.org> Committed: Thu May 5 10:23:48 2016 -0700 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileOutputOperator.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/72de8400/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index 15a208f..0195f7f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -257,7 +257,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp * operations, or during occasional read operations if writes are rare.<br/> * This isn't the most effective way but adds a little bit of optimization. */ - private Long expireStreamAfterAcessMillis; + private Long expireStreamAfterAccessMillis; private final Set<String> filesWithOpenStreams; /** @@ -321,8 +321,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp public void setup(Context.OperatorContext context) { LOG.debug("setup initiated"); - if (expireStreamAfterAcessMillis == null) { - expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) * + if (expireStreamAfterAccessMillis == null) { + expireStreamAfterAccessMillis = (long)(context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT)); } rollingFile = (maxLength < Long.MAX_VALUE) || (rotationWindows > 0); @@ -343,7 +343,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp //building cache RemovalListener<String, FSFilterStreamContext> removalListener = createCacheRemoveListener(); CacheLoader<String, FSFilterStreamContext> loader = createCacheLoader(); - streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis, + streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAccessMillis, TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader); LOG.debug("File system class: {}", fs.getClass()); @@ -608,7 +608,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp String filename = notification.getKey(); String partFileName = getPartFileNamePri(filename); - LOG.debug("closing {}", partFileName); + LOG.info("closing {}", partFileName); long start = System.currentTimeMillis(); closeStream(streamContext); @@ -616,6 +616,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp totalWritingTime += System.currentTimeMillis() - start; } catch (IOException e) { + LOG.error("removing {}", notification.getValue(), e); throw new RuntimeException(e); } } @@ -1292,7 +1293,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp public Long getExpireStreamAfterAccessMillis() { - return expireStreamAfterAcessMillis; + return expireStreamAfterAccessMillis; } /** @@ -1303,7 +1304,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp */ public void setExpireStreamAfterAccessMillis(Long millis) { - this.expireStreamAfterAcessMillis = millis; + this.expireStreamAfterAccessMillis = millis; } /**