Repository: apex-malhar Updated Branches: refs/heads/master 0d0e448ba -> df13332c1
APEXMALHAR-2404 Provided fixes for kryo seralization & atleast once semantics for recovery. Added unit test case to verify atleast once semantics for recovery. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/df13332c Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/df13332c Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/df13332c Branch: refs/heads/master Commit: df13332c16f4ff1c2ccc434b0fdd25321fada0fb Parents: 0d0e448 Author: Lakshmi Prasanna Velineni <[email protected]> Authored: Thu May 18 15:35:09 2017 -0700 Committer: Lakshmi Prasanna Velineni <[email protected]> Committed: Tue Jun 13 11:22:22 2017 -0700 ---------------------------------------------------------------------- .../contrib/avro/AvroFileInputOperator.java | 1 + .../datatorrent/contrib/avro/AvroToPojo.java | 6 +- .../contrib/avro/AvroFileInputOperatorTest.java | 47 +++++++++ .../io/fs/AbstractFileInputOperatorTest.java | 104 +++++++++++++------ 4 files changed, 121 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/df13332c/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java index 01e99d3..f863d41 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java @@ -161,6 +161,7 @@ public class AvroFileInputOperator extends AbstractFileInputOperator<GenericReco @Override public void beginWindow(long windowId) { + super.beginWindow(windowId); errorCount = 0; recordCount = 0; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/df13332c/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java index 1951c1e..2acf98c 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java @@ -72,9 +72,9 @@ public class AvroToPojo extends BaseOperator private String genericRecordToPOJOFieldsMapping = null; - private List<FieldInfo> fieldInfos; + private transient List<FieldInfo> fieldInfos; - private List<ActiveFieldInfo> columnFieldSetters; + private transient List<ActiveFieldInfo> columnFieldSetters; @AutoMetric @VisibleForTesting @@ -87,7 +87,7 @@ public class AvroToPojo extends BaseOperator @AutoMetric @VisibleForTesting int fieldErrorCount = 0; - + public final transient DefaultOutputPort<GenericRecord> errorPort = new DefaultOutputPort<GenericRecord>(); /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/df13332c/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java index 09507e6..17b1e2c 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java @@ -53,10 +53,13 @@ import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.LocalMode; +import com.datatorrent.api.Sink; import com.datatorrent.api.StreamingApplication; import com.datatorrent.lib.helper.TestPortContext; import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.io.fs.AbstractFileInputOperatorTest; import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; @@ -149,7 +152,51 @@ public class AvroFileInputOperatorTest avroFileInput.teardown(); } + + @Test + public void testIdempotencyWithCheckPoint() throws Exception + { + AbstractFileInputOperatorTest.testIdempotencyWithCheckPoint(new AvroFileInputOperator(), new CollectorTestSink<String>(), new AbstractFileInputOperatorTest.IdempotencyTestDriver<AvroFileInputOperator>() + { + @Override + public void writeFile(int count, String fileName) throws IOException + { + recordList = Lists.newArrayList(); + + while (count > 0) { + GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA)); + rec.put("orderId", count * 1L); + rec.put("customerId", count * 2); + rec.put("total", count * 1.5); + rec.put("customerName", "*" + count + "*"); + count--; + recordList.add(rec); + } + + writeAvroFile(new File(fileName)); + } + + @Override + public void setSink(AvroFileInputOperator operator, Sink<?> sink) + { + TestUtils.setSink(operator.output, sink); + } + + @Override + public String getDirectory() + { + return testMeta.dir; + } + @Override + public OperatorContext getContext() + { + return testMeta.context; + } + }); + } + + @Test public void testMultipleFileAvroReads() throws Exception { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/df13332c/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index 8acd16a..1e69a89 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -20,6 +20,7 @@ package com.datatorrent.lib.io.fs; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -56,7 +57,9 @@ import com.google.common.collect.Sets; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator; import com.datatorrent.api.Partitioner.Partition; +import com.datatorrent.api.Sink; import com.datatorrent.api.StatsListener; import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner; import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl; @@ -907,38 +910,72 @@ public class AbstractFileInputOperatorTest @Test public void testIdempotencyWithCheckPoint() throws Exception { - FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); + testIdempotencyWithCheckPoint(new LineByLineFileInputOperator(), new CollectorTestSink<String>(), new IdempotencyTestDriver<LineByLineFileInputOperator>() + { + @Override + public void writeFile(int count, String fileName) throws IOException + { + List<String> lines = Lists.newArrayList(); + for (int line = 0; line < count; line++) { + lines.add(fileName + "l" + line); + } + FileUtils.write(new File(testMeta.dir, fileName), StringUtils.join(lines, '\n')); + } + + @Override + public void setSink(LineByLineFileInputOperator operator, Sink<?> sink) + { + TestUtils.setSink(operator.output, sink); + } + + @Override + public String getDirectory() + { + return testMeta.dir; + } + + @Override + public Context.OperatorContext getContext() + { + return testMeta.context; + } + }); + } + + public interface IdempotencyTestDriver<T extends Operator> + { + void writeFile(int count, String fileName) throws IOException; + + void setSink(T operator, Sink<?> sink); + + String getDirectory(); + + Context.OperatorContext getContext(); + } + + public static <S extends AbstractFileInputOperator, T> void testIdempotencyWithCheckPoint(S oper, CollectorTestSink<T> queryResults, IdempotencyTestDriver<S> driver) throws Exception + { + FileContext.getLocalFSFileContext().delete(new Path(new File(driver.getDirectory()).getAbsolutePath()), true); - List<String> lines = Lists.newArrayList(); int file = 0; - for (int line = 0; line < 5; line++) { - lines.add("f" + file + "l" + line); - } - FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); + driver.writeFile(5, "file" + file); file = 1; - lines = Lists.newArrayList(); - for (int line = 0; line < 6; line++) { - lines.add("f" + file + "l" + line); - } - FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); + driver.writeFile(6, "file" + file); // empty file file = 2; - lines = Lists.newArrayList(); - FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); + driver.writeFile(0, "file" + file); - - LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); FSWindowDataManager manager = new FSWindowDataManager(); - manager.setStatePath(testMeta.dir + "/recovery"); + manager.setStatePath(driver.getDirectory() + "/recovery"); oper.setWindowDataManager(manager); - oper.setDirectory(testMeta.dir); + oper.setDirectory(driver.getDirectory()); oper.getScanner().setFilePatternRegexp(".*file[\\d]"); - oper.setup(testMeta.context); + oper.setup(driver.getContext()); oper.setEmitBatchSize(3); @@ -958,41 +995,40 @@ public class AbstractFileInputOperatorTest //checkpoint the operator ByteArrayOutputStream bos = new ByteArrayOutputStream(); - LineByLineFileInputOperator checkPointOper = checkpoint(oper, bos); + S checkPointOper = checkpoint(oper, bos); // start saving output - CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); - TestUtils.setSink(oper.output, queryResults); + driver.setSink(oper, queryResults); // emit f0l3, f0l4, and closeFile(f0) in the same window oper.beginWindow(2); oper.emitTuples(); oper.endWindow(); - List<String> beforeRecovery2 = Lists.newArrayList(queryResults.collectedTuples); + List<T> beforeRecovery2 = Lists.newArrayList(queryResults.collectedTuples); // emit f1l0, f1l1, f1l2 oper.beginWindow(3); oper.emitTuples(); oper.endWindow(); - List<String> beforeRecovery3 = Lists.newArrayList(queryResults.collectedTuples); + List<T> beforeRecovery3 = Lists.newArrayList(queryResults.collectedTuples); // emit f1l3, f1l4, f1l5 oper.beginWindow(4); oper.emitTuples(); oper.endWindow(); - List<String> beforeRecovery4 = Lists.newArrayList(queryResults.collectedTuples); + List<T> beforeRecovery4 = Lists.newArrayList(queryResults.collectedTuples); // closeFile(f1) in a new window oper.beginWindow(5); oper.emitTuples(); oper.endWindow(); - List<String> beforeRecovery5 = Lists.newArrayList(queryResults.collectedTuples); + List<T> beforeRecovery5 = Lists.newArrayList(queryResults.collectedTuples); // empty file ops, closeFile(f2) in emitTuples() only oper.beginWindow(6); oper.emitTuples(); oper.endWindow(); - List<String> beforeRecovery6 = Lists.newArrayList(queryResults.collectedTuples); + List<T> beforeRecovery6 = Lists.newArrayList(queryResults.collectedTuples); oper.teardown(); @@ -1001,11 +1037,11 @@ public class AbstractFileInputOperatorTest //idempotency part oper = restoreCheckPoint(checkPointOper, bos); - testMeta.context.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L); - oper.setup(testMeta.context); - TestUtils.setSink(oper.output, queryResults); + driver.getContext().getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L); + oper.setup(driver.getContext()); + driver.setSink(oper, queryResults); - long startwid = testMeta.context.getAttributes().get(Context.OperatorContext.ACTIVATION_WINDOW_ID) + 1; + long startwid = driver.getContext().getAttributes().get(Context.OperatorContext.ACTIVATION_WINDOW_ID) + 1; oper.beginWindow(startwid); Assert.assertTrue(oper.currentFile == null); @@ -1046,7 +1082,7 @@ public class AbstractFileInputOperatorTest * @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily. * @return new operator. */ - public static LineByLineFileInputOperator checkpoint(LineByLineFileInputOperator oper, ByteArrayOutputStream bos) throws Exception + public static <T> T checkpoint(T oper, ByteArrayOutputStream bos) throws Exception { Kryo kryo = new Kryo(); @@ -1056,7 +1092,7 @@ public class AbstractFileInputOperatorTest Input lInput = new Input(bos.toByteArray()); @SuppressWarnings("unchecked") - LineByLineFileInputOperator checkPointedOper = kryo.readObject(lInput, oper.getClass()); + T checkPointedOper = kryo.readObject(lInput, (Class<T>)oper.getClass()); lInput.close(); return checkPointedOper; @@ -1068,12 +1104,12 @@ public class AbstractFileInputOperatorTest * @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily. */ @SuppressWarnings({"unchecked", "rawtypes"}) - public static LineByLineFileInputOperator restoreCheckPoint(LineByLineFileInputOperator checkPointOper, ByteArrayOutputStream bos) throws Exception + public static <T> T restoreCheckPoint(T checkPointOper, ByteArrayOutputStream bos) throws Exception { Kryo kryo = new Kryo(); Input lInput = new Input(bos.toByteArray()); - LineByLineFileInputOperator oper = kryo.readObject(lInput, checkPointOper.getClass()); + T oper = kryo.readObject(lInput, (Class<T>)checkPointOper.getClass()); lInput.close(); return oper;
