Repository: apex-malhar Updated Branches: refs/heads/master c89e63621 -> 999efccba
APEXMALHAR-2254, APEXMALHAR-2269, APEXMALHAR-2270 Fix bugs on replay tuple skipping and idempotency issues. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0abb698e Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0abb698e Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0abb698e Branch: refs/heads/master Commit: 0abb698e5c784c8813943065acccda1993f8cb5d Parents: c92ca15 Author: Matt Zhang <[email protected]> Authored: Mon Oct 10 09:49:47 2016 -0700 Committer: Matt Zhang <[email protected]> Committed: Sun Oct 23 19:25:51 2016 -0700 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileInputOperator.java | 26 ++- .../io/fs/AbstractFileInputOperatorTest.java | 201 +++++++++++++++++++ 2 files changed, 223 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0abb698e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index d4ee03b..0f3cc48 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -592,17 +592,27 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par pendingFiles.remove(recoveryEntry.file); } inputStream = retryFailedFile(new FailedFile(recoveryEntry.file, recoveryEntry.startOffset)); + + while (--skipCount >= 0) { + readEntity(); + } while (offset < recoveryEntry.endOffset) { T line = readEntity(); offset++; emit(line); } + if (recoveryEntry.fileClosed) { + closeFile(inputStream); + } } else { while (offset < recoveryEntry.endOffset) { T line = readEntity(); offset++; emit(line); } + if (recoveryEntry.fileClosed) { + closeFile(inputStream); + } } } } @@ -654,6 +664,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par if (inputStream != null) { long startOffset = offset; String file = currentFile; //current file is reset to null when closed. + boolean fileClosed = false; try { int counterForTuple = 0; @@ -662,6 +673,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par if (line == null) { LOG.info("done reading file ({} entries).", offset); closeFile(inputStream); + fileClosed = true; break; } @@ -679,9 +691,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par } catch (IOException e) { failureHandling(e); } - //Only when something was emitted from the file then we record it for entry. - if (offset > startOffset) { - currentWindowRecoveryState.add(new RecoveryEntry(file, startOffset, offset)); + //Only when something was emitted from the file, or we have a closeFile(), then we record it for entry. + if (offset >= startOffset) { + currentWindowRecoveryState.add(new RecoveryEntry(file, startOffset, offset, fileClosed)); } } } @@ -1138,6 +1150,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par final String file; final long startOffset; final long endOffset; + final boolean fileClosed; @SuppressWarnings("unused") private RecoveryEntry() @@ -1145,13 +1158,15 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par file = null; startOffset = -1; endOffset = -1; + fileClosed = false; } - RecoveryEntry(String file, long startOffset, long endOffset) + RecoveryEntry(String file, long startOffset, long endOffset, boolean fileClosed) { this.file = Preconditions.checkNotNull(file, "file"); this.startOffset = startOffset; this.endOffset = endOffset; + this.fileClosed = fileClosed; } @Override @@ -1172,6 +1187,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par if (startOffset != that.startOffset) { return false; } + if (fileClosed != that.fileClosed) { + return false; + } return file.equals(that.file); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0abb698e/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index 2f926d3..e9346ec 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -18,15 +18,19 @@ */ package com.datatorrent.lib.io.fs; +import java.io.ByteArrayOutputStream; import java.io.File; + import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.TreeSet; import org.junit.Assert; import org.junit.Rule; @@ -44,6 +48,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -816,6 +822,181 @@ public class AbstractFileInputOperatorTest } @Test + public void testIdempotencyWithCheckPoint() throws Exception + { + FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); + + List<String> lines = Lists.newArrayList(); + int file = 0; + for (int line = 0; line < 5; line++) { + lines.add("f" + file + "l" + line); + } + FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); + + file = 1; + lines = Lists.newArrayList(); + for (int line = 0; line < 6; line++) { + lines.add("f" + file + "l" + line); + } + FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); + + // empty file + file = 2; + lines = Lists.newArrayList(); + FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); + + + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); + FSWindowDataManager manager = new FSWindowDataManager(); + manager.setStatePath(testMeta.dir + "/recovery"); + + oper.setWindowDataManager(manager); + + oper.setDirectory(testMeta.dir); + oper.getScanner().setFilePatternRegexp(".*file[\\d]"); + + oper.setup(testMeta.context); + + oper.setEmitBatchSize(3); + + // sort the pendingFiles and ensure the ordering of the files scanned + DirectoryScannerNew newScanner = new DirectoryScannerNew(); + oper.setScanner(newScanner); + + // scan directory + oper.beginWindow(0); + oper.emitTuples(); + oper.endWindow(); + + // emit f0l0, f0l1, f0l2 + oper.beginWindow(1); + oper.emitTuples(); + oper.endWindow(); + + //checkpoint the operator + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + LineByLineFileInputOperator checkPointOper = checkpoint(oper, bos); + + // start saving output + CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); + TestUtils.setSink(oper.output, queryResults); + + // emit f0l3, f0l4, and closeFile(f0) in the same window + oper.beginWindow(2); + oper.emitTuples(); + oper.endWindow(); + List<String> beforeRecovery2 = Lists.newArrayList(queryResults.collectedTuples); + + // emit f1l0, f1l1, f1l2 + oper.beginWindow(3); + oper.emitTuples(); + oper.endWindow(); + List<String> beforeRecovery3 = Lists.newArrayList(queryResults.collectedTuples); + + // emit f1l3, f1l4, f1l5 + oper.beginWindow(4); + oper.emitTuples(); + oper.endWindow(); + List<String> beforeRecovery4 = Lists.newArrayList(queryResults.collectedTuples); + + // closeFile(f1) in a new window + oper.beginWindow(5); + oper.emitTuples(); + oper.endWindow(); + List<String> beforeRecovery5 = Lists.newArrayList(queryResults.collectedTuples); + + // empty file ops, closeFile(f2) in emitTuples() only + oper.beginWindow(6); + oper.emitTuples(); + oper.endWindow(); + List<String> beforeRecovery6 = Lists.newArrayList(queryResults.collectedTuples); + + oper.teardown(); + + queryResults.clear(); + + //idempotency part + + oper = restoreCheckPoint(checkPointOper, bos); + testMeta.context.getAttributes().put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L); + oper.setup(testMeta.context); + TestUtils.setSink(oper.output, queryResults); + + long startwid = testMeta.context.getAttributes().get(Context.OperatorContext.ACTIVATION_WINDOW_ID) + 1; + + oper.beginWindow(startwid); + Assert.assertTrue(oper.currentFile == null); + oper.emitTuples(); + oper.endWindow(); + Assert.assertEquals("lines", beforeRecovery2, queryResults.collectedTuples); + + oper.beginWindow(++startwid); + oper.emitTuples(); + oper.endWindow(); + Assert.assertEquals("lines", beforeRecovery3, queryResults.collectedTuples); + + oper.beginWindow(++startwid); + oper.emitTuples(); + oper.endWindow(); + Assert.assertEquals("lines", beforeRecovery4, queryResults.collectedTuples); + + oper.beginWindow(++startwid); + Assert.assertTrue(oper.currentFile == null); + oper.emitTuples(); + oper.endWindow(); + Assert.assertEquals("lines", beforeRecovery5, queryResults.collectedTuples); + + oper.beginWindow(++startwid); + Assert.assertTrue(oper.currentFile == null); + oper.emitTuples(); + oper.endWindow(); + Assert.assertEquals("lines", beforeRecovery6, queryResults.collectedTuples); + + Assert.assertEquals("number tuples", 8, queryResults.collectedTuples.size()); + + oper.teardown(); + } + + /** + * This method checkpoints the given operator. + * @param oper The operator to checkpoint. + * @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily. + * @return new operator. + */ + public static LineByLineFileInputOperator checkpoint(LineByLineFileInputOperator oper, ByteArrayOutputStream bos) throws Exception + { + Kryo kryo = new Kryo(); + + Output loutput = new Output(bos); + kryo.writeObject(loutput, oper); + loutput.close(); + + Input lInput = new Input(bos.toByteArray()); + @SuppressWarnings("unchecked") + LineByLineFileInputOperator checkPointedOper = kryo.readObject(lInput, oper.getClass()); + lInput.close(); + + return checkPointedOper; + } + + /** + * Restores the checkpointed operator. + * @param checkPointOper The checkpointed operator. + * @param bos The ByteArrayOutputStream which saves the checkpoint data temporarily. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public static LineByLineFileInputOperator restoreCheckPoint(LineByLineFileInputOperator checkPointOper, ByteArrayOutputStream bos) throws Exception + { + Kryo kryo = new Kryo(); + + Input lInput = new Input(bos.toByteArray()); + LineByLineFileInputOperator oper = kryo.readObject(lInput, checkPointOper.getClass()); + lInput.close(); + + return oper; + } + + @Test public void testWindowDataManagerPartitioning() throws Exception { LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); @@ -926,4 +1107,24 @@ public class AbstractFileInputOperatorTest accepted = scanner.acceptFile("1_file"); Assert.assertFalse("File should not be accepted by this partition ", accepted); } + + private static class DirectoryScannerNew extends DirectoryScanner + { + public LinkedHashSet<Path> scan(FileSystem fs, Path filePath, Set<String> consumedFiles) + { + LinkedHashSet<Path> pathSet; + pathSet = super.scan(fs, filePath, consumedFiles); + + TreeSet<Path> orderFiles = new TreeSet<>(); + orderFiles.addAll(pathSet); + pathSet.clear(); + Iterator<Path> fileIterator = orderFiles.iterator(); + while (fileIterator.hasNext()) { + pathSet.add(fileIterator.next()); + } + + return pathSet; + } + } } +
