Storage

Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/44326514
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/44326514
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/44326514

Branch: refs/heads/master
Commit: 4432651437e8aac8b442c0f86a6775adc47e962c
Parents: bbdab0e
Author: gaurav <[email protected]>
Authored: Sun Feb 19 21:29:56 2017 +0530
Committer: Pramod Immaneni <[email protected]>
Committed: Mon May 22 16:47:34 2017 -0700

----------------------------------------------------------------------
 .../datatorrent/flume/storage/HDFSStorage.java  | 946 +++++++++++++++++++
 .../com/datatorrent/flume/storage/Storage.java  |  73 ++
 .../flume/storage/HDFSStorageMatching.java      | 109 +++
 .../flume/storage/HDFSStoragePerformance.java   |  85 ++
 .../storage/HDFSStoragePerformanceTest.java     | 112 +++
 .../flume/storage/HDFSStorageTest.java          | 693 ++++++++++++++
 6 files changed, 2018 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java 
b/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java
new file mode 100644
index 0000000..74849e9
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java
@@ -0,0 +1,946 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import java.io.DataInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.flume.sink.Server;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * HDFSStorage is developed to store and retrieve the data from HDFS
+ * <p />
+ * The properties that can be set on HDFSStorage are: <br />
+ * baseDir - The base directory where the data is going to be stored <br />
+ * restore - This is used to restore the application from previous failure <br 
/>
+ * blockSize - The maximum size of the each file to created. <br />
+ *
+ * @author Gaurav Gupta <[email protected]>
+ * @since 0.9.3
+ */
+public class HDFSStorage implements Storage, Configurable, 
Component<com.datatorrent.api.Context>
+{
+  public static final int DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
+  public static final String BASE_DIR_KEY = "baseDir";
+  public static final String RESTORE_KEY = "restore";
+  public static final String BLOCKSIZE = "blockSize";
+  public static final String BLOCK_SIZE_MULTIPLE = "blockSizeMultiple";
+  public static final String NUMBER_RETRY = "retryCount";
+
+  private static final String OFFSET_SUFFIX = "-offsetFile";
+  private static final String BOOK_KEEPING_FILE_OFFSET = 
"-bookKeepingOffsetFile";
+  private static final String FLUSHED_IDENTITY_FILE = "flushedCounter";
+  private static final String CLEAN_OFFSET_FILE = "cleanoffsetFile";
+  private static final String FLUSHED_IDENTITY_FILE_TEMP = 
"flushedCounter.tmp";
+  private static final String CLEAN_OFFSET_FILE_TEMP = "cleanoffsetFile.tmp";
+  private static final int IDENTIFIER_SIZE = 8;
+  private static final int DATA_LENGTH_BYTE_SIZE = 4;
+
+  /**
+   * Number of times the storage will try to get the filesystem
+   */
+  private int retryCount = 3;
+  /**
+   * The multiple of block size
+   */
+  private int blockSizeMultiple = 1;
+  /**
+   * Identifier for this storage.
+   */
+  @NotNull
+  private String id;
+  /**
+   * The baseDir where the storage facility is going to create files.
+   */
+  @NotNull
+  private String baseDir;
+  /**
+   * The block size to be used to create the storage files
+   */
+  private long blockSize;
+  /**
+   *
+   */
+  private boolean restore;
+  /**
+   * This identifies the current file number
+   */
+  private long currentWrittenFile;
+  /**
+   * This identifies the file number that has been flushed
+   */
+  private long flushedFileCounter;
+  /**
+   * The file that stores the fileCounter information
+   */
+  // private Path fileCounterFile;
+  /**
+   * The file that stores the flushed fileCounter information
+   */
+  private Path flushedCounterFile;
+  private Path flushedCounterFileTemp;
+  /**
+   * This identifies the last cleaned file number
+   */
+  private long cleanedFileCounter;
+  /**
+   * The file that stores the clean file counter information
+   */
+  // private Path cleanFileCounterFile;
+  /**
+   * The file that stores the clean file offset information
+   */
+  private Path cleanFileOffsetFile;
+  private Path cleanFileOffsetFileTemp;
+  private FileSystem fs;
+  private FSDataOutputStream dataStream;
+  ArrayList<DataBlock> files2Commit = new ArrayList<DataBlock>();
+  /**
+   * The offset in the current opened file
+   */
+  private long fileWriteOffset;
+  private FSDataInputStream readStream;
+  private long retrievalOffset;
+  private long retrievalFile;
+  private int offset;
+  private long flushedLong;
+  private long flushedFileWriteOffset;
+  private long bookKeepingFileOffset;
+  private byte[] cleanedOffset = new byte[8];
+  private long skipOffset;
+  private long skipFile;
+  private transient Path basePath;
+  private ExecutorService storageExecutor;
+  private byte[] currentData;
+  private FSDataInputStream nextReadStream;
+  private long nextFlushedLong;
+  private long nextRetrievalFile;
+  private byte[] nextRetrievalData;
+
+  public HDFSStorage()
+  {
+    this.restore = true;
+  }
+
+  /**
+   * This stores the Identifier information identified in the last store 
function call
+   *
+   * @param ctx
+   */
+  @Override
+  public void configure(Context ctx)
+  {
+    String tempId = ctx.getString(ID);
+    if (tempId == null) {
+      if (id == null) {
+        throw new IllegalArgumentException("id can't be  null.");
+      }
+    } else {
+      id = tempId;
+    }
+
+    String tempBaseDir = ctx.getString(BASE_DIR_KEY);
+    if (tempBaseDir != null) {
+      baseDir = tempBaseDir;
+    }
+
+    restore = ctx.getBoolean(RESTORE_KEY, restore);
+    Long tempBlockSize = ctx.getLong(BLOCKSIZE);
+    if (tempBlockSize != null) {
+      blockSize = tempBlockSize;
+    }
+    blockSizeMultiple = ctx.getInteger(BLOCK_SIZE_MULTIPLE, blockSizeMultiple);
+    retryCount = ctx.getInteger(NUMBER_RETRY,retryCount);
+  }
+
+  /**
+   * This function reads the file at a location and return the bytes stored in 
the file "
+   *
+   * @param path - the location of the file
+   * @return
+   * @throws IOException
+   */
+  byte[] readData(Path path) throws IOException
+  {
+    DataInputStream is = new DataInputStream(fs.open(path));
+    byte[] bytes = new byte[is.available()];
+    is.readFully(bytes);
+    is.close();
+    return bytes;
+  }
+
+  /**
+   * This function writes the bytes to a file specified by the path
+   *
+   * @param path the file location
+   * @param data the data to be written to the file
+   * @return
+   * @throws IOException
+   */
+  private FSDataOutputStream writeData(Path path, byte[] data) throws 
IOException
+  {
+    FSDataOutputStream fsOutputStream;
+    if (fs.getScheme().equals("file")) {
+      // local FS does not support hflush and does not flush native stream
+      fsOutputStream = new FSDataOutputStream(
+          new 
FileOutputStream(Path.getPathWithoutSchemeAndAuthority(path).toString()), null);
+    } else {
+      fsOutputStream = fs.create(path);
+    }
+    fsOutputStream.write(data);
+    return fsOutputStream;
+  }
+
+  private long calculateOffset(long fileOffset, long fileCounter)
+  {
+    return ((fileCounter << 32) | (fileOffset & 0xffffffffL));
+  }
+
+  @Override
+  public byte[] store(Slice slice)
+  {
+    // logger.debug("store message ");
+    int bytesToWrite = slice.length + DATA_LENGTH_BYTE_SIZE;
+    if (currentWrittenFile < skipFile) {
+      fileWriteOffset += bytesToWrite;
+      if (fileWriteOffset >= bookKeepingFileOffset) {
+        files2Commit.add(new DataBlock(null, bookKeepingFileOffset,
+            new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), 
currentWrittenFile));
+        currentWrittenFile++;
+        if (fileWriteOffset > bookKeepingFileOffset) {
+          fileWriteOffset = bytesToWrite;
+        } else {
+          fileWriteOffset = 0;
+        }
+        try {
+          bookKeepingFileOffset = getFlushedFileWriteOffset(
+              new Path(basePath, currentWrittenFile + 
BOOK_KEEPING_FILE_OFFSET));
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return null;
+    }
+
+    if (flushedFileCounter == currentWrittenFile && dataStream == null) {
+      currentWrittenFile++;
+      fileWriteOffset = 0;
+    }
+
+    if (flushedFileCounter == skipFile && skipFile != -1) {
+      skipFile++;
+    }
+
+    if (fileWriteOffset + bytesToWrite < blockSize) {
+      try {
+        /* write length and the actual data to the file */
+        if (fileWriteOffset == 0) {
+          // writeData(flushedCounterFile, 
String.valueOf(currentWrittenFile).getBytes()).close();
+          dataStream = writeData(new Path(basePath, 
String.valueOf(currentWrittenFile)),
+              Ints.toByteArray(slice.length));
+          dataStream.write(slice.buffer, slice.offset, slice.length);
+        } else {
+          dataStream.write(Ints.toByteArray(slice.length));
+          dataStream.write(slice.buffer, slice.offset, slice.length);
+        }
+        fileWriteOffset += bytesToWrite;
+
+        byte[] fileOffset = null;
+        if ((currentWrittenFile > skipFile) || (currentWrittenFile == skipFile 
&& fileWriteOffset > skipOffset)) {
+          skipFile = -1;
+          fileOffset = new byte[IDENTIFIER_SIZE];
+          Server.writeLong(fileOffset, 0, calculateOffset(fileWriteOffset, 
currentWrittenFile));
+        }
+        return fileOffset;
+      } catch (IOException ex) {
+        logger.warn("Error while storing the bytes {}", ex.getMessage());
+        closeFs();
+        throw new RuntimeException(ex);
+      }
+    }
+    DataBlock db = new DataBlock(dataStream, fileWriteOffset,
+        new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), 
currentWrittenFile);
+    db.close();
+    files2Commit.add(db);
+    fileWriteOffset = 0;
+    ++currentWrittenFile;
+    return store(slice);
+  }
+
+  /**
+   * @param b
+   * @param startIndex
+   * @return
+   */
+  long byteArrayToLong(byte[] b, int startIndex)
+  {
+    final byte b1 = 0;
+    return Longs.fromBytes(b1, b1, b1, b1, b[3 + startIndex], b[2 + 
startIndex], b[1 + startIndex], b[startIndex]);
+  }
+
+  @Override
+  public byte[] retrieve(byte[] identifier)
+  {
+    skipFile = -1;
+    skipOffset = 0;
+    logger.debug("retrieve with address {}", Arrays.toString(identifier));
+    // flushing the last incomplete flushed file
+    closeUnflushedFiles();
+
+    retrievalOffset = byteArrayToLong(identifier, 0);
+    retrievalFile = byteArrayToLong(identifier, offset);
+
+    if (retrievalFile == 0 && retrievalOffset == 0 && currentWrittenFile == 0 
&& fileWriteOffset == 0) {
+      skipOffset = 0;
+      return null;
+    }
+
+    // making sure that the deleted address is not requested again
+    if (retrievalFile != 0 || retrievalOffset != 0) {
+      long cleanedFile = byteArrayToLong(cleanedOffset, offset);
+      if (retrievalFile < cleanedFile || (retrievalFile == cleanedFile &&
+          retrievalOffset < byteArrayToLong(cleanedOffset, 0))) {
+        logger.warn("The address asked has been deleted retrievalFile={}, 
cleanedFile={}, retrievalOffset={}, " +
+            "cleanedOffset={}", retrievalFile, cleanedFile, retrievalOffset, 
byteArrayToLong(cleanedOffset, 0));
+        closeFs();
+        throw new IllegalArgumentException(String.format("The data for address 
%s has already been deleted",
+            Arrays.toString(identifier)));
+      }
+    }
+
+    // we have just started
+    if (retrievalFile == 0 && retrievalOffset == 0) {
+      retrievalFile = byteArrayToLong(cleanedOffset, offset);
+      retrievalOffset = byteArrayToLong(cleanedOffset, 0);
+    }
+
+    if ((retrievalFile > flushedFileCounter)) {
+      skipFile = retrievalFile;
+      skipOffset = retrievalOffset;
+      retrievalFile = -1;
+      return null;
+    }
+    if ((retrievalFile == flushedFileCounter && retrievalOffset >= 
flushedFileWriteOffset)) {
+      skipFile = retrievalFile;
+      skipOffset = retrievalOffset - flushedFileWriteOffset;
+      retrievalFile = -1;
+      return null;
+    }
+
+    try {
+      if (readStream != null) {
+        readStream.close();
+        readStream = null;
+      }
+      Path path = new Path(basePath, String.valueOf(retrievalFile));
+      if (!fs.exists(path)) {
+        retrievalFile = -1;
+        closeFs();
+        throw new RuntimeException(String.format("File %s does not exist", 
path.toString()));
+      }
+
+      byte[] flushedOffset = readData(new Path(basePath, retrievalFile + 
OFFSET_SUFFIX));
+      flushedLong = Server.readLong(flushedOffset, 0);
+      while (retrievalOffset >= flushedLong && retrievalFile < 
flushedFileCounter) {
+        retrievalOffset -= flushedLong;
+        retrievalFile++;
+        flushedOffset = readData(new Path(basePath, retrievalFile + 
OFFSET_SUFFIX));
+        flushedLong = Server.readLong(flushedOffset, 0);
+      }
+
+      if (retrievalOffset >= flushedLong) {
+        logger.warn("data not flushed for the given identifier");
+        retrievalFile = -1;
+        return null;
+      }
+      synchronized (HDFSStorage.this) {
+        if (nextReadStream != null) {
+          nextReadStream.close();
+          nextReadStream = null;
+        }
+      }
+      currentData = null;
+      path = new Path(basePath, String.valueOf(retrievalFile));
+      //readStream = new FSDataInputStream(fs.open(path));
+      currentData = readData(path);
+      //readStream.seek(retrievalOffset);
+      storageExecutor.submit(getNextStream());
+      return retrieveHelper();
+    } catch (IOException e) {
+      closeFs();
+      throw new RuntimeException(e);
+    }
+  }
+
+  private byte[] retrieveHelper() throws IOException
+  {
+    int tempRetrievalOffset = (int)retrievalOffset;
+    int length = Ints.fromBytes(currentData[tempRetrievalOffset], 
currentData[tempRetrievalOffset + 1],
+        currentData[tempRetrievalOffset + 2], currentData[tempRetrievalOffset 
+ 3]);
+    byte[] data = new byte[length + IDENTIFIER_SIZE];
+    System.arraycopy(currentData, tempRetrievalOffset + 4, data, 
IDENTIFIER_SIZE, length);
+    retrievalOffset += length + DATA_LENGTH_BYTE_SIZE;
+    if (retrievalOffset >= flushedLong) {
+      Server.writeLong(data, 0, calculateOffset(0, retrievalFile + 1));
+    } else {
+      Server.writeLong(data, 0, calculateOffset(retrievalOffset, 
retrievalFile));
+    }
+    return data;
+  }
+
+  @Override
+  public byte[] retrieveNext()
+  {
+    if (retrievalFile == -1) {
+      closeFs();
+      throw new RuntimeException("Call retrieve first");
+    }
+
+    if (retrievalFile > flushedFileCounter) {
+      logger.warn("data is not flushed");
+      return null;
+    }
+
+    try {
+      if (currentData == null) {
+        synchronized (HDFSStorage.this) {
+          if (nextRetrievalData != null && (retrievalFile == 
nextRetrievalFile)) {
+            currentData = nextRetrievalData;
+            flushedLong = nextFlushedLong;
+            nextRetrievalData = null;
+          } else {
+            currentData = null;
+            currentData = readData(new Path(basePath, 
String.valueOf(retrievalFile)));
+            byte[] flushedOffset = readData(new Path(basePath, retrievalFile + 
OFFSET_SUFFIX));
+            flushedLong = Server.readLong(flushedOffset, 0);
+          }
+        }
+        storageExecutor.submit(getNextStream());
+      }
+
+      if (retrievalOffset >= flushedLong) {
+        retrievalFile++;
+        retrievalOffset = 0;
+
+        if (retrievalFile > flushedFileCounter) {
+          logger.warn("data is not flushed");
+          return null;
+        }
+
+        //readStream.close();
+        // readStream = new FSDataInputStream(fs.open(new Path(basePath, 
String.valueOf(retrievalFile))));
+        // byte[] flushedOffset = readData(new Path(basePath, retrievalFile + 
OFFSET_SUFFIX));
+        // flushedLong = Server.readLong(flushedOffset, 0);
+
+        synchronized (HDFSStorage.this) {
+          if (nextRetrievalData != null && (retrievalFile == 
nextRetrievalFile)) {
+            currentData = nextRetrievalData;
+            flushedLong = nextFlushedLong;
+            nextRetrievalData = null;
+          } else {
+            currentData = null;
+            currentData = readData(new Path(basePath, 
String.valueOf(retrievalFile)));
+            byte[] flushedOffset = readData(new Path(basePath, retrievalFile + 
OFFSET_SUFFIX));
+            flushedLong = Server.readLong(flushedOffset, 0);
+          }
+        }
+        storageExecutor.submit(getNextStream());
+      }
+      //readStream.seek(retrievalOffset);
+      return retrieveHelper();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter")
+  public void clean(byte[] identifier)
+  {
+    logger.info("clean {}", Arrays.toString(identifier));
+    long cleanFileIndex = byteArrayToLong(identifier, offset);
+
+    long cleanFileOffset = byteArrayToLong(identifier, 0);
+    if (flushedFileCounter == -1) {
+      identifier = new byte[8];
+    } else if (cleanFileIndex > flushedFileCounter ||
+        (cleanFileIndex == flushedFileCounter && cleanFileOffset >= 
flushedFileWriteOffset)) {
+      // This is to make sure that we clean only the data that is flushed
+      cleanFileIndex = flushedFileCounter;
+      cleanFileOffset = flushedFileWriteOffset;
+      Server.writeLong(identifier, 0, calculateOffset(cleanFileOffset, 
cleanFileIndex));
+    }
+    cleanedOffset = identifier;
+
+    try {
+      writeData(cleanFileOffsetFileTemp, identifier).close();
+      fs.rename(cleanFileOffsetFileTemp, cleanFileOffsetFile);
+      if (cleanedFileCounter >= cleanFileIndex) {
+        return;
+      }
+      do {
+        Path path = new Path(basePath, String.valueOf(cleanedFileCounter));
+        if (fs.exists(path) && fs.isFile(path)) {
+          fs.delete(path, false);
+        }
+        path = new Path(basePath, cleanedFileCounter + OFFSET_SUFFIX);
+        if (fs.exists(path) && fs.isFile(path)) {
+          fs.delete(path, false);
+        }
+        path = new Path(basePath, cleanedFileCounter + 
BOOK_KEEPING_FILE_OFFSET);
+        if (fs.exists(path) && fs.isFile(path)) {
+          fs.delete(path, false);
+        }
+        logger.info("deleted file {}", cleanedFileCounter);
+        ++cleanedFileCounter;
+      } while (cleanedFileCounter < cleanFileIndex);
+      // writeData(cleanFileCounterFile, 
String.valueOf(cleanedFileCounter).getBytes()).close();
+
+    } catch (IOException e) {
+      logger.warn("not able to close the streams {}", e.getMessage());
+      closeFs();
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * This is used mainly for cleaning up of counter files created
+   */
+  void cleanHelperFiles()
+  {
+    try {
+      fs.delete(basePath, true);
+    } catch (IOException e) {
+      logger.warn(e.getMessage());
+    }
+  }
+
+  private void closeUnflushedFiles()
+  {
+    try {
+      files2Commit.clear();
+      // closing the stream
+      if (dataStream != null) {
+        dataStream.close();
+        dataStream = null;
+        // currentWrittenFile++;
+        // fileWriteOffset = 0;
+      }
+
+      if (!fs.exists(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX))) {
+        fs.delete(new Path(basePath, String.valueOf(currentWrittenFile)), 
false);
+      }
+
+      if (fs.exists(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX))) {
+        // This means that flush was called
+        flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, 
flushedFileCounter + OFFSET_SUFFIX));
+        bookKeepingFileOffset = getFlushedFileWriteOffset(
+            new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET));
+      }
+
+      if (flushedFileCounter != -1) {
+        currentWrittenFile = flushedFileCounter;
+        fileWriteOffset = flushedFileWriteOffset;
+      } else {
+        currentWrittenFile = 0;
+        fileWriteOffset = 0;
+      }
+
+      flushedLong = 0;
+
+    } catch (IOException e) {
+      closeFs();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void flush()
+  {
+    nextReadStream = null;
+    StringBuilder builder = new StringBuilder();
+    Iterator<DataBlock> itr = files2Commit.iterator();
+    DataBlock db;
+    try {
+      while (itr.hasNext()) {
+        db = itr.next();
+        db.updateOffsets();
+        builder.append(db.fileName).append(", ");
+      }
+      files2Commit.clear();
+
+      if (dataStream != null) {
+        dataStream.hflush();
+        writeData(flushedCounterFileTemp, 
String.valueOf(currentWrittenFile).getBytes()).close();
+        fs.rename(flushedCounterFileTemp, flushedCounterFile);
+        updateFlushedOffset(new Path(basePath, currentWrittenFile + 
OFFSET_SUFFIX), fileWriteOffset);
+        flushedFileWriteOffset = fileWriteOffset;
+        builder.append(currentWrittenFile);
+      }
+      logger.debug("flushed files {}", builder.toString());
+    } catch (IOException ex) {
+      logger.warn("not able to close the stream {}", ex.getMessage());
+      closeFs();
+      throw new RuntimeException(ex);
+    }
+    flushedFileCounter = currentWrittenFile;
+    // logger.debug("flushedFileCounter in flush {}",flushedFileCounter);
+  }
+
+  /**
+   * This updates the flushed offset
+   */
+  private void updateFlushedOffset(Path file, long bytesWritten)
+  {
+    byte[] lastStoredOffset = new byte[IDENTIFIER_SIZE];
+    Server.writeLong(lastStoredOffset, 0, bytesWritten);
+    try {
+      writeData(file, lastStoredOffset).close();
+    } catch (IOException e) {
+      try {
+        if (!Arrays.equals(readData(file), lastStoredOffset)) {
+          closeFs();
+          throw new RuntimeException(e);
+        }
+      } catch (Exception e1) {
+        closeFs();
+        throw new RuntimeException(e1);
+      }
+    }
+  }
+
+  public int getBlockSizeMultiple()
+  {
+    return blockSizeMultiple;
+  }
+
+  public void setBlockSizeMultiple(int blockSizeMultiple)
+  {
+    this.blockSizeMultiple = blockSizeMultiple;
+  }
+
+  /**
+   * @return the baseDir
+   */
+  public String getBaseDir()
+  {
+    return baseDir;
+  }
+
+  /**
+   * @param baseDir the baseDir to set
+   */
+  public void setBaseDir(String baseDir)
+  {
+    this.baseDir = baseDir;
+  }
+
+  /**
+   * @return the id
+   */
+  public String getId()
+  {
+    return id;
+  }
+
+  /**
+   * @param id the id to set
+   */
+  public void setId(String id)
+  {
+    this.id = id;
+  }
+
+  /**
+   * @return the blockSize
+   */
+  public long getBlockSize()
+  {
+    return blockSize;
+  }
+
+  /**
+   * @param blockSize the blockSize to set
+   */
+  public void setBlockSize(long blockSize)
+  {
+    this.blockSize = blockSize;
+  }
+
+  /**
+   * @return the restore
+   */
+  public boolean isRestore()
+  {
+    return restore;
+  }
+
+  /**
+   * @param restore the restore to set
+   */
+  public void setRestore(boolean restore)
+  {
+    this.restore = restore;
+  }
+
+  class DataBlock
+  {
+    FSDataOutputStream dataStream;
+    long dataOffset;
+    Path path2FlushedData;
+    long fileName;
+    private Path bookKeepingPath;
+
+    DataBlock(FSDataOutputStream stream, long bytesWritten, Path 
path2FlushedData, long fileName)
+    {
+      this.dataStream = stream;
+      this.dataOffset = bytesWritten;
+      this.path2FlushedData = path2FlushedData;
+      this.fileName = fileName;
+    }
+
+    public void close()
+    {
+      if (dataStream != null) {
+        try {
+          dataStream.close();
+          bookKeepingPath = new Path(basePath, fileName + 
BOOK_KEEPING_FILE_OFFSET);
+          updateFlushedOffset(bookKeepingPath, dataOffset);
+        } catch (IOException ex) {
+          logger.warn("not able to close the stream {}", ex.getMessage());
+          closeFs();
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+
+    public void updateOffsets() throws IOException
+    {
+      updateFlushedOffset(path2FlushedData, dataOffset);
+      if (bookKeepingPath != null && fs.exists(bookKeepingPath)) {
+        fs.delete(bookKeepingPath, false);
+      }
+    }
+
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(HDFSStorage.class);
+
+  @Override
+  public void setup(com.datatorrent.api.Context context)
+  {
+    Configuration conf = new Configuration();
+    if (baseDir == null) {
+      baseDir = conf.get("hadoop.tmp.dir");
+      if (baseDir == null || baseDir.isEmpty()) {
+        throw new IllegalArgumentException("baseDir cannot be null.");
+      }
+    }
+    offset = 4;
+    skipOffset = -1;
+    skipFile = -1;
+    int tempRetryCount = 0;
+    while (tempRetryCount < retryCount && fs == null) {
+      try {
+        fs = FileSystem.newInstance(conf);
+        tempRetryCount++;
+      } catch (Throwable throwable) {
+        logger.warn("Not able to get file system ", throwable);
+      }
+    }
+
+    try {
+      Path path = new Path(baseDir);
+      basePath = new Path(path, id);
+      if (fs == null) {
+        fs = FileSystem.newInstance(conf);
+      }
+      if (!fs.exists(path)) {
+        closeFs();
+        throw new RuntimeException(String.format("baseDir passed (%s) doesn't 
exist.", baseDir));
+      }
+      if (!fs.isDirectory(path)) {
+        closeFs();
+        throw new RuntimeException(String.format("baseDir passed (%s) is not a 
directory.", baseDir));
+      }
+      if (!restore) {
+        fs.delete(basePath, true);
+      }
+      if (!fs.exists(basePath) || !fs.isDirectory(basePath)) {
+        fs.mkdirs(basePath);
+      }
+
+      if (blockSize == 0) {
+        blockSize = fs.getDefaultBlockSize(new Path(basePath, "tempData"));
+      }
+      if (blockSize == 0) {
+        blockSize = DEFAULT_BLOCK_SIZE;
+      }
+
+      blockSize = blockSizeMultiple * blockSize;
+
+      currentWrittenFile = 0;
+      cleanedFileCounter = -1;
+      retrievalFile = -1;
+      // fileCounterFile = new Path(basePath, IDENTITY_FILE);
+      flushedFileCounter = -1;
+      // cleanFileCounterFile = new Path(basePath, CLEAN_FILE);
+      cleanFileOffsetFile = new Path(basePath, CLEAN_OFFSET_FILE);
+      cleanFileOffsetFileTemp = new Path(basePath, CLEAN_OFFSET_FILE_TEMP);
+      flushedCounterFile = new Path(basePath, FLUSHED_IDENTITY_FILE);
+      flushedCounterFileTemp = new Path(basePath, FLUSHED_IDENTITY_FILE_TEMP);
+
+      if (restore) {
+        //
+        // if (fs.exists(fileCounterFile) && fs.isFile(fileCounterFile)) {
+        // //currentWrittenFile = Long.valueOf(new 
String(readData(fileCounterFile)));
+        // }
+
+        if (fs.exists(cleanFileOffsetFile) && fs.isFile(cleanFileOffsetFile)) {
+          cleanedOffset = readData(cleanFileOffsetFile);
+        }
+
+        if (fs.exists(flushedCounterFile) && fs.isFile(flushedCounterFile)) {
+          String strFlushedFileCounter = new 
String(readData(flushedCounterFile));
+          if (strFlushedFileCounter.isEmpty()) {
+            logger.warn("empty flushed file");
+          } else {
+            flushedFileCounter = Long.valueOf(strFlushedFileCounter);
+            flushedFileWriteOffset = getFlushedFileWriteOffset(new 
Path(basePath, flushedFileCounter + OFFSET_SUFFIX));
+            bookKeepingFileOffset = getFlushedFileWriteOffset(
+                new Path(basePath, flushedFileCounter + 
BOOK_KEEPING_FILE_OFFSET));
+          }
+
+        }
+      }
+      fileWriteOffset = flushedFileWriteOffset;
+      currentWrittenFile = flushedFileCounter;
+      cleanedFileCounter = byteArrayToLong(cleanedOffset, offset) - 1;
+      if (currentWrittenFile == -1) {
+        ++currentWrittenFile;
+        fileWriteOffset = 0;
+      }
+
+    } catch (IOException io) {
+
+      throw new RuntimeException(io);
+    }
+    storageExecutor = Executors.newSingleThreadExecutor(new 
NameableThreadFactory("StorageHelper"));
+  }
+
+  private void closeFs()
+  {
+    if (fs != null) {
+      try {
+        fs.close();
+        fs = null;
+      } catch (IOException e) {
+        logger.debug(e.getMessage());
+      }
+    }
+  }
+
+  private long getFlushedFileWriteOffset(Path filePath) throws IOException
+  {
+    if (flushedFileCounter != -1 && fs.exists(filePath)) {
+      byte[] flushedFileOffsetByte = readData(filePath);
+      if (flushedFileOffsetByte != null && flushedFileOffsetByte.length == 8) {
+        return Server.readLong(flushedFileOffsetByte, 0);
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public void teardown()
+  {
+    logger.debug("called teardown");
+    try {
+      if (readStream != null) {
+        readStream.close();
+      }
+      synchronized (HDFSStorage.this) {
+        if (nextReadStream != null) {
+          nextReadStream.close();
+        }
+      }
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      closeUnflushedFiles();
+      storageExecutor.shutdown();
+    }
+
+  }
+
+  private Runnable getNextStream()
+  {
+    return new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        try {
+          synchronized (HDFSStorage.this) {
+            nextRetrievalFile = retrievalFile + 1;
+            if (nextRetrievalFile > flushedFileCounter) {
+              nextRetrievalData = null;
+              return;
+            }
+            Path path = new Path(basePath, String.valueOf(nextRetrievalFile));
+            Path offsetPath = new Path(basePath, nextRetrievalFile + 
OFFSET_SUFFIX);
+            nextRetrievalData = null;
+            nextRetrievalData = readData(path);
+            byte[] flushedOffset = readData(offsetPath);
+            nextFlushedLong = Server.readLong(flushedOffset, 0);
+          }
+        } catch (Throwable e) {
+          logger.warn("in storage executor ", e);
+
+        }
+      }
+    };
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/main/java/com/datatorrent/flume/storage/Storage.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/storage/Storage.java 
b/flume/src/main/java/com/datatorrent/flume/storage/Storage.java
new file mode 100644
index 0000000..9f3a010
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/storage/Storage.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>Storage interface.</p>
+ *
+ * @author Gaurav Gupta  <[email protected]>
+ * @since 0.9.2
+ */
+public interface Storage
+{
+  /**
+   * key in the context for Unique identifier for the storage which may be 
used to recover from failure.
+   */
+  String ID = "id";
+
+  /**
+   * This stores the bytes and returns the unique identifier to retrieve these 
bytes
+   *
+   * @param bytes
+   * @return
+   */
+  byte[] store(Slice bytes);
+
+  /**
+   * This returns the data bytes for the current identifier and the identifier 
for next data bytes. <br/>
+   * The first eight bytes contain the identifier and the remaining bytes 
contain the data
+   *
+   * @param identifier
+   * @return
+   */
+  byte[] retrieve(byte[] identifier);
+
+  /**
+   * This returns data bytes and the identifier for the next data bytes. The 
identifier for current data bytes is based
+   * on the retrieve method call and number of retrieveNext method calls after 
retrieve method call. <br/>
+   * The first eight bytes contain the identifier and the remaining bytes 
contain the data
+   *
+   * @return
+   */
+  byte[] retrieveNext();
+
+  /**
+   * This is used to clean up the files identified by identifier
+   *
+   * @param identifier
+   */
+  void clean(byte[] identifier);
+
+  /**
+   * This flushes the data from stream
+   *
+   */
+  void flush();
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java
----------------------------------------------------------------------
diff --git 
a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java 
b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java
new file mode 100644
index 0000000..05eeb4e
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java
@@ -0,0 +1,109 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.primitives.Ints;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * @author Gaurav Gupta  <[email protected]>
+ */
+public class HDFSStorageMatching
+{
+
+  public static void main(String[] args)
+  {
+    HDFSStorage storage = new HDFSStorage();
+    storage.setBaseDir(args[0]);
+    storage.setId(args[1]);
+    storage.setRestore(true);
+    storage.setup(null);
+    int count = 100000000;
+
+    logger.debug(" start time {}", System.currentTimeMillis());
+    int index = 10000;
+    byte[] b = Ints.toByteArray(index);
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    logger.debug(" end time {}", System.currentTimeMillis());
+    logger.debug(" start time for retrieve {}", System.currentTimeMillis());
+    b = storage.retrieve(new byte[8]);
+    int org_index = index;
+    index = 10000;
+    match(b, index);
+    while (true) {
+      index++;
+      b = storage.retrieveNext();
+      if (b == null) {
+        logger.debug(" end time for retrieve {}/{}/{}", 
System.currentTimeMillis(), index, org_index);
+        return;
+      } else {
+        if (!match(b, index)) {
+          throw new RuntimeException("failed : " + index);
+        }
+      }
+    }
+
+  }
+
+  public static boolean match(byte[] data, int match)
+  {
+    byte[] tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+    int dataR = Ints.fromByteArray(tempData);
+    //logger.debug("input: {}, output: {}",match,dataR);
+    if (match == dataR) {
+      return true;
+    }
+    return false;
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(HDFSStorageMatching.class);
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java
----------------------------------------------------------------------
diff --git 
a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java 
b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java
new file mode 100644
index 0000000..394ce0e
--- /dev/null
+++ 
b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java
@@ -0,0 +1,85 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * @author Gaurav Gupta  <[email protected]>
+ */
+public class HDFSStoragePerformance
+{
+
+  public static void main(String[] args)
+  {
+    HDFSStorage storage = new HDFSStorage();
+    storage.setBaseDir(".");
+    storage.setId("gaurav_flume_1");
+    storage.setRestore(true);
+    storage.setup(null);
+    int count = 1000000;
+
+    logger.debug(" start time {}", System.currentTimeMillis());
+    int index = 10000;
+    byte[] b = new byte[1024];
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+    }
+    storage.flush();
+    logger.debug(" end time {}", System.currentTimeMillis());
+    logger.debug(" start time for retrieve {}", System.currentTimeMillis());
+    storage.retrieve(new byte[8]);
+    String inputData = new String(b);
+    index = 1;
+    while (true) {
+      b = storage.retrieveNext();
+      if (b == null) {
+        logger.debug(" end time for retrieve {}", System.currentTimeMillis());
+        return;
+      } else {
+        if (!match(b, inputData)) {
+          throw new RuntimeException("failed : " + index);
+        }
+      }
+
+      index++;
+    }
+
+  }
+
+  public static boolean match(byte[] data, String match)
+  {
+    byte[] tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+//    logger.debug("input: {}, output: {}",match,new String(tempData));
+    return (match.equals(new String(tempData)));
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(HDFSStoragePerformance.class);
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java
 
b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java
new file mode 100644
index 0000000..08476c2
--- /dev/null
+++ 
b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java
@@ -0,0 +1,112 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.primitives.Ints;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>HDFSStoragePerformanceTest class.</p>
+ *
+ * @author Gaurav Gupta  <[email protected]>
+ * @since 1.0.1
+ */
+public class HDFSStoragePerformanceTest
+{
+
+  public static void main(String[] args)
+  {
+    HDFSStorage storage = new HDFSStorage();
+    storage.setBaseDir(args[0]);
+    storage.setId(args[1]);
+    storage.setRestore(true);
+    storage.setup(null);
+    int count = 100000000;
+
+    logger.debug(" start time {}", System.currentTimeMillis());
+    int index = 10000;
+    byte[] b = Ints.toByteArray(index);
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    for (int i = 0; i < count; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      index++;
+      b = Ints.toByteArray(index);
+    }
+    storage.flush();
+    logger.debug(" end time {}", System.currentTimeMillis());
+    logger.debug(" start time for retrieve {}", System.currentTimeMillis());
+    b = storage.retrieve(new byte[8]);
+    int org_index = index;
+    index = 10000;
+    match(b, index);
+    while (true) {
+      index++;
+      b = storage.retrieveNext();
+      if (b == null) {
+        logger.debug(" end time for retrieve {}/{}/{}", 
System.currentTimeMillis(), index, org_index);
+        return;
+      } else {
+        if (!match(b, index)) {
+          throw new RuntimeException("failed : " + index);
+        }
+      }
+    }
+
+  }
+
+  public static boolean match(byte[] data, int match)
+  {
+    byte[] tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+    int dataR = Ints.fromByteArray(tempData);
+    //logger.debug("input: {}, output: {}",match,dataR);
+    if (match == dataR) {
+      return true;
+    }
+    return false;
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(HDFSStoragePerformanceTest.class);
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java
----------------------------------------------------------------------
diff --git 
a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java 
b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java
new file mode 100644
index 0000000..b348c8f
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java
@@ -0,0 +1,693 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.storage;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * @author Gaurav Gupta <[email protected]>
+ */
+public class HDFSStorageTest
+{
+  public static class TestMeta extends TestWatcher
+  {
+    public String baseDir;
+    public String testFile;
+    private String testData = "No and yes. There is also IdleTimeHandler that 
allows the operator to emit tuples. " +
+        "There is overlap, why not have a single interface. \n" +
+        "Also consider the possibility of an operator that does other 
processing and not consume nor emit tuples,";
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      String className = description.getClassName();
+      baseDir = "target/" + className;
+      try {
+        baseDir = (new File(baseDir)).getAbsolutePath();
+        FileUtils.forceMkdir(new File(baseDir));
+        testFile = baseDir + "/testInput.txt";
+        FileOutputStream outputStream = FileUtils.openOutputStream(new 
File(testFile));
+        outputStream.write(testData.getBytes());
+        outputStream.close();
+
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        FileUtils.deleteDirectory(new File(baseDir));
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  private String STORAGE_DIRECTORY;
+
+  private HDFSStorage getStorage(String id, boolean restore)
+  {
+    Context ctx = new Context();
+    STORAGE_DIRECTORY = testMeta.baseDir;
+    ctx.put(HDFSStorage.BASE_DIR_KEY, testMeta.baseDir);
+    ctx.put(HDFSStorage.RESTORE_KEY, Boolean.toString(restore));
+    ctx.put(HDFSStorage.ID, id);
+    ctx.put(HDFSStorage.BLOCKSIZE, "256");
+    HDFSStorage lstorage = new HDFSStorage();
+    lstorage.configure(ctx);
+    lstorage.setup(null);
+    return lstorage;
+  }
+
+  private HDFSStorage storage;
+
+  @Before
+  public void setup()
+  {
+    storage = getStorage("1", false);
+  }
+
+  @After
+  public void teardown()
+  {
+    storage.teardown();
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+    storage.cleanHelperFiles();
+  }
+
+  /**
+   * This test covers following use case 1. Some data is stored 2. File is 
flush but the file is not close 3. Some more
+   * data is stored but the file doesn't roll-overs 4. Retrieve is called for 
the last returned address and it return
+   * nulls 5. Some more data is stored again but the address is returned null 
because of previous retrieve call
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPartialFlush() throws Exception
+  {
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    byte[] b = "ab".getBytes();
+    byte[] address = storage.store(new Slice(b, 0, b.length));
+    Assert.assertNotNull(address);
+    storage.flush();
+    b = "cb".getBytes();
+    byte[] addr = storage.store(new Slice(b, 0, b.length));
+    match(storage.retrieve(new byte[8]), "ab");
+    Assert.assertNull(storage.retrieve(addr));
+    Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
+    storage.flush();
+    match(storage.retrieve(address), "cb");
+    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
+  }
+
+  /**
+   * This test covers following use case 1. Some data is stored to make sure 
that there is no roll over 2. File is
+   * flushed but the file is not closed 3. Some more data is stored. The data 
stored is enough to make the file roll
+   * over 4. Retrieve is called for the last returned address and it return 
nulls as the data is not flushed 5. Some
+   * more data is stored again but the address is returned null because of 
previous retrieve call 6. The data is flushed
+   * to make sure that the data is committed. 7. Now the data is retrieved 
from the starting and data returned matches
+   * the data stored
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPartialFlushRollOver() throws Exception
+  {
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 
97, 53, 101, 56, 56, 97, 55, 98, 53, 52,
+        51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 
45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49,
+        45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 
52, 50, 1, 50, 1, 49, 53, 49, 49, 54,
+        49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 
54, 54, 53, 1, 49, 53, 49, 50, 49, 53,
+        52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 
52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1,
+        48, 46, 48, 1, 48, 46, 48};
+    byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 
52, 97, 53, 101, 56, 56, 97, 55, 98, 53,
+        52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 
49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49,
+        49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 
57, 52, 50, 1, 50, 1, 49, 53, 49, 49,
+        54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 
57, 54, 54, 53, 1, 49, 53, 49, 50, 49,
+        53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 
53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48,
+        1, 48, 46, 48, 1, 48, 46, 48};
+    byte[] address = storage.store(new Slice(b, 0, b.length));
+    Assert.assertNotNull(address);
+    storage.flush();
+    byte[] addr = null;
+    for (int i = 0; i < 5; i++) {
+      b[0] = (byte)(b[0] + 1);
+      addr = storage.store(new Slice(b, 0, b.length));
+    }
+    Assert.assertNull(storage.retrieve(addr));
+    for (int i = 0; i < 5; i++) {
+      b[0] = (byte)(b[0] + 1);
+      Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
+    }
+    storage.flush();
+    match(storage.retrieve(new byte[8]), new String(b_org));
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieve(address), new String(b_org));
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieveNext(), new String(b_org));
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieveNext(), new String(b_org));
+
+  }
+
+  /**
+   * This test covers following use case 1. Some data is stored to make sure 
that there is no roll over 2. File is
+   * flushed but the file is not closed 3. Some more data is stored. The data 
stored is enough to make the file roll
+   * over 4. The storage crashes and new storage is instiated. 5. Retrieve is 
called for the last returned address and
+   * it return nulls as the data is not flushed 6. Some more data is stored 
again but the address is returned null
+   * because of previous retrieve call 7. The data is flushed to make sure 
that the data is committed. 8. Now the data
+   * is retrieved from the starting and data returned matches the data stored
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPartialFlushRollOverWithFailure() throws Exception
+  {
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 
97, 53, 101, 56, 56, 97, 55, 98, 53, 52,
+        51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 
45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49,
+        45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 
52, 50, 1, 50, 1, 49, 53, 49, 49, 54,
+        49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 
54, 54, 53, 1, 49, 53, 49, 50, 49, 53,
+        52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 
52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1,
+        48, 46, 48, 1, 48, 46, 48};
+    byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 
52, 97, 53, 101, 56, 56, 97, 55, 98, 53,
+        52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 
49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49,
+        49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 
57, 52, 50, 1, 50, 1, 49, 53, 49, 49,
+        54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 
57, 54, 54, 53, 1, 49, 53, 49, 50, 49,
+        53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 
53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48,
+        1, 48, 46, 48, 1, 48, 46, 48};
+    byte[] address = storage.store(new Slice(b, 0, b.length));
+    Assert.assertNotNull(address);
+    storage.flush();
+    byte[] addr = null;
+    for (int i = 0; i < 5; i++) {
+      b[0] = (byte)(b[0] + 1);
+      addr = storage.store(new Slice(b, 0, b.length));
+    }
+    storage = getStorage("1", true);
+    Assert.assertNull(storage.retrieve(addr));
+    for (int i = 0; i < 5; i++) {
+      b[0] = (byte)(b[0] + 1);
+      Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
+    }
+    storage.flush();
+    match(storage.retrieve(new byte[8]), new String(b_org));
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieve(address), new String(b_org));
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieveNext(), new String(b_org));
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieveNext(), new String(b_org));
+
+  }
+
+  /**
+   * This tests clean when the file doesn't roll over
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPartialFlushWithClean() throws Exception
+  {
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    byte[] b = "ab".getBytes();
+    byte[] address = storage.store(new Slice(b, 0, b.length));
+    Assert.assertNotNull(address);
+    storage.flush();
+    storage.clean(address);
+    b = "cb".getBytes();
+    byte[] addr = storage.store(new Slice(b, 0, b.length));
+    Assert.assertNull(storage.retrieve(addr));
+    Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
+    storage.flush();
+    match(storage.retrieve(new byte[8]), "cb");
+    match(storage.retrieve(address), "cb");
+    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
+  }
+
+  /**
+   * This tests clean when the file doesn't roll over
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPartialFlushWithCleanAndFailure() throws Exception
+  {
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    byte[] b = "ab".getBytes();
+    byte[] address = storage.store(new Slice(b, 0, b.length));
+    Assert.assertNotNull(address);
+    storage.flush();
+    storage.clean(address);
+    b = "cb".getBytes();
+    byte[] addr = storage.store(new Slice(b, 0, b.length));
+    storage = getStorage("1", true);
+    Assert.assertNull(storage.retrieve(addr));
+    Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
+    storage.flush();
+    match(storage.retrieve(new byte[8]), "cb");
+    match(storage.retrieve(address), "cb");
+    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
+  }
+
+  /**
+   * This test covers following use case 1. Some data is stored to make sure 
that there is no roll over 2. File is
+   * flushed but the file is not closed 3. The data is cleaned till the last 
returned address 4. Some more data is
+   * stored. The data stored is enough to make the file roll over 5. Retrieve 
is called for the last returned address
+   * and it return nulls as the data is not flushed 6. Some more data is 
stored again but the address is returned null
+   * because of previous retrieve call 7. The data is flushed to make sure 
that the data is committed. 8. Now the data
+   * is retrieved from the starting and data returned matches the data stored
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPartialFlushWithCleanAndRollOver() throws Exception
+  {
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 
97, 53, 101, 56, 56, 97, 55, 98, 53, 52,
+        51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 
45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49,
+        45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 
52, 50, 1, 50, 1, 49, 53, 49, 49, 54,
+        49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 
54, 54, 53, 1, 49, 53, 49, 50, 49, 53,
+        52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 
52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1,
+        48, 46, 48, 1, 48, 46, 48};
+    byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 
52, 97, 53, 101, 56, 56, 97, 55, 98, 53,
+        52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 
49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49,
+        49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 
57, 52, 50, 1, 50, 1, 49, 53, 49, 49,
+        54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 
57, 54, 54, 53, 1, 49, 53, 49, 50, 49,
+        53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 
53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48,
+        1, 48, 46, 48, 1, 48, 46, 48};
+    byte[] address = storage.store(new Slice(b, 0, b.length));
+    Assert.assertNotNull(address);
+    storage.flush();
+    storage.clean(address);
+
+    byte[] addr = null;
+    for (int i = 0; i < 5; i++) {
+      b[0] = (byte)(b[0] + 1);
+      addr = storage.store(new Slice(b, 0, b.length));
+    }
+    Assert.assertNull(storage.retrieve(addr));
+    for (int i = 0; i < 5; i++) {
+      b[0] = (byte)(b[0] + 1);
+      Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
+    }
+    storage.flush();
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieve(new byte[8]), new String(b_org));
+    match(storage.retrieve(address), new String(b_org));
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieveNext(), new String(b_org));
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieveNext(), new String(b_org));
+
+  }
+
+  /**
+   * This tests the clean when the files are roll-over and the storage fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPartialFlushWithCleanAndRollOverAndFailure() throws Exception
+  {
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 
97, 53, 101, 56, 56, 97, 55, 98, 53, 52,
+        51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 
45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49,
+        45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 
52, 50, 1, 50, 1, 49, 53, 49, 49, 54,
+        49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 
54, 54, 53, 1, 49, 53, 49, 50, 49, 53,
+        52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 
52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1,
+        48, 46, 48, 1, 48, 46, 48};
+    byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 
52, 97, 53, 101, 56, 56, 97, 55, 98, 53,
+        52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 
49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49,
+        49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 
57, 52, 50, 1, 50, 1, 49, 53, 49, 49,
+        54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 
57, 54, 54, 53, 1, 49, 53, 49, 50, 49,
+        53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 
53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48,
+        1, 48, 46, 48, 1, 48, 46, 48};
+    byte[] address = storage.store(new Slice(b, 0, b.length));
+    Assert.assertNotNull(address);
+    storage.flush();
+    storage.clean(address);
+    byte[] addr = null;
+    for (int i = 0; i < 5; i++) {
+      b[0] = (byte)(b[0] + 1);
+      addr = storage.store(new Slice(b, 0, b.length));
+    }
+    storage = getStorage("1", true);
+    Assert.assertNull(storage.retrieve(addr));
+    for (int i = 0; i < 5; i++) {
+      b[0] = (byte)(b[0] + 1);
+      Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
+    }
+    storage.flush();
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieve(address), new String(b_org));
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieveNext(), new String(b_org));
+    b_org[0] = (byte)(b_org[0] + 1);
+    match(storage.retrieveNext(), new String(b_org));
+
+  }
+
+  /**
+   * This test covers following use case The file is flushed and then more 
data is written to the same file, but the new
+   * data is not flushed and file is not roll over and storage fails The new 
storage comes up and client asks for data
+   * at the last returned address from earlier storage instance. The new 
storage returns null. Client stores the data
+   * again but the address returned this time is null and the retrieval of the 
earlier address now returns data
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPartialFlushWithFailure() throws Exception
+  {
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    byte[] b = "ab".getBytes();
+    byte[] address = storage.store(new Slice(b, 0, b.length));
+    Assert.assertNotNull(address);
+    storage.flush();
+    b = "cb".getBytes();
+    byte[] addr = storage.store(new Slice(b, 0, b.length));
+    storage = getStorage("1", true);
+    Assert.assertNull(storage.retrieve(addr));
+    Assert.assertNull(storage.store(new Slice(b, 0, b.length)));
+    storage.flush();
+    match(storage.retrieve(address), "cb");
+  }
+
+  private void match(byte[] data, String match)
+  {
+    byte[] tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+    Assert.assertEquals("matched the stored value with retrieved value", 
match, new String(tempData));
+  }
+
+  @Test
+  public void testStorage() throws IOException
+  {
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    byte[] b = new byte[200];
+    byte[] identifier;
+    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
+    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
+    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
+    storage.flush();
+    byte[] data = storage.retrieve(new byte[8]);
+    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
+    identifier = storage.store(new Slice(b, 0, b.length));
+    byte[] tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+    Assert.assertEquals("matched the stored value with retrieved value", new 
String(b), new String(tempData));
+    Assert.assertNull(storage.retrieve(identifier));
+  }
+
+  @Test
+  public void testStorageWithRestore() throws IOException
+  {
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    byte[] b = new byte[200];
+    Assert.assertNotNull(storage.store(new Slice(b, 0, b.length)));
+    storage.flush();
+    storage.teardown();
+
+    storage = getStorage("1", true);
+    storage.store(new Slice(b, 0, b.length));
+    storage.flush();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    boolean exists = fs.exists(new Path(STORAGE_DIRECTORY + "/1/" + "1"));
+    Assert.assertEquals("file should exist", true, exists);
+  }
+
+  @Test
+  public void testCleanup() throws IOException
+  {
+    RandomAccessFile r = new RandomAccessFile(testMeta.testFile, "r");
+    r.seek(0);
+    byte[] b = r.readLine().getBytes();
+    storage.store(new Slice(b, 0, b.length));
+    byte[] val = storage.store(new Slice(b, 0, b.length));
+    storage.flush();
+    storage.clean(val);
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    boolean exists = fs.exists(new Path(STORAGE_DIRECTORY + "/" + "0"));
+    Assert.assertEquals("file should not exist", false, exists);
+    r.close();
+  }
+
+  @Test
+  public void testNext() throws IOException
+  {
+    RandomAccessFile r = new RandomAccessFile(testMeta.testFile, "r");
+    r.seek(0);
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    byte[] b = r.readLine().getBytes();
+    storage.store(new Slice(b, 0, b.length));
+    byte[] b1 = r.readLine().getBytes();
+    storage.store(new Slice(b1, 0, b1.length));
+    storage.store(new Slice(b, 0, b.length));
+    storage.flush();
+    storage.store(new Slice(b1, 0, b1.length));
+    storage.store(new Slice(b, 0, b.length));
+    storage.flush();
+    byte[] data = storage.retrieve(new byte[8]);
+    byte[] tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+    Assert.assertEquals("matched the stored value with retrieved value", new 
String(b), new String(tempData));
+    data = storage.retrieveNext();
+    tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+    Assert.assertEquals("matched the stored value with retrieved value", new 
String(b1), new String(tempData));
+    data = storage.retrieveNext();
+    tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+    Assert.assertEquals("matched the stored value with retrieved value", new 
String(b), new String(tempData));
+    r.close();
+  }
+
+  @Test
+  public void testFailure() throws IOException
+  {
+    byte[] address;
+    byte[] b = new byte[200];
+    storage.retrieve(new byte[8]);
+    for (int i = 0; i < 5; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      address = storage.store(new Slice(b, 0, b.length));
+      storage.flush();
+      storage.clean(address);
+    }
+    storage.teardown();
+
+    byte[] identifier = new byte[8];
+    storage = getStorage("1", true);
+
+    storage.retrieve(identifier);
+
+    storage.store(new Slice(b, 0, b.length));
+    storage.store(new Slice(b, 0, b.length));
+    storage.store(new Slice(b, 0, b.length));
+    storage.flush();
+    byte[] data = storage.retrieve(identifier);
+    byte[] tempData = new byte[data.length - 8];
+    System.arraycopy(data, 8, tempData, 0, tempData.length);
+    Assert.assertEquals("matched the stored value with retrieved value", new 
String(b), new String(tempData));
+  }
+
+  /**
+   * This test case tests the clean call before any flush is called.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testCleanUnflushedData() throws IOException
+  {
+    for (int i = 0; i < 5; i++) {
+      final byte[] bytes = (i + "").getBytes();
+      storage.store(new Slice(bytes, 0, bytes.length));
+    }
+    storage.clean(new byte[8]);
+    storage.flush();
+    match(storage.retrieve(new byte[8]), "0");
+    match(storage.retrieveNext(), "1");
+  }
+
+  @Test
+  public void testCleanForUnflushedData() throws IOException
+  {
+    byte[] address = null;
+    byte[] b = new byte[200];
+    storage.retrieve(new byte[8]);
+    for (int i = 0; i < 5; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      address = storage.store(new Slice(b, 0, b.length));
+      storage.flush();
+      // storage.clean(address);
+    }
+    byte[] lastWrittenAddress = null;
+    for (int i = 0; i < 5; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      lastWrittenAddress = storage.store(new Slice(b, 0, b.length));
+    }
+    storage.clean(lastWrittenAddress);
+    byte[] cleanedOffset = storage.readData(new Path(STORAGE_DIRECTORY + 
"/1/cleanoffsetFile"));
+    Assert.assertArrayEquals(address, cleanedOffset);
+
+  }
+
+  @Test
+  public void testCleanForFlushedData() throws IOException
+  {
+    byte[] b = new byte[200];
+    storage.retrieve(new byte[8]);
+    for (int i = 0; i < 5; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      storage.store(new Slice(b, 0, b.length));
+      storage.flush();
+      // storage.clean(address);
+    }
+    byte[] lastWrittenAddress = null;
+    for (int i = 0; i < 5; i++) {
+      storage.store(new Slice(b, 0, b.length));
+      lastWrittenAddress = storage.store(new Slice(b, 0, b.length));
+    }
+    storage.flush();
+    storage.clean(lastWrittenAddress);
+    byte[] cleanedOffset = storage.readData(new Path(STORAGE_DIRECTORY + 
"/1/cleanoffsetFile"));
+    Assert.assertArrayEquals(lastWrittenAddress, cleanedOffset);
+
+  }
+
+  @Test
+  public void testCleanForPartialFlushedData() throws IOException
+  {
+    byte[] b = new byte[8];
+    storage.retrieve(new byte[8]);
+
+    storage.store(new Slice(b, 0, b.length));
+    byte[] bytes = "1a".getBytes();
+    byte[] address = storage.store(new Slice(bytes, 0, bytes.length));
+    storage.flush();
+    storage.clean(address);
+
+    byte[] lastWrittenAddress = null;
+    for (int i = 0; i < 5; i++) {
+      final byte[] bytes1 = (i + "").getBytes();
+      storage.store(new Slice(bytes1, 0, bytes1.length));
+      lastWrittenAddress = storage.store(new Slice(b, 0, b.length));
+    }
+    Assert.assertNull(storage.retrieve(new byte[8]));
+    Assert.assertNull(storage.retrieve(lastWrittenAddress));
+    storage.store(new Slice(b, 0, b.length));
+    storage.flush();
+    Assert.assertNull(storage.retrieve(lastWrittenAddress));
+  }
+
+  @Test
+  public void testRandomSequence() throws IOException
+  {
+    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
+    byte[] bytes = new byte[]{48, 48, 48, 51, 101, 100, 55, 56, 55, 49, 53, 
99, 52, 101, 55, 50, 97, 52, 48, 49, 51,
+        99, 97, 54, 102, 57, 55, 53, 57, 100, 49, 99, 1, 50, 48, 49, 51, 45, 
49, 49, 45, 48, 55, 1, 50, 48, 49, 51,
+        45, 49, 49, 45, 48, 55, 32, 48, 48, 58, 48, 48, 58, 52, 54, 1, 52, 50, 
49, 50, 51, 1, 50, 1, 49, 53, 49, 49,
+        52, 50, 54, 53, 1, 49, 53, 49, 49, 57, 51, 53, 49, 1, 49, 53, 49, 50, 
57, 56, 50, 52, 1, 49, 53, 49, 50, 49,
+        55, 48, 55, 1, 49, 48, 48, 55, 55, 51, 57, 51, 1, 49, 57, 49, 52, 55, 
50, 53, 52, 54, 49, 1, 49, 1, 48, 1, 48,
+        46, 48, 1, 48, 46, 48, 1, 48, 46, 48};
+    storage.store(new Slice(bytes, 0, bytes.length));
+    storage.flush();
+    storage.clean(new byte[]{-109, 0, 0, 0, 0, 0, 0, 0});
+    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
+    for (int i = 0; i < 2555; i++) {
+      byte[] bytes1 = new byte[]{48, 48, 48, 55, 56, 51, 98, 101, 50, 54, 50, 
98, 52, 102, 50, 54, 56, 97, 55, 56, 102,
+          48, 54, 54, 50, 49, 49, 54, 99, 98, 101, 99, 1, 50, 48, 49, 51, 45, 
49, 49, 45, 48, 55, 1, 50, 48, 49, 51,
+          45, 49, 49, 45, 48, 55, 32, 48, 48, 58, 48, 48, 58, 53, 49, 1, 49, 
49, 49, 49, 54, 51, 57, 1, 50, 1, 49, 53,
+          49, 48, 57, 57, 56, 51, 1, 49, 53, 49, 49, 49, 55, 48, 52, 1, 49, 
53, 49, 50, 49, 51, 55, 49, 1, 49, 53, 49,
+          49, 52, 56, 51, 49, 1, 49, 48, 48, 55, 49, 57, 56, 49, 1, 49, 50, 
48, 50, 55, 54, 49, 54, 56, 53, 1, 49, 1,
+          48, 1, 48, 46, 48, 1, 48, 46, 48, 1, 48, 46, 48};
+      storage.store(new Slice(bytes1, 0, bytes1.length));
+      storage.flush();
+    }
+    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
+    for (int i = 0; i < 1297; i++) {
+      storage.retrieveNext();
+    }
+    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
+    for (int i = 0; i < 1302; i++) {
+      storage.retrieveNext();
+    }
+    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
+    for (int i = 0; i < 1317; i++) {
+      storage.retrieveNext();
+    }
+    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
+    for (int i = 0; i < 2007; i++) {
+      storage.retrieveNext();
+    }
+    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
+    for (int i = 0; i < 2556; i++) {
+      storage.retrieveNext();
+    }
+    byte[] bytes1 = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 
52, 97, 53, 101, 56, 56, 97, 55, 98, 53,
+        52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 
49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49,
+        49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 
57, 52, 50, 1, 50, 1, 49, 53, 49, 49,
+        54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 
57, 54, 54, 53, 1, 49, 53, 49, 50, 49,
+        53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 
53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48,
+        1, 48, 46, 48, 1, 48, 46, 48};
+    storage.store(new Slice(bytes1, 0, bytes1.length));
+    storage.flush();
+    storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0});
+    for (int i = 0; i < 2062; i++) {
+      storage.retrieveNext();
+
+    }
+  }
+
+  @SuppressWarnings("unused")
+  private static final Logger logger = 
LoggerFactory.getLogger(HDFSStorageTest.class);
+}

Reply via email to