Repository: apex-malhar
Updated Branches:
  refs/heads/master a46ec1dcf -> 157fb8230


APEXMALHAR-2422 WAL should directly relay on the file and not on the size of 
the file returned by NameNode. As HSYNC on HDFS without the 
SyncFlag.UPDATE_LENGTH flag will not update the length of the file with 
NameNode.

Also made the tests compatible with Hadoop filesystem API.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/157fb823
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/157fb823
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/157fb823

Branch: refs/heads/master
Commit: 157fb82308c737dca1fca803c33f7f7b496cf95c
Parents: a46ec1d
Author: Sandesh Hegde <[email protected]>
Authored: Mon Feb 27 16:33:29 2017 -0800
Committer: Sandesh Hegde <[email protected]>
Committed: Tue Mar 7 19:08:01 2017 -0800

----------------------------------------------------------------------
 .../apex/malhar/lib/wal/FileSystemWAL.java      | 37 +++++++++++++++++---
 .../apex/malhar/lib/wal/FileSystemWALTest.java  | 30 +++++++++++-----
 2 files changed, 54 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/157fb823/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java 
b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
index 1ae039b..72a1564 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
@@ -20,6 +20,7 @@ package org.apache.apex.malhar.lib.wal;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -426,6 +427,36 @@ public class FileSystemWAL implements 
WAL<FileSystemWAL.FileSystemWALReader, Fil
       readOrSkip(true);
     }
 
+    private static int readLen(final DataInputStream inputStream) throws 
IOException
+    {
+      if (inputStream == null) {
+        return -1;
+      }
+
+      int len = inputStream.read();
+
+      if (len < 0) {
+        return len;
+      }
+
+      len = len << 24;
+
+      for (int i = 2;i >= 0;--i) {
+        int ch = inputStream.read();
+        if (ch < 0) {
+          throw new EOFException();
+        }
+
+        len += (ch << 8 * i);
+      }
+
+      if (len < 0) {
+        throw new IOException("Negative length");
+      }
+
+      return len;
+    }
+
     private Slice readOrSkip(boolean skip) throws IOException
     {
       if (currentPointer == null) {
@@ -442,11 +473,9 @@ public class FileSystemWAL implements 
WAL<FileSystemWAL.FileSystemWALReader, Fil
           inputStream = getInputStream(currentPointer);
         }
 
-        if (inputStream != null && currentPointer.offset <
-            fileSystemWAL.fileContext.getFileStatus(currentOpenPath).getLen()) 
{
-          int len = inputStream.readInt();
-          Preconditions.checkState(len >= 0, "negative length");
+        int len = readLen(inputStream);
 
+        if (len != -1) {
           if (!skip) {
             byte[] data = new byte[len];
             inputStream.readFully(data);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/157fb823/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java 
b/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java
index aefaac9..697d5fd 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java
@@ -18,8 +18,9 @@
  */
 package org.apache.apex.malhar.lib.wal;
 
-import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Random;
 
 import org.junit.Assert;
@@ -31,11 +32,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.utils.FileContextUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.datatorrent.lib.util.KryoCloneUtils;
-import com.datatorrent.lib.util.TestUtils;
 import com.datatorrent.netlet.util.Slice;
 
 public class FileSystemWALTest
@@ -53,12 +55,21 @@ public class FileSystemWALTest
   {
     private String targetDir;
     FileSystemWAL fsWAL = new FileSystemWAL();
+    Configuration conf = new Configuration();
+    FileSystem fs;
 
     @Override
     protected void starting(Description description)
     {
-      TestUtils.deleteTargetTestClassFolder(description);
       targetDir = "target/" + description.getClassName() + "/" + 
description.getMethodName();
+
+      try {
+        fs = FileSystem.get(new URI(targetDir + "/WAL"), conf);
+        fs.delete(new Path(targetDir), true);
+      } catch (IOException | URISyntaxException e) {
+        throw new RuntimeException(e);
+      }
+
       fsWAL = new FileSystemWAL();
       fsWAL.setFilePath(targetDir + "/WAL");
     }
@@ -66,7 +77,11 @@ public class FileSystemWALTest
     @Override
     protected void finished(Description description)
     {
-      TestUtils.deleteTargetTestClassFolder(description);
+      try {
+        fs.delete(new Path(targetDir), true);
+      } catch (IOException e) {
+        throw  new RuntimeException(e);
+      }
     }
   }
 
@@ -100,8 +115,8 @@ public class FileSystemWALTest
     fsWALWriter.rotate(true);
     testMeta.fsWAL.beforeCheckpoint(0);
     testMeta.fsWAL.committed(0);
-    File walFile = new File(testMeta.fsWAL.getPartFilePath(0));
-    Assert.assertEquals("WAL file created ", true, walFile.exists());
+    Path walFile = new Path(testMeta.fsWAL.getPartFilePath(0));
+    Assert.assertEquals("WAL file created ", true, 
testMeta.fs.isFile(walFile));
 
     FileSystemWAL.FileSystemWALReader fsWALReader = testMeta.fsWAL.getReader();
     assertNumTuplesRead(fsWALReader, numTuples);
@@ -134,9 +149,6 @@ public class FileSystemWALTest
     testMeta.fsWAL.beforeCheckpoint(0);
     testMeta.fsWAL.committed(0);
 
-    File walFile = new File(testMeta.fsWAL.getPartFilePath(0));
-    Assert.assertEquals("WAL file size ", totalBytes, walFile.length());
-
     FileSystemWAL.FileSystemWALReader fsWALReader = testMeta.fsWAL.getReader();
 
     fsWALReader.seek(new FileSystemWAL.FileSystemWALPointer(0, offset));

Reply via email to