Storage
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/44326514 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/44326514 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/44326514 Branch: refs/heads/master Commit: 4432651437e8aac8b442c0f86a6775adc47e962c Parents: bbdab0e Author: gaurav <[email protected]> Authored: Sun Feb 19 21:29:56 2017 +0530 Committer: Pramod Immaneni <[email protected]> Committed: Mon May 22 16:47:34 2017 -0700 ---------------------------------------------------------------------- .../datatorrent/flume/storage/HDFSStorage.java | 946 +++++++++++++++++++ .../com/datatorrent/flume/storage/Storage.java | 73 ++ .../flume/storage/HDFSStorageMatching.java | 109 +++ .../flume/storage/HDFSStoragePerformance.java | 85 ++ .../storage/HDFSStoragePerformanceTest.java | 112 +++ .../flume/storage/HDFSStorageTest.java | 693 ++++++++++++++ 6 files changed, 2018 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java b/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java new file mode 100644 index 0000000..74849e9 --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java @@ -0,0 +1,946 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.storage; + +import java.io.DataInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; + +import com.datatorrent.api.Component; +import com.datatorrent.common.util.NameableThreadFactory; +import com.datatorrent.flume.sink.Server; +import com.datatorrent.netlet.util.Slice; + +/** + * HDFSStorage is developed to store and retrieve the data from HDFS + * <p /> + * The properties that can be set on HDFSStorage are: <br /> + * baseDir - The base directory where the data is going to be stored <br /> + * restore - This is used to restore the application from previous failure <br /> + * blockSize - The maximum size of the each file to created. <br /> + * + * @author Gaurav Gupta <[email protected]> + * @since 0.9.3 + */ +public class HDFSStorage implements Storage, Configurable, Component<com.datatorrent.api.Context> +{ + public static final int DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; + public static final String BASE_DIR_KEY = "baseDir"; + public static final String RESTORE_KEY = "restore"; + public static final String BLOCKSIZE = "blockSize"; + public static final String BLOCK_SIZE_MULTIPLE = "blockSizeMultiple"; + public static final String NUMBER_RETRY = "retryCount"; + + private static final String OFFSET_SUFFIX = "-offsetFile"; + private static final String BOOK_KEEPING_FILE_OFFSET = "-bookKeepingOffsetFile"; + private static final String FLUSHED_IDENTITY_FILE = "flushedCounter"; + private static final String CLEAN_OFFSET_FILE = "cleanoffsetFile"; + private static final String FLUSHED_IDENTITY_FILE_TEMP = "flushedCounter.tmp"; + private static final String CLEAN_OFFSET_FILE_TEMP = "cleanoffsetFile.tmp"; + private static final int IDENTIFIER_SIZE = 8; + private static final int DATA_LENGTH_BYTE_SIZE = 4; + + /** + * Number of times the storage will try to get the filesystem + */ + private int retryCount = 3; + /** + * The multiple of block size + */ + private int blockSizeMultiple = 1; + /** + * Identifier for this storage. + */ + @NotNull + private String id; + /** + * The baseDir where the storage facility is going to create files. + */ + @NotNull + private String baseDir; + /** + * The block size to be used to create the storage files + */ + private long blockSize; + /** + * + */ + private boolean restore; + /** + * This identifies the current file number + */ + private long currentWrittenFile; + /** + * This identifies the file number that has been flushed + */ + private long flushedFileCounter; + /** + * The file that stores the fileCounter information + */ + // private Path fileCounterFile; + /** + * The file that stores the flushed fileCounter information + */ + private Path flushedCounterFile; + private Path flushedCounterFileTemp; + /** + * This identifies the last cleaned file number + */ + private long cleanedFileCounter; + /** + * The file that stores the clean file counter information + */ + // private Path cleanFileCounterFile; + /** + * The file that stores the clean file offset information + */ + private Path cleanFileOffsetFile; + private Path cleanFileOffsetFileTemp; + private FileSystem fs; + private FSDataOutputStream dataStream; + ArrayList<DataBlock> files2Commit = new ArrayList<DataBlock>(); + /** + * The offset in the current opened file + */ + private long fileWriteOffset; + private FSDataInputStream readStream; + private long retrievalOffset; + private long retrievalFile; + private int offset; + private long flushedLong; + private long flushedFileWriteOffset; + private long bookKeepingFileOffset; + private byte[] cleanedOffset = new byte[8]; + private long skipOffset; + private long skipFile; + private transient Path basePath; + private ExecutorService storageExecutor; + private byte[] currentData; + private FSDataInputStream nextReadStream; + private long nextFlushedLong; + private long nextRetrievalFile; + private byte[] nextRetrievalData; + + public HDFSStorage() + { + this.restore = true; + } + + /** + * This stores the Identifier information identified in the last store function call + * + * @param ctx + */ + @Override + public void configure(Context ctx) + { + String tempId = ctx.getString(ID); + if (tempId == null) { + if (id == null) { + throw new IllegalArgumentException("id can't be null."); + } + } else { + id = tempId; + } + + String tempBaseDir = ctx.getString(BASE_DIR_KEY); + if (tempBaseDir != null) { + baseDir = tempBaseDir; + } + + restore = ctx.getBoolean(RESTORE_KEY, restore); + Long tempBlockSize = ctx.getLong(BLOCKSIZE); + if (tempBlockSize != null) { + blockSize = tempBlockSize; + } + blockSizeMultiple = ctx.getInteger(BLOCK_SIZE_MULTIPLE, blockSizeMultiple); + retryCount = ctx.getInteger(NUMBER_RETRY,retryCount); + } + + /** + * This function reads the file at a location and return the bytes stored in the file " + * + * @param path - the location of the file + * @return + * @throws IOException + */ + byte[] readData(Path path) throws IOException + { + DataInputStream is = new DataInputStream(fs.open(path)); + byte[] bytes = new byte[is.available()]; + is.readFully(bytes); + is.close(); + return bytes; + } + + /** + * This function writes the bytes to a file specified by the path + * + * @param path the file location + * @param data the data to be written to the file + * @return + * @throws IOException + */ + private FSDataOutputStream writeData(Path path, byte[] data) throws IOException + { + FSDataOutputStream fsOutputStream; + if (fs.getScheme().equals("file")) { + // local FS does not support hflush and does not flush native stream + fsOutputStream = new FSDataOutputStream( + new FileOutputStream(Path.getPathWithoutSchemeAndAuthority(path).toString()), null); + } else { + fsOutputStream = fs.create(path); + } + fsOutputStream.write(data); + return fsOutputStream; + } + + private long calculateOffset(long fileOffset, long fileCounter) + { + return ((fileCounter << 32) | (fileOffset & 0xffffffffL)); + } + + @Override + public byte[] store(Slice slice) + { + // logger.debug("store message "); + int bytesToWrite = slice.length + DATA_LENGTH_BYTE_SIZE; + if (currentWrittenFile < skipFile) { + fileWriteOffset += bytesToWrite; + if (fileWriteOffset >= bookKeepingFileOffset) { + files2Commit.add(new DataBlock(null, bookKeepingFileOffset, + new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile)); + currentWrittenFile++; + if (fileWriteOffset > bookKeepingFileOffset) { + fileWriteOffset = bytesToWrite; + } else { + fileWriteOffset = 0; + } + try { + bookKeepingFileOffset = getFlushedFileWriteOffset( + new Path(basePath, currentWrittenFile + BOOK_KEEPING_FILE_OFFSET)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return null; + } + + if (flushedFileCounter == currentWrittenFile && dataStream == null) { + currentWrittenFile++; + fileWriteOffset = 0; + } + + if (flushedFileCounter == skipFile && skipFile != -1) { + skipFile++; + } + + if (fileWriteOffset + bytesToWrite < blockSize) { + try { + /* write length and the actual data to the file */ + if (fileWriteOffset == 0) { + // writeData(flushedCounterFile, String.valueOf(currentWrittenFile).getBytes()).close(); + dataStream = writeData(new Path(basePath, String.valueOf(currentWrittenFile)), + Ints.toByteArray(slice.length)); + dataStream.write(slice.buffer, slice.offset, slice.length); + } else { + dataStream.write(Ints.toByteArray(slice.length)); + dataStream.write(slice.buffer, slice.offset, slice.length); + } + fileWriteOffset += bytesToWrite; + + byte[] fileOffset = null; + if ((currentWrittenFile > skipFile) || (currentWrittenFile == skipFile && fileWriteOffset > skipOffset)) { + skipFile = -1; + fileOffset = new byte[IDENTIFIER_SIZE]; + Server.writeLong(fileOffset, 0, calculateOffset(fileWriteOffset, currentWrittenFile)); + } + return fileOffset; + } catch (IOException ex) { + logger.warn("Error while storing the bytes {}", ex.getMessage()); + closeFs(); + throw new RuntimeException(ex); + } + } + DataBlock db = new DataBlock(dataStream, fileWriteOffset, + new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile); + db.close(); + files2Commit.add(db); + fileWriteOffset = 0; + ++currentWrittenFile; + return store(slice); + } + + /** + * @param b + * @param startIndex + * @return + */ + long byteArrayToLong(byte[] b, int startIndex) + { + final byte b1 = 0; + return Longs.fromBytes(b1, b1, b1, b1, b[3 + startIndex], b[2 + startIndex], b[1 + startIndex], b[startIndex]); + } + + @Override + public byte[] retrieve(byte[] identifier) + { + skipFile = -1; + skipOffset = 0; + logger.debug("retrieve with address {}", Arrays.toString(identifier)); + // flushing the last incomplete flushed file + closeUnflushedFiles(); + + retrievalOffset = byteArrayToLong(identifier, 0); + retrievalFile = byteArrayToLong(identifier, offset); + + if (retrievalFile == 0 && retrievalOffset == 0 && currentWrittenFile == 0 && fileWriteOffset == 0) { + skipOffset = 0; + return null; + } + + // making sure that the deleted address is not requested again + if (retrievalFile != 0 || retrievalOffset != 0) { + long cleanedFile = byteArrayToLong(cleanedOffset, offset); + if (retrievalFile < cleanedFile || (retrievalFile == cleanedFile && + retrievalOffset < byteArrayToLong(cleanedOffset, 0))) { + logger.warn("The address asked has been deleted retrievalFile={}, cleanedFile={}, retrievalOffset={}, " + + "cleanedOffset={}", retrievalFile, cleanedFile, retrievalOffset, byteArrayToLong(cleanedOffset, 0)); + closeFs(); + throw new IllegalArgumentException(String.format("The data for address %s has already been deleted", + Arrays.toString(identifier))); + } + } + + // we have just started + if (retrievalFile == 0 && retrievalOffset == 0) { + retrievalFile = byteArrayToLong(cleanedOffset, offset); + retrievalOffset = byteArrayToLong(cleanedOffset, 0); + } + + if ((retrievalFile > flushedFileCounter)) { + skipFile = retrievalFile; + skipOffset = retrievalOffset; + retrievalFile = -1; + return null; + } + if ((retrievalFile == flushedFileCounter && retrievalOffset >= flushedFileWriteOffset)) { + skipFile = retrievalFile; + skipOffset = retrievalOffset - flushedFileWriteOffset; + retrievalFile = -1; + return null; + } + + try { + if (readStream != null) { + readStream.close(); + readStream = null; + } + Path path = new Path(basePath, String.valueOf(retrievalFile)); + if (!fs.exists(path)) { + retrievalFile = -1; + closeFs(); + throw new RuntimeException(String.format("File %s does not exist", path.toString())); + } + + byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); + flushedLong = Server.readLong(flushedOffset, 0); + while (retrievalOffset >= flushedLong && retrievalFile < flushedFileCounter) { + retrievalOffset -= flushedLong; + retrievalFile++; + flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); + flushedLong = Server.readLong(flushedOffset, 0); + } + + if (retrievalOffset >= flushedLong) { + logger.warn("data not flushed for the given identifier"); + retrievalFile = -1; + return null; + } + synchronized (HDFSStorage.this) { + if (nextReadStream != null) { + nextReadStream.close(); + nextReadStream = null; + } + } + currentData = null; + path = new Path(basePath, String.valueOf(retrievalFile)); + //readStream = new FSDataInputStream(fs.open(path)); + currentData = readData(path); + //readStream.seek(retrievalOffset); + storageExecutor.submit(getNextStream()); + return retrieveHelper(); + } catch (IOException e) { + closeFs(); + throw new RuntimeException(e); + } + } + + private byte[] retrieveHelper() throws IOException + { + int tempRetrievalOffset = (int)retrievalOffset; + int length = Ints.fromBytes(currentData[tempRetrievalOffset], currentData[tempRetrievalOffset + 1], + currentData[tempRetrievalOffset + 2], currentData[tempRetrievalOffset + 3]); + byte[] data = new byte[length + IDENTIFIER_SIZE]; + System.arraycopy(currentData, tempRetrievalOffset + 4, data, IDENTIFIER_SIZE, length); + retrievalOffset += length + DATA_LENGTH_BYTE_SIZE; + if (retrievalOffset >= flushedLong) { + Server.writeLong(data, 0, calculateOffset(0, retrievalFile + 1)); + } else { + Server.writeLong(data, 0, calculateOffset(retrievalOffset, retrievalFile)); + } + return data; + } + + @Override + public byte[] retrieveNext() + { + if (retrievalFile == -1) { + closeFs(); + throw new RuntimeException("Call retrieve first"); + } + + if (retrievalFile > flushedFileCounter) { + logger.warn("data is not flushed"); + return null; + } + + try { + if (currentData == null) { + synchronized (HDFSStorage.this) { + if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) { + currentData = nextRetrievalData; + flushedLong = nextFlushedLong; + nextRetrievalData = null; + } else { + currentData = null; + currentData = readData(new Path(basePath, String.valueOf(retrievalFile))); + byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); + flushedLong = Server.readLong(flushedOffset, 0); + } + } + storageExecutor.submit(getNextStream()); + } + + if (retrievalOffset >= flushedLong) { + retrievalFile++; + retrievalOffset = 0; + + if (retrievalFile > flushedFileCounter) { + logger.warn("data is not flushed"); + return null; + } + + //readStream.close(); + // readStream = new FSDataInputStream(fs.open(new Path(basePath, String.valueOf(retrievalFile)))); + // byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); + // flushedLong = Server.readLong(flushedOffset, 0); + + synchronized (HDFSStorage.this) { + if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) { + currentData = nextRetrievalData; + flushedLong = nextFlushedLong; + nextRetrievalData = null; + } else { + currentData = null; + currentData = readData(new Path(basePath, String.valueOf(retrievalFile))); + byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); + flushedLong = Server.readLong(flushedOffset, 0); + } + } + storageExecutor.submit(getNextStream()); + } + //readStream.seek(retrievalOffset); + return retrieveHelper(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter") + public void clean(byte[] identifier) + { + logger.info("clean {}", Arrays.toString(identifier)); + long cleanFileIndex = byteArrayToLong(identifier, offset); + + long cleanFileOffset = byteArrayToLong(identifier, 0); + if (flushedFileCounter == -1) { + identifier = new byte[8]; + } else if (cleanFileIndex > flushedFileCounter || + (cleanFileIndex == flushedFileCounter && cleanFileOffset >= flushedFileWriteOffset)) { + // This is to make sure that we clean only the data that is flushed + cleanFileIndex = flushedFileCounter; + cleanFileOffset = flushedFileWriteOffset; + Server.writeLong(identifier, 0, calculateOffset(cleanFileOffset, cleanFileIndex)); + } + cleanedOffset = identifier; + + try { + writeData(cleanFileOffsetFileTemp, identifier).close(); + fs.rename(cleanFileOffsetFileTemp, cleanFileOffsetFile); + if (cleanedFileCounter >= cleanFileIndex) { + return; + } + do { + Path path = new Path(basePath, String.valueOf(cleanedFileCounter)); + if (fs.exists(path) && fs.isFile(path)) { + fs.delete(path, false); + } + path = new Path(basePath, cleanedFileCounter + OFFSET_SUFFIX); + if (fs.exists(path) && fs.isFile(path)) { + fs.delete(path, false); + } + path = new Path(basePath, cleanedFileCounter + BOOK_KEEPING_FILE_OFFSET); + if (fs.exists(path) && fs.isFile(path)) { + fs.delete(path, false); + } + logger.info("deleted file {}", cleanedFileCounter); + ++cleanedFileCounter; + } while (cleanedFileCounter < cleanFileIndex); + // writeData(cleanFileCounterFile, String.valueOf(cleanedFileCounter).getBytes()).close(); + + } catch (IOException e) { + logger.warn("not able to close the streams {}", e.getMessage()); + closeFs(); + throw new RuntimeException(e); + } + } + + /** + * This is used mainly for cleaning up of counter files created + */ + void cleanHelperFiles() + { + try { + fs.delete(basePath, true); + } catch (IOException e) { + logger.warn(e.getMessage()); + } + } + + private void closeUnflushedFiles() + { + try { + files2Commit.clear(); + // closing the stream + if (dataStream != null) { + dataStream.close(); + dataStream = null; + // currentWrittenFile++; + // fileWriteOffset = 0; + } + + if (!fs.exists(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX))) { + fs.delete(new Path(basePath, String.valueOf(currentWrittenFile)), false); + } + + if (fs.exists(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX))) { + // This means that flush was called + flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX)); + bookKeepingFileOffset = getFlushedFileWriteOffset( + new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET)); + } + + if (flushedFileCounter != -1) { + currentWrittenFile = flushedFileCounter; + fileWriteOffset = flushedFileWriteOffset; + } else { + currentWrittenFile = 0; + fileWriteOffset = 0; + } + + flushedLong = 0; + + } catch (IOException e) { + closeFs(); + throw new RuntimeException(e); + } + } + + @Override + public void flush() + { + nextReadStream = null; + StringBuilder builder = new StringBuilder(); + Iterator<DataBlock> itr = files2Commit.iterator(); + DataBlock db; + try { + while (itr.hasNext()) { + db = itr.next(); + db.updateOffsets(); + builder.append(db.fileName).append(", "); + } + files2Commit.clear(); + + if (dataStream != null) { + dataStream.hflush(); + writeData(flushedCounterFileTemp, String.valueOf(currentWrittenFile).getBytes()).close(); + fs.rename(flushedCounterFileTemp, flushedCounterFile); + updateFlushedOffset(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), fileWriteOffset); + flushedFileWriteOffset = fileWriteOffset; + builder.append(currentWrittenFile); + } + logger.debug("flushed files {}", builder.toString()); + } catch (IOException ex) { + logger.warn("not able to close the stream {}", ex.getMessage()); + closeFs(); + throw new RuntimeException(ex); + } + flushedFileCounter = currentWrittenFile; + // logger.debug("flushedFileCounter in flush {}",flushedFileCounter); + } + + /** + * This updates the flushed offset + */ + private void updateFlushedOffset(Path file, long bytesWritten) + { + byte[] lastStoredOffset = new byte[IDENTIFIER_SIZE]; + Server.writeLong(lastStoredOffset, 0, bytesWritten); + try { + writeData(file, lastStoredOffset).close(); + } catch (IOException e) { + try { + if (!Arrays.equals(readData(file), lastStoredOffset)) { + closeFs(); + throw new RuntimeException(e); + } + } catch (Exception e1) { + closeFs(); + throw new RuntimeException(e1); + } + } + } + + public int getBlockSizeMultiple() + { + return blockSizeMultiple; + } + + public void setBlockSizeMultiple(int blockSizeMultiple) + { + this.blockSizeMultiple = blockSizeMultiple; + } + + /** + * @return the baseDir + */ + public String getBaseDir() + { + return baseDir; + } + + /** + * @param baseDir the baseDir to set + */ + public void setBaseDir(String baseDir) + { + this.baseDir = baseDir; + } + + /** + * @return the id + */ + public String getId() + { + return id; + } + + /** + * @param id the id to set + */ + public void setId(String id) + { + this.id = id; + } + + /** + * @return the blockSize + */ + public long getBlockSize() + { + return blockSize; + } + + /** + * @param blockSize the blockSize to set + */ + public void setBlockSize(long blockSize) + { + this.blockSize = blockSize; + } + + /** + * @return the restore + */ + public boolean isRestore() + { + return restore; + } + + /** + * @param restore the restore to set + */ + public void setRestore(boolean restore) + { + this.restore = restore; + } + + class DataBlock + { + FSDataOutputStream dataStream; + long dataOffset; + Path path2FlushedData; + long fileName; + private Path bookKeepingPath; + + DataBlock(FSDataOutputStream stream, long bytesWritten, Path path2FlushedData, long fileName) + { + this.dataStream = stream; + this.dataOffset = bytesWritten; + this.path2FlushedData = path2FlushedData; + this.fileName = fileName; + } + + public void close() + { + if (dataStream != null) { + try { + dataStream.close(); + bookKeepingPath = new Path(basePath, fileName + BOOK_KEEPING_FILE_OFFSET); + updateFlushedOffset(bookKeepingPath, dataOffset); + } catch (IOException ex) { + logger.warn("not able to close the stream {}", ex.getMessage()); + closeFs(); + throw new RuntimeException(ex); + } + } + } + + public void updateOffsets() throws IOException + { + updateFlushedOffset(path2FlushedData, dataOffset); + if (bookKeepingPath != null && fs.exists(bookKeepingPath)) { + fs.delete(bookKeepingPath, false); + } + } + + } + + private static final Logger logger = LoggerFactory.getLogger(HDFSStorage.class); + + @Override + public void setup(com.datatorrent.api.Context context) + { + Configuration conf = new Configuration(); + if (baseDir == null) { + baseDir = conf.get("hadoop.tmp.dir"); + if (baseDir == null || baseDir.isEmpty()) { + throw new IllegalArgumentException("baseDir cannot be null."); + } + } + offset = 4; + skipOffset = -1; + skipFile = -1; + int tempRetryCount = 0; + while (tempRetryCount < retryCount && fs == null) { + try { + fs = FileSystem.newInstance(conf); + tempRetryCount++; + } catch (Throwable throwable) { + logger.warn("Not able to get file system ", throwable); + } + } + + try { + Path path = new Path(baseDir); + basePath = new Path(path, id); + if (fs == null) { + fs = FileSystem.newInstance(conf); + } + if (!fs.exists(path)) { + closeFs(); + throw new RuntimeException(String.format("baseDir passed (%s) doesn't exist.", baseDir)); + } + if (!fs.isDirectory(path)) { + closeFs(); + throw new RuntimeException(String.format("baseDir passed (%s) is not a directory.", baseDir)); + } + if (!restore) { + fs.delete(basePath, true); + } + if (!fs.exists(basePath) || !fs.isDirectory(basePath)) { + fs.mkdirs(basePath); + } + + if (blockSize == 0) { + blockSize = fs.getDefaultBlockSize(new Path(basePath, "tempData")); + } + if (blockSize == 0) { + blockSize = DEFAULT_BLOCK_SIZE; + } + + blockSize = blockSizeMultiple * blockSize; + + currentWrittenFile = 0; + cleanedFileCounter = -1; + retrievalFile = -1; + // fileCounterFile = new Path(basePath, IDENTITY_FILE); + flushedFileCounter = -1; + // cleanFileCounterFile = new Path(basePath, CLEAN_FILE); + cleanFileOffsetFile = new Path(basePath, CLEAN_OFFSET_FILE); + cleanFileOffsetFileTemp = new Path(basePath, CLEAN_OFFSET_FILE_TEMP); + flushedCounterFile = new Path(basePath, FLUSHED_IDENTITY_FILE); + flushedCounterFileTemp = new Path(basePath, FLUSHED_IDENTITY_FILE_TEMP); + + if (restore) { + // + // if (fs.exists(fileCounterFile) && fs.isFile(fileCounterFile)) { + // //currentWrittenFile = Long.valueOf(new String(readData(fileCounterFile))); + // } + + if (fs.exists(cleanFileOffsetFile) && fs.isFile(cleanFileOffsetFile)) { + cleanedOffset = readData(cleanFileOffsetFile); + } + + if (fs.exists(flushedCounterFile) && fs.isFile(flushedCounterFile)) { + String strFlushedFileCounter = new String(readData(flushedCounterFile)); + if (strFlushedFileCounter.isEmpty()) { + logger.warn("empty flushed file"); + } else { + flushedFileCounter = Long.valueOf(strFlushedFileCounter); + flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX)); + bookKeepingFileOffset = getFlushedFileWriteOffset( + new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET)); + } + + } + } + fileWriteOffset = flushedFileWriteOffset; + currentWrittenFile = flushedFileCounter; + cleanedFileCounter = byteArrayToLong(cleanedOffset, offset) - 1; + if (currentWrittenFile == -1) { + ++currentWrittenFile; + fileWriteOffset = 0; + } + + } catch (IOException io) { + + throw new RuntimeException(io); + } + storageExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("StorageHelper")); + } + + private void closeFs() + { + if (fs != null) { + try { + fs.close(); + fs = null; + } catch (IOException e) { + logger.debug(e.getMessage()); + } + } + } + + private long getFlushedFileWriteOffset(Path filePath) throws IOException + { + if (flushedFileCounter != -1 && fs.exists(filePath)) { + byte[] flushedFileOffsetByte = readData(filePath); + if (flushedFileOffsetByte != null && flushedFileOffsetByte.length == 8) { + return Server.readLong(flushedFileOffsetByte, 0); + } + } + return 0; + } + + @Override + public void teardown() + { + logger.debug("called teardown"); + try { + if (readStream != null) { + readStream.close(); + } + synchronized (HDFSStorage.this) { + if (nextReadStream != null) { + nextReadStream.close(); + } + } + + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + closeUnflushedFiles(); + storageExecutor.shutdown(); + } + + } + + private Runnable getNextStream() + { + return new Runnable() + { + @Override + public void run() + { + try { + synchronized (HDFSStorage.this) { + nextRetrievalFile = retrievalFile + 1; + if (nextRetrievalFile > flushedFileCounter) { + nextRetrievalData = null; + return; + } + Path path = new Path(basePath, String.valueOf(nextRetrievalFile)); + Path offsetPath = new Path(basePath, nextRetrievalFile + OFFSET_SUFFIX); + nextRetrievalData = null; + nextRetrievalData = readData(path); + byte[] flushedOffset = readData(offsetPath); + nextFlushedLong = Server.readLong(flushedOffset, 0); + } + } catch (Throwable e) { + logger.warn("in storage executor ", e); + + } + } + }; + } + +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/main/java/com/datatorrent/flume/storage/Storage.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/storage/Storage.java b/flume/src/main/java/com/datatorrent/flume/storage/Storage.java new file mode 100644 index 0000000..9f3a010 --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/storage/Storage.java @@ -0,0 +1,73 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.storage; + +import com.datatorrent.netlet.util.Slice; + +/** + * <p>Storage interface.</p> + * + * @author Gaurav Gupta <[email protected]> + * @since 0.9.2 + */ +public interface Storage +{ + /** + * key in the context for Unique identifier for the storage which may be used to recover from failure. + */ + String ID = "id"; + + /** + * This stores the bytes and returns the unique identifier to retrieve these bytes + * + * @param bytes + * @return + */ + byte[] store(Slice bytes); + + /** + * This returns the data bytes for the current identifier and the identifier for next data bytes. <br/> + * The first eight bytes contain the identifier and the remaining bytes contain the data + * + * @param identifier + * @return + */ + byte[] retrieve(byte[] identifier); + + /** + * This returns data bytes and the identifier for the next data bytes. The identifier for current data bytes is based + * on the retrieve method call and number of retrieveNext method calls after retrieve method call. <br/> + * The first eight bytes contain the identifier and the remaining bytes contain the data + * + * @return + */ + byte[] retrieveNext(); + + /** + * This is used to clean up the files identified by identifier + * + * @param identifier + */ + void clean(byte[] identifier); + + /** + * This flushes the data from stream + * + */ + void flush(); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java new file mode 100644 index 0000000..05eeb4e --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java @@ -0,0 +1,109 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.primitives.Ints; + +import com.datatorrent.netlet.util.Slice; + +/** + * @author Gaurav Gupta <[email protected]> + */ +public class HDFSStorageMatching +{ + + public static void main(String[] args) + { + HDFSStorage storage = new HDFSStorage(); + storage.setBaseDir(args[0]); + storage.setId(args[1]); + storage.setRestore(true); + storage.setup(null); + int count = 100000000; + + logger.debug(" start time {}", System.currentTimeMillis()); + int index = 10000; + byte[] b = Ints.toByteArray(index); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + logger.debug(" end time {}", System.currentTimeMillis()); + logger.debug(" start time for retrieve {}", System.currentTimeMillis()); + b = storage.retrieve(new byte[8]); + int org_index = index; + index = 10000; + match(b, index); + while (true) { + index++; + b = storage.retrieveNext(); + if (b == null) { + logger.debug(" end time for retrieve {}/{}/{}", System.currentTimeMillis(), index, org_index); + return; + } else { + if (!match(b, index)) { + throw new RuntimeException("failed : " + index); + } + } + } + + } + + public static boolean match(byte[] data, int match) + { + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + int dataR = Ints.fromByteArray(tempData); + //logger.debug("input: {}, output: {}",match,dataR); + if (match == dataR) { + return true; + } + return false; + } + + private static final Logger logger = LoggerFactory.getLogger(HDFSStorageMatching.class); +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java new file mode 100644 index 0000000..394ce0e --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java @@ -0,0 +1,85 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.netlet.util.Slice; + +/** + * @author Gaurav Gupta <[email protected]> + */ +public class HDFSStoragePerformance +{ + + public static void main(String[] args) + { + HDFSStorage storage = new HDFSStorage(); + storage.setBaseDir("."); + storage.setId("gaurav_flume_1"); + storage.setRestore(true); + storage.setup(null); + int count = 1000000; + + logger.debug(" start time {}", System.currentTimeMillis()); + int index = 10000; + byte[] b = new byte[1024]; + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + } + storage.flush(); + logger.debug(" end time {}", System.currentTimeMillis()); + logger.debug(" start time for retrieve {}", System.currentTimeMillis()); + storage.retrieve(new byte[8]); + String inputData = new String(b); + index = 1; + while (true) { + b = storage.retrieveNext(); + if (b == null) { + logger.debug(" end time for retrieve {}", System.currentTimeMillis()); + return; + } else { + if (!match(b, inputData)) { + throw new RuntimeException("failed : " + index); + } + } + + index++; + } + + } + + public static boolean match(byte[] data, String match) + { + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); +// logger.debug("input: {}, output: {}",match,new String(tempData)); + return (match.equals(new String(tempData))); + } + + private static final Logger logger = LoggerFactory.getLogger(HDFSStoragePerformance.class); +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java new file mode 100644 index 0000000..08476c2 --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java @@ -0,0 +1,112 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.primitives.Ints; + +import com.datatorrent.netlet.util.Slice; + +/** + * <p>HDFSStoragePerformanceTest class.</p> + * + * @author Gaurav Gupta <[email protected]> + * @since 1.0.1 + */ +public class HDFSStoragePerformanceTest +{ + + public static void main(String[] args) + { + HDFSStorage storage = new HDFSStorage(); + storage.setBaseDir(args[0]); + storage.setId(args[1]); + storage.setRestore(true); + storage.setup(null); + int count = 100000000; + + logger.debug(" start time {}", System.currentTimeMillis()); + int index = 10000; + byte[] b = Ints.toByteArray(index); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + logger.debug(" end time {}", System.currentTimeMillis()); + logger.debug(" start time for retrieve {}", System.currentTimeMillis()); + b = storage.retrieve(new byte[8]); + int org_index = index; + index = 10000; + match(b, index); + while (true) { + index++; + b = storage.retrieveNext(); + if (b == null) { + logger.debug(" end time for retrieve {}/{}/{}", System.currentTimeMillis(), index, org_index); + return; + } else { + if (!match(b, index)) { + throw new RuntimeException("failed : " + index); + } + } + } + + } + + public static boolean match(byte[] data, int match) + { + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + int dataR = Ints.fromByteArray(tempData); + //logger.debug("input: {}, output: {}",match,dataR); + if (match == dataR) { + return true; + } + return false; + } + + private static final Logger logger = LoggerFactory.getLogger(HDFSStoragePerformanceTest.class); +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/44326514/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java new file mode 100644 index 0000000..b348c8f --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java @@ -0,0 +1,693 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.storage; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.flume.Context; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.netlet.util.Slice; + +/** + * @author Gaurav Gupta <[email protected]> + */ +public class HDFSStorageTest +{ + public static class TestMeta extends TestWatcher + { + public String baseDir; + public String testFile; + private String testData = "No and yes. There is also IdleTimeHandler that allows the operator to emit tuples. " + + "There is overlap, why not have a single interface. \n" + + "Also consider the possibility of an operator that does other processing and not consume nor emit tuples,"; + + @Override + protected void starting(org.junit.runner.Description description) + { + String className = description.getClassName(); + baseDir = "target/" + className; + try { + baseDir = (new File(baseDir)).getAbsolutePath(); + FileUtils.forceMkdir(new File(baseDir)); + testFile = baseDir + "/testInput.txt"; + FileOutputStream outputStream = FileUtils.openOutputStream(new File(testFile)); + outputStream.write(testData.getBytes()); + outputStream.close(); + + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + protected void finished(Description description) + { + try { + FileUtils.deleteDirectory(new File(baseDir)); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + private String STORAGE_DIRECTORY; + + private HDFSStorage getStorage(String id, boolean restore) + { + Context ctx = new Context(); + STORAGE_DIRECTORY = testMeta.baseDir; + ctx.put(HDFSStorage.BASE_DIR_KEY, testMeta.baseDir); + ctx.put(HDFSStorage.RESTORE_KEY, Boolean.toString(restore)); + ctx.put(HDFSStorage.ID, id); + ctx.put(HDFSStorage.BLOCKSIZE, "256"); + HDFSStorage lstorage = new HDFSStorage(); + lstorage.configure(ctx); + lstorage.setup(null); + return lstorage; + } + + private HDFSStorage storage; + + @Before + public void setup() + { + storage = getStorage("1", false); + } + + @After + public void teardown() + { + storage.teardown(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + storage.cleanHelperFiles(); + } + + /** + * This test covers following use case 1. Some data is stored 2. File is flush but the file is not close 3. Some more + * data is stored but the file doesn't roll-overs 4. Retrieve is called for the last returned address and it return + * nulls 5. Some more data is stored again but the address is returned null because of previous retrieve call + * + * @throws Exception + */ + @Test + public void testPartialFlush() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = "ab".getBytes(); + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + b = "cb".getBytes(); + byte[] addr = storage.store(new Slice(b, 0, b.length)); + match(storage.retrieve(new byte[8]), "ab"); + Assert.assertNull(storage.retrieve(addr)); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + match(storage.retrieve(address), "cb"); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + } + + /** + * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is + * flushed but the file is not closed 3. Some more data is stored. The data stored is enough to make the file roll + * over 4. Retrieve is called for the last returned address and it return nulls as the data is not flushed 5. Some + * more data is stored again but the address is returned null because of previous retrieve call 6. The data is flushed + * to make sure that the data is committed. 7. Now the data is retrieved from the starting and data returned matches + * the data stored + * + * @throws Exception + */ + @Test + public void testPartialFlushRollOver() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, + 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, + 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, + 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, + 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, + 48, 46, 48, 1, 48, 46, 48}; + byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, + 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, + 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, + 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, + 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, + 1, 48, 46, 48, 1, 48, 46, 48}; + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + byte[] addr = null; + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + addr = storage.store(new Slice(b, 0, b.length)); + } + Assert.assertNull(storage.retrieve(addr)); + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + } + storage.flush(); + match(storage.retrieve(new byte[8]), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieve(address), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + + } + + /** + * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is + * flushed but the file is not closed 3. Some more data is stored. The data stored is enough to make the file roll + * over 4. The storage crashes and new storage is instiated. 5. Retrieve is called for the last returned address and + * it return nulls as the data is not flushed 6. Some more data is stored again but the address is returned null + * because of previous retrieve call 7. The data is flushed to make sure that the data is committed. 8. Now the data + * is retrieved from the starting and data returned matches the data stored + * + * @throws Exception + */ + @Test + public void testPartialFlushRollOverWithFailure() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, + 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, + 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, + 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, + 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, + 48, 46, 48, 1, 48, 46, 48}; + byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, + 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, + 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, + 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, + 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, + 1, 48, 46, 48, 1, 48, 46, 48}; + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + byte[] addr = null; + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + addr = storage.store(new Slice(b, 0, b.length)); + } + storage = getStorage("1", true); + Assert.assertNull(storage.retrieve(addr)); + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + } + storage.flush(); + match(storage.retrieve(new byte[8]), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieve(address), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + + } + + /** + * This tests clean when the file doesn't roll over + * + * @throws Exception + */ + @Test + public void testPartialFlushWithClean() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = "ab".getBytes(); + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + storage.clean(address); + b = "cb".getBytes(); + byte[] addr = storage.store(new Slice(b, 0, b.length)); + Assert.assertNull(storage.retrieve(addr)); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + match(storage.retrieve(new byte[8]), "cb"); + match(storage.retrieve(address), "cb"); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + } + + /** + * This tests clean when the file doesn't roll over + * + * @throws Exception + */ + @Test + public void testPartialFlushWithCleanAndFailure() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = "ab".getBytes(); + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + storage.clean(address); + b = "cb".getBytes(); + byte[] addr = storage.store(new Slice(b, 0, b.length)); + storage = getStorage("1", true); + Assert.assertNull(storage.retrieve(addr)); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + match(storage.retrieve(new byte[8]), "cb"); + match(storage.retrieve(address), "cb"); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + } + + /** + * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is + * flushed but the file is not closed 3. The data is cleaned till the last returned address 4. Some more data is + * stored. The data stored is enough to make the file roll over 5. Retrieve is called for the last returned address + * and it return nulls as the data is not flushed 6. Some more data is stored again but the address is returned null + * because of previous retrieve call 7. The data is flushed to make sure that the data is committed. 8. Now the data + * is retrieved from the starting and data returned matches the data stored + * + * @throws Exception + */ + @Test + public void testPartialFlushWithCleanAndRollOver() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, + 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, + 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, + 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, + 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, + 48, 46, 48, 1, 48, 46, 48}; + byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, + 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, + 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, + 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, + 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, + 1, 48, 46, 48, 1, 48, 46, 48}; + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + storage.clean(address); + + byte[] addr = null; + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + addr = storage.store(new Slice(b, 0, b.length)); + } + Assert.assertNull(storage.retrieve(addr)); + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + } + storage.flush(); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieve(new byte[8]), new String(b_org)); + match(storage.retrieve(address), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + + } + + /** + * This tests the clean when the files are roll-over and the storage fails + * + * @throws Exception + */ + @Test + public void testPartialFlushWithCleanAndRollOverAndFailure() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, + 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, + 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, + 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, + 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, + 48, 46, 48, 1, 48, 46, 48}; + byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, + 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, + 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, + 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, + 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, + 1, 48, 46, 48, 1, 48, 46, 48}; + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + storage.clean(address); + byte[] addr = null; + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + addr = storage.store(new Slice(b, 0, b.length)); + } + storage = getStorage("1", true); + Assert.assertNull(storage.retrieve(addr)); + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + } + storage.flush(); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieve(address), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + + } + + /** + * This test covers following use case The file is flushed and then more data is written to the same file, but the new + * data is not flushed and file is not roll over and storage fails The new storage comes up and client asks for data + * at the last returned address from earlier storage instance. The new storage returns null. Client stores the data + * again but the address returned this time is null and the retrieval of the earlier address now returns data + * + * @throws Exception + */ + @Test + public void testPartialFlushWithFailure() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = "ab".getBytes(); + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + b = "cb".getBytes(); + byte[] addr = storage.store(new Slice(b, 0, b.length)); + storage = getStorage("1", true); + Assert.assertNull(storage.retrieve(addr)); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + match(storage.retrieve(address), "cb"); + } + + private void match(byte[] data, String match) + { + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", match, new String(tempData)); + } + + @Test + public void testStorage() throws IOException + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[200]; + byte[] identifier; + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + Assert.assertNull(storage.retrieve(new byte[8])); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + byte[] data = storage.retrieve(new byte[8]); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + identifier = storage.store(new Slice(b, 0, b.length)); + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); + Assert.assertNull(storage.retrieve(identifier)); + } + + @Test + public void testStorageWithRestore() throws IOException + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[200]; + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + storage.teardown(); + + storage = getStorage("1", true); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + boolean exists = fs.exists(new Path(STORAGE_DIRECTORY + "/1/" + "1")); + Assert.assertEquals("file should exist", true, exists); + } + + @Test + public void testCleanup() throws IOException + { + RandomAccessFile r = new RandomAccessFile(testMeta.testFile, "r"); + r.seek(0); + byte[] b = r.readLine().getBytes(); + storage.store(new Slice(b, 0, b.length)); + byte[] val = storage.store(new Slice(b, 0, b.length)); + storage.flush(); + storage.clean(val); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + boolean exists = fs.exists(new Path(STORAGE_DIRECTORY + "/" + "0")); + Assert.assertEquals("file should not exist", false, exists); + r.close(); + } + + @Test + public void testNext() throws IOException + { + RandomAccessFile r = new RandomAccessFile(testMeta.testFile, "r"); + r.seek(0); + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = r.readLine().getBytes(); + storage.store(new Slice(b, 0, b.length)); + byte[] b1 = r.readLine().getBytes(); + storage.store(new Slice(b1, 0, b1.length)); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + storage.store(new Slice(b1, 0, b1.length)); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + byte[] data = storage.retrieve(new byte[8]); + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); + data = storage.retrieveNext(); + tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", new String(b1), new String(tempData)); + data = storage.retrieveNext(); + tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); + r.close(); + } + + @Test + public void testFailure() throws IOException + { + byte[] address; + byte[] b = new byte[200]; + storage.retrieve(new byte[8]); + for (int i = 0; i < 5; i++) { + storage.store(new Slice(b, 0, b.length)); + address = storage.store(new Slice(b, 0, b.length)); + storage.flush(); + storage.clean(address); + } + storage.teardown(); + + byte[] identifier = new byte[8]; + storage = getStorage("1", true); + + storage.retrieve(identifier); + + storage.store(new Slice(b, 0, b.length)); + storage.store(new Slice(b, 0, b.length)); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + byte[] data = storage.retrieve(identifier); + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); + } + + /** + * This test case tests the clean call before any flush is called. + * + * @throws IOException + */ + @Test + public void testCleanUnflushedData() throws IOException + { + for (int i = 0; i < 5; i++) { + final byte[] bytes = (i + "").getBytes(); + storage.store(new Slice(bytes, 0, bytes.length)); + } + storage.clean(new byte[8]); + storage.flush(); + match(storage.retrieve(new byte[8]), "0"); + match(storage.retrieveNext(), "1"); + } + + @Test + public void testCleanForUnflushedData() throws IOException + { + byte[] address = null; + byte[] b = new byte[200]; + storage.retrieve(new byte[8]); + for (int i = 0; i < 5; i++) { + storage.store(new Slice(b, 0, b.length)); + address = storage.store(new Slice(b, 0, b.length)); + storage.flush(); + // storage.clean(address); + } + byte[] lastWrittenAddress = null; + for (int i = 0; i < 5; i++) { + storage.store(new Slice(b, 0, b.length)); + lastWrittenAddress = storage.store(new Slice(b, 0, b.length)); + } + storage.clean(lastWrittenAddress); + byte[] cleanedOffset = storage.readData(new Path(STORAGE_DIRECTORY + "/1/cleanoffsetFile")); + Assert.assertArrayEquals(address, cleanedOffset); + + } + + @Test + public void testCleanForFlushedData() throws IOException + { + byte[] b = new byte[200]; + storage.retrieve(new byte[8]); + for (int i = 0; i < 5; i++) { + storage.store(new Slice(b, 0, b.length)); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + // storage.clean(address); + } + byte[] lastWrittenAddress = null; + for (int i = 0; i < 5; i++) { + storage.store(new Slice(b, 0, b.length)); + lastWrittenAddress = storage.store(new Slice(b, 0, b.length)); + } + storage.flush(); + storage.clean(lastWrittenAddress); + byte[] cleanedOffset = storage.readData(new Path(STORAGE_DIRECTORY + "/1/cleanoffsetFile")); + Assert.assertArrayEquals(lastWrittenAddress, cleanedOffset); + + } + + @Test + public void testCleanForPartialFlushedData() throws IOException + { + byte[] b = new byte[8]; + storage.retrieve(new byte[8]); + + storage.store(new Slice(b, 0, b.length)); + byte[] bytes = "1a".getBytes(); + byte[] address = storage.store(new Slice(bytes, 0, bytes.length)); + storage.flush(); + storage.clean(address); + + byte[] lastWrittenAddress = null; + for (int i = 0; i < 5; i++) { + final byte[] bytes1 = (i + "").getBytes(); + storage.store(new Slice(bytes1, 0, bytes1.length)); + lastWrittenAddress = storage.store(new Slice(b, 0, b.length)); + } + Assert.assertNull(storage.retrieve(new byte[8])); + Assert.assertNull(storage.retrieve(lastWrittenAddress)); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + Assert.assertNull(storage.retrieve(lastWrittenAddress)); + } + + @Test + public void testRandomSequence() throws IOException + { + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + byte[] bytes = new byte[]{48, 48, 48, 51, 101, 100, 55, 56, 55, 49, 53, 99, 52, 101, 55, 50, 97, 52, 48, 49, 51, + 99, 97, 54, 102, 57, 55, 53, 57, 100, 49, 99, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, + 45, 49, 49, 45, 48, 55, 32, 48, 48, 58, 48, 48, 58, 52, 54, 1, 52, 50, 49, 50, 51, 1, 50, 1, 49, 53, 49, 49, + 52, 50, 54, 53, 1, 49, 53, 49, 49, 57, 51, 53, 49, 1, 49, 53, 49, 50, 57, 56, 50, 52, 1, 49, 53, 49, 50, 49, + 55, 48, 55, 1, 49, 48, 48, 55, 55, 51, 57, 51, 1, 49, 57, 49, 52, 55, 50, 53, 52, 54, 49, 1, 49, 1, 48, 1, 48, + 46, 48, 1, 48, 46, 48, 1, 48, 46, 48}; + storage.store(new Slice(bytes, 0, bytes.length)); + storage.flush(); + storage.clean(new byte[]{-109, 0, 0, 0, 0, 0, 0, 0}); + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 2555; i++) { + byte[] bytes1 = new byte[]{48, 48, 48, 55, 56, 51, 98, 101, 50, 54, 50, 98, 52, 102, 50, 54, 56, 97, 55, 56, 102, + 48, 54, 54, 50, 49, 49, 54, 99, 98, 101, 99, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, + 45, 49, 49, 45, 48, 55, 32, 48, 48, 58, 48, 48, 58, 53, 49, 1, 49, 49, 49, 49, 54, 51, 57, 1, 50, 1, 49, 53, + 49, 48, 57, 57, 56, 51, 1, 49, 53, 49, 49, 49, 55, 48, 52, 1, 49, 53, 49, 50, 49, 51, 55, 49, 1, 49, 53, 49, + 49, 52, 56, 51, 49, 1, 49, 48, 48, 55, 49, 57, 56, 49, 1, 49, 50, 48, 50, 55, 54, 49, 54, 56, 53, 1, 49, 1, + 48, 1, 48, 46, 48, 1, 48, 46, 48, 1, 48, 46, 48}; + storage.store(new Slice(bytes1, 0, bytes1.length)); + storage.flush(); + } + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 1297; i++) { + storage.retrieveNext(); + } + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 1302; i++) { + storage.retrieveNext(); + } + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 1317; i++) { + storage.retrieveNext(); + } + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 2007; i++) { + storage.retrieveNext(); + } + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 2556; i++) { + storage.retrieveNext(); + } + byte[] bytes1 = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, + 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, + 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, + 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, + 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, + 1, 48, 46, 48, 1, 48, 46, 48}; + storage.store(new Slice(bytes1, 0, bytes1.length)); + storage.flush(); + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 2062; i++) { + storage.retrieveNext(); + + } + } + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(HDFSStorageTest.class); +}
