Repository: apex-malhar Updated Branches: refs/heads/master 154ff3ccb -> a5ffae6fb
APEXMALHAR-2227 Index out of Bound Exception fix in Kafka input Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a5ffae6f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a5ffae6f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a5ffae6f Branch: refs/heads/master Commit: a5ffae6fb6620f374c75e1a0760c4dd6f8cf1c62 Parents: 154ff3c Author: Chandni Singh <[email protected]> Authored: Wed Sep 7 08:58:09 2016 -0700 Committer: Chandni Singh <[email protected]> Committed: Wed Sep 7 11:34:56 2016 -0700 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 2 +- .../malhar/lib/wal/FSWindowDataManager.java | 21 +++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5ffae6f/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index abf3fad..fc11bf7 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -621,7 +621,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem if (numPartitionsChanged) { List<WindowDataManager> managers = windowDataManager.partition(resultPartitions.size(), deletedOperators); int i = 0; - for (Partition<AbstractKafkaInputOperator<K>> partition : partitions) { + for (Partition<AbstractKafkaInputOperator<K>> partition : resultPartitions) { partition.getPartitionedInstance().setWindowDataManager(managers.get(i++)); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5ffae6f/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java index 81f6aa0..2b85580 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java @@ -237,15 +237,18 @@ public class FSWindowDataManager implements WindowDataManager protected void createReadOnlyWals() throws IOException { - RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(new Path(fullStatePath)); - while (operatorsIter.hasNext()) { - FileStatus status = operatorsIter.next(); - int operatorId = Integer.parseInt(status.getPath().getName()); - - if (operatorId != this.operatorId) { - //create read-only wal for other partitions - FSWindowReplayWAL wal = new FSWindowReplayWAL(true); - readOnlyWals.put(operatorId, wal); + Path statePath = new Path(fullStatePath); + if (fileContext.util().exists(statePath)) { + RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(statePath); + while (operatorsIter.hasNext()) { + FileStatus status = operatorsIter.next(); + int operatorId = Integer.parseInt(status.getPath().getName()); + + if (operatorId != this.operatorId) { + //create read-only wal for other partitions + FSWindowReplayWAL wal = new FSWindowReplayWAL(true); + readOnlyWals.put(operatorId, wal); + } } } }
