Repository: apex-malhar Updated Branches: refs/heads/master 2cf8bade8 -> 0f8442472
APEXMALHAR-2017 Fixed failing tests. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0f844247 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0f844247 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0f844247 Branch: refs/heads/master Commit: 0f84424720268e1eebb06992bcda00d2e7d591d4 Parents: 28a7e34 Author: Lakshmi Prasanna Velineni <[email protected]> Authored: Mon Oct 3 09:01:30 2016 -0700 Committer: Lakshmi Prasanna Velineni <[email protected]> Committed: Wed Oct 19 22:28:34 2016 -0700 ---------------------------------------------------------------------- .../datatorrent/contrib/hive/HiveMockTest.java | 16 ++++++++++++- .../lib/io/fs/AbstractFileInputOperator.java | 3 +-- .../lib/io/fs/AbstractFileOutputOperator.java | 6 +++-- .../lib/io/fs/AbstractReconciler.java | 2 +- .../io/fs/AbstractFileOutputOperatorTest.java | 25 ++++++++++---------- .../AbstractWindowFileOutputOperatorTest.java | 5 ++-- .../lib/io/fs/FSInputModuleAppTest.java | 1 + .../lib/fs/GenericFileOutputOperatorTest.java | 6 +++++ 8 files changed, 43 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java ---------------------------------------------------------------------- diff --git a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java index 3c64bdf..4ec92c9 100755 --- a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java +++ b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java @@ -50,7 +50,6 @@ import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator.FilePartitio import com.datatorrent.contrib.hive.FSPojoToHiveOperator.FIELD_TYPE; import com.datatorrent.lib.helper.OperatorContextTestHelper; - public class HiveMockTest extends HiveTestService { public static final String APP_ID = "HiveOperatorTest"; @@ -250,6 +249,11 @@ public class HiveMockTest extends HiveTestService } fsRolling.endWindow(); + + if (wid == 6) { + fsRolling.beforeCheckpoint(wid); + fsRolling.checkpointed(wid); + } } fsRolling.teardown(); @@ -353,6 +357,11 @@ public class HiveMockTest extends HiveTestService } fsRolling.endWindow(); + + if (wid == 6) { + fsRolling.beforeCheckpoint(wid); + fsRolling.checkpointed(wid); + } } fsRolling.teardown(); @@ -521,6 +530,11 @@ public class HiveMockTest extends HiveTestService fsRolling.endWindow(); + if ((wid == 6) || (wid == 9)) { + fsRolling.beforeCheckpoint(wid); + fsRolling.checkpointed(wid); + } + if (wid == 9) { Kryo kryo = new Kryo(); FieldSerializer<HiveOperator> f1 = (FieldSerializer<HiveOperator>)kryo.getSerializer(HiveOperator.class); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index 14cabfc..d4ee03b 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -37,7 +37,6 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.validation.OverridesAttribute; import javax.validation.constraints.NotNull; import org.slf4j.Logger; @@ -99,7 +98,7 @@ import com.datatorrent.lib.util.KryoCloneUtils; */ @org.apache.hadoop.classification.InterfaceStability.Evolving public abstract class AbstractFileInputOperator<T> implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, - Operator.CheckpointListener, Operator.CheckpointNotificationListener + Operator.CheckpointListener, Operator.CheckpointNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index f703a19..27a56cd 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -262,7 +262,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp private Long expireStreamAfterAccessMillis; private final Set<String> filesWithOpenStreams; - private boolean initializeContext; + private transient boolean initializeContext; /** * This input port receives incoming tuples. @@ -968,6 +968,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp @Override public void beginWindow(long windowId) { + // All the filter state needs to be flushed to the disk. Not all filters allow a flush option, so the filters have + // to be closed and reopened. If no filter being is being used then it is a essentially a noop as the underlying + // FSDataOutputStream is not being closed in this operation. if (initializeContext) { try { Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap(); @@ -1240,7 +1243,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp long start = System.currentTimeMillis(); streamContext.finalizeContext(); totalWritingTime += System.currentTimeMillis() - start; - //streamContext.resetFilter(); // Re-initialize context when next window starts after checkpoint initializeContext = true; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java index c12becd..0d67c31 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java @@ -31,8 +31,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.Operator.IdleTimeHandler; import com.datatorrent.api.Operator.CheckpointNotificationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.common.util.NameableThreadFactory; import com.datatorrent.netlet.util.DTThrowable; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index 5b58121..8f0fbb0 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -66,6 +66,7 @@ import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.LocalMode; import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.RandomWordGenerator; import com.datatorrent.lib.util.TestUtils; @@ -245,7 +246,7 @@ public class AbstractFileOutputOperatorTest */ public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer, long windowId) { - if (windowId >= 0) { + if (windowId >= Stateless.WINDOW_ID) { writer.beforeCheckpoint(windowId); } Kryo kryo = new Kryo(); @@ -421,7 +422,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.put(2); @@ -677,7 +678,7 @@ public class AbstractFileOutputOperatorTest writer.requestFinalize(EVEN_FILE); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.put(4); @@ -821,7 +822,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(2); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.put(3); @@ -868,8 +869,8 @@ public class AbstractFileOutputOperatorTest writer.input.put(4); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); - AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, -1); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); + AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, Stateless.WINDOW_ID); LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets); @@ -1134,7 +1135,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.put(2); @@ -1231,7 +1232,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.put(2); @@ -1278,7 +1279,7 @@ public class AbstractFileOutputOperatorTest writer.input.process(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.process(2); @@ -1369,7 +1370,7 @@ public class AbstractFileOutputOperatorTest writer.input.put(3); writer.input.put(4); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, -1); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.input.put(3); writer.input.put(4); @@ -1766,13 +1767,11 @@ public class AbstractFileOutputOperatorTest for (int j = 0; j < 1000; ++j) { writer.input.put(i); } - //writer.endWindow(); + writer.endWindow(); if ((i % 2) == 1) { writer.beforeCheckpoint(i); evenOffsets.add(evenCounterContext.getCounter()); oddOffsets.add(oddCounterContext.getCounter()); - //evenOffsets.add(evenFile.length()); - //oddOffsets.add(oddFile.length()); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java index bcb1bc3..daebecb 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java @@ -22,6 +22,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.Description; +import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; import com.datatorrent.lib.util.TestUtils.TestInfo; @@ -70,7 +71,7 @@ public class AbstractWindowFileOutputOperatorTest oper.input.process("window 0"); oper.endWindow(); - AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, 0); + AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, Stateless.WINDOW_ID); oper.beginWindow(1); oper.input.process("window 1"); @@ -110,7 +111,7 @@ public class AbstractWindowFileOutputOperatorTest oper.beginWindow(1); oper.input.process("1"); - AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, -1); + AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, Stateless.WINDOW_ID); oper.input.process("1"); oper.teardown(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java index 4213a00..4042d3c 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java @@ -99,6 +99,7 @@ public class FSInputModuleAppTest conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10"); conf.set("dt.operator.hdfsInputModule.prop.blocksThreshold", "4"); conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000"); + conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT","10"); LocalMode lma = LocalMode.newInstance(); lma.prepareDAG(app, conf); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0f844247/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java index 6082f57..8b8ed01 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java @@ -62,6 +62,7 @@ public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTes } writer.endWindow(); } + checkpoint(writer, 10); writer.committed(10); for (int i = 13; i <= 26; i++) { @@ -71,7 +72,10 @@ public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTes } writer.endWindow(); } + checkpoint(writer, 20); writer.committed(20); + + checkpoint(writer, 26); writer.committed(26); String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", "13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n", @@ -108,8 +112,10 @@ public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTes } writer.endWindow(); if (i % 10 == 0) { + checkpoint(writer, 10); writer.committed(10); } + checkpoint(writer, 24); } writer.committed(tuples.length);
