APEXMALHAR-2063 Made window data manager use file system wal
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1b4536ca Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1b4536ca Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1b4536ca Branch: refs/heads/master Commit: 1b4536ca0224eae2452d3a052d6f67b67bdec893 Parents: 7d9386d Author: Chandni Singh <[email protected]> Authored: Fri Aug 26 20:16:17 2016 -0700 Committer: Chandni Singh <[email protected]> Committed: Fri Aug 26 21:10:48 2016 -0700 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 66 +- .../kinesis/AbstractKinesisInputOperator.java | 102 ++- .../contrib/nifi/AbstractNiFiInputOperator.java | 8 +- .../nifi/AbstractNiFiOutputOperator.java | 4 +- .../rabbitmq/AbstractRabbitMQInputOperator.java | 8 +- .../AbstractRabbitMQOutputOperator.java | 4 +- .../redis/AbstractRedisInputOperator.java | 12 +- .../contrib/kafka/KafkaInputOperatorTest.java | 4 +- .../kinesis/KinesisInputOperatorTest.java | 2 +- .../nifi/NiFiSinglePortInputOperatorTest.java | 4 +- .../rabbitmq/RabbitMQInputOperatorTest.java | 4 +- .../contrib/redis/RedisInputOperatorTest.java | 4 +- .../kafka/AbstractKafkaInputOperator.java | 9 +- ...afkaSinglePortExactlyOnceOutputOperator.java | 16 +- .../malhar/kafka/KafkaOutputOperatorTest.java | 2 +- .../db/jdbc/AbstractJdbcPollInputOperator.java | 16 +- .../lib/io/fs/AbstractFileInputOperator.java | 19 +- .../com/datatorrent/lib/io/fs/FileSplitter.java | 16 +- .../lib/io/fs/FileSplitterInput.java | 33 +- .../lib/io/jms/AbstractJMSInputOperator.java | 60 +- .../AbstractManagedStateInnerJoinOperator.java | 4 +- .../state/managed/AbstractManagedStateImpl.java | 37 +- .../managed/IncrementalCheckpointManager.java | 51 +- .../malhar/lib/wal/FSWindowDataManager.java | 723 ++++++++++++++----- .../apex/malhar/lib/wal/FSWindowReplayWAL.java | 188 +++++ .../apex/malhar/lib/wal/FileSystemWAL.java | 292 +++++--- .../org/apache/apex/malhar/lib/wal/WAL.java | 9 +- .../apex/malhar/lib/wal/WindowDataManager.java | 103 ++- .../db/jdbc/JdbcPojoPollableOpeartorTest.java | 4 +- .../io/fs/AbstractFileInputOperatorTest.java | 32 +- .../lib/io/fs/FileSplitterInputTest.java | 201 ++++-- .../datatorrent/lib/io/fs/FileSplitterTest.java | 2 +- .../lib/io/jms/JMSStringInputOperatorTest.java | 37 +- .../lib/io/jms/SQSStringInputOperatorTest.java | 4 +- .../IncrementalCheckpointManagerTest.java | 19 +- .../malhar/lib/wal/FSWindowDataManagerTest.java | 259 ++++--- .../apex/malhar/lib/wal/FileSystemWALTest.java | 4 +- 37 files changed, 1581 insertions(+), 781 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index 21ff181..abf3fad 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -273,7 +273,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= windowDataManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestCompletedWindow()) { replay(windowId); } emitCount = 0; @@ -285,7 +285,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem try { @SuppressWarnings("unchecked") Map<KafkaPartition, MutablePair<Long, Integer>> recoveredData = (Map<KafkaPartition, MutablePair<Long, Integer>>) - windowDataManager.load(operatorId, windowId); + windowDataManager.retrieve(windowId); if (recoveredData != null) { Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().topic); if (pms != null) { @@ -325,7 +325,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem } } } - if(windowId == windowDataManager.getLargestRecoveryWindow()) { + if(windowId == windowDataManager.getLargestCompletedWindow()) { // Start the consumer at the largest recovery window SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer(); // Set the offset positions to the consumer @@ -351,9 +351,9 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem Map<KafkaPartition, Long> carryOn = new HashMap<>(offsetStats); offsetTrackHistory.add(Pair.of(currentWindowId, carryOn)); } - if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestCompletedWindow()) { try { - windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); @@ -396,7 +396,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem } try { - windowDataManager.deleteUpTo(operatorId, windowId); + windowDataManager.committed(windowId); } catch (IOException e) { throw new RuntimeException("deleting state", e); @@ -407,7 +407,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem public void activate(OperatorContext ctx) { if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID && - context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { + context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) { // If it is a replay state, don't start the consumer return; } @@ -426,7 +426,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem @Override public void emitTuples() { - if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { return; } int count = consumer.messageSize() + ((pendingMessage != null) ? 1 : 0); @@ -521,9 +521,10 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem logger.info("Initial offsets: {} ", "{ " + Joiner.on(", ").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }"); } - Collection<WindowDataManager> newManagers = Sets.newHashSet(); - Set<Integer> deletedOperators = Sets.newHashSet(); - + Set<Integer> deletedOperators = Sets.newHashSet(); + Collection<Partition<AbstractKafkaInputOperator<K>>> resultPartitions = partitions; + boolean numPartitionsChanged = false; + switch (strategy) { // For the 1 to 1 mapping The framework will create number of operator partitions based on kafka topic partitions @@ -545,19 +546,21 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem String clusterId = kp.getKey(); for (PartitionMetadata pm : kp.getValue()) { logger.info("[ONE_TO_ONE]: Create operator partition for cluster {}, topic {}, kafka partition {} ", clusterId, getConsumer().topic, pm.partitionId()); - newPartitions.add(createPartition(Sets.newHashSet(new KafkaPartition(clusterId, consumer.topic, pm.partitionId())), initOffset, newManagers)); + newPartitions.add(createPartition(Sets.newHashSet(new KafkaPartition(clusterId, consumer.topic, pm.partitionId())), initOffset)); } } - windowDataManager.partitioned(newManagers, deletedOperators); - return newPartitions; + resultPartitions = newPartitions; + numPartitionsChanged = true; } else if (newWaitingPartition.size() != 0) { // add partition for new kafka partition for (KafkaPartition newPartition : newWaitingPartition) { logger.info("[ONE_TO_ONE]: Add operator partition for cluster {}, topic {}, partition {}", newPartition.getClusterId(), getConsumer().topic, newPartition.getPartitionId()); - partitions.add(createPartition(Sets.newHashSet(newPartition), null, newManagers)); + partitions.add(createPartition(Sets.newHashSet(newPartition), null)); } newWaitingPartition.clear(); + resultPartitions = partitions; + numPartitionsChanged = true; } break; // For the 1 to N mapping The initial partition number is defined by stream application @@ -595,7 +598,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem newPartitions = new ArrayList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>(size); for (i = 0; i < size; i++) { logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): {} ", StringUtils.join(kps[i], ", ")); - newPartitions.add(createPartition(kps[i], initOffset, newManagers)); + newPartitions.add(createPartition(kps[i], initOffset)); } // Add the existing partition Ids to the deleted operators for (Partition<AbstractKafkaInputOperator<K>> op : partitions) @@ -604,8 +607,8 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem } newWaitingPartition.clear(); - windowDataManager.partitioned(newManagers, deletedOperators); - return newPartitions; + resultPartitions = newPartitions; + numPartitionsChanged = true; } break; @@ -614,13 +617,33 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem default: break; } + + if (numPartitionsChanged) { + List<WindowDataManager> managers = windowDataManager.partition(resultPartitions.size(), deletedOperators); + int i = 0; + for (Partition<AbstractKafkaInputOperator<K>> partition : partitions) { + partition.getPartitionedInstance().setWindowDataManager(managers.get(i++)); + } + } + return resultPartitions; + } - windowDataManager.partitioned(newManagers, deletedOperators); - return partitions; + /** + * Create a new partition with the partition Ids and initial offset positions + * + * @deprecated use {@link #createPartition(Set, Map)} + */ + @Deprecated + protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, + Map<KafkaPartition, Long> initOffsets, + @SuppressWarnings("UnusedParameters") Collection<WindowDataManager> newManagers) + { + return createPartition(pIds, initOffsets); } // Create a new partition with the partition Ids and initial offset positions - protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> initOffsets, Collection<WindowDataManager> newManagers) + protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, + Map<KafkaPartition, Long> initOffsets) { Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<>(KryoCloneUtils.cloneObject(this)); @@ -632,7 +655,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets()); } } - newManagers.add(p.getPartitionedInstance().windowDataManager); PartitionInfo pif = new PartitionInfo(); pif.kpids = pIds; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java index 2352236..c03df21 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java @@ -18,32 +18,46 @@ */ package com.datatorrent.contrib.kinesis; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.*; -import com.datatorrent.api.Operator.ActivationListener; -import com.datatorrent.common.util.Pair; -import com.datatorrent.lib.util.KryoCloneUtils; +import java.io.IOException; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; -import com.esotericsoftware.kryo.DefaultSerializer; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.collect.Sets; +import javax.validation.Valid; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import javax.validation.Valid; -import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.collect.Sets; -import java.io.IOException; -import java.lang.reflect.Array; -import java.util.*; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.Stats; +import com.datatorrent.api.StatsListener; +import com.datatorrent.common.util.Pair; +import com.datatorrent.lib.util.KryoCloneUtils; @DefaultSerializer(JavaSerializer.class) class KinesisPair <F, S> extends Pair<F, S> @@ -168,7 +182,6 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, // Operator partitions List<Partition<AbstractKinesisInputOperator>> newPartitions = null; - Collection<WindowDataManager> newManagers = Sets.newHashSet(); Set<Integer> deletedOperators = Sets.newHashSet(); // initialize the shard positions @@ -188,7 +201,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, newPartitions = new ArrayList<Partition<AbstractKinesisInputOperator>>(shards.size()); for (int i = 0; i < shards.size(); i++) { logger.info("[ONE_TO_ONE]: Create operator partition for kinesis partition: " + shards.get(i).getShardId() + ", StreamName: " + this.getConsumer().streamName); - newPartitions.add(createPartition(Sets.newHashSet(shards.get(i).getShardId()), initShardPos, newManagers)); + newPartitions.add(createPartition(Sets.newHashSet(shards.get(i).getShardId()), initShardPos)); } } else if (newWaitingPartition.size() != 0) { // Remove the partitions for the closed shards @@ -196,10 +209,15 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, // add partition for new kinesis shard for (String pid : newWaitingPartition) { logger.info("[ONE_TO_ONE]: Add operator partition for kinesis partition " + pid); - partitions.add(createPartition(Sets.newHashSet(pid), null, newManagers)); + partitions.add(createPartition(Sets.newHashSet(pid), null)); } newWaitingPartition.clear(); - windowDataManager.partitioned(newManagers, deletedOperators); + List<WindowDataManager> managers = windowDataManager.partition(partitions.size(), deletedOperators); + int i = 0; + for (Partition<AbstractKinesisInputOperator> partition : partitions) { + partition.getPartitionedInstance().setWindowDataManager(managers.get(i)); + i++; + } return partitions; } break; @@ -211,11 +229,10 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, 2. Static Partition: Number of DT partitions is fixed, whether the number of shards are increased/decreased. */ int size = initialPartitionCount; - if(newWaitingPartition.size() != 0) - { + if (newWaitingPartition.size() != 0) { // Get the list of open shards shards = getOpenShards(partitions); - if(shardsPerPartition > 1) + if (shardsPerPartition > 1) size = (int)Math.ceil(shards.size() / (shardsPerPartition * 1.0)); initShardPos = shardManager.loadInitialShardPositions(); } @@ -238,20 +255,26 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, newWaitingPartition.clear(); } // Add the existing partition Ids to the deleted operators - for(Partition<AbstractKinesisInputOperator> op : partitions) - { + for (Partition<AbstractKinesisInputOperator> op : partitions) { deletedOperators.add(op.getPartitionedInstance().operatorId); } + for (int i = 0; i < pIds.length; i++) { logger.info("[MANY_TO_ONE]: Create operator partition for kinesis partition(s): " + StringUtils.join(pIds[i], ", ") + ", StreamName: " + this.getConsumer().streamName); - if(pIds[i] != null) - newPartitions.add(createPartition(pIds[i], initShardPos, newManagers)); + + if (pIds[i] != null) { + newPartitions.add(createPartition(pIds[i], initShardPos)); + } } break; default: break; } - windowDataManager.partitioned(newManagers, deletedOperators); + int i = 0; + List<WindowDataManager> managers = windowDataManager.partition(partitions.size(), deletedOperators); + for (Partition<AbstractKinesisInputOperator> partition : partitions) { + partition.getPartitionedInstance().setWindowDataManager(managers.get(i++)); + } return newPartitions; } @@ -371,10 +394,9 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, } // Create a new partition with the shardIds and initial shard positions private - Partition<AbstractKinesisInputOperator> createPartition(Set<String> shardIds, Map<String, String> initShardPos, Collection<WindowDataManager> newManagers) + Partition<AbstractKinesisInputOperator> createPartition(Set<String> shardIds, Map<String, String> initShardPos) { Partition<AbstractKinesisInputOperator> p = new DefaultPartition<AbstractKinesisInputOperator>(KryoCloneUtils.cloneObject(this)); - newManagers.add(p.getPartitionedInstance().windowDataManager); p.getPartitionedInstance().getConsumer().setShardIds(shardIds); p.getPartitionedInstance().getConsumer().resetShardPositions(initShardPos); @@ -403,7 +425,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, operatorId = context.getId(); windowDataManager.setup(context); shardPosition.clear(); - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { + if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) { isReplayState = true; } } @@ -426,7 +448,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, { emitCount = 0; currentWindowId = windowId; - if (windowId <= windowDataManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestCompletedWindow()) { replay(windowId); } } @@ -436,7 +458,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, try { @SuppressWarnings("unchecked") Map<String, KinesisPair<String, Integer>> recoveredData = - (Map<String, KinesisPair<String, Integer>>)windowDataManager.load(operatorId, windowId); + (Map<String, KinesisPair<String, Integer>>)windowDataManager.retrieve(windowId); if (recoveredData == null) { return; } @@ -466,10 +488,10 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, @Override public void endWindow() { - if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestCompletedWindow()) { context.setCounters(getConsumer().getConsumerStats(shardPosition)); try { - windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); @@ -497,7 +519,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, public void committed(long windowId) { try { - windowDataManager.deleteUpTo(operatorId, windowId); + windowDataManager.committed(windowId); } catch (IOException e) { throw new RuntimeException("deleting state", e); @@ -523,7 +545,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, @Override public void emitTuples() { - if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { return; } int count = consumer.getQueueSize(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java index 1bc81d6..b040d87 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java @@ -105,9 +105,9 @@ public abstract class AbstractNiFiInputOperator<T> implements InputOperator currentWindowId = windowId; // if the current window is now less than the largest window, then we need to replay data - if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { try { - List<T> recoveredData = (List<T>)this.windowDataManager.load(operatorContextId, windowId); + List<T> recoveredData = (List<T>)this.windowDataManager.retrieve(windowId); if (recoveredData == null) { return; } @@ -159,7 +159,7 @@ public abstract class AbstractNiFiInputOperator<T> implements InputOperator // ensure we have the data saved before proceeding in case anything goes wrong currentWindowTuples.addAll(tuples); - windowDataManager.save(currentWindowTuples, operatorContextId, currentWindowId); + windowDataManager.save(currentWindowTuples, currentWindowId); // we now have the data saved so we can complete the transaction transaction.complete(); @@ -192,7 +192,7 @@ public abstract class AbstractNiFiInputOperator<T> implements InputOperator { // save the final state of the window and clear the current window list try { - windowDataManager.save(currentWindowTuples, operatorContextId, currentWindowId); + windowDataManager.save(currentWindowTuples, currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java index d0dc23f..53cf302 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java @@ -104,7 +104,7 @@ public abstract class AbstractNiFiOutputOperator<T> extends BaseOperator public void beginWindow(long windowId) { currentWindowId = windowId; - largestRecoveryWindowId = windowDataManager.getLargestRecoveryWindow(); + largestRecoveryWindowId = windowDataManager.getLargestCompletedWindow(); // if processing a window we've already seen, don't resend the tuples if (currentWindowId <= largestRecoveryWindowId) { @@ -127,7 +127,7 @@ public abstract class AbstractNiFiOutputOperator<T> extends BaseOperator // mark that we processed the window try { - windowDataManager.save("processedWindow", operatorContextId, currentWindowId); + windowDataManager.save("processedWindow", currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java index b842698..847602e 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java @@ -190,7 +190,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= this.windowDataManager.getLargestRecoveryWindow()) { + if (windowId <= this.windowDataManager.getLargestCompletedWindow()) { replay(windowId); } } @@ -199,7 +199,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements private void replay(long windowId) { Map<Long, byte[]> recoveredData; try { - recoveredData = (Map<Long, byte[]>) this.windowDataManager.load(operatorContextId, windowId); + recoveredData = (Map<Long, byte[]>)this.windowDataManager.retrieve(windowId); if (recoveredData == null) { return; } @@ -225,7 +225,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements } try { - this.windowDataManager.save(currentWindowRecoveryState, operatorContextId, currentWindowId); + this.windowDataManager.save(currentWindowRecoveryState, currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } @@ -320,7 +320,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements public void committed(long windowId) { try { - windowDataManager.deleteUpTo(operatorContextId, windowId); + windowDataManager.committed(windowId); } catch (IOException e) { throw new RuntimeException("committing", e); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java index 6043c5b..a19417c 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java @@ -109,7 +109,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator public void beginWindow(long windowId) { currentWindowId = windowId; - largestRecoveryWindowId = windowDataManager.getLargestRecoveryWindow(); + largestRecoveryWindowId = windowDataManager.getLargestCompletedWindow(); if (windowId <= largestRecoveryWindowId) { // Do not resend already sent tuples skipProcessingTuple = true; @@ -132,7 +132,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator return; } try { - windowDataManager.save("processedWindow", operatorContextId, currentWindowId); + windowDataManager.save("processedWindow", currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java index fd0a885..59b320d 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java @@ -94,7 +94,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor currentWindowId = windowId; scanCallsInCurrentWindow = 0; replay = false; - if (currentWindowId <= getWindowDataManager().getLargestRecoveryWindow()) { + if (currentWindowId <= getWindowDataManager().getLargestCompletedWindow()) { replay(windowId); } } @@ -107,11 +107,11 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor if (!skipOffsetRecovery) { // Begin offset for this window is recovery offset stored for the last // window - RecoveryState recoveryStateForLastWindow = (RecoveryState) getWindowDataManager().load(context.getId(), windowId - 1); + RecoveryState recoveryStateForLastWindow = (RecoveryState) getWindowDataManager().retrieve(windowId - 1); recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow; } skipOffsetRecovery = false; - RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getWindowDataManager().load(context.getId(), windowId); + RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getWindowDataManager().retrieve(windowId); recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow; if (recoveryState.scanOffsetAtBeginWindow != null) { scanOffset = recoveryState.scanOffsetAtBeginWindow; @@ -183,9 +183,9 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor recoveryState.scanOffsetAtBeginWindow = scanOffset; recoveryState.numberOfScanCallsInWindow = scanCallsInCurrentWindow; - if (currentWindowId > getWindowDataManager().getLargestRecoveryWindow()) { + if (currentWindowId > getWindowDataManager().getLargestCompletedWindow()) { try { - getWindowDataManager().save(recoveryState, context.getId(), currentWindowId); + getWindowDataManager().save(recoveryState, currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } @@ -233,7 +233,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor public void committed(long windowId) { try { - getWindowDataManager().deleteUpTo(context.getId(), windowId); + getWindowDataManager().committed(windowId); } catch (IOException e) { throw new RuntimeException("committing", e); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java index e4a4dec..2b4b0f5 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java @@ -305,7 +305,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase operator.deactivate(); operator = createAndDeployOperator(true); - Assert.assertEquals("largest recovery window", 2, operator.getWindowDataManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 2, operator.getWindowDataManager().getLargestCompletedWindow()); operator.beginWindow(1); operator.emitTuples(); @@ -390,7 +390,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase if (isIdempotency) { FSWindowDataManager storageManager = new FSWindowDataManager(); - storageManager.setRecoveryPath(testMeta.recoveryDir); + storageManager.setStatePath(testMeta.recoveryDir); testMeta.operator.setWindowDataManager(storageManager); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java index e8eff5d..a79d03a 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java @@ -232,7 +232,7 @@ public class KinesisInputOperatorTest extends KinesisOperatorTestBase testMeta.operator.setup(testMeta.context); testMeta.operator.activate(testMeta.context); - Assert.assertEquals("largest recovery window", 1, testMeta.operator.getWindowDataManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 1, testMeta.operator.getWindowDataManager().getLargestCompletedWindow()); testMeta.operator.beginWindow(1); testMeta.operator.endWindow(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java index dc56e1c..a1a26ab 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java @@ -34,6 +34,7 @@ import org.junit.Before; import org.junit.Test; import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.util.file.FileUtils; @@ -45,7 +46,6 @@ import com.datatorrent.contrib.nifi.mock.MockDataPacket; import com.datatorrent.contrib.nifi.mock.MockSiteToSiteClient; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.CollectorTestSink; -import org.apache.apex.malhar.lib.wal.WindowDataManager; public class NiFiSinglePortInputOperatorTest { @@ -115,7 +115,7 @@ public class NiFiSinglePortInputOperatorTest windowDataManager.setup(context); // verify that all the data packets were saved for window #1 - List<StandardNiFiDataPacket> windowData = (List<StandardNiFiDataPacket>) windowDataManager.load(context.getId(), 1); + List<StandardNiFiDataPacket> windowData = (List<StandardNiFiDataPacket>)windowDataManager.retrieve(1); Assert.assertNotNull("Should have recovered data", windowData); Assert.assertEquals("Size of recovered data should equal size of mock data packets", dataPackets.size(), windowData.size()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java index 4fccffa..ebe4a90 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java @@ -221,7 +221,7 @@ public class RabbitMQInputOperatorTest operator.setup(context); operator.activate(context); - Assert.assertEquals("largest recovery window", 1, operator.getWindowDataManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 1, operator.getWindowDataManager().getLargestCompletedWindow()); operator.beginWindow(1); operator.endWindow(); Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size()); @@ -229,7 +229,7 @@ public class RabbitMQInputOperatorTest operator.deactivate(); operator.teardown(); - operator.getWindowDataManager().deleteUpTo(context.getId(), 1); + operator.getWindowDataManager().committed(1); publisher.teardown(); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java index bee170d..010c534 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java @@ -169,7 +169,7 @@ public class RedisInputOperatorTest operator.outputPort.setSink(sink); operator.setup(context); - Assert.assertEquals("largest recovery window", 2, operator.getWindowDataManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 2, operator.getWindowDataManager().getLargestCompletedWindow()); operator.beginWindow(1); operator.emitTuples(); @@ -189,7 +189,7 @@ public class RedisInputOperatorTest testStore.remove(entry.getKey()); } sink.collectedTuples.clear(); - operator.getWindowDataManager().deleteUpTo(context.getId(), 5); + operator.getWindowDataManager().committed(5); operator.teardown(); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index b05d877..4cf2888 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -223,7 +223,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera } if (isIdempotent()) { try { - windowDataManager.deleteUpTo(operatorId, windowId); + windowDataManager.committed(windowId); } catch (IOException e) { DTThrowable.rethrow(e); } @@ -259,7 +259,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera emitCount = 0; currentWindowId = wid; windowStartOffset.clear(); - if (isIdempotent() && wid <= windowDataManager.getLargestRecoveryWindow()) { + if (isIdempotent() && wid <= windowDataManager.getLargestCompletedWindow()) { replay(wid); } else { consumerWrapper.afterReplay(); @@ -269,8 +269,9 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera private void replay(long windowId) { try { + @SuppressWarnings("unchecked") Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData = - (Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>>)windowDataManager.load(operatorId, windowId); + (Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>>)windowDataManager.retrieve(windowId); consumerWrapper.emitImmediately(windowData); } catch (IOException e) { DTThrowable.rethrow(e); @@ -294,7 +295,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : windowStartOffset.entrySet()) { windowData.put(e.getKey(), new MutablePair<>(e.getValue(), offsetTrack.get(e.getKey()) - e.getValue())); } - windowDataManager.save(windowData, operatorId, currentWindowId); + windowDataManager.save(windowData, currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java index 09ae1cb..29b2584 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java @@ -133,7 +133,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu { this.windowId = windowId; - if (windowId == windowDataManager.getLargestRecoveryWindow()) { + if (windowId == windowDataManager.getLargestCompletedWindow()) { rebuildPartialWindow(); } } @@ -147,7 +147,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu public void committed(long windowId) { try { - windowDataManager.deleteUpTo(operatorId, windowId); + windowDataManager.committed(windowId); } catch (IOException e) { throw new RuntimeException(e); } @@ -168,7 +168,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu @Override public void endWindow() { - if (!partialWindowTuples.isEmpty() && windowId > windowDataManager.getLargestRecoveryWindow()) { + if (!partialWindowTuples.isEmpty() && windowId > windowDataManager.getLargestCompletedWindow()) { throw new RuntimeException("Violates Exactly once. Not all the tuples received after operator reset."); } @@ -176,7 +176,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu getProducer().flush(); try { - this.windowDataManager.save(getPartitionsAndOffsets(true), operatorId, windowId); + this.windowDataManager.save(getPartitionsAndOffsets(true), windowId); } catch (IOException | InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -209,7 +209,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu private boolean alreadyInKafka(T message) { - if ( windowId <= windowDataManager.getLargestRecoveryWindow() ) { + if ( windowId <= windowDataManager.getLargestCompletedWindow() ) { return true; } @@ -265,20 +265,20 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu private void rebuildPartialWindow() { - logger.info("Rebuild the partial window after " + windowDataManager.getLargestRecoveryWindow()); + logger.info("Rebuild the partial window after " + windowDataManager.getLargestCompletedWindow()); Map<Integer,Long> storedOffsets; Map<Integer,Long> currentOffsets; try { - storedOffsets = (Map<Integer,Long>)this.windowDataManager.load(operatorId, windowId); + storedOffsets = (Map<Integer,Long>)this.windowDataManager.retrieve(windowId); currentOffsets = getPartitionsAndOffsets(true); } catch (IOException | ExecutionException | InterruptedException e) { throw new RuntimeException(e); } if (currentOffsets == null) { - logger.debug("No tuples found while building partial window " + windowDataManager.getLargestRecoveryWindow()); + logger.debug("No tuples found while building partial window " + windowDataManager.getLargestCompletedWindow()); return; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java index db27be0..5d0e59a 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java @@ -242,7 +242,7 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase WindowDataManager windowDataManager = new FSWindowDataManager(); windowDataManager.setup(operatorContext); try { - windowDataManager.deleteUpTo(operatorContext.getId(),windowDataManager.getLargestRecoveryWindow()); + windowDataManager.committed(windowDataManager.getLargestCompletedWindow()); } catch (IOException e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java index 234e28c..edb3aaa 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java @@ -161,7 +161,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu public void activate(OperatorContext context) { initializePreparedStatement(); - long largestRecoveryWindow = windowManager.getLargestRecoveryWindow(); + long largestRecoveryWindow = windowManager.getLargestCompletedWindow(); if (largestRecoveryWindow == Stateless.WINDOW_ID || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) { scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS); @@ -191,7 +191,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu public void beginWindow(long windowId) { currentWindowId = windowId; - if (currentWindowId <= windowManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowManager.getLargestCompletedWindow()) { try { replay(currentWindowId); return; @@ -228,7 +228,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu @Override public void emitTuples() { - if (currentWindowId <= windowManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowManager.getLargestCompletedWindow()) { return; } int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize; @@ -247,9 +247,9 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu public void endWindow() { try { - if (currentWindowId > windowManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowManager.getLargestCompletedWindow()) { currentWindowRecoveryState = new MutablePair<>(lowerBound, lastEmittedRow); - windowManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowManager.save(currentWindowRecoveryState, currentWindowId); } } catch (IOException e) { throw new RuntimeException("saving recovery", e); @@ -301,8 +301,8 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu { try { - MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.load(operatorId, - windowId); + @SuppressWarnings("unchecked") + MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.retrieve(windowId); if (recoveredData != null && shouldReplayWindow(recoveredData)) { LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left, @@ -317,7 +317,7 @@ public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInpu } - if (currentWindowId == windowManager.getLargestRecoveryWindow()) { + if (currentWindowId == windowManager.getLargestCompletedWindow()) { try { if (!isPollerPartition && rangeQueryPair.getValue() != null) { ps = store.getConnection().prepareStatement( http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index a3de5f3..f0e3fbb 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -457,7 +457,7 @@ public abstract class AbstractFileInputOperator<T> fileCounters.setCounter(FileCounters.PENDING_FILES, pendingFileCount); windowDataManager.setup(context); - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { + if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) { //reset current file and offset in case of replay currentFile = null; offset = 0; @@ -519,7 +519,7 @@ public abstract class AbstractFileInputOperator<T> public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= windowDataManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestCompletedWindow()) { replay(windowId); } } @@ -527,9 +527,9 @@ public abstract class AbstractFileInputOperator<T> @Override public void endWindow() { - if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestCompletedWindow()) { try { - windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); } @@ -553,7 +553,7 @@ public abstract class AbstractFileInputOperator<T> //all the recovery data for a window and then processes only those files which would be hashed //to it in the current run. try { - Map<Integer, Object> recoveryDataPerOperator = windowDataManager.load(windowId); + Map<Integer, Object> recoveryDataPerOperator = windowDataManager.retrieveAllPartitions(windowId); for (Object recovery : recoveryDataPerOperator.values()) { @SuppressWarnings("unchecked") @@ -615,7 +615,7 @@ public abstract class AbstractFileInputOperator<T> @Override public void emitTuples() { - if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { return; } @@ -836,7 +836,7 @@ public abstract class AbstractFileInputOperator<T> List<DirectoryScanner> scanners = scanner.partition(totalCount, oldscanners); Collection<Partition<AbstractFileInputOperator<T>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount); - Collection<WindowDataManager> newManagers = Lists.newArrayListWithExpectedSize(totalCount); + List<WindowDataManager> newManagers = windowDataManager.partition(totalCount, deletedOperators); KryoCloneUtils<AbstractFileInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this); for (int i = 0; i < scanners.size(); i++) { @@ -888,11 +888,10 @@ public abstract class AbstractFileInputOperator<T> pendingFilesIterator.remove(); } } + oper.setWindowDataManager(newManagers.get(i)); newPartitions.add(new DefaultPartition<AbstractFileInputOperator<T>>(oper)); - newManagers.add(oper.windowDataManager); } - windowDataManager.partitioned(newManagers, deletedOperators); LOG.info("definePartitions called returning {} partitions", newPartitions.size()); return newPartitions; } @@ -917,7 +916,7 @@ public abstract class AbstractFileInputOperator<T> public void committed(long windowId) { try { - windowDataManager.deleteUpTo(operatorId, windowId); + windowDataManager.committed(windowId); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java index cd79a2b..4bb53e5 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java @@ -145,7 +145,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener blockSize = fs.getDefaultBlockSize(new Path(scanner.files.iterator().next())); } - if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { + if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) { blockMetadataIterator = null; } else { //don't setup scanner while recovery @@ -175,7 +175,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener { blockCount = 0; currentWindowId = windowId; - if (windowId <= windowDataManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestCompletedWindow()) { replay(windowId); } } @@ -184,7 +184,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener { try { @SuppressWarnings("unchecked") - LinkedList<FileInfo> recoveredData = (LinkedList<FileInfo>)windowDataManager.load(operatorId, windowId); + LinkedList<FileInfo> recoveredData = (LinkedList<FileInfo>)windowDataManager.retrieve(windowId); if (recoveredData == null) { //This could happen when there are multiple physical instances and one of them is ahead in processing windows. return; @@ -209,7 +209,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener } } - if (windowId == windowDataManager.getLargestRecoveryWindow()) { + if (windowId == windowDataManager.getLargestCompletedWindow()) { scanner.setup(context); } } catch (IOException e) { @@ -220,7 +220,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener @Override public void emitTuples() { - if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { return; } @@ -259,9 +259,9 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener @Override public void endWindow() { - if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestCompletedWindow()) { try { - windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); } @@ -388,7 +388,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener public void committed(long l) { try { - windowDataManager.deleteUpTo(operatorId, l); + windowDataManager.committed(l); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java index b8b21cb..3763ef0 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.URI; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.Map; import java.util.Set; @@ -110,7 +111,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper windowDataManager.setup(context); super.setup(context); - long largestRecoveryWindow = windowDataManager.getLargestRecoveryWindow(); + long largestRecoveryWindow = windowDataManager.getLargestCompletedWindow(); if (largestRecoveryWindow == Stateless.WINDOW_ID || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) { scanner.startScanning(Collections.unmodifiableMap(referenceTimes)); @@ -121,7 +122,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper public void beginWindow(long windowId) { super.beginWindow(windowId); - if (windowId <= windowDataManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestCompletedWindow()) { replay(windowId); } } @@ -130,7 +131,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper { try { @SuppressWarnings("unchecked") - LinkedList<ScannedFileInfo> recoveredData = (LinkedList<ScannedFileInfo>)windowDataManager.load(operatorId, windowId); + LinkedList<ScannedFileInfo> recoveredData = (LinkedList<ScannedFileInfo>)windowDataManager.retrieve(windowId); if (recoveredData == null) { //This could happen when there are multiple physical instances and one of them is ahead in processing windows. return; @@ -151,7 +152,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper } catch (IOException e) { throw new RuntimeException("replay", e); } - if (windowId == windowDataManager.getLargestRecoveryWindow()) { + if (windowId == windowDataManager.getLargestCompletedWindow()) { scanner.startScanning(Collections.unmodifiableMap(referenceTimes)); } } @@ -159,7 +160,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper @Override public void emitTuples() { - if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { return; } @@ -206,9 +207,9 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper @Override public void endWindow() { - if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestCompletedWindow()) { try { - windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); } @@ -237,7 +238,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper public void committed(long l) { try { - windowDataManager.deleteUpTo(operatorId, l); + windowDataManager.committed(l); } catch (IOException e) { throw new RuntimeException(e); } @@ -274,16 +275,16 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper private static long DEF_SCAN_INTERVAL_MILLIS = 5000; private static String FILE_BEING_COPIED = "_COPYING_"; - private boolean recursive; + private boolean recursive = true; private transient volatile boolean trigger; @NotNull @Size(min = 1) - private final Set<String> files; + private final Set<String> files = new LinkedHashSet<>(); @Min(0) - private long scanIntervalMillis; + private long scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS; private String filePatternRegularExp; private String ignoreFilePatternRegularExp; @@ -306,19 +307,9 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper private transient ScannedFileInfo lastScannedInfo; private transient int numDiscoveredPerIteration; - public TimeBasedDirectoryScanner() - { - recursive = true; - scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS; - files = Sets.newLinkedHashSet(); - } - @Override public void setup(Context.OperatorContext context) { - if (scanService != null) { - throw new RuntimeException("multiple calls to setup() detected!"); - } scanService = Executors.newSingleThreadExecutor(); discoveredFiles = new LinkedBlockingDeque<>(); atomicThrowable = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java index bf0fe5c..2b8b58d 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java @@ -19,7 +19,6 @@ package com.datatorrent.lib.io.jms; import java.io.IOException; -import java.util.Arrays; import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; @@ -43,6 +42,7 @@ import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang.mutable.MutableLong; +import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -54,7 +54,6 @@ import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.lib.counters.BasicCounters; -import com.datatorrent.netlet.util.DTThrowable; /** * This is the base implementation of a JMS input operator.<br/> @@ -107,7 +106,6 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase @NotNull protected WindowDataManager windowDataManager; - private transient long[] operatorRecoveredWindows; protected transient long currentWindowId; protected transient int emitCount; @@ -202,14 +200,6 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase counters.setCounter(CounterKeys.RECEIVED, new MutableLong()); counters.setCounter(CounterKeys.REDELIVERED, new MutableLong()); windowDataManager.setup(context); - try { - operatorRecoveredWindows = windowDataManager.getWindowIds(context.getId()); - if (operatorRecoveredWindows != null) { - Arrays.sort(operatorRecoveredWindows); - } - } catch (IOException e) { - throw new RuntimeException("fetching windows", e); - } } /** @@ -262,7 +252,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= windowDataManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestCompletedWindow()) { replay(windowId); } } @@ -271,11 +261,17 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase { try { @SuppressWarnings("unchecked") - Map<String, T> recoveredData = (Map<String, T>)windowDataManager.load(context.getId(), windowId); + Map<String, T> recoveredData = (Map<String, T>)windowDataManager.retrieve(windowId); if (recoveredData == null) { return; } for (Map.Entry<String, T> recoveredEntry : recoveredData.entrySet()) { + /* + It is important to add the recovered message ids to the pendingAck set because there is no guarantee + that acknowledgement completed after state was persisted by windowDataManager. In that case, the messages are + re-delivered by the message bus. Therefore, we compare each message against this set and ignore re-delivered + messages. + */ pendingAck.add(recoveredEntry.getKey()); emit(recoveredEntry.getValue()); } @@ -287,7 +283,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase @Override public void emitTuples() { - if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { return; } @@ -329,7 +325,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase throw new RuntimeException(ie); } } else { - DTThrowable.rethrow(lthrowable); + Throwables.propagate(lthrowable); } } @@ -347,10 +343,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase @Override public void endWindow() { - if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestCompletedWindow()) { synchronized (lock) { - boolean stateSaved = false; - boolean ackCompleted = false; try { //No more messages can be consumed now. so we will call emit tuples once more //so that any pending messages can be emitted. @@ -360,33 +354,25 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase emitCount++; lastMsg = msg; } - windowDataManager.save(currentWindowRecoveryState, context.getId(), currentWindowId); - stateSaved = true; - + windowDataManager.save(currentWindowRecoveryState, currentWindowId); currentWindowRecoveryState.clear(); if (lastMsg != null) { acknowledge(); } - ackCompleted = true; pendingAck.clear(); } catch (Throwable t) { - if (!ackCompleted) { - LOG.info("confirm recovery of {} for {} does not exist", context.getId(), currentWindowId, t); - } - DTThrowable.rethrow(t); - } finally { - if (stateSaved && !ackCompleted) { - try { - windowDataManager.delete(context.getId(), currentWindowId); - } catch (IOException e) { - LOG.error("unable to delete corrupted state", e); - } - } + /* + When acknowledgement fails after state is persisted by windowDataManager, then this window is considered + as completed by the operator instance after recovery. However, since the acknowledgement failed, the + messages will be re-sent by the message bus. In order to address that, while re-playing, we add the messages + to the pendingAck set. When these messages are re-delivered, we compare it against this set and ignore them + if there id is already in the set. + */ + Throwables.propagate(t); } } emitCount = 0; //reset emit count - } else if (operatorRecoveredWindows != null && - currentWindowId < operatorRecoveredWindows[operatorRecoveredWindows.length - 1]) { + } else if (currentWindowId < windowDataManager.getLargestCompletedWindow()) { //pendingAck is not cleared for the last replayed window of this operator. This is because there is //still a chance that in the previous run the operator crashed after saving the state but before acknowledgement. pendingAck.clear(); @@ -417,7 +403,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase public void committed(long windowId) { try { - windowDataManager.deleteUpTo(context.getId(), windowId); + windowDataManager.committed(windowId); } catch (IOException e) { throw new RuntimeException("committing", e); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java index dbf903d..d0652ef 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java @@ -145,8 +145,8 @@ public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends Abstrac super.setup(context); ((FileAccessFSImpl)stream1Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) + Path.SEPARATOR + stream1State); ((FileAccessFSImpl)stream2Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) + Path.SEPARATOR + stream2State); - stream1Store.getCheckpointManager().setRecoveryPath("managed_state_" + stream1State); - stream1Store.getCheckpointManager().setRecoveryPath("managed_state_" + stream2State); + stream1Store.getCheckpointManager().setStatePath("managed_state_" + stream1State); + stream1Store.getCheckpointManager().setStatePath("managed_state_" + stream2State); stream1Store.setup(context); stream2Store.setup(context); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java index 25b3f8b..1e378ec 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java @@ -202,28 +202,23 @@ public abstract class AbstractManagedStateImpl long activationWindow = context.getValue(OperatorContext.ACTIVATION_WINDOW_ID); if (activationWindow != Stateless.WINDOW_ID) { - //delete all the wal files with windows > activationWindow. //All the wal files with windows <= activationWindow are loaded and kept separately as recovered data. try { - long[] recoveredWindows = checkpointManager.getWindowIds(operatorContext.getId()); - if (recoveredWindows != null) { - for (long recoveredWindow : recoveredWindows) { - if (recoveredWindow <= activationWindow) { - @SuppressWarnings("unchecked") - Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = (Map<Long, Map<Slice, Bucket.BucketedValue>>) - checkpointManager.load(operatorContext.getId(), recoveredWindow); - if (recoveredData != null && !recoveredData.isEmpty()) { - for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : recoveredData.entrySet()) { - int bucketIdx = prepareBucket(entry.getKey()); - buckets[bucketIdx].recoveredData(recoveredWindow, entry.getValue()); - } - } - checkpointManager.save(recoveredData, operatorContext.getId(), recoveredWindow, - true /*skipWritingToWindowFile*/); - } else { - checkpointManager.delete(operatorContext.getId(), recoveredWindow); + + Map<Long, Object> statePerWindow = checkpointManager.retrieveAllWindows(); + for (Map.Entry<Long, Object> stateEntry : statePerWindow.entrySet()) { + Preconditions.checkArgument(stateEntry.getKey() <= activationWindow, + stateEntry.getKey() + " greater than " + activationWindow); + @SuppressWarnings("unchecked") + Map<Long, Map<Slice, Bucket.BucketedValue>> state = (Map<Long, Map<Slice, Bucket.BucketedValue>>) + stateEntry.getValue(); + if (state != null && !state.isEmpty()) { + for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> bucketEntry : state.entrySet()) { + int bucketIdx = prepareBucket(bucketEntry.getKey()); + buckets[bucketIdx].recoveredData(stateEntry.getKey(), bucketEntry.getValue()); } } + checkpointManager.save(state, stateEntry.getKey(), true /*skipWritingToWindowFile*/); } } catch (IOException e) { throw new RuntimeException("recovering", e); @@ -354,7 +349,7 @@ public abstract class AbstractManagedStateImpl } if (!flashData.isEmpty()) { try { - checkpointManager.save(flashData, operatorContext.getId(), windowId, false); + checkpointManager.save(flashData, windowId, false); } catch (IOException e) { throw new RuntimeException(e); } @@ -379,8 +374,8 @@ public abstract class AbstractManagedStateImpl } } } - checkpointManager.committed(operatorContext.getId(), windowId); - } catch (IOException | InterruptedException e) { + checkpointManager.committed(windowId); + } catch (IOException e) { throw new RuntimeException("committing " + windowId, e); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java index 4852b50..237f4b9 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java @@ -19,6 +19,7 @@ package org.apache.apex.malhar.lib.state.managed; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; @@ -33,10 +34,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.FileSystemWAL; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Queues; +import com.google.common.primitives.Longs; import com.datatorrent.api.Context; import com.datatorrent.api.annotation.Stateless; @@ -78,7 +81,8 @@ public class IncrementalCheckpointManager extends FSWindowDataManager public IncrementalCheckpointManager() { super(); - setRecoveryPath(WAL_RELATIVE_PATH); + setStatePath(WAL_RELATIVE_PATH); + setRelyOnCheckpoints(true); } @Override @@ -132,7 +136,7 @@ public class IncrementalCheckpointManager extends FSWindowDataManager managedStateContext.getBucketsFileSystem().writeBucketData(windowId, singleBucket.getKey(), singleBucket.getValue()); } - storageAgent.delete(managedStateContext.getOperatorContext().getId(), windowId); + committed(windowId); } catch (Throwable t) { throwable.set(t); LOG.debug("transfer window {}", windowId, t); @@ -150,7 +154,7 @@ public class IncrementalCheckpointManager extends FSWindowDataManager } @Override - public void save(Object object, int operatorId, long windowId) throws IOException + public void save(Object object, long windowId) throws IOException { throw new UnsupportedOperationException("doesn't support saving any object"); } @@ -159,15 +163,13 @@ public class IncrementalCheckpointManager extends FSWindowDataManager * The unsaved state combines data received in multiple windows. This window data manager persists this data * on disk by the window id in which it was requested. * @param unsavedData un-saved data of all buckets. - * @param operatorId operator id. * @param windowId window id. * @param skipWriteToWindowFile flag that enables/disables saving the window file. * * @throws IOException */ - public void save(Map<Long, Map<Slice, Bucket.BucketedValue>> unsavedData, int operatorId, long windowId, - boolean skipWriteToWindowFile) - throws IOException + public void save(Map<Long, Map<Slice, Bucket.BucketedValue>> unsavedData, long windowId, + boolean skipWriteToWindowFile) throws IOException { Throwable lthrowable; if ((lthrowable = throwable.get()) != null) { @@ -177,25 +179,46 @@ public class IncrementalCheckpointManager extends FSWindowDataManager savedWindows.put(windowId, unsavedData); if (!skipWriteToWindowFile) { - super.save(unsavedData, operatorId, windowId); + super.save(unsavedData, windowId); } } /** + * Retrieves artifacts available for all the windows saved by the enclosing partitions. + * @return artifact saved per window. + * @throws IOException + */ + public Map<Long, Object> retrieveAllWindows() throws IOException + { + Map<Long, Object> artifactPerWindow = new HashMap<>(); + FileSystemWAL.FileSystemWALReader reader = getWal().getReader(); + reader.seek(getWal().getWalStartPointer()); + + Slice windowSlice = readNext(reader); + while (reader.getCurrentPointer().compareTo(getWal().getWalEndPointerAfterRecovery()) < 0 && windowSlice != null) { + long window = Longs.fromByteArray(windowSlice.toByteArray()); + Object data = fromSlice(readNext(reader)); + artifactPerWindow.put(window, data); + windowSlice = readNext(reader); //null or next window + } + reader.seek(getWal().getWalStartPointer()); + return artifactPerWindow; + } + + /** * Transfers the data which has been committed till windowId to data files. * - * @param operatorId operator id - * @param windowId window id + * @param committedWindowId window id */ - @SuppressWarnings("UnusedParameters") - protected void committed(int operatorId, long windowId) throws IOException, InterruptedException + @Override + public void committed(long committedWindowId) throws IOException { - LOG.debug("data manager committed {}", windowId); + LOG.debug("data manager committed {}", committedWindowId); for (Long currentWindow : savedWindows.keySet()) { if (currentWindow <= largestWindowAddedToTransferQueue) { continue; } - if (currentWindow <= windowId) { + if (currentWindow <= committedWindowId) { LOG.debug("to transfer {}", currentWindow); largestWindowAddedToTransferQueue = currentWindow; windowsToTransfer.add(currentWindow);
