Repository: apex-malhar Updated Branches: refs/heads/master abb3900c9 -> 2f308aa21
APEXMALHAR-2263 change offset in AbstractFileInputOperator from int to long Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2f308aa2 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2f308aa2 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2f308aa2 Branch: refs/heads/master Commit: 2f308aa21dacff082f854d583df6dff9d1ec30ec Parents: abb3900 Author: Matt Zhang <[email protected]> Authored: Fri Oct 14 16:06:04 2016 -0700 Committer: Matt Zhang <[email protected]> Committed: Tue Oct 18 22:37:01 2016 -0700 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileInputOperator.java | 25 ++++++++++---------- pom.xml | 1 + 2 files changed, 14 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f308aa2/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index f0e3fbb..9e80b4e 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -96,6 +96,7 @@ import com.datatorrent.lib.util.KryoCloneUtils; * @param <T> The type of the object that this input operator reads. * @since 1.0.2 */ [email protected] public abstract class AbstractFileInputOperator<T> implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, Operator.CheckpointListener { @@ -106,7 +107,7 @@ public abstract class AbstractFileInputOperator<T> @NotNull protected DirectoryScanner scanner = new DirectoryScanner(); protected int scanIntervalMillis = 5000; - protected int offset; + protected long offset; protected String currentFile; protected Set<String> processedFiles = new HashSet<String>(); protected int emitBatchSize = 1000; @@ -114,7 +115,7 @@ public abstract class AbstractFileInputOperator<T> protected int partitionCount = 1; private int retryCount = 0; private int maxRetryCount = 5; - protected transient int skipCount = 0; + protected transient long skipCount = 0; private transient OperatorContext context; private final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class); @@ -143,7 +144,7 @@ public abstract class AbstractFileInputOperator<T> protected static class FailedFile { String path; - int offset; + long offset; int retryCount; long lastFailedTime; @@ -151,14 +152,14 @@ public abstract class AbstractFileInputOperator<T> @SuppressWarnings("unused") protected FailedFile() {} - protected FailedFile(String path, int offset) + protected FailedFile(String path, long offset) { this.path = path; this.offset = offset; this.retryCount = 0; } - protected FailedFile(String path, int offset, int retryCount) + protected FailedFile(String path, long offset, int retryCount) { this.path = path; this.offset = offset; @@ -623,7 +624,7 @@ public abstract class AbstractFileInputOperator<T> try { if (currentFile != null && offset > 0) { //open file resets offset to 0 so this a way around it. - int tmpOffset = offset; + long tmpOffset = offset; if (fs.exists(new Path(currentFile))) { this.inputStream = openFile(new Path(currentFile)); offset = tmpOffset; @@ -651,7 +652,7 @@ public abstract class AbstractFileInputOperator<T> } } if (inputStream != null) { - int startOffset = offset; + long startOffset = offset; String file = currentFile; //current file is reset to null when closed. try { @@ -1130,8 +1131,8 @@ public abstract class AbstractFileInputOperator<T> protected static class RecoveryEntry { final String file; - final int startOffset; - final int endOffset; + final long startOffset; + final long endOffset; @SuppressWarnings("unused") private RecoveryEntry() @@ -1141,7 +1142,7 @@ public abstract class AbstractFileInputOperator<T> endOffset = -1; } - RecoveryEntry(String file, int startOffset, int endOffset) + RecoveryEntry(String file, long startOffset, long endOffset) { this.file = Preconditions.checkNotNull(file, "file"); this.startOffset = startOffset; @@ -1174,8 +1175,8 @@ public abstract class AbstractFileInputOperator<T> public int hashCode() { int result = file.hashCode(); - result = 31 * result + startOffset; - result = 31 * result + endOffset; + result = 31 * result + (int)(startOffset & 0xFFFFFFFF); + result = 31 * result + (int)(endOffset & 0xFFFFFFFF); return result; } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2f308aa2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index cb8bd93..7a392d0 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,7 @@ <exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude> <exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude> <exclude>com.datatorrent.lib.io.fs.FSInputModule</exclude> + <exclude>com.datatorrent.lib.io.fs.AbstractFileInputOperator</exclude> </excludes> </parameter> <skip>${semver.plugin.skip}</skip>
