Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 8f671f2dd -> d081d0cee


MLHR-1886: moving restoring of a file in its own method and added a test that 
covers re-writing the file when it isn't closed before failure


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/8f0c27b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/8f0c27b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/8f0c27b6

Branch: refs/heads/devel-3
Commit: 8f0c27b6df5511c5048774baa78f7a2e25dd200c
Parents: 8f671f2
Author: Chandni Singh <[email protected]>
Authored: Tue Nov 3 13:06:25 2015 -0800
Committer: Chandni Singh <[email protected]>
Committed: Tue Nov 3 13:14:48 2015 -0800

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 113 +++++++++++--------
 .../io/fs/AbstractFileOutputOperatorTest.java   |  52 +++++++++
 2 files changed, 116 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/8f0c27b6/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 09294a2..744f024 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -351,50 +351,7 @@ public abstract class AbstractFileOutputOperator<INPUT> 
extends BaseOperator imp
           }
 
           if (fs.exists(activeFilePath)) {
-            LOG.debug("path exists {}", activeFilePath);
-            long offset = endOffsets.get(seenFileName).longValue();
-            FSDataInputStream inputStream = fs.open(activeFilePath);
-            FileStatus status = fs.getFileStatus(activeFilePath);
-
-            if (status.getLen() != offset) {
-              LOG.info("path corrupted {} {} {}", activeFilePath, offset, 
status.getLen());
-              byte[] buffer = new byte[COPY_BUFFER_SIZE];
-              String recoveryFileName = seenFileNamePart + '.' + 
System.currentTimeMillis() + TMP_EXTENSION;
-              Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + 
recoveryFileName);
-              FSDataOutputStream fsOutput = openStream(recoveryFilePath, 
false);
-
-              while (inputStream.getPos() < offset) {
-                long remainingBytes = offset - inputStream.getPos();
-                int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int) 
remainingBytes : COPY_BUFFER_SIZE;
-                inputStream.read(buffer);
-                fsOutput.write(buffer, 0, bytesToWrite);
-              }
-
-              flush(fsOutput);
-              fsOutput.close();
-              inputStream.close();
-
-              FileContext fileContext = 
FileContext.getFileContext(fs.getUri());
-              LOG.debug("active {} recovery {} ", activeFilePath, 
recoveryFilePath);
-
-              if (alwaysWriteToTmp) {
-                //recovery file is used as the new tmp file and we cannot 
delete the old tmp file because when the operator
-                //is restored to an earlier check-pointed window, it will look 
for an older tmp.
-                fileNameToTmpName.put(seenFileNamePart, recoveryFileName);
-              } else {
-                LOG.debug("recovery path {} actual path {} ", 
recoveryFilePath, status.getPath());
-                fileContext.rename(recoveryFilePath, status.getPath(), 
Options.Rename.OVERWRITE);
-              }
-            } else {
-              if (alwaysWriteToTmp && 
filesWithOpenStreams.contains(seenFileName)) {
-                String currentTmp = seenFileNamePart + '.' + 
System.currentTimeMillis() + TMP_EXTENSION;
-                FSDataOutputStream outputStream = openStream(new Path(filePath 
+ Path.SEPARATOR + currentTmp), false);
-                IOUtils.copy(inputStream, outputStream);
-                outputStream.close();
-                fileNameToTmpName.put(seenFileNamePart, currentTmp);
-              }
-              inputStream.close();
-            }
+            recoverFile(seenFileName, seenFileNamePart, activeFilePath);
           }
         }
       }
@@ -448,10 +405,7 @@ public abstract class AbstractFileOutputOperator<INPUT> 
extends BaseOperator imp
           }
         }
       }
-
       LOG.debug("setup completed");
