Repository: apex-malhar Updated Branches: refs/heads/release-3.5 9bcffafe4 -> ebd4f376d
APEXMALHAR-2227 Index out of Bound Exception fix in Kafka input Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ebd4f376 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ebd4f376 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ebd4f376 Branch: refs/heads/release-3.5 Commit: ebd4f376d5502c2c2dd82f143887b0c1ac796f57 Parents: 9bcffaf Author: Chandni Singh <[email protected]> Authored: Wed Sep 7 08:58:09 2016 -0700 Committer: Thomas Weise <[email protected]> Committed: Wed Sep 7 12:59:47 2016 -0700 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 2 +- .../malhar/lib/wal/FSWindowDataManager.java | 21 +++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ebd4f376/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index abf3fad..fc11bf7 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -621,7 +621,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem if (numPartitionsChanged) { List<WindowDataManager> managers = windowDataManager.partition(resultPartitions.size(), deletedOperators); int i = 0; - for (Partition<AbstractKafkaInputOperator<K>> partition : partitions) { + for (Partition<AbstractKafkaInputOperator<K>> partition : resultPartitions) { partition.getPartitionedInstance().setWindowDataManager(managers.get(i++)); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ebd4f376/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java index 81f6aa0..2b85580 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java @@ -237,15 +237,18 @@ public class FSWindowDataManager implements WindowDataManager protected void createReadOnlyWals() throws IOException { - RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(new Path(fullStatePath)); - while (operatorsIter.hasNext()) { - FileStatus status = operatorsIter.next(); - int operatorId = Integer.parseInt(status.getPath().getName()); - - if (operatorId != this.operatorId) { - //create read-only wal for other partitions - FSWindowReplayWAL wal = new FSWindowReplayWAL(true); - readOnlyWals.put(operatorId, wal); + Path statePath = new Path(fullStatePath); + if (fileContext.util().exists(statePath)) { + RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(statePath); + while (operatorsIter.hasNext()) { + FileStatus status = operatorsIter.next(); + int operatorId = Integer.parseInt(status.getPath().getName()); + + if (operatorId != this.operatorId) { + //create read-only wal for other partitions + FSWindowReplayWAL wal = new FSWindowReplayWAL(true); + readOnlyWals.put(operatorId, wal); + } } } }
