Repository: apex-malhar Updated Branches: refs/heads/master a46ec1dcf -> 157fb8230
APEXMALHAR-2422 WAL should directly relay on the file and not on the size of the file returned by NameNode. As HSYNC on HDFS without the SyncFlag.UPDATE_LENGTH flag will not update the length of the file with NameNode. Also made the tests compatible with Hadoop filesystem API. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/157fb823 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/157fb823 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/157fb823 Branch: refs/heads/master Commit: 157fb82308c737dca1fca803c33f7f7b496cf95c Parents: a46ec1d Author: Sandesh Hegde <[email protected]> Authored: Mon Feb 27 16:33:29 2017 -0800 Committer: Sandesh Hegde <[email protected]> Committed: Tue Mar 7 19:08:01 2017 -0800 ---------------------------------------------------------------------- .../apex/malhar/lib/wal/FileSystemWAL.java | 37 +++++++++++++++++--- .../apex/malhar/lib/wal/FileSystemWALTest.java | 30 +++++++++++----- 2 files changed, 54 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/157fb823/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java index 1ae039b..72a1564 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java @@ -20,6 +20,7 @@ package org.apache.apex.malhar.lib.wal; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.IOException; import java.util.EnumSet; import java.util.HashSet; @@ -426,6 +427,36 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil readOrSkip(true); } + private static int readLen(final DataInputStream inputStream) throws IOException + { + if (inputStream == null) { + return -1; + } + + int len = inputStream.read(); + + if (len < 0) { + return len; + } + + len = len << 24; + + for (int i = 2;i >= 0;--i) { + int ch = inputStream.read(); + if (ch < 0) { + throw new EOFException(); + } + + len += (ch << 8 * i); + } + + if (len < 0) { + throw new IOException("Negative length"); + } + + return len; + } + private Slice readOrSkip(boolean skip) throws IOException { if (currentPointer == null) { @@ -442,11 +473,9 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil inputStream = getInputStream(currentPointer); } - if (inputStream != null && currentPointer.offset < - fileSystemWAL.fileContext.getFileStatus(currentOpenPath).getLen()) { - int len = inputStream.readInt(); - Preconditions.checkState(len >= 0, "negative length"); + int len = readLen(inputStream); + if (len != -1) { if (!skip) { byte[] data = new byte[len]; inputStream.readFully(data); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/157fb823/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java b/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java index aefaac9..697d5fd 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java @@ -18,8 +18,9 @@ */ package org.apache.apex.malhar.lib.wal; -import java.io.File; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Random; import org.junit.Assert; @@ -31,11 +32,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.utils.FileContextUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.datatorrent.lib.util.KryoCloneUtils; -import com.datatorrent.lib.util.TestUtils; import com.datatorrent.netlet.util.Slice; public class FileSystemWALTest @@ -53,12 +55,21 @@ public class FileSystemWALTest { private String targetDir; FileSystemWAL fsWAL = new FileSystemWAL(); + Configuration conf = new Configuration(); + FileSystem fs; @Override protected void starting(Description description) { - TestUtils.deleteTargetTestClassFolder(description); targetDir = "target/" + description.getClassName() + "/" + description.getMethodName(); + + try { + fs = FileSystem.get(new URI(targetDir + "/WAL"), conf); + fs.delete(new Path(targetDir), true); + } catch (IOException | URISyntaxException e) { + throw new RuntimeException(e); + } + fsWAL = new FileSystemWAL(); fsWAL.setFilePath(targetDir + "/WAL"); } @@ -66,7 +77,11 @@ public class FileSystemWALTest @Override protected void finished(Description description) { - TestUtils.deleteTargetTestClassFolder(description); + try { + fs.delete(new Path(targetDir), true); + } catch (IOException e) { + throw new RuntimeException(e); + } } } @@ -100,8 +115,8 @@ public class FileSystemWALTest fsWALWriter.rotate(true); testMeta.fsWAL.beforeCheckpoint(0); testMeta.fsWAL.committed(0); - File walFile = new File(testMeta.fsWAL.getPartFilePath(0)); - Assert.assertEquals("WAL file created ", true, walFile.exists()); + Path walFile = new Path(testMeta.fsWAL.getPartFilePath(0)); + Assert.assertEquals("WAL file created ", true, testMeta.fs.isFile(walFile)); FileSystemWAL.FileSystemWALReader fsWALReader = testMeta.fsWAL.getReader(); assertNumTuplesRead(fsWALReader, numTuples); @@ -134,9 +149,6 @@ public class FileSystemWALTest testMeta.fsWAL.beforeCheckpoint(0); testMeta.fsWAL.committed(0); - File walFile = new File(testMeta.fsWAL.getPartFilePath(0)); - Assert.assertEquals("WAL file size ", totalBytes, walFile.length()); - FileSystemWAL.FileSystemWALReader fsWALReader = testMeta.fsWAL.getReader(); fsWALReader.seek(new FileSystemWAL.FileSystemWALPointer(0, offset));
