Repository: apex-malhar Updated Branches: refs/heads/master 91767c589 -> 16b15c21b
APEXMALHAR-2379 Validation of attributes for AbstractFileOutputOperator, regex on name instead of path, added @ Min constraint annotation Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/16b15c21 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/16b15c21 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/16b15c21 Branch: refs/heads/master Commit: 16b15c21be47eea7ff262cd5dd7eb44d824736b5 Parents: 91767c5 Author: francisf <[email protected]> Authored: Mon Jan 2 18:50:11 2017 +0530 Committer: francisf <[email protected]> Committed: Mon Jan 9 12:36:40 2017 +0530 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileInputOperator.java | 17 ++++++++++++++++- .../lib/io/fs/AbstractFileInputOperatorTest.java | 2 +- 2 files changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16b15c21/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index be156d1..bc5ebf1 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -19,6 +19,7 @@ package com.datatorrent.lib.io.fs; import java.io.BufferedReader; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -37,6 +38,7 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; import org.slf4j.Logger; @@ -106,14 +108,17 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par protected String directory; @NotNull protected DirectoryScanner scanner = new DirectoryScanner(); + @Min(0) protected int scanIntervalMillis = 5000; protected long offset; protected String currentFile; protected Set<String> processedFiles = new HashSet<String>(); + @Min(1) protected int emitBatchSize = 1000; protected int currentPartitions = 1; protected int partitionCount = 1; private int retryCount = 0; + @Min(0) private int maxRetryCount = 5; protected transient long skipCount = 0; private transient OperatorContext context; @@ -366,6 +371,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par */ public void setScanIntervalMillis(int scanIntervalMillis) { + if (scanIntervalMillis < 0) { + throw new IllegalArgumentException("scanIntervalMillis should be greater than or equal to 0."); + } this.scanIntervalMillis = scanIntervalMillis; } @@ -384,6 +392,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par */ public void setEmitBatchSize(int emitBatchSize) { + if (emitBatchSize <= 0) { + throw new IllegalArgumentException("emitBatchSize should be greater than 0."); + } this.emitBatchSize = emitBatchSize; } @@ -994,6 +1005,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par */ public void setMaxRetryCount(int maxRetryCount) { + if (maxRetryCount < 0) { + throw new IllegalArgumentException("maxRetryCount should be greater than or equal to 0."); + } this.maxRetryCount = maxRetryCount; } @@ -1100,7 +1114,8 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par } Pattern regex = this.getRegex(); if (regex != null) { - Matcher matcher = regex.matcher(filePathStr); + String fileName = new File(filePathStr).getName(); + Matcher matcher = regex.matcher(fileName); if (!matcher.matches()) { return false; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16b15c21/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index 1a97f5e..5ceac01 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -130,7 +130,7 @@ public class AbstractFileInputOperatorTest oper.output.setSink(sink); oper.setDirectory(testMeta.dir); - oper.getScanner().setFilePatternRegexp(".*file[\\d]"); + oper.getScanner().setFilePatternRegexp("((?!target).)*file[\\d]"); oper.getScanner().setRecursive(recursive); oper.setup(testMeta.context);
