Using CheckpointNotificationListener and beforeCheckpoint callback to do IO in a more optimized fashion
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/28a7e347 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/28a7e347 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/28a7e347 Branch: refs/heads/master Commit: 28a7e347691dc5dbe364888c4520f5c31139bc39 Parents: 2cf8bad Author: Pramod Immaneni <[email protected]> Authored: Wed Mar 16 20:57:39 2016 -0700 Committer: Lakshmi Prasanna Velineni <[email protected]> Committed: Wed Oct 19 22:28:34 2016 -0700 ---------------------------------------------------------------------- .../hbase/AbstractHBasePutOutputOperator.java | 57 +++++++------------- .../kinesis/AbstractKinesisInputOperator.java | 8 ++- .../kinesis/AbstractKinesisOutputOperator.java | 18 ++++++- .../rabbitmq/AbstractRabbitMQInputOperator.java | 7 ++- .../redis/AbstractRedisInputOperator.java | 9 +++- .../contrib/splunk/SplunkTcpOutputOperator.java | 17 ++++-- .../hive/AbstractFSRollingOutputOperator.java | 5 +- .../lib/io/fs/AbstractFileInputOperator.java | 10 +++- .../lib/io/fs/AbstractFileOutputOperator.java | 49 ++++++++++------- .../lib/io/fs/AbstractReconciler.java | 12 +++-- .../com/datatorrent/lib/io/fs/FileSplitter.java | 7 ++- .../lib/io/fs/FileSplitterInput.java | 7 ++- .../lib/io/jms/AbstractJMSInputOperator.java | 7 ++- .../lib/join/AbstractJoinOperator.java | 7 ++- .../io/fs/AbstractFileOutputOperatorTest.java | 47 ++++++++++------ .../AbstractWindowFileOutputOperatorTest.java | 4 +- 16 files changed, 171 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java index 3973008..7f93394 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java @@ -22,6 +22,7 @@ import java.io.InterruptedIOException; import javax.validation.constraints.Min; +import com.datatorrent.api.Operator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,27 +53,21 @@ import com.datatorrent.netlet.util.DTThrowable; * The tuple type * @since 1.0.2 */ -public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOutputOperator<T, HBaseStore> { +public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOutputOperator<T, HBaseStore> implements Operator.CheckpointNotificationListener { private static final transient Logger logger = LoggerFactory.getLogger(AbstractHBasePutOutputOperator.class); - public static final int DEFAULT_BATCH_SIZE = 1000; - private int batchSize = DEFAULT_BATCH_SIZE; - protected int unCommittedSize = 0; - public AbstractHBasePutOutputOperator() { + public AbstractHBasePutOutputOperator() + { store = new HBaseStore(); } @Override - public void processTuple(T tuple) { + public void processTuple(T tuple) + { HTable table = store.getTable(); Put put = operationPut(tuple); try { table.put(put); - if( ++unCommittedSize >= batchSize ) - { - table.flushCommits(); - unCommittedSize = 0; - } } catch (RetriesExhaustedWithDetailsException e) { logger.error("Could not output tuple", e); DTThrowable.rethrow(e); @@ -80,46 +75,30 @@ public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOut logger.error("Could not output tuple", e); DTThrowable.rethrow(e); } - } @Override - public void endWindow() + public void beforeCheckpoint(long windowId) { - try - { - if( unCommittedSize > 0 ) { - store.getTable().flushCommits(); - unCommittedSize = 0; - } - } - catch (RetriesExhaustedWithDetailsException e) { - logger.error("Could not output tuple", e); - DTThrowable.rethrow(e); + try { + store.getTable().flushCommits(); } catch (InterruptedIOException e) { - logger.error("Could not output tuple", e); + DTThrowable.rethrow(e); + } catch (RetriesExhaustedWithDetailsException e) { DTThrowable.rethrow(e); } } - public abstract Put operationPut(T t); + @Override + public void checkpointed(long l) { - /** - * the batch size save flush data - */ - @Min(1) - public int getBatchSize() - { - return batchSize; } - /** - * the batch size save flush data - */ - public void setBatchSize(int batchSize) - { - this.batchSize = batchSize; - } + @Override + public void committed(long l) { + } + public abstract Put operationPut(T t); + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java index c03df21..fc10bea 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java @@ -86,7 +86,7 @@ class KinesisPair <F, S> extends Pair<F, S> * @since 2.0.0 */ @SuppressWarnings("rawtypes") -public abstract class AbstractKinesisInputOperator <T> implements InputOperator, ActivationListener<OperatorContext>, Partitioner<AbstractKinesisInputOperator>, StatsListener,Operator.CheckpointListener +public abstract class AbstractKinesisInputOperator <T> implements InputOperator, ActivationListener<OperatorContext>, Partitioner<AbstractKinesisInputOperator>, StatsListener,Operator.CheckpointNotificationListener { private static final Logger logger = LoggerFactory.getLogger(AbstractKinesisInputOperator.class); @@ -530,6 +530,12 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, public void checkpointed(long windowId) { } + + @Override + public void beforeCheckpoint(long windowId) + { + } + /** * Implement ActivationListener Interface. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java index 43fc62a..d6f9d36 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java @@ -49,7 +49,7 @@ import java.util.List; * @param <T> * @since 2.0.0 */ -public abstract class AbstractKinesisOutputOperator<V, T> implements Operator +public abstract class AbstractKinesisOutputOperator<V, T> implements Operator, Operator.CheckpointNotificationListener { private static final Logger logger = LoggerFactory.getLogger( AbstractKinesisOutputOperator.class ); protected String streamName; @@ -91,6 +91,10 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator { } + @Override + public void endWindow() { + } + /** * Implement Component Interface. */ @@ -103,7 +107,7 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator * Implement Operator Interface. */ @Override - public void endWindow() + public void beforeCheckpoint(long windowId) { if (isBatchProcessing && putRecordsRequestEntryList.size() != 0) { try { @@ -114,6 +118,16 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator } } + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + } + /** * Implement Component Interface. * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java index 08157bc..672122c 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java @@ -77,7 +77,7 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager; */ public abstract class AbstractRabbitMQInputOperator<T> implements InputOperator, Operator.ActivationListener<OperatorContext>, - Operator.CheckpointListener + Operator.CheckpointNotificationListener { private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class); @NotNull @@ -312,6 +312,11 @@ public abstract class AbstractRabbitMQInputOperator<T> implements } @Override + public void beforeCheckpoint(long windowId) + { + } + + @Override public void checkpointed(long windowId) { } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java index 0b12574..092a3c6 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java @@ -31,7 +31,7 @@ import org.apache.apex.malhar.lib.wal.WindowDataManager; import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; -import com.datatorrent.api.Operator.CheckpointListener; +import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; @@ -47,7 +47,7 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; * The tuple type. * @since 0.9.3 */ -public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements CheckpointListener +public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements CheckpointNotificationListener { protected transient List<String> keys = new ArrayList<String>(); protected transient Integer scanOffset; @@ -225,6 +225,11 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor abstract public void processTuples(); @Override + public void beforeCheckpoint(long windowId) + { + } + + @Override public void checkpointed(long windowId) { } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java index cb81375..f7e98b6 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java @@ -19,6 +19,7 @@ package com.datatorrent.contrib.splunk; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Operator; import com.datatorrent.lib.db.AbstractStoreOutputOperator; import com.splunk.TcpInput; @@ -34,7 +35,7 @@ import java.net.Socket; * @tags splunk * @since 1.0.4 */ -public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, SplunkStore> { +public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, SplunkStore> implements Operator.CheckpointNotificationListener { private String tcpPort; private transient Socket socket; @@ -75,8 +76,8 @@ public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, S } @Override - public void endWindow() { - + public void beforeCheckpoint(long windowId) + { try { stream.flush(); } catch (IOException e) { @@ -85,6 +86,16 @@ public class SplunkTcpOutputOperator<T> extends AbstractStoreOutputOperator<T, S } @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + } + + @Override public void teardown() { super.teardown(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java index 3c9c4da..37b5f2e 100755 --- a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java +++ b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java @@ -37,7 +37,7 @@ import org.apache.commons.lang3.mutable.MutableInt; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator.CheckpointListener; +import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; import com.datatorrent.netlet.util.DTThrowable; @@ -52,8 +52,7 @@ import com.datatorrent.netlet.util.DTThrowable; * * @since 2.1.0 */ -public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOutputOperator<T> - implements CheckpointListener +public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOutputOperator<T> implements CheckpointNotificationListener { private transient String outputFilePath; protected MutableInt partNumber; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index 9e80b4e..14cabfc 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.validation.OverridesAttribute; import javax.validation.constraints.NotNull; import org.slf4j.Logger; @@ -97,8 +98,8 @@ import com.datatorrent.lib.util.KryoCloneUtils; * @since 1.0.2 */ @org.apache.hadoop.classification.InterfaceStability.Evolving -public abstract class AbstractFileInputOperator<T> - implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, Operator.CheckpointListener +public abstract class AbstractFileInputOperator<T> implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, + Operator.CheckpointListener, Operator.CheckpointNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class); @@ -909,6 +910,11 @@ public abstract class AbstractFileInputOperator<T> } @Override + public void beforeCheckpoint(long windowId) + { + } + + @Override public void checkpointed(long windowId) { } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index ab8bedc..f703a19 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -110,7 +110,7 @@ import com.datatorrent.lib.counters.BasicCounters; * @since 2.0.0 */ @OperatorAnnotation(checkpointableWithinAppWindow = false) -public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator implements Operator.CheckpointListener +public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator implements Operator.CheckpointListener, Operator.CheckpointNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(AbstractFileOutputOperator.class); @@ -262,6 +262,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp private Long expireStreamAfterAccessMillis; private final Set<String> filesWithOpenStreams; + private boolean initializeContext; + /** * This input port receives incoming tuples. */ @@ -966,13 +968,16 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp @Override public void beginWindow(long windowId) { - try { - Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap(); - for (FSFilterStreamContext streamContext : openStreams.values()) { - streamContext.initializeContext(); + if (initializeContext) { + try { + Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap(); + for (FSFilterStreamContext streamContext : openStreams.values()) { + streamContext.initializeContext(); + } + } catch (IOException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - throw new RuntimeException(e); + initializeContext = false; } currentWindow = windowId; } @@ -980,18 +985,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp @Override public void endWindow() { - try { - Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap(); - for (FSFilterStreamContext streamContext: openStreams.values()) { - long start = System.currentTimeMillis(); - streamContext.finalizeContext(); - totalWritingTime += System.currentTimeMillis() - start; - //streamContext.resetFilter(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - if (rotationWindows > 0) { if (++rotationCount == rotationWindows) { rotationCount = 0; @@ -1239,6 +1232,24 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp } @Override + public void beforeCheckpoint(long l) + { + try { + Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap(); + for (FSFilterStreamContext streamContext: openStreams.values()) { + long start = System.currentTimeMillis(); + streamContext.finalizeContext(); + totalWritingTime += System.currentTimeMillis() - start; + //streamContext.resetFilter(); + // Re-initialize context when next window starts after checkpoint + initializeContext = true; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override public void checkpointed(long l) { } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java index 945c000..c12becd 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java @@ -25,17 +25,14 @@ import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.common.collect.Maps; import com.google.common.collect.Queues; - import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.Operator.CheckpointListener; import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.common.util.NameableThreadFactory; import com.datatorrent.netlet.util.DTThrowable; @@ -55,7 +52,7 @@ import com.datatorrent.netlet.util.DTThrowable; * @since 2.0.0 */ @org.apache.hadoop.classification.InterfaceStability.Evolving -public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator implements CheckpointListener, IdleTimeHandler +public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator implements CheckpointNotificationListener, IdleTimeHandler { private static final Logger logger = LoggerFactory.getLogger(AbstractReconciler.class); public transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>() @@ -125,6 +122,11 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator } @Override + public void beforeCheckpoint(long l) + { + } + + @Override public void checkpointed(long l) { } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java index 4bb53e5..b9594b3 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java @@ -79,7 +79,7 @@ import com.datatorrent.netlet.util.DTThrowable; */ @OperatorAnnotation(checkpointableWithinAppWindow = false) @Deprecated -public class FileSplitter implements InputOperator, Operator.CheckpointListener +public class FileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener { protected Long blockSize; private int sequenceNo; @@ -380,6 +380,11 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener } @Override + public void beforeCheckpoint(long l) + { + } + + @Override public void checkpointed(long l) { } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java index 2d290cd..745f953 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -76,7 +76,7 @@ import com.datatorrent.api.annotation.Stateless; * @since 2.0.0 */ @OperatorAnnotation(checkpointableWithinAppWindow = false) -public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener +public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener { @NotNull private WindowDataManager windowDataManager; @@ -219,6 +219,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper } @Override + public void beforeCheckpoint(long l) + { + } + + @Override public void checkpointed(long l) { } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java index 2b8b58d..72bf63c 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java @@ -80,7 +80,7 @@ import com.datatorrent.lib.counters.BasicCounters; @OperatorAnnotation(checkpointableWithinAppWindow = false) public abstract class AbstractJMSInputOperator<T> extends JMSBase implements InputOperator, ActivationListener<OperatorContext>, MessageListener, ExceptionListener, - Operator.IdleTimeHandler, Operator.CheckpointListener + Operator.IdleTimeHandler, Operator.CheckpointListener, Operator.CheckpointNotificationListener { protected static final int DEFAULT_BUFFER_SIZE = 10 * 1024; // 10k @@ -395,6 +395,11 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase } @Override + public void beforeCheckpoint(long windowId) + { + } + + @Override public void checkpointed(long windowId) { } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java index 89df25e..d0f722d 100644 --- a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java +++ b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java @@ -64,7 +64,7 @@ import com.datatorrent.common.util.BaseOperator; * @since 3.4.0 */ @InterfaceStability.Unstable -public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener +public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointNotificationListener { @AutoMetric private long tuplesJoinedPerSec; @@ -226,6 +226,11 @@ public abstract class AbstractJoinOperator<T> extends BaseOperator implements Op } @Override + public void beforeCheckpoint(long windowId) + { + } + + @Override public void checkpointed(long windowId) { leftStore.getStore().checkpointed(windowId); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index 03f3bf6..5b58121 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -243,8 +243,11 @@ public class AbstractFileOutputOperatorTest * @param writer The writer to checkpoint. * @return new writer. */ - public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer) + public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer, long windowId) { + if (windowId >= 0) { + writer.beforeCheckpoint(windowId); + } Kryo kryo = new Kryo(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); Output loutput = new Output(bos); @@ -418,7 +421,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(2); @@ -577,6 +580,7 @@ public class AbstractFileOutputOperatorTest writer.requestFinalize(EVEN_FILE); writer.requestFinalize(ODD_FILE); + writer.beforeCheckpoint(1); writer.committed(1); } @@ -603,6 +607,7 @@ public class AbstractFileOutputOperatorTest writer.requestFinalize(ODD_FILE); writer.requestFinalize(EVEN_FILE); + writer.beforeCheckpoint(1); writer.committed(1); } @@ -672,7 +677,7 @@ public class AbstractFileOutputOperatorTest writer.requestFinalize(EVEN_FILE); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(4); @@ -690,6 +695,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(8); writer.input.put(9); writer.endWindow(); + writer.beforeCheckpoint(2); writer.committed(2); } @@ -815,7 +821,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(2); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(3); @@ -862,8 +868,8 @@ public class AbstractFileOutputOperatorTest writer.input.put(4); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); - AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, -1); LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets); @@ -1128,7 +1134,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(2); @@ -1225,7 +1231,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(2); @@ -1272,7 +1278,7 @@ public class AbstractFileOutputOperatorTest writer.input.process(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.process(2); @@ -1363,7 +1369,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(3); writer.input.put(4); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, -1); writer.input.put(3); writer.input.put(4); @@ -1554,8 +1560,11 @@ public class AbstractFileOutputOperatorTest writer.input.put(i); } writer.endWindow(); - evenOffsets.add(evenFile.length()); - oddOffsets.add(oddFile.length()); + if ((i % 2) == 1) { + writer.beforeCheckpoint(i); + evenOffsets.add(evenFile.length()); + oddOffsets.add(oddFile.length()); + } } writer.teardown(); @@ -1580,6 +1589,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(2); writer.input.put(3); writer.endWindow(); + writer.beforeCheckpoint(0); //failure and restored writer.setup(testMeta.testOperatorContext); @@ -1756,11 +1766,14 @@ public class AbstractFileOutputOperatorTest for (int j = 0; j < 1000; ++j) { writer.input.put(i); } - writer.endWindow(); - evenOffsets.add(evenCounterContext.getCounter()); - oddOffsets.add(oddCounterContext.getCounter()); - //evenOffsets.add(evenFile.length()); - //oddOffsets.add(oddFile.length()); + //writer.endWindow(); + if ((i % 2) == 1) { + writer.beforeCheckpoint(i); + evenOffsets.add(evenCounterContext.getCounter()); + oddOffsets.add(oddCounterContext.getCounter()); + //evenOffsets.add(evenFile.length()); + //oddOffsets.add(oddFile.length()); + } } writer.teardown(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28a7e347/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java index 32c32f7..bcb1bc3 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java @@ -70,7 +70,7 @@ public class AbstractWindowFileOutputOperatorTest oper.input.process("window 0"); oper.endWindow(); - AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper); + AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, 0); oper.beginWindow(1); oper.input.process("window 1"); @@ -110,7 +110,7 @@ public class AbstractWindowFileOutputOperatorTest oper.beginWindow(1); oper.input.process("1"); - AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper); + AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, -1); oper.input.process("1"); oper.teardown();