-      LOG.debug("end-offsets {}", endOffsets);
-
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -463,6 +417,68 @@ public abstract class AbstractFileOutputOperator<INPUT> 
extends BaseOperator imp
   }
 
   /**
+   * Recovers a file which exists on the disk. If the length of the file is 
not same as the
+   * length which the operator remembers then the file is truncated. <br/>
+   * When always writing to a temporary file, then a file is restored even 
when the length is same as what the
+   * operator remembers however this is done only for files which had open 
streams that weren't closed before
+   * failure.
+   *
+   * @param filename     name of the actual file.
+   * @param partFileName name of the part file. When not rolling this is same 
as filename; otherwise this is the
+   *                     latest open part file name.
+   * @param filepath     path of the file. When always writing to temp file, 
this is the path of the temp file; otherwise
+   *                     path of the actual file.
+   * @throws IOException
+   */
+  private void recoverFile(String filename, String partFileName, Path 
filepath) throws IOException
+  {
+    LOG.debug("path exists {}", filepath);
+    long offset = endOffsets.get(filename).longValue();
+    FSDataInputStream inputStream = fs.open(filepath);
+    FileStatus status = fs.getFileStatus(filepath);
+
+    if (status.getLen() != offset) {
+      LOG.info("path corrupted {} {} {}", filepath, offset, status.getLen());
+      byte[] buffer = new byte[COPY_BUFFER_SIZE];
+      String recoveryFileName = partFileName + '.' + 
System.currentTimeMillis() + TMP_EXTENSION;
+      Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + 
recoveryFileName);
+      FSDataOutputStream fsOutput = openStream(recoveryFilePath, false);
+
+      while (inputStream.getPos() < offset) {
+        long remainingBytes = offset - inputStream.getPos();
+        int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? 
(int)remainingBytes : COPY_BUFFER_SIZE;
+        inputStream.read(buffer);
+        fsOutput.write(buffer, 0, bytesToWrite);
+      }
+
+      flush(fsOutput);
+      fsOutput.close();
+      inputStream.close();
+
+      FileContext fileContext = FileContext.getFileContext(fs.getUri());
+      LOG.debug("active {} recovery {} ", filepath, recoveryFilePath);
+
+      if (alwaysWriteToTmp) {
+        //recovery file is used as the new tmp file and we cannot delete the 
old tmp file because when the operator
+        //is restored to an earlier check-pointed window, it will look for an 
older tmp.
+        fileNameToTmpName.put(partFileName, recoveryFileName);
+      } else {
+        LOG.debug("recovery path {} actual path {} ", recoveryFilePath, 
status.getPath());
+        fileContext.rename(recoveryFilePath, status.getPath(), 
Options.Rename.OVERWRITE);
+      }
+    } else {
+      if (alwaysWriteToTmp && filesWithOpenStreams.contains(filename)) {
+        String currentTmp = partFileName + '.' + System.currentTimeMillis() + 
TMP_EXTENSION;
+        FSDataOutputStream outputStream = openStream(new Path(filePath + 
Path.SEPARATOR + currentTmp), false);
+        IOUtils.copy(inputStream, outputStream);
+        streamsCache.put(filename, new FSFilterStreamContext(outputStream));
+        fileNameToTmpName.put(partFileName, currentTmp);
+      }
+      inputStream.close();
+    }
+  }
+
+  /**
    * Creates the {@link CacheLoader} for loading an output stream when it is 
not present in the cache.
    * @return cache loader
    */
@@ -656,7 +672,6 @@ public abstract class AbstractFileOutputOperator<INPUT> 
extends BaseOperator imp
       fileName = getPartFileNamePri(fileName);
       part.setValue(currentOpenPart.getValue());
     }
-    LOG.debug("request finalize {}", fileName);
     filesPerWindow.add(fileName);
   }
 
@@ -1215,7 +1230,7 @@ public abstract class AbstractFileOutputOperator<INPUT> 
extends BaseOperator imp
         //a tmp file has tmp extension always preceded by timestamp
         String actualFileName = statusName.substring(0, 
statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1));
         if (fileName.equals(actualFileName)) {
-          LOG.debug("deleting vagrant file {}", statusName);
+          LOG.debug("deleting stray file {}", statusName);
           fs.delete(status.getPath(), true);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/8f0c27b6/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index f7d1731..cbcc8b4 100644
--- 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -1883,6 +1883,58 @@ public class AbstractFileOutputOperatorTest
     checkCompressedFile(oddFile, oddOffsets, 1, 5, 1000, null, null);
   }
 
+  @Test
+  public void testRecoveryOfOpenFiles()
+  {
+    EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
+    writer.setMaxLength(4);
+    File meta = new File(testMeta.getDir());
+    writer.setFilePath(meta.getAbsolutePath());
+    writer.setAlwaysWriteToTmp(true);
+    writer.setup(testMeta.testOperatorContext);
+
+    writer.beginWindow(0);
+    writer.input.put(0);
+    writer.input.put(1);
+    writer.input.put(2);
+    writer.input.put(3);
+    writer.endWindow();
+
+    //failure and restored
+    writer.setup(testMeta.testOperatorContext);
+    writer.input.put(4);
+    writer.input.put(5);
+    writer.endWindow();
+
+    writer.beginWindow(1);
+    writer.input.put(6);
+    writer.input.put(7);
+    writer.input.put(8);
+    writer.input.put(9);
+    writer.input.put(6);
+    writer.input.put(7);
+    writer.endWindow();
+
+    writer.committed(1);
+
+    //Part 0 checks
+    String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
+    String correctContents = "0\n" + "2\n" + "4\n";
+    checkOutput(0, evenFileName, correctContents);
+
+    String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
+    correctContents = "1\n" + "3\n" + "5\n";
+    checkOutput(0, oddFileName, correctContents);
+
+
+    //Part 1 checks
+    correctContents = "6\n" + "8\n" + "6\n";
+    checkOutput(1, evenFileName, correctContents);
+
+    correctContents = "7\n" + "9\n" + "7\n";
+    checkOutput(1, oddFileName, correctContents);
+  }
+
   private void checkCompressedFile(File file, List<Long> offsets, int 
startVal, int totalWindows, int totalRecords, SecretKey secretKey, byte[] iv) 
throws IOException
   {
     FileInputStream fis;

Reply via email to