http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java new file mode 100644 index 0000000..b8d2725 --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/ErrorMaskingEventCodec.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Event; + +import com.datatorrent.netlet.util.Slice; + +/** + * <p>ErrorMaskingEventCodec class.</p> + * + * @since 1.0.4 + */ +public class ErrorMaskingEventCodec extends EventCodec +{ + + @Override + public Object fromByteArray(Slice fragment) + { + try { + return super.fromByteArray(fragment); + } catch (RuntimeException re) { + logger.warn("Cannot deserialize event {}", fragment, re); + } + + return null; + } + + @Override + public Slice toByteArray(Event event) + { + try { + return super.toByteArray(event); + } catch (RuntimeException re) { + logger.warn("Cannot serialize event {}", event, re); + } + + return null; + } + + + private static final Logger logger = LoggerFactory.getLogger(ErrorMaskingEventCodec.class); +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java new file mode 100644 index 0000000..463551e --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/EventCodec.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.storage; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * <p>EventCodec class.</p> + * + * @since 0.9.4 + */ +public class EventCodec implements StreamCodec<Event> +{ + private final transient Kryo kryo; + + public EventCodec() + { + this.kryo = new Kryo(); + this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + } + + @Override + public Object fromByteArray(Slice fragment) + { + ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length); + Input input = new Input(is); + + @SuppressWarnings("unchecked") + HashMap<String, String> headers = kryo.readObjectOrNull(input, HashMap.class); + byte[] body = kryo.readObjectOrNull(input, byte[].class); + return EventBuilder.withBody(body, headers); + } + + @Override + public Slice toByteArray(Event event) + { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + Output output = new Output(os); + + Map<String, String> headers = event.getHeaders(); + if (headers != null && headers.getClass() != HashMap.class) { + HashMap<String, String> tmp = new HashMap<String, String>(headers.size()); + tmp.putAll(headers); + headers = tmp; + } + kryo.writeObjectOrNull(output, headers, HashMap.class); + kryo.writeObjectOrNull(output, event.getBody(), byte[].class); + output.flush(); + final byte[] bytes = os.toByteArray(); + return new Slice(bytes, 0, bytes.length); + } + + @Override + public int getPartition(Event o) + { + return o.hashCode(); + } + + private static final Logger logger = LoggerFactory.getLogger(EventCodec.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java new file mode 100644 index 0000000..77aeb68 --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/HDFSStorage.java @@ -0,0 +1,947 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.storage; + +import java.io.DataInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; + +import com.datatorrent.api.Component; +import com.datatorrent.common.util.NameableThreadFactory; +import org.apache.apex.malhar.flume.sink.Server; +import com.datatorrent.netlet.util.Slice; + +/** + * HDFSStorage is developed to store and retrieve the data from HDFS + * <p /> + * The properties that can be set on HDFSStorage are: <br /> + * baseDir - The base directory where the data is going to be stored <br /> + * restore - This is used to restore the application from previous failure <br /> + * blockSize - The maximum size of the each file to created. <br /> + * + * @since 0.9.3 + */ +public class HDFSStorage implements Storage, Configurable, Component<com.datatorrent.api.Context> +{ + public static final int DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; + public static final String BASE_DIR_KEY = "baseDir"; + public static final String RESTORE_KEY = "restore"; + public static final String BLOCKSIZE = "blockSize"; + public static final String BLOCK_SIZE_MULTIPLE = "blockSizeMultiple"; + public static final String NUMBER_RETRY = "retryCount"; + + private static final String OFFSET_SUFFIX = "-offsetFile"; + private static final String BOOK_KEEPING_FILE_OFFSET = "-bookKeepingOffsetFile"; + private static final String FLUSHED_IDENTITY_FILE = "flushedCounter"; + private static final String CLEAN_OFFSET_FILE = "cleanoffsetFile"; + private static final String FLUSHED_IDENTITY_FILE_TEMP = "flushedCounter.tmp"; + private static final String CLEAN_OFFSET_FILE_TEMP = "cleanoffsetFile.tmp"; + private static final int IDENTIFIER_SIZE = 8; + private static final int DATA_LENGTH_BYTE_SIZE = 4; + + /** + * Number of times the storage will try to get the filesystem + */ + private int retryCount = 3; + /** + * The multiple of block size + */ + private int blockSizeMultiple = 1; + /** + * Identifier for this storage. + */ + @NotNull + private String id; + /** + * The baseDir where the storage facility is going to create files. + */ + @NotNull + private String baseDir; + /** + * The block size to be used to create the storage files + */ + private long blockSize; + /** + * + */ + private boolean restore; + /** + * This identifies the current file number + */ + private long currentWrittenFile; + /** + * This identifies the file number that has been flushed + */ + private long flushedFileCounter; + /** + * The file that stores the fileCounter information + */ + // private Path fileCounterFile; + /** + * The file that stores the flushed fileCounter information + */ + private Path flushedCounterFile; + private Path flushedCounterFileTemp; + /** + * This identifies the last cleaned file number + */ + private long cleanedFileCounter; + /** + * The file that stores the clean file counter information + */ + // private Path cleanFileCounterFile; + /** + * The file that stores the clean file offset information + */ + private Path cleanFileOffsetFile; + private Path cleanFileOffsetFileTemp; + private FileSystem fs; + private FSDataOutputStream dataStream; + ArrayList<DataBlock> files2Commit = new ArrayList<DataBlock>(); + /** + * The offset in the current opened file + */ + private long fileWriteOffset; + private FSDataInputStream readStream; + private long retrievalOffset; + private long retrievalFile; + private int offset; + private long flushedLong; + private long flushedFileWriteOffset; + private long bookKeepingFileOffset; + private byte[] cleanedOffset = new byte[8]; + private long skipOffset; + private long skipFile; + private transient Path basePath; + private ExecutorService storageExecutor; + private byte[] currentData; + private FSDataInputStream nextReadStream; + private long nextFlushedLong; + private long nextRetrievalFile; + private byte[] nextRetrievalData; + + public HDFSStorage() + { + this.restore = true; + } + + /** + * This stores the Identifier information identified in the last store function call + * + * @param ctx + */ + @Override + public void configure(Context ctx) + { + String tempId = ctx.getString(ID); + if (tempId == null) { + if (id == null) { + throw new IllegalArgumentException("id can't be null."); + } + } else { + id = tempId; + } + + String tempBaseDir = ctx.getString(BASE_DIR_KEY); + if (tempBaseDir != null) { + baseDir = tempBaseDir; + } + + restore = ctx.getBoolean(RESTORE_KEY, restore); + Long tempBlockSize = ctx.getLong(BLOCKSIZE); + if (tempBlockSize != null) { + blockSize = tempBlockSize; + } + blockSizeMultiple = ctx.getInteger(BLOCK_SIZE_MULTIPLE, blockSizeMultiple); + retryCount = ctx.getInteger(NUMBER_RETRY,retryCount); + } + + /** + * This function reads the file at a location and return the bytes stored in the file " + * + * @param path - the location of the file + * @return + * @throws IOException + */ + byte[] readData(Path path) throws IOException + { + DataInputStream is = new DataInputStream(fs.open(path)); + byte[] bytes = new byte[is.available()]; + is.readFully(bytes); + is.close(); + return bytes; + } + + /** + * This function writes the bytes to a file specified by the path + * + * @param path the file location + * @param data the data to be written to the file + * @return + * @throws IOException + */ + private FSDataOutputStream writeData(Path path, byte[] data) throws IOException + { + FSDataOutputStream fsOutputStream; + if (fs.getScheme().equals("file")) { + // local FS does not support hflush and does not flush native stream + fsOutputStream = new FSDataOutputStream( + new FileOutputStream(Path.getPathWithoutSchemeAndAuthority(path).toString()), null); + } else { + fsOutputStream = fs.create(path); + } + fsOutputStream.write(data); + return fsOutputStream; + } + + private long calculateOffset(long fileOffset, long fileCounter) + { + return ((fileCounter << 32) | (fileOffset & 0xffffffffL)); + } + + @Override + public byte[] store(Slice slice) + { + // logger.debug("store message "); + int bytesToWrite = slice.length + DATA_LENGTH_BYTE_SIZE; + if (currentWrittenFile < skipFile) { + fileWriteOffset += bytesToWrite; + if (fileWriteOffset >= bookKeepingFileOffset) { + files2Commit.add(new DataBlock(null, bookKeepingFileOffset, + new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile)); + currentWrittenFile++; + if (fileWriteOffset > bookKeepingFileOffset) { + fileWriteOffset = bytesToWrite; + } else { + fileWriteOffset = 0; + } + try { + bookKeepingFileOffset = getFlushedFileWriteOffset( + new Path(basePath, currentWrittenFile + BOOK_KEEPING_FILE_OFFSET)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return null; + } + + if (flushedFileCounter == currentWrittenFile && dataStream == null) { + currentWrittenFile++; + fileWriteOffset = 0; + } + + if (flushedFileCounter == skipFile && skipFile != -1) { + skipFile++; + } + + if (fileWriteOffset + bytesToWrite < blockSize) { + try { + /* write length and the actual data to the file */ + if (fileWriteOffset == 0) { + // writeData(flushedCounterFile, String.valueOf(currentWrittenFile).getBytes()).close(); + dataStream = writeData(new Path(basePath, String.valueOf(currentWrittenFile)), + Ints.toByteArray(slice.length)); + dataStream.write(slice.buffer, slice.offset, slice.length); + } else { + dataStream.write(Ints.toByteArray(slice.length)); + dataStream.write(slice.buffer, slice.offset, slice.length); + } + fileWriteOffset += bytesToWrite; + + byte[] fileOffset = null; + if ((currentWrittenFile > skipFile) || (currentWrittenFile == skipFile && fileWriteOffset > skipOffset)) { + skipFile = -1; + fileOffset = new byte[IDENTIFIER_SIZE]; + Server.writeLong(fileOffset, 0, calculateOffset(fileWriteOffset, currentWrittenFile)); + } + return fileOffset; + } catch (IOException ex) { + logger.warn("Error while storing the bytes {}", ex.getMessage()); + closeFs(); + throw new RuntimeException(ex); + } + } + DataBlock db = new DataBlock(dataStream, fileWriteOffset, + new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile); + db.close(); + files2Commit.add(db); + fileWriteOffset = 0; + ++currentWrittenFile; + return store(slice); + } + + /** + * @param b + * @param startIndex + * @return + */ + long byteArrayToLong(byte[] b, int startIndex) + { + final byte b1 = 0; + return Longs.fromBytes(b1, b1, b1, b1, b[3 + startIndex], b[2 + startIndex], b[1 + startIndex], b[startIndex]); + } + + @Override + public byte[] retrieve(byte[] identifier) + { + skipFile = -1; + skipOffset = 0; + logger.debug("retrieve with address {}", Arrays.toString(identifier)); + // flushing the last incomplete flushed file + closeUnflushedFiles(); + + retrievalOffset = byteArrayToLong(identifier, 0); + retrievalFile = byteArrayToLong(identifier, offset); + + if (retrievalFile == 0 && retrievalOffset == 0 && currentWrittenFile == 0 && fileWriteOffset == 0) { + skipOffset = 0; + return null; + } + + // making sure that the deleted address is not requested again + if (retrievalFile != 0 || retrievalOffset != 0) { + long cleanedFile = byteArrayToLong(cleanedOffset, offset); + if (retrievalFile < cleanedFile || (retrievalFile == cleanedFile && + retrievalOffset < byteArrayToLong(cleanedOffset, 0))) { + logger.warn("The address asked has been deleted retrievalFile={}, cleanedFile={}, retrievalOffset={}, " + + "cleanedOffset={}", retrievalFile, cleanedFile, retrievalOffset, byteArrayToLong(cleanedOffset, 0)); + closeFs(); + throw new IllegalArgumentException(String.format("The data for address %s has already been deleted", + Arrays.toString(identifier))); + } + } + + // we have just started + if (retrievalFile == 0 && retrievalOffset == 0) { + retrievalFile = byteArrayToLong(cleanedOffset, offset); + retrievalOffset = byteArrayToLong(cleanedOffset, 0); + } + + if ((retrievalFile > flushedFileCounter)) { + skipFile = retrievalFile; + skipOffset = retrievalOffset; + retrievalFile = -1; + return null; + } + if ((retrievalFile == flushedFileCounter && retrievalOffset >= flushedFileWriteOffset)) { + skipFile = retrievalFile; + skipOffset = retrievalOffset - flushedFileWriteOffset; + retrievalFile = -1; + return null; + } + + try { + if (readStream != null) { + readStream.close(); + readStream = null; + } + Path path = new Path(basePath, String.valueOf(retrievalFile)); + if (!fs.exists(path)) { + retrievalFile = -1; + closeFs(); + throw new RuntimeException(String.format("File %s does not exist", path.toString())); + } + + byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); + flushedLong = Server.readLong(flushedOffset, 0); + while (retrievalOffset >= flushedLong && retrievalFile < flushedFileCounter) { + retrievalOffset -= flushedLong; + retrievalFile++; + flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); + flushedLong = Server.readLong(flushedOffset, 0); + } + + if (retrievalOffset >= flushedLong) { + logger.warn("data not flushed for the given identifier"); + retrievalFile = -1; + return null; + } + synchronized (HDFSStorage.this) { + if (nextReadStream != null) { + nextReadStream.close(); + nextReadStream = null; + } + } + currentData = null; + path = new Path(basePath, String.valueOf(retrievalFile)); + //readStream = new FSDataInputStream(fs.open(path)); + currentData = readData(path); + //readStream.seek(retrievalOffset); + storageExecutor.submit(getNextStream()); + return retrieveHelper(); + } catch (IOException e) { + closeFs(); + throw new RuntimeException(e); + } + } + + private byte[] retrieveHelper() throws IOException + { + int tempRetrievalOffset = (int)retrievalOffset; + int length = Ints.fromBytes(currentData[tempRetrievalOffset], currentData[tempRetrievalOffset + 1], + currentData[tempRetrievalOffset + 2], currentData[tempRetrievalOffset + 3]); + byte[] data = new byte[length + IDENTIFIER_SIZE]; + System.arraycopy(currentData, tempRetrievalOffset + 4, data, IDENTIFIER_SIZE, length); + retrievalOffset += length + DATA_LENGTH_BYTE_SIZE; + if (retrievalOffset >= flushedLong) { + Server.writeLong(data, 0, calculateOffset(0, retrievalFile + 1)); + } else { + Server.writeLong(data, 0, calculateOffset(retrievalOffset, retrievalFile)); + } + return data; + } + + @Override + public byte[] retrieveNext() + { + if (retrievalFile == -1) { + closeFs(); + throw new RuntimeException("Call retrieve first"); + } + + if (retrievalFile > flushedFileCounter) { + logger.warn("data is not flushed"); + return null; + } + + try { + if (currentData == null) { + synchronized (HDFSStorage.this) { + if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) { + currentData = nextRetrievalData; + flushedLong = nextFlushedLong; + nextRetrievalData = null; + } else { + currentData = null; + currentData = readData(new Path(basePath, String.valueOf(retrievalFile))); + byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); + flushedLong = Server.readLong(flushedOffset, 0); + } + } + storageExecutor.submit(getNextStream()); + } + + if (retrievalOffset >= flushedLong) { + retrievalFile++; + retrievalOffset = 0; + + if (retrievalFile > flushedFileCounter) { + logger.warn("data is not flushed"); + return null; + } + + //readStream.close(); + // readStream = new FSDataInputStream(fs.open(new Path(basePath, String.valueOf(retrievalFile)))); + // byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); + // flushedLong = Server.readLong(flushedOffset, 0); + + synchronized (HDFSStorage.this) { + if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) { + currentData = nextRetrievalData; + flushedLong = nextFlushedLong; + nextRetrievalData = null; + } else { + currentData = null; + currentData = readData(new Path(basePath, String.valueOf(retrievalFile))); + byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); + flushedLong = Server.readLong(flushedOffset, 0); + } + } + storageExecutor.submit(getNextStream()); + } + //readStream.seek(retrievalOffset); + return retrieveHelper(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter") + public void clean(byte[] identifier) + { + logger.info("clean {}", Arrays.toString(identifier)); + long cleanFileIndex = byteArrayToLong(identifier, offset); + + long cleanFileOffset = byteArrayToLong(identifier, 0); + if (flushedFileCounter == -1) { + identifier = new byte[8]; + } else if (cleanFileIndex > flushedFileCounter || + (cleanFileIndex == flushedFileCounter && cleanFileOffset >= flushedFileWriteOffset)) { + // This is to make sure that we clean only the data that is flushed + cleanFileIndex = flushedFileCounter; + cleanFileOffset = flushedFileWriteOffset; + Server.writeLong(identifier, 0, calculateOffset(cleanFileOffset, cleanFileIndex)); + } + cleanedOffset = identifier; + + try { + writeData(cleanFileOffsetFileTemp, identifier).close(); + fs.rename(cleanFileOffsetFileTemp, cleanFileOffsetFile); + if (cleanedFileCounter >= cleanFileIndex) { + return; + } + do { + Path path = new Path(basePath, String.valueOf(cleanedFileCounter)); + if (fs.exists(path) && fs.isFile(path)) { + fs.delete(path, false); + } + path = new Path(basePath, cleanedFileCounter + OFFSET_SUFFIX); + if (fs.exists(path) && fs.isFile(path)) { + fs.delete(path, false); + } + path = new Path(basePath, cleanedFileCounter + BOOK_KEEPING_FILE_OFFSET); + if (fs.exists(path) && fs.isFile(path)) { + fs.delete(path, false); + } + logger.info("deleted file {}", cleanedFileCounter); + ++cleanedFileCounter; + } while (cleanedFileCounter < cleanFileIndex); + // writeData(cleanFileCounterFile, String.valueOf(cleanedFileCounter).getBytes()).close(); + + } catch (IOException e) { + logger.warn("not able to close the streams {}", e.getMessage()); + closeFs(); + throw new RuntimeException(e); + } + } + + /** + * This is used mainly for cleaning up of counter files created + */ + void cleanHelperFiles() + { + try { + fs.delete(basePath, true); + } catch (IOException e) { + logger.warn(e.getMessage()); + } + } + + private void closeUnflushedFiles() + { + try { + files2Commit.clear(); + // closing the stream + if (dataStream != null) { + dataStream.close(); + dataStream = null; + // currentWrittenFile++; + // fileWriteOffset = 0; + } + + if (!fs.exists(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX))) { + fs.delete(new Path(basePath, String.valueOf(currentWrittenFile)), false); + } + + if (fs.exists(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX))) { + // This means that flush was called + flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX)); + bookKeepingFileOffset = getFlushedFileWriteOffset( + new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET)); + } + + if (flushedFileCounter != -1) { + currentWrittenFile = flushedFileCounter; + fileWriteOffset = flushedFileWriteOffset; + } else { + currentWrittenFile = 0; + fileWriteOffset = 0; + } + + flushedLong = 0; + + } catch (IOException e) { + closeFs(); + throw new RuntimeException(e); + } + } + + @Override + public void flush() + { + nextReadStream = null; + StringBuilder builder = new StringBuilder(); + Iterator<DataBlock> itr = files2Commit.iterator(); + DataBlock db; + try { + while (itr.hasNext()) { + db = itr.next(); + db.updateOffsets(); + builder.append(db.fileName).append(", "); + } + files2Commit.clear(); + + if (dataStream != null) { + dataStream.hflush(); + writeData(flushedCounterFileTemp, String.valueOf(currentWrittenFile).getBytes()).close(); + fs.rename(flushedCounterFileTemp, flushedCounterFile); + updateFlushedOffset(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), fileWriteOffset); + flushedFileWriteOffset = fileWriteOffset; + builder.append(currentWrittenFile); + } + logger.debug("flushed files {}", builder.toString()); + } catch (IOException ex) { + logger.warn("not able to close the stream {}", ex.getMessage()); + closeFs(); + throw new RuntimeException(ex); + } + flushedFileCounter = currentWrittenFile; + // logger.debug("flushedFileCounter in flush {}",flushedFileCounter); + } + + /** + * This updates the flushed offset + */ + private void updateFlushedOffset(Path file, long bytesWritten) + { + byte[] lastStoredOffset = new byte[IDENTIFIER_SIZE]; + Server.writeLong(lastStoredOffset, 0, bytesWritten); + try { + writeData(file, lastStoredOffset).close(); + } catch (IOException e) { + try { + if (!Arrays.equals(readData(file), lastStoredOffset)) { + closeFs(); + throw new RuntimeException(e); + } + } catch (Exception e1) { + closeFs(); + throw new RuntimeException(e1); + } + } + } + + public int getBlockSizeMultiple() + { + return blockSizeMultiple; + } + + public void setBlockSizeMultiple(int blockSizeMultiple) + { + this.blockSizeMultiple = blockSizeMultiple; + } + + /** + * @return the baseDir + */ + public String getBaseDir() + { + return baseDir; + } + + /** + * @param baseDir the baseDir to set + */ + public void setBaseDir(String baseDir) + { + this.baseDir = baseDir; + } + + /** + * @return the id + */ + public String getId() + { + return id; + } + + /** + * @param id the id to set + */ + public void setId(String id) + { + this.id = id; + } + + /** + * @return the blockSize + */ + public long getBlockSize() + { + return blockSize; + } + + /** + * @param blockSize the blockSize to set + */ + public void setBlockSize(long blockSize) + { + this.blockSize = blockSize; + } + + /** + * @return the restore + */ + public boolean isRestore() + { + return restore; + } + + /** + * @param restore the restore to set + */ + public void setRestore(boolean restore) + { + this.restore = restore; + } + + class DataBlock + { + FSDataOutputStream dataStream; + long dataOffset; + Path path2FlushedData; + long fileName; + private Path bookKeepingPath; + + DataBlock(FSDataOutputStream stream, long bytesWritten, Path path2FlushedData, long fileName) + { + this.dataStream = stream; + this.dataOffset = bytesWritten; + this.path2FlushedData = path2FlushedData; + this.fileName = fileName; + } + + public void close() + { + if (dataStream != null) { + try { + dataStream.close(); + bookKeepingPath = new Path(basePath, fileName + BOOK_KEEPING_FILE_OFFSET); + updateFlushedOffset(bookKeepingPath, dataOffset); + } catch (IOException ex) { + logger.warn("not able to close the stream {}", ex.getMessage()); + closeFs(); + throw new RuntimeException(ex); + } + } + } + + public void updateOffsets() throws IOException + { + updateFlushedOffset(path2FlushedData, dataOffset); + if (bookKeepingPath != null && fs.exists(bookKeepingPath)) { + fs.delete(bookKeepingPath, false); + } + } + + } + + private static final Logger logger = LoggerFactory.getLogger(HDFSStorage.class); + + @Override + public void setup(com.datatorrent.api.Context context) + { + Configuration conf = new Configuration(); + if (baseDir == null) { + baseDir = conf.get("hadoop.tmp.dir"); + if (baseDir == null || baseDir.isEmpty()) { + throw new IllegalArgumentException("baseDir cannot be null."); + } + } + offset = 4; + skipOffset = -1; + skipFile = -1; + int tempRetryCount = 0; + while (tempRetryCount < retryCount && fs == null) { + try { + fs = FileSystem.newInstance(conf); + tempRetryCount++; + } catch (Throwable throwable) { + logger.warn("Not able to get file system ", throwable); + } + } + + try { + Path path = new Path(baseDir); + basePath = new Path(path, id); + if (fs == null) { + fs = FileSystem.newInstance(conf); + } + if (!fs.exists(path)) { + closeFs(); + throw new RuntimeException(String.format("baseDir passed (%s) doesn't exist.", baseDir)); + } + if (!fs.isDirectory(path)) { + closeFs(); + throw new RuntimeException(String.format("baseDir passed (%s) is not a directory.", baseDir)); + } + if (!restore) { + fs.delete(basePath, true); + } + if (!fs.exists(basePath) || !fs.isDirectory(basePath)) { + fs.mkdirs(basePath); + } + + if (blockSize == 0) { + blockSize = fs.getDefaultBlockSize(new Path(basePath, "tempData")); + } + if (blockSize == 0) { + blockSize = DEFAULT_BLOCK_SIZE; + } + + blockSize = blockSizeMultiple * blockSize; + + currentWrittenFile = 0; + cleanedFileCounter = -1; + retrievalFile = -1; + // fileCounterFile = new Path(basePath, IDENTITY_FILE); + flushedFileCounter = -1; + // cleanFileCounterFile = new Path(basePath, CLEAN_FILE); + cleanFileOffsetFile = new Path(basePath, CLEAN_OFFSET_FILE); + cleanFileOffsetFileTemp = new Path(basePath, CLEAN_OFFSET_FILE_TEMP); + flushedCounterFile = new Path(basePath, FLUSHED_IDENTITY_FILE); + flushedCounterFileTemp = new Path(basePath, FLUSHED_IDENTITY_FILE_TEMP); + + if (restore) { + // + // if (fs.exists(fileCounterFile) && fs.isFile(fileCounterFile)) { + // //currentWrittenFile = Long.valueOf(new String(readData(fileCounterFile))); + // } + + if (fs.exists(cleanFileOffsetFile) && fs.isFile(cleanFileOffsetFile)) { + cleanedOffset = readData(cleanFileOffsetFile); + } + + if (fs.exists(flushedCounterFile) && fs.isFile(flushedCounterFile)) { + String strFlushedFileCounter = new String(readData(flushedCounterFile)); + if (strFlushedFileCounter.isEmpty()) { + logger.warn("empty flushed file"); + } else { + flushedFileCounter = Long.valueOf(strFlushedFileCounter); + flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX)); + bookKeepingFileOffset = getFlushedFileWriteOffset( + new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET)); + } + + } + } + fileWriteOffset = flushedFileWriteOffset; + currentWrittenFile = flushedFileCounter; + cleanedFileCounter = byteArrayToLong(cleanedOffset, offset) - 1; + if (currentWrittenFile == -1) { + ++currentWrittenFile; + fileWriteOffset = 0; + } + + } catch (IOException io) { + + throw new RuntimeException(io); + } + storageExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("StorageHelper")); + } + + private void closeFs() + { + if (fs != null) { + try { + fs.close(); + fs = null; + } catch (IOException e) { + logger.debug(e.getMessage()); + } + } + } + + private long getFlushedFileWriteOffset(Path filePath) throws IOException + { + if (flushedFileCounter != -1 && fs.exists(filePath)) { + byte[] flushedFileOffsetByte = readData(filePath); + if (flushedFileOffsetByte != null && flushedFileOffsetByte.length == 8) { + return Server.readLong(flushedFileOffsetByte, 0); + } + } + return 0; + } + + @Override + public void teardown() + { + logger.debug("called teardown"); + try { + if (readStream != null) { + readStream.close(); + } + synchronized (HDFSStorage.this) { + if (nextReadStream != null) { + nextReadStream.close(); + } + } + + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + closeUnflushedFiles(); + storageExecutor.shutdown(); + } + + } + + private Runnable getNextStream() + { + return new Runnable() + { + @Override + public void run() + { + try { + synchronized (HDFSStorage.this) { + nextRetrievalFile = retrievalFile + 1; + if (nextRetrievalFile > flushedFileCounter) { + nextRetrievalData = null; + return; + } + Path path = new Path(basePath, String.valueOf(nextRetrievalFile)); + Path offsetPath = new Path(basePath, nextRetrievalFile + OFFSET_SUFFIX); + nextRetrievalData = null; + nextRetrievalData = readData(path); + byte[] flushedOffset = readData(offsetPath); + nextFlushedLong = Server.readLong(flushedOffset, 0); + } + } catch (Throwable e) { + logger.warn("in storage executor ", e); + + } + } + }; + } + +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java new file mode 100644 index 0000000..add1831 --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/Storage.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.storage; + +import com.datatorrent.netlet.util.Slice; + +/** + * <p>Storage interface.</p> + * + * @since 0.9.2 + */ +public interface Storage +{ + /** + * key in the context for Unique identifier for the storage which may be used to recover from failure. + */ + String ID = "id"; + + /** + * This stores the bytes and returns the unique identifier to retrieve these bytes + * + * @param bytes + * @return + */ + byte[] store(Slice bytes); + + /** + * This returns the data bytes for the current identifier and the identifier for next data bytes. <br/> + * The first eight bytes contain the identifier and the remaining bytes contain the data + * + * @param identifier + * @return + */ + byte[] retrieve(byte[] identifier); + + /** + * This returns data bytes and the identifier for the next data bytes. The identifier for current data bytes is based + * on the retrieve method call and number of retrieveNext method calls after retrieve method call. <br/> + * The first eight bytes contain the identifier and the remaining bytes contain the data + * + * @return + */ + byte[] retrieveNext(); + + /** + * This is used to clean up the files identified by identifier + * + * @param identifier + */ + void clean(byte[] identifier); + + /** + * This flushes the data from stream + * + */ + void flush(); + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/resources/flume-conf/flume-conf.sample.properties ---------------------------------------------------------------------- diff --git a/flume/src/main/resources/flume-conf/flume-conf.sample.properties b/flume/src/main/resources/flume-conf/flume-conf.sample.properties index 1782d4a..af59e52 100644 --- a/flume/src/main/resources/flume-conf/flume-conf.sample.properties +++ b/flume/src/main/resources/flume-conf/flume-conf.sample.properties @@ -23,7 +23,7 @@ agent1.sinks = dt # first sink - dt - agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink + agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink agent1.sinks.dt.id = sink1 agent1.sinks.dt.hostname = localhost agent1.sinks.dt.port = 8080 @@ -31,7 +31,7 @@ agent1.sinks.dt.throughputAdjustmentFactor = 2 agent1.sinks.dt.maximumEventsPerTransaction = 5000 agent1.sinks.dt.minimumEventsPerTransaction = 1 - agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage + agent1.sinks.dt.storage = org.apache.apex.malhar.flume.storage.HDFSStorage agent1.sinks.dt.storage.restore = false agent1.sinks.dt.storage.baseDir = /tmp/flume101 agent1.sinks.dt.channel = ch1 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java deleted file mode 100644 index f182edc..0000000 --- a/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.discovery; - -import org.codehaus.jackson.type.TypeReference; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.details.InstanceSerializer; - -import com.datatorrent.flume.discovery.Discovery.Service; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertNotNull; - -/** - * - */ -@Ignore -public class ZKAssistedDiscoveryTest -{ - public ZKAssistedDiscoveryTest() - { - } - - @Test - public void testSerialization() throws Exception - { - ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); - discovery.setServiceName("DTFlumeTest"); - discovery.setConnectionString("localhost:2181"); - discovery.setBasePath("/HelloDT"); - discovery.setup(null); - ServiceInstance<byte[]> instance = discovery.getInstance(new Service<byte[]>() - { - @Override - public String getHost() - { - return "localhost"; - } - - @Override - public int getPort() - { - return 8080; - } - - @Override - public byte[] getPayload() - { - return null; - } - - @Override - public String getId() - { - return "localhost8080"; - } - - }); - InstanceSerializer<byte[]> instanceSerializer = - discovery.getInstanceSerializerFactory().getInstanceSerializer(new TypeReference<ServiceInstance<byte[]>>() - { - }); - byte[] serialize = instanceSerializer.serialize(instance); - logger.debug("serialized json = {}", new String(serialize)); - ServiceInstance<byte[]> deserialize = instanceSerializer.deserialize(serialize); - assertArrayEquals("Metadata", instance.getPayload(), deserialize.getPayload()); - } - - @Test - public void testDiscover() - { - ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); - discovery.setServiceName("DTFlumeTest"); - discovery.setConnectionString("localhost:2181"); - discovery.setBasePath("/HelloDT"); - discovery.setup(null); - assertNotNull("Discovered Sinks", discovery.discover()); - discovery.teardown(); - } - - @Test - public void testAdvertize() - { - ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); - discovery.setServiceName("DTFlumeTest"); - discovery.setConnectionString("localhost:2181"); - discovery.setBasePath("/HelloDT"); - discovery.setup(null); - - Service<byte[]> service = new Service<byte[]>() - { - @Override - public String getHost() - { - return "chetan"; - } - - @Override - public int getPort() - { - return 5033; - } - - @Override - public byte[] getPayload() - { - return new byte[] {3, 2, 1}; - } - - @Override - public String getId() - { - return "uniqueId"; - } - - }; - discovery.advertise(service); - discovery.teardown(); - } - - private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscoveryTest.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java deleted file mode 100644 index 8256916..0000000 --- a/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.integration; - -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flume.Event; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.LocalMode; -import com.datatorrent.api.Operator; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.flume.operator.AbstractFlumeInputOperator; -import com.datatorrent.flume.storage.EventCodec; - -/** - * - */ -@Ignore -public class ApplicationTest implements StreamingApplication -{ - public static class FlumeInputOperator extends AbstractFlumeInputOperator<Event> - { - @Override - public Event convert(Event event) - { - return event; - } - } - - public static class Counter implements Operator - { - private int count; - private transient Event event; - public final transient DefaultInputPort<Event> input = new DefaultInputPort<Event>() - { - @Override - public void process(Event tuple) - { - count++; - event = tuple; - } - - }; - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - logger.debug("total count = {}, tuple = {}", count, event); - } - - @Override - public void setup(OperatorContext context) - { - } - - @Override - public void teardown() - { - } - - private static final Logger logger = LoggerFactory.getLogger(Counter.class); - } - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 1000); - FlumeInputOperator flume = dag.addOperator("FlumeOperator", new FlumeInputOperator()); - flume.setConnectAddresses(new String[]{"test:127.0.0.1:8080"}); - flume.setCodec(new EventCodec()); - Counter counter = dag.addOperator("Counter", new Counter()); - - dag.addStream("Slices", flume.output, counter.input).setLocality(Locality.CONTAINER_LOCAL); - } - - @Test - public void test() - { - try { - LocalMode.runApp(this, Integer.MAX_VALUE); - } catch (Exception ex) { - logger.warn("The dag seems to be not testable yet, if it's - remove this exception handling", ex); - } - - } - - private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java deleted file mode 100644 index aca99c3..0000000 --- a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.interceptor; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.HashMap; - -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.flume.Context; -import org.apache.flume.interceptor.Interceptor; - -import static org.junit.Assert.assertArrayEquals; - -/** - * Tests for {@link ColumnFilteringFormattingInterceptor} - */ -public class ColumnFilteringFormattingInterceptorTest -{ - private static InterceptorTestHelper helper; - - @BeforeClass - public static void startUp() - { - HashMap<String, String> contextMap = new HashMap<String, String>(); - contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); - contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{1}\001{2}\001{3}\001"); - - helper = new InterceptorTestHelper(new ColumnFilteringFormattingInterceptor.Builder(), contextMap); - } - - @Test - public void testInterceptEvent() - { - helper.testIntercept_Event(); - } - - @Test - public void testFiles() throws IOException, URISyntaxException - { - helper.testFiles(); - } - - @Test - public void testInterceptEventWithPrefix() - { - HashMap<String, String> contextMap = new HashMap<String, String>(); - contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); - contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "\001{1}\001{2}\001{3}\001"); - - ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder(); - builder.configure(new Context(contextMap)); - Interceptor interceptor = builder.build(); - - assertArrayEquals("Six Fields", - "\001\001Second\001\001".getBytes(), - interceptor.intercept( - new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody()); - } - - @Test - public void testInterceptEventWithLongSeparator() - { - HashMap<String, String> contextMap = new HashMap<String, String>(); - contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); - contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}ghi"); - - ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder(); - builder.configure(new Context(contextMap)); - Interceptor interceptor = builder.build(); - byte[] body = interceptor.intercept( - new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody(); - - assertArrayEquals("Six Fields, " + new String(body), "abcSeconddefghi".getBytes(), body); - } - - @Test - public void testInterceptEventWithTerminatingSeparator() - { - HashMap<String, String> contextMap = new HashMap<String, String>(); - contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); - contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}"); - - ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder(); - builder.configure(new Context(contextMap)); - Interceptor interceptor = builder.build(); - byte[] body = interceptor.intercept( - new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody(); - - assertArrayEquals("Six Fields, " + new String(body), "abcSeconddef".getBytes(), body); - } - - @Test - public void testInterceptEventWithColumnZero() - { - HashMap<String, String> contextMap = new HashMap<String, String>(); - contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); - contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{0}\001"); - - ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder(); - builder.configure(new Context(contextMap)); - Interceptor interceptor = builder.build(); - - assertArrayEquals("Empty Bytes", - "\001".getBytes(), - interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody()); - - assertArrayEquals("One Field", - "First\001".getBytes(), - interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody()); - - assertArrayEquals("Two Fields", - "\001".getBytes(), - interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody()); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java deleted file mode 100644 index 11ee23f..0000000 --- a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.interceptor; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.HashMap; - -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.flume.Context; -import org.apache.flume.interceptor.Interceptor; - -import static org.junit.Assert.assertArrayEquals; - -/** - * - */ -public class ColumnFilteringInterceptorTest -{ - private static InterceptorTestHelper helper; - - @BeforeClass - public static void startUp() - { - HashMap<String, String> contextMap = new HashMap<String, String>(); - contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1)); - contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); - contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "1 2 3"); - - helper = new InterceptorTestHelper(new ColumnFilteringInterceptor.Builder(), contextMap); - } - - @Test - public void testInterceptEvent() - { - helper.testIntercept_Event(); - } - - @Test - public void testFiles() throws IOException, URISyntaxException - { - helper.testFiles(); - } - - @Test - public void testInterceptEventWithColumnZero() - { - HashMap<String, String> contextMap = new HashMap<String, String>(); - contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1)); - contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); - contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "0"); - - ColumnFilteringInterceptor.Builder builder = new ColumnFilteringInterceptor.Builder(); - builder.configure(new Context(contextMap)); - Interceptor interceptor = builder.build(); - - assertArrayEquals("Empty Bytes", - "\001".getBytes(), - interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody()); - - assertArrayEquals("One Field", - "First\001".getBytes(), - interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody()); - - assertArrayEquals("Two Fields", - "\001".getBytes(), - interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody()); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java deleted file mode 100644 index dc95f08..0000000 --- a/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.interceptor; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; - -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.interceptor.Interceptor; - -import com.datatorrent.netlet.util.Slice; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertNotNull; - -/** - * - */ -public class InterceptorTestHelper -{ - private static final byte FIELD_SEPARATOR = 1; - - static class MyEvent implements Event - { - byte[] body; - - MyEvent(byte[] bytes) - { - body = bytes; - } - - @Override - public Map<String, String> getHeaders() - { - return null; - } - - @Override - public void setHeaders(Map<String, String> map) - { - } - - @Override - @SuppressWarnings("ReturnOfCollectionOrArrayField") - public byte[] getBody() - { - return body; - } - - @Override - @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter") - public void setBody(byte[] bytes) - { - body = bytes; - } - } - - private final Interceptor.Builder builder; - private final Map<String, String> context; - - InterceptorTestHelper(Interceptor.Builder builder, Map<String, String> context) - { - this.builder = builder; - this.context = context; - } - - public void testIntercept_Event() - { - builder.configure(new Context(context)); - Interceptor interceptor = builder.build(); - - assertArrayEquals("Empty Bytes", - "\001\001\001".getBytes(), - interceptor.intercept(new MyEvent("".getBytes())).getBody()); - - assertArrayEquals("One Separator", - "\001\001\001".getBytes(), - interceptor.intercept(new MyEvent("\002".getBytes())).getBody()); - - assertArrayEquals("Two Separators", - "\001\001\001".getBytes(), - interceptor.intercept(new MyEvent("\002\002".getBytes())).getBody()); - - assertArrayEquals("One Field", - "\001\001\001".getBytes(), - interceptor.intercept(new MyEvent("First".getBytes())).getBody()); - - assertArrayEquals("Two Fields", - "First\001\001\001".getBytes(), - interceptor.intercept(new MyEvent("\002First".getBytes())).getBody()); - - assertArrayEquals("Two Fields", - "\001\001\001".getBytes(), - interceptor.intercept(new MyEvent("First\001".getBytes())).getBody()); - - assertArrayEquals("Two Fields", - "Second\001\001\001".getBytes(), - interceptor.intercept(new MyEvent("First\002Second".getBytes())).getBody()); - - assertArrayEquals("Three Fields", - "Second\001\001\001".getBytes(), - interceptor.intercept(new MyEvent("First\002Second\002".getBytes())).getBody()); - - assertArrayEquals("Three Fields", - "\001Second\001\001".getBytes(), - interceptor.intercept(new MyEvent("First\002\002Second".getBytes())).getBody()); - - assertArrayEquals("Four Fields", - "\001Second\001\001".getBytes(), - interceptor.intercept(new MyEvent("First\002\002Second\002".getBytes())).getBody()); - - assertArrayEquals("Five Fields", - "\001Second\001\001".getBytes(), - interceptor.intercept(new MyEvent("First\002\002Second\002\002".getBytes())).getBody()); - - assertArrayEquals("Six Fields", - "\001Second\001\001".getBytes(), - interceptor.intercept(new MyEvent("First\002\002Second\002\002\002".getBytes())).getBody()); - } - - public void testFiles() throws IOException, URISyntaxException - { - Properties properties = new Properties(); - properties.load(getClass().getResourceAsStream("/flume/conf/flume-conf.properties")); - - String interceptor = null; - for (Entry<Object, Object> entry : properties.entrySet()) { - logger.debug("{} => {}", entry.getKey(), entry.getValue()); - - if (builder.getClass().getName().equals(entry.getValue().toString())) { - String key = entry.getKey().toString(); - if (key.endsWith(".type")) { - interceptor = key.substring(0, key.length() - "type".length()); - break; - } - } - } - - assertNotNull(builder.getClass().getName(), interceptor); - @SuppressWarnings({"null", "ConstantConditions"}) - final int interceptorLength = interceptor.length(); - - HashMap<String, String> map = new HashMap<String, String>(); - for (Entry<Object, Object> entry : properties.entrySet()) { - String key = entry.getKey().toString(); - if (key.startsWith(interceptor)) { - map.put(key.substring(interceptorLength), entry.getValue().toString()); - } - } - - builder.configure(new Context(map)); - Interceptor interceptorInstance = builder.build(); - - URL url = getClass().getResource("/test_data/gentxns/"); - assertNotNull("Generated Transactions", url); - - int records = 0; - File dir = new File(url.toURI()); - for (File file : dir.listFiles()) { - records += processFile(file, interceptorInstance); - } - - Assert.assertEquals("Total Records", 2200, records); - } - - private int processFile(File file, Interceptor interceptor) throws IOException - { - InputStream stream = getClass().getResourceAsStream("/test_data/gentxns/" + file.getName()); - BufferedReader br = new BufferedReader(new InputStreamReader(stream)); - - String line; - int i = 0; - while ((line = br.readLine()) != null) { - byte[] body = interceptor.intercept(new MyEvent(line.getBytes())).getBody(); - RawEvent event = RawEvent.from(body, FIELD_SEPARATOR); - Assert.assertEquals("GUID", new Slice(line.getBytes(), 0, 32), event.guid); - logger.debug("guid = {}, time = {}", event.guid, event.time); - i++; - } - - br.close(); - return i; - } - - private static final Logger logger = LoggerFactory.getLogger(InterceptorTestHelper.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java deleted file mode 100644 index c029cd0..0000000 --- a/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.interceptor; - -import java.io.Serializable; - -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.netlet.util.Slice; - -/** - * - */ -public class RawEvent implements Serializable -{ - public Slice guid; - public long time; - public int dimensionsOffset; - - public Slice getGUID() - { - return guid; - } - - public long getTime() - { - return time; - } - - RawEvent() - { - /* needed for Kryo serialization */ - } - - public static RawEvent from(byte[] row, byte separator) - { - final int rowsize = row.length; - - /* - * Lets get the guid out of the current record - */ - int sliceLengh = -1; - while (++sliceLengh < rowsize) { - if (row[sliceLengh] == separator) { - break; - } - } - - int i = sliceLengh + 1; - - /* lets parse the date */ - int dateStart = i; - while (i < rowsize) { - if (row[i++] == separator) { - long time = DATE_PARSER.parseMillis(new String(row, dateStart, i - dateStart - 1)); - RawEvent event = new RawEvent(); - event.guid = new Slice(row, 0, sliceLengh); - event.time = time; - event.dimensionsOffset = i; - return event; - } - } - - return null; - } - - @Override - public int hashCode() - { - int hash = 5; - hash = 61 * hash + (this.guid != null ? this.guid.hashCode() : 0); - hash = 61 * hash + (int)(this.time ^ (this.time >>> 32)); - return hash; - } - - @Override - public String toString() - { - return "RawEvent{" + "guid=" + guid + ", time=" + time + '}'; - } - - @Override - public boolean equals(Object obj) - { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final RawEvent other = (RawEvent)obj; - if (this.guid != other.guid && (this.guid == null || !this.guid.equals(other.guid))) { - return false; - } - return this.time == other.time; - } - - private static final DateTimeFormatter DATE_PARSER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); - private static final Logger logger = LoggerFactory.getLogger(RawEvent.class); - private static final long serialVersionUID = 201312191312L; -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java deleted file mode 100644 index 2f162a8..0000000 --- a/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.operator; - -import java.util.HashSet; -import java.util.Set; - -import org.junit.Test; - -import static org.junit.Assert.assertTrue; - -/** - * - */ -public class AbstractFlumeInputOperatorTest -{ - public AbstractFlumeInputOperatorTest() - { - } - - @Test - public void testThreadLocal() - { - ThreadLocal<Set<Integer>> tl = new ThreadLocal<Set<Integer>>() - { - @Override - protected Set<Integer> initialValue() - { - return new HashSet<Integer>(); - } - - }; - Set<Integer> get1 = tl.get(); - get1.add(1); - assertTrue("Just Added Value", get1.contains(1)); - - Set<Integer> get2 = tl.get(); - assertTrue("Previously added value", get2.contains(1)); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java deleted file mode 100644 index 7949e63..0000000 --- a/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.sink; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flume.channel.MemoryChannel; - -import com.datatorrent.flume.discovery.Discovery; -import com.datatorrent.netlet.AbstractLengthPrependerClient; -import com.datatorrent.netlet.DefaultEventLoop; -import com.datatorrent.netlet.util.Slice; - -/** - * - */ -public class DTFlumeSinkTest -{ - static final String hostname = "localhost"; - int port = 0; - - @Test - @SuppressWarnings("SleepWhileInLoop") - public void testServer() throws InterruptedException, IOException - { - Discovery<byte[]> discovery = new Discovery<byte[]>() - { - @Override - public synchronized void unadvertise(Service<byte[]> service) - { - notify(); - } - - @Override - public synchronized void advertise(Service<byte[]> service) - { - port = service.getPort(); - logger.debug("listening at {}", service); - notify(); - } - - @Override - @SuppressWarnings("unchecked") - public synchronized Collection<Service<byte[]>> discover() - { - try { - wait(); - } catch (InterruptedException ie) { - throw new RuntimeException(ie); - } - return Collections.EMPTY_LIST; - } - - }; - DTFlumeSink sink = new DTFlumeSink(); - sink.setName("TeskSink"); - sink.setHostname(hostname); - sink.setPort(0); - sink.setAcceptedTolerance(2000); - sink.setChannel(new MemoryChannel()); - sink.setDiscovery(discovery); - sink.start(); - AbstractLengthPrependerClient client = new AbstractLengthPrependerClient() - { - private byte[] array; - private int offset = 2; - - @Override - public void onMessage(byte[] buffer, int offset, int size) - { - Slice received = new Slice(buffer, offset, size); - logger.debug("Client Received = {}", received); - Assert.assertEquals(received, - new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, Server.Request.FIXED_SIZE)); - synchronized (DTFlumeSinkTest.this) { - DTFlumeSinkTest.this.notify(); - } - } - - @Override - public void connected() - { - super.connected(); - array = new byte[Server.Request.FIXED_SIZE + offset]; - array[offset] = Server.Command.ECHO.getOrdinal(); - array[offset + 1] = 1; - array[offset + 2] = 2; - array[offset + 3] = 3; - array[offset + 4] = 4; - array[offset + 5] = 5; - array[offset + 6] = 6; - array[offset + 7] = 7; - array[offset + 8] = 8; - Server.writeLong(array, offset + Server.Request.TIME_OFFSET, System.currentTimeMillis()); - write(array, offset, Server.Request.FIXED_SIZE); - } - - }; - - DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient"); - eventloop.start(); - discovery.discover(); - try { - eventloop.connect(new InetSocketAddress(hostname, port), client); - try { - synchronized (this) { - this.wait(); - } - } finally { - eventloop.disconnect(client); - } - } finally { - eventloop.stop(); - } - - sink.stop(); - } - - private static final Logger logger = LoggerFactory.getLogger(DTFlumeSinkTest.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java deleted file mode 100644 index 8c225d1..0000000 --- a/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.sink; - -import java.util.Random; - -import org.junit.Assert; -import org.junit.Test; - -/** - * - */ -public class ServerTest -{ - byte[] array; - - public ServerTest() - { - array = new byte[1024]; - } - - @Test - public void testInt() - { - Server.writeInt(array, 0, Integer.MAX_VALUE); - Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readInt(array, 0)); - - Server.writeInt(array, 0, Integer.MIN_VALUE); - Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readInt(array, 0)); - - Server.writeInt(array, 0, 0); - Assert.assertEquals("Zero Integer", 0, Server.readInt(array, 0)); - - Random rand = new Random(); - for (int i = 0; i < 128; i++) { - int n = rand.nextInt(); - if (rand.nextBoolean()) { - n = -n; - } - Server.writeInt(array, 0, n); - Assert.assertEquals("Random Integer", n, Server.readInt(array, 0)); - } - } - - @Test - public void testLong() - { - Server.writeLong(array, 0, Integer.MAX_VALUE); - Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readLong(array, 0)); - - Server.writeLong(array, 0, Integer.MIN_VALUE); - Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readLong(array, 0)); - - Server.writeLong(array, 0, 0); - Assert.assertEquals("Zero Integer", 0L, Server.readLong(array, 0)); - - Server.writeLong(array, 0, Long.MAX_VALUE); - Assert.assertEquals("Max Long", Long.MAX_VALUE, Server.readLong(array, 0)); - - Server.writeLong(array, 0, Long.MIN_VALUE); - Assert.assertEquals("Min Long", Long.MIN_VALUE, Server.readLong(array, 0)); - - Server.writeLong(array, 0, 0L); - Assert.assertEquals("Zero Long", 0L, Server.readLong(array, 0)); - - Random rand = new Random(); - for (int i = 0; i < 128; i++) { - long n = rand.nextLong(); - if (rand.nextBoolean()) { - n = -n; - } - Server.writeLong(array, 0, n); - Assert.assertEquals("Random Long", n, Server.readLong(array, 0)); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java deleted file mode 100644 index 6b6adcb..0000000 --- a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageMatching.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.storage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.primitives.Ints; - -import com.datatorrent.netlet.util.Slice; - -/** - * - */ -public class HDFSStorageMatching -{ - - public static void main(String[] args) - { - HDFSStorage storage = new HDFSStorage(); - storage.setBaseDir(args[0]); - storage.setId(args[1]); - storage.setRestore(true); - storage.setup(null); - int count = 100000000; - - logger.debug(" start time {}", System.currentTimeMillis()); - int index = 10000; - byte[] b = Ints.toByteArray(index); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - index++; - b = Ints.toByteArray(index); - } - storage.flush(); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - index++; - b = Ints.toByteArray(index); - } - storage.flush(); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - index++; - b = Ints.toByteArray(index); - } - storage.flush(); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - index++; - b = Ints.toByteArray(index); - } - storage.flush(); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - index++; - b = Ints.toByteArray(index); - } - storage.flush(); - logger.debug(" end time {}", System.currentTimeMillis()); - logger.debug(" start time for retrieve {}", System.currentTimeMillis()); - b = storage.retrieve(new byte[8]); - int org_index = index; - index = 10000; - match(b, index); - while (true) { - index++; - b = storage.retrieveNext(); - if (b == null) { - logger.debug(" end time for retrieve {}/{}/{}", System.currentTimeMillis(), index, org_index); - return; - } else { - if (!match(b, index)) { - throw new RuntimeException("failed : " + index); - } - } - } - - } - - public static boolean match(byte[] data, int match) - { - byte[] tempData = new byte[data.length - 8]; - System.arraycopy(data, 8, tempData, 0, tempData.length); - int dataR = Ints.fromByteArray(tempData); - //logger.debug("input: {}, output: {}",match,dataR); - if (match == dataR) { - return true; - } - return false; - } - - private static final Logger logger = LoggerFactory.getLogger(HDFSStorageMatching.class); -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java deleted file mode 100644 index 098f3f7..0000000 --- a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformance.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.storage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.netlet.util.Slice; - -/** - * - */ -public class HDFSStoragePerformance -{ - - public static void main(String[] args) - { - HDFSStorage storage = new HDFSStorage(); - storage.setBaseDir("."); - storage.setId("gaurav_flume_1"); - storage.setRestore(true); - storage.setup(null); - int count = 1000000; - - logger.debug(" start time {}", System.currentTimeMillis()); - int index = 10000; - byte[] b = new byte[1024]; - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - } - storage.flush(); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - } - storage.flush(); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - } - storage.flush(); - logger.debug(" end time {}", System.currentTimeMillis()); - logger.debug(" start time for retrieve {}", System.currentTimeMillis()); - storage.retrieve(new byte[8]); - String inputData = new String(b); - index = 1; - while (true) { - b = storage.retrieveNext(); - if (b == null) { - logger.debug(" end time for retrieve {}", System.currentTimeMillis()); - return; - } else { - if (!match(b, inputData)) { - throw new RuntimeException("failed : " + index); - } - } - - index++; - } - - } - - public static boolean match(byte[] data, String match) - { - byte[] tempData = new byte[data.length - 8]; - System.arraycopy(data, 8, tempData, 0, tempData.length); -// logger.debug("input: {}, output: {}",match,new String(tempData)); - return (match.equals(new String(tempData))); - } - - private static final Logger logger = LoggerFactory.getLogger(HDFSStoragePerformance.class); -} -
