Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 ffc25e57c -> d710af9b1
MLHR-1804 MLHR-1805 MLHR-1806 #resolve #comment Refactoring of file splitter and fix for kryo exception Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ea202dae Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ea202dae Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ea202dae Branch: refs/heads/devel-3 Commit: ea202daed756d8a0ca633ec14c0539451ddc581a Parents: ffc25e5 Author: Chandni Singh <[email protected]> Authored: Tue Aug 25 12:08:21 2015 -0700 Committer: Chandni Singh <[email protected]> Committed: Tue Sep 22 15:24:51 2015 -0700 ---------------------------------------------------------------------- .../datatorrent/lib/io/block/BlockMetadata.java | 78 ++- .../lib/io/fs/AbstractFileSplitter.java | 539 +++++++++++++++++ .../com/datatorrent/lib/io/fs/FileSplitter.java | 6 +- .../datatorrent/lib/io/fs/FileSplitterBase.java | 142 +++++ .../lib/io/fs/FileSplitterInput.java | 594 +++++++++++++++++++ .../lib/io/fs/FileSplitterBaseTest.java | 264 +++++++++ .../lib/io/fs/FileSplitterInputTest.java | 486 +++++++++++++++ 7 files changed, 2096 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java index 255a67f..1f47f8f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,6 +15,10 @@ */ package com.datatorrent.lib.io.block; +import javax.validation.constraints.NotNull; + +import com.google.common.base.Preconditions; + /** * Represents the metadata of a block. * @@ -51,8 +55,9 @@ public interface BlockMetadata { private long offset; private long length; - private final boolean isLastBlock; - private final long previousBlockId; + private boolean isLastBlock; + private long previousBlockId; + private long blockId; @SuppressWarnings("unused") protected AbstractBlockMetadata() @@ -61,6 +66,7 @@ public interface BlockMetadata length = -1; isLastBlock = false; previousBlockId = -1; + blockId = -1; } /** @@ -71,12 +77,32 @@ public interface BlockMetadata * @param isLastBlock true if this is the last block of file * @param previousBlockId id of the previous block */ + @Deprecated public AbstractBlockMetadata(long offset, long length, boolean isLastBlock, long previousBlockId) { this.offset = offset; this.length = length; this.isLastBlock = isLastBlock; this.previousBlockId = previousBlockId; + this.blockId = -1; + } + + /** + * Constructs Block metadata + * + * @param blockId block id + * @param offset offset of the file in the block + * @param length length of the file in the block + * @param isLastBlock true if this is the last block of file + * @param previousBlockId id of the previous block + */ + public AbstractBlockMetadata(long blockId, long offset, long length, boolean isLastBlock, long previousBlockId) + { + this.blockId = blockId; + this.offset = offset; + this.length = length; + this.isLastBlock = isLastBlock; + this.previousBlockId = previousBlockId; } @Override @@ -89,14 +115,14 @@ public interface BlockMetadata return false; } - AbstractBlockMetadata that = (AbstractBlockMetadata) o; + AbstractBlockMetadata that = (AbstractBlockMetadata)o; return getBlockId() == that.getBlockId(); } @Override public int hashCode() { - return (int) getBlockId(); + return (int)getBlockId(); } @Override @@ -133,11 +159,37 @@ public interface BlockMetadata return isLastBlock; } + public void setLastBlock(boolean lastBlock) + { + this.isLastBlock = lastBlock; + } + @Override public long getPreviousBlockId() { return previousBlockId; } + + /** + * Sets the previous block id. + * + * @param previousBlockId previous block id. + */ + public void setPreviousBlockId(long previousBlockId) + { + this.previousBlockId = previousBlockId; + } + + @Override + public long getBlockId() + { + return blockId; + } + + public void setBlockId(long blockId) + { + this.blockId = blockId; + } } /** @@ -146,31 +198,33 @@ public interface BlockMetadata public static class FileBlockMetadata extends AbstractBlockMetadata { private final String filePath; - private final long blockId; protected FileBlockMetadata() { super(); filePath = null; - blockId = -1; } public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock, long previousBlockId) { - super(offset, length, isLastBlock, previousBlockId); + super(blockId, offset, length, isLastBlock, previousBlockId); this.filePath = filePath; - this.blockId = blockId; } - @Override - public long getBlockId() + public FileBlockMetadata(String filePath) { - return blockId; + this.filePath = filePath; } public String getFilePath() { return filePath; } + + public FileBlockMetadata newInstance(@NotNull String filePath) + { + Preconditions.checkNotNull(filePath); + return new FileBlockMetadata(filePath); + } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java new file mode 100644 index 0000000..b8513ee --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java @@ -0,0 +1,539 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.lib.io.fs; + +import java.io.IOException; +import java.util.Iterator; + +import javax.annotation.Nullable; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; + +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.block.BlockMetadata; + +/** + * An abstract File Splitter. + */ +public abstract class AbstractFileSplitter extends BaseOperator +{ + protected Long blockSize; + private int sequenceNo; + + /** + * This is a threshold on the no. of blocks emitted per window. A lot of blocks emitted + * per window can overwhelm the downstream operators. This setting helps to control that. + */ + @Min(1) + protected int blocksThreshold; + + protected transient long blockCount; + + protected BlockMetadataIterator blockMetadataIterator; + + protected transient int operatorId; + protected transient Context.OperatorContext context; + protected transient long currentWindowId; + + @AutoMetric + protected int filesProcessed; + + public final transient DefaultOutputPort<FileMetadata> filesMetadataOutput = new DefaultOutputPort<>(); + public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new DefaultOutputPort<>(); + + public AbstractFileSplitter() + { + blocksThreshold = Integer.MAX_VALUE; + } + + @Override + public void setup(Context.OperatorContext context) + { + Preconditions.checkArgument(blockSize == null || blockSize > 0, "invalid block size"); + + operatorId = context.getId(); + this.context = context; + currentWindowId = context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID); + if (blockSize == null) { + blockSize = getDefaultBlockSize(); + } + } + + @Override + public void beginWindow(long windowId) + { + filesProcessed = 0; + blockCount = 0; + currentWindowId = windowId; + } + + protected void process() + { + if (blockMetadataIterator != null && blockCount < blocksThreshold) { + emitBlockMetadata(); + } + + FileInfo fileInfo; + while (blockCount < blocksThreshold && (fileInfo = getFileInfo()) != null) { + if (!processFileInfo(fileInfo)) { + break; + } + } + } + + /** + * @return {@link FileInfo} + */ + protected abstract FileInfo getFileInfo(); + + /** + * @param fileInfo file info + * @return true if blocks threshold is reached; false otherwise + */ + protected boolean processFileInfo(FileInfo fileInfo) + { + try { + FileMetadata fileMetadata = buildFileMetadata(fileInfo); + filesMetadataOutput.emit(fileMetadata); + filesProcessed++; + if (!fileMetadata.isDirectory()) { + blockMetadataIterator = new BlockMetadataIterator(this, fileMetadata, blockSize); + if (!emitBlockMetadata()) { + //block threshold reached + return false; + } + } + return true; + } catch (IOException e) { + throw new RuntimeException("creating metadata", e); + } + } + + /** + * @return true if all the blocks were emitted; false otherwise + */ + protected boolean emitBlockMetadata() + { + while (blockMetadataIterator.hasNext()) { + if (blockCount++ < blocksThreshold) { + this.blocksMetadataOutput.emit(blockMetadataIterator.next()); + } else { + return false; + } + } + blockMetadataIterator = null; + return true; + } + + /** + * Builds block metadata + * + * @param pos offset of the block + * @param lengthOfFileInBlock length of the block in file + * @param blockNumber block number + * @param fileMetadata file metadata + * @param isLast last block of the file + * @return + */ + protected BlockMetadata.FileBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber, + FileMetadata fileMetadata, boolean isLast) + { + BlockMetadata.FileBlockMetadata fileBlockMetadata = createBlockMetadata(fileMetadata); + fileBlockMetadata.setBlockId(fileMetadata.getBlockIds()[blockNumber - 1]); + fileBlockMetadata.setOffset(pos); + fileBlockMetadata.setLength(lengthOfFileInBlock); + fileBlockMetadata.setLastBlock(isLast); + fileBlockMetadata.setPreviousBlockId(blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2]); + + return fileBlockMetadata; + } + + /** + * Can be overridden for creating block metadata of a type that extends {@link BlockMetadata.FileBlockMetadata} + */ + protected BlockMetadata.FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata) + { + return new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath()); + } + + /** + * Creates file-metadata and populates no. of blocks in the metadata. + * + * @param fileInfo file information + * @return file-metadata + * @throws IOException + */ + protected FileMetadata buildFileMetadata(FileInfo fileInfo) throws IOException + { + LOG.debug("file {}", fileInfo.getFilePath()); + FileMetadata fileMetadata = createFileMetadata(fileInfo); + Path path = new Path(fileInfo.getFilePath()); + + fileMetadata.setFileName(path.getName()); + + FileStatus status = getFileStatus(path); + fileMetadata.setDirectory(status.isDirectory()); + fileMetadata.setFileLength(status.getLen()); + + if (!status.isDirectory()) { + int noOfBlocks = (int)((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1)); + if (fileMetadata.getDataOffset() >= status.getLen()) { + noOfBlocks = 0; + } + fileMetadata.setNumberOfBlocks(noOfBlocks); + populateBlockIds(fileMetadata); + } + return fileMetadata; + } + + /** + * This can be over-ridden to create file metadata of type that extends {@link FileSplitterInput.FileMetadata} + * + * @param fileInfo file information + * @return file-metadata + */ + protected FileMetadata createFileMetadata(FileInfo fileInfo) + { + return new FileMetadata(fileInfo.getFilePath()); + } + + protected void populateBlockIds(FileMetadata fileMetadata) + { + // block ids are 32 bits of operatorId | 32 bits of sequence number + long[] blockIds = new long[fileMetadata.getNumberOfBlocks()]; + long longLeftSide = ((long)operatorId) << 32; + for (int i = 0; i < fileMetadata.getNumberOfBlocks(); i++) { + blockIds[i] = longLeftSide | sequenceNo++ & 0xFFFFFFFFL; + } + fileMetadata.setBlockIds(blockIds); + } + + /** + * Get default block size which is used when the user hasn't specified block size. + * + * @return default block size. + */ + protected abstract long getDefaultBlockSize(); + + /** + * Get status of a file. + * + * @param path path of a file + * @return file status + */ + protected abstract FileStatus getFileStatus(Path path) throws IOException; + + public void setBlockSize(Long blockSize) + { + this.blockSize = blockSize; + } + + public Long getBlockSize() + { + return blockSize; + } + + public void setBlocksThreshold(int threshold) + { + this.blocksThreshold = threshold; + } + + public int getBlocksThreshold() + { + return blocksThreshold; + } + + /** + * An {@link Iterator} for Block-Metadatas of a file. + */ + protected static class BlockMetadataIterator implements Iterator<BlockMetadata.FileBlockMetadata> + { + private final FileMetadata fileMetadata; + private final long blockSize; + + private long pos; + private int blockNumber; + + private final transient AbstractFileSplitter splitter; + + protected BlockMetadataIterator() + { + //for kryo + fileMetadata = null; + blockSize = -1; + splitter = null; + } + + protected BlockMetadataIterator(AbstractFileSplitter splitter, FileMetadata fileMetadata, long blockSize) + { + this.splitter = splitter; + this.fileMetadata = fileMetadata; + this.blockSize = blockSize; + this.pos = fileMetadata.getDataOffset(); + this.blockNumber = 0; + } + + @Override + public boolean hasNext() + { + return pos < fileMetadata.getFileLength(); + } + + @SuppressWarnings("StatementWithEmptyBody") + @Override + public BlockMetadata.FileBlockMetadata next() + { + long length; + while ((length = blockSize * ++blockNumber) <= pos) { + } + boolean isLast = length >= fileMetadata.getFileLength(); + long lengthOfFileInBlock = isLast ? fileMetadata.getFileLength() : length; + BlockMetadata.FileBlockMetadata fileBlock = splitter.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast); + pos = lengthOfFileInBlock; + return fileBlock; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("remove not supported"); + } + } + + /** + * Represents the file metadata - file path, name, no. of blocks, etc. + */ + public static class FileMetadata + { + @NotNull + private String filePath; + private String fileName; + private int numberOfBlocks; + private long dataOffset; + private long fileLength; + private long discoverTime; + private long[] blockIds; + private boolean isDirectory; + + @SuppressWarnings("unused") + protected FileMetadata() + { + //for kryo + filePath = null; + discoverTime = System.currentTimeMillis(); + } + + /** + * Constructs file metadata + * + * @param filePath file path + */ + public FileMetadata(@NotNull String filePath) + { + this.filePath = filePath; + discoverTime = System.currentTimeMillis(); + } + + /** + * Returns the total number of blocks. + */ + public int getNumberOfBlocks() + { + return numberOfBlocks; + } + + /** + * Sets the total number of blocks. + */ + public void setNumberOfBlocks(int numberOfBlocks) + { + this.numberOfBlocks = numberOfBlocks; + } + + /** + * Returns the file name. + */ + public String getFileName() + { + return fileName; + } + + /** + * Sets the file name. + */ + public void setFileName(String fileName) + { + this.fileName = fileName; + } + + /** + * Sets the file path. + */ + public void setFilePath(String filePath) + { + this.filePath = filePath; + } + + /** + * Returns the file path. + */ + public String getFilePath() + { + return filePath; + } + + /** + * Returns the data offset. + */ + public long getDataOffset() + { + return dataOffset; + } + + /** + * Sets the data offset. + */ + public void setDataOffset(long offset) + { + this.dataOffset = offset; + } + + /** + * Returns the file length. + */ + public long getFileLength() + { + return fileLength; + } + + /** + * Sets the file length. + */ + public void setFileLength(long fileLength) + { + this.fileLength = fileLength; + } + + /** + * Returns the file discover time. + */ + public long getDiscoverTime() + { + return discoverTime; + } + + /** + * Sets the discover time. + */ + public void setDiscoverTime(long discoverTime) + { + this.discoverTime = discoverTime; + } + + /** + * Returns the block ids associated with the file. + */ + public long[] getBlockIds() + { + return blockIds; + } + + /** + * Sets the blocks ids of the file. + */ + public void setBlockIds(long[] blockIds) + { + this.blockIds = blockIds; + } + + /** + * Sets whether the file metadata is a directory. + */ + public void setDirectory(boolean isDirectory) + { + this.isDirectory = isDirectory; + } + + /** + * @return true if it is a directory; false otherwise. + */ + public boolean isDirectory() + { + return isDirectory; + } + } + + /** + * A class that encapsulates file path. + */ + public static class FileInfo + { + protected final String directoryPath; + protected final String relativeFilePath; + + protected FileInfo() + { + directoryPath = null; + relativeFilePath = null; + } + + public FileInfo(@Nullable String directoryPath, @NotNull String relativeFilePath) + { + this.directoryPath = directoryPath; + this.relativeFilePath = relativeFilePath; + } + + /** + * @return directory path + */ + public String getDirectoryPath() + { + return directoryPath; + } + + /** + * @return path relative to directory + */ + public String getRelativeFilePath() + { + return relativeFilePath; + } + + /** + * @return full path of the file + */ + public String getFilePath() + { + if (directoryPath == null) { + return relativeFilePath; + } + return new Path(directoryPath, relativeFilePath).toUri().getPath(); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractFileSplitter.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java index 58554c3..874d486 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java @@ -59,14 +59,15 @@ import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata; * The operator emits block metadata and file metadata.<br/> * * The file system/directory space should be different for different partitions of file splitter. - * The scanning of * + * @deprecated use {@link FileSplitterInput}. This splitter has issues with recovery and fixing that breaks backward compatibility. * @displayName File Splitter * @category Input * @tags file * @since 2.0.0 */ @OperatorAnnotation(checkpointableWithinAppWindow = false) +@Deprecated public class FileSplitter implements InputOperator, Operator.CheckpointListener { protected Long blockSize; @@ -454,6 +455,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener /** * Represents the file metadata - file path, name, no. of blocks, etc. */ + @Deprecated public static class FileMetadata { @NotNull @@ -614,6 +616,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener } } + @Deprecated public static class TimeBasedDirectoryScanner implements Component<Context.OperatorContext>, Runnable { private static long DEF_SCAN_INTERVAL_MILLIS = 5000; @@ -939,6 +942,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener /** * A class that represents the file discovered by time-based scanner. */ + @Deprecated protected static class FileInfo { protected final String directoryPath; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java new file mode 100644 index 0000000..005377c --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java @@ -0,0 +1,142 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.lib.io.fs; + +import java.io.IOException; +import java.util.LinkedList; + +import javax.validation.constraints.NotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; + +/** + * A file splitter that receives its input from an upstream operator. + */ +public class FileSplitterBase extends AbstractFileSplitter implements Operator.IdleTimeHandler +{ + @NotNull + protected String file; + protected transient FileSystem fs; + + protected final LinkedList<FileInfo> fileInfos; + protected transient int sleepTimeMillis; + + public FileSplitterBase() + { + fileInfos = new LinkedList<>(); + } + + public final transient DefaultInputPort<FileInfo> input = new DefaultInputPort<FileInfo>() + { + @Override + public void process(FileInfo fileInfo) + { + fileInfos.add(fileInfo); + FileSplitterBase.this.process(); + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + sleepTimeMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); + try { + fs = getFSInstance(); + } catch (IOException e) { + throw new RuntimeException(e); + } + super.setup(context); + } + + protected FileSystem getFSInstance() throws IOException + { + return FileSystem.newInstance(new Path(file).toUri(), new Configuration()); + } + + @Override + protected FileInfo getFileInfo() + { + if (fileInfos.size() > 0) { + return fileInfos.remove(); + } + return null; + } + + @Override + public void handleIdleTime() + { + if (blockCount < blocksThreshold && (blockMetadataIterator != null || fileInfos.size() > 0)) { + process(); + } else { + /* nothing to do here, so sleep for a while to avoid busy loop */ + try { + Thread.sleep(sleepTimeMillis); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + } + } + + @Override + public void teardown() + { + super.teardown(); + try { + fs.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected long getDefaultBlockSize() + { + return fs.getDefaultBlockSize(new Path(file)); + } + + @Override + protected FileStatus getFileStatus(Path path) throws IOException + { + return fs.getFileStatus(path); + } + + /** + * File path from which the File System is inferred. + * + * @param file files + */ + public void setFile(@NotNull String file) + { + this.file = Preconditions.checkNotNull(file, "file path"); + } + + /** + * @return file path from which the File System is inferred. + */ + public String getFile() + { + return file; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java new file mode 100644 index 0000000..a381be5 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -0,0 +1,594 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.lib.io.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.annotation.Nullable; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.Stateless; + +import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Input operator that scans a directory for files and splits a file into blocks.<br/> + * The operator emits block metadata and file metadata.<br/> + * + * The file system/directory space should be different for different partitions of file splitter. + * The scanning of + * + * @displayName File Splitter + * @category Input + * @tags file + * @since 2.0.0 + */ +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener +{ + @NotNull + private IdempotentStorageManager idempotentStorageManager; + @NotNull + protected final transient LinkedList<ScannedFileInfo> currentWindowRecoveryState; + + @NotNull + private TimeBasedDirectoryScanner scanner; + @NotNull + private Map<String, Long> referenceTimes; + + private transient long sleepMillis; + + public FileSplitterInput() + { + super(); + currentWindowRecoveryState = Lists.newLinkedList(); + idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + referenceTimes = Maps.newHashMap(); + scanner = new TimeBasedDirectoryScanner(); + } + + @Override + public void setup(Context.OperatorContext context) + { + sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); + scanner.setup(context); + idempotentStorageManager.setup(context); + super.setup(context); + + long largestRecoveryWindow = idempotentStorageManager.getLargestRecoveryWindow(); + if (largestRecoveryWindow == Stateless.WINDOW_ID || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) { + scanner.startScanning(Collections.unmodifiableMap(referenceTimes)); + } + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + replay(windowId); + } + } + + protected void replay(long windowId) + { + try { + @SuppressWarnings("unchecked") + LinkedList<ScannedFileInfo> recoveredData = (LinkedList<ScannedFileInfo>)idempotentStorageManager.load(operatorId, windowId); + if (recoveredData == null) { + //This could happen when there are multiple physical instances and one of them is ahead in processing windows. + return; + } + if (blockMetadataIterator != null) { + emitBlockMetadata(); + } + for (ScannedFileInfo info : recoveredData) { + updateReferenceTimes(info); + FileMetadata fileMetadata = buildFileMetadata(info); + filesMetadataOutput.emit(fileMetadata); + + blockMetadataIterator = new BlockMetadataIterator(this, fileMetadata, blockSize); + if (!emitBlockMetadata()) { + break; + } + } + } catch (IOException e) { + throw new RuntimeException("replay", e); + } + if (windowId == idempotentStorageManager.getLargestRecoveryWindow()) { + scanner.startScanning(Collections.unmodifiableMap(referenceTimes)); + } + } + + @Override + public void emitTuples() + { + if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + return; + } + + Throwable throwable; + if ((throwable = scanner.atomicThrowable.get()) != null) { + DTThrowable.rethrow(throwable); + } + if (blockMetadataIterator == null && scanner.discoveredFiles.isEmpty()) { + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException e) { + throw new RuntimeException("waiting for work", e); + } + } + process(); + } + + @Override + protected FileInfo getFileInfo() + { + return scanner.pollFile(); + } + + @Override + protected boolean processFileInfo(FileInfo fileInfo) + { + ScannedFileInfo scannedFileInfo = (ScannedFileInfo)fileInfo; + currentWindowRecoveryState.add(scannedFileInfo); + updateReferenceTimes(scannedFileInfo); + return super.processFileInfo(fileInfo) && !scannedFileInfo.lastFileOfScan; + } + + protected void updateReferenceTimes(ScannedFileInfo fileInfo) + { + referenceTimes.put(fileInfo.getFilePath(), fileInfo.modifiedTime); + } + + @Override + public void endWindow() + { + if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + try { + idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + } catch (IOException e) { + throw new RuntimeException("saving recovery", e); + } + } + currentWindowRecoveryState.clear(); + } + + @Override + protected long getDefaultBlockSize() + { + return scanner.fs.getDefaultBlockSize(new Path(scanner.files.iterator().next())); + } + + @Override + protected FileStatus getFileStatus(Path path) throws IOException + { + return scanner.fs.getFileStatus(path); + } + + @Override + public void checkpointed(long l) + { + } + + @Override + public void committed(long l) + { + try { + idempotentStorageManager.deleteUpTo(operatorId, l); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void teardown() + { + scanner.teardown(); + } + + public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + { + this.idempotentStorageManager = idempotentStorageManager; + } + + public IdempotentStorageManager getIdempotentStorageManager() + { + return this.idempotentStorageManager; + } + + public void setScanner(TimeBasedDirectoryScanner scanner) + { + this.scanner = scanner; + } + + public TimeBasedDirectoryScanner getScanner() + { + return this.scanner; + } + + public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext> + { + private static long DEF_SCAN_INTERVAL_MILLIS = 5000; + + private boolean recursive; + + private transient volatile boolean trigger; + + @NotNull + @Size(min = 1) + private final Set<String> files; + + @Min(0) + private long scanIntervalMillis; + private String filePatternRegularExp; + + protected transient long lastScanMillis; + protected transient FileSystem fs; + protected final transient LinkedBlockingDeque<ScannedFileInfo> discoveredFiles; + protected final transient ExecutorService scanService; + protected final transient AtomicReference<Throwable> atomicThrowable; + + private transient volatile boolean running; + protected final transient HashSet<String> ignoredFiles; + protected transient Pattern regex; + protected transient long sleepMillis; + protected transient Map<String, Long> referenceTimes; + + private transient ScannedFileInfo lastScannedInfo; + + public TimeBasedDirectoryScanner() + { + recursive = true; + scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS; + files = Sets.newLinkedHashSet(); + scanService = Executors.newSingleThreadExecutor(); + discoveredFiles = new LinkedBlockingDeque<>(); + atomicThrowable = new AtomicReference<>(); + ignoredFiles = Sets.newHashSet(); + } + + @Override + public void setup(Context.OperatorContext context) + { + sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); + if (filePatternRegularExp != null) { + regex = Pattern.compile(filePatternRegularExp); + } + try { + fs = getFSInstance(); + } catch (IOException e) { + throw new RuntimeException("opening fs", e); + } + } + + protected void startScanning(Map<String, Long> referenceTimes) + { + this.referenceTimes = Preconditions.checkNotNull(referenceTimes); + scanService.submit(this); + } + + @Override + public void teardown() + { + running = false; + scanService.shutdownNow(); + try { + fs.close(); + } catch (IOException e) { + throw new RuntimeException("closing fs", e); + } + } + + protected FileSystem getFSInstance() throws IOException + { + return FileSystem.newInstance(new Path(files.iterator().next()).toUri(), new Configuration()); + } + + @Override + public void run() + { + running = true; + try { + while (running) { + if ((trigger || (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis)) && + (lastScannedInfo == null || referenceTimes.get(lastScannedInfo.getFilePath()) != null)) { + trigger = false; + for (String afile : files) { + scan(new Path(afile), null); + } + scanComplete(); + } else { + Thread.sleep(sleepMillis); + } + } + } catch (Throwable throwable) { + LOG.error("service", throwable); + running = false; + atomicThrowable.set(throwable); + DTThrowable.rethrow(throwable); + } + } + + /** + * Operations that need to be done once a scan is complete. + */ + protected void scanComplete() + { + LOG.debug("scan complete {}", lastScanMillis); + ScannedFileInfo fileInfo = discoveredFiles.peekLast(); + if (fileInfo != null) { + fileInfo.lastFileOfScan = true; + lastScannedInfo = fileInfo; + } + lastScanMillis = System.currentTimeMillis(); + } + + protected void scan(@NotNull Path filePath, Path rootPath) + { + try { + FileStatus parentStatus = fs.getFileStatus(filePath); + String parentPathStr = filePath.toUri().getPath(); + + LOG.debug("scan {}", parentPathStr); + + FileStatus[] childStatuses = fs.listStatus(filePath); + for (FileStatus status : childStatuses) { + Path childPath = status.getPath(); + ScannedFileInfo info = createScannedFileInfo(filePath, parentStatus, childPath, status, rootPath); + + if (skipFile(childPath, status.getModificationTime(), referenceTimes.get(info.getFilePath()))) { + continue; + } + + if (status.isDirectory()) { + if (recursive) { + scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath); + } + } + + String childPathStr = childPath.toUri().getPath(); + if (ignoredFiles.contains(childPathStr)) { + continue; + } + if (acceptFile(childPathStr)) { + LOG.debug("found {}", childPathStr); + discoveredFiles.add(info); + } else { + // don't look at it again + ignoredFiles.add(childPathStr); + } + } + } catch (FileNotFoundException fnf) { + LOG.warn("Failed to list directory {}", filePath, fnf); + } catch (IOException e) { + throw new RuntimeException("listing files", e); + } + } + + protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath, @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath) + { + ScannedFileInfo info; + if (rootPath == null) { + info = parentStatus.isDirectory() ? + new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), parentStatus.getModificationTime()) : + new ScannedFileInfo(null, childPath.toUri().getPath(), parentStatus.getModificationTime()); + } else { + URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri()); + info = new ScannedFileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(), parentStatus.getModificationTime()); + } + return info; + } + + /** + * Skips file/directory based on their modification time.<br/> + * + * @param path file path + * @param modificationTime modification time + * @param lastModificationTime last cached directory modification time + * @return true to skip; false otherwise. + * @throws IOException + */ + protected static boolean skipFile(@SuppressWarnings("unused") @NotNull Path path, @NotNull Long modificationTime, + Long lastModificationTime) throws IOException + { + return (!(lastModificationTime == null || modificationTime > lastModificationTime)); + } + + /** + * Accepts file which match a regular pattern. + * + * @param filePathStr file path + * @return true if the path matches the pattern; false otherwise; + */ + protected boolean acceptFile(String filePathStr) + { + if (regex != null) { + Matcher matcher = regex.matcher(filePathStr); + if (!matcher.matches()) { + return false; + } + } + return true; + } + + public FileInfo pollFile() + { + return discoveredFiles.poll(); + } + + /** + * Gets the regular expression for file names to split. + * + * @return regular expression + */ + public String getFilePatternRegularExp() + { + return filePatternRegularExp; + } + + /** + * Only files with names matching the given java regular expression are split. + * + * @param filePatternRegexp regular expression + */ + public void setFilePatternRegularExp(String filePatternRegexp) + { + this.filePatternRegularExp = filePatternRegexp; + } + + /** + * A comma separated list of directories to scan. If the path is not fully qualified the default + * file system is used. A fully qualified path can be provided to scan directories in other filesystems. + * + * @param files files + */ + public void setFiles(String files) + { + Iterables.addAll(this.files, Splitter.on(",").omitEmptyStrings().split(files)); + } + + /** + * Gets the files to be scanned. + * + * @return files to be scanned. + */ + public String getFiles() + { + return Joiner.on(",").join(this.files); + } + + /** + * True if recursive; false otherwise. + * + * @param recursive true if recursive; false otherwise. + */ + public void setRecursive(boolean recursive) + { + this.recursive = recursive; + } + + /** + * Sets whether scan will be recursive. + * + * @return true if recursive; false otherwise. + */ + public boolean isRecursive() + { + return this.recursive; + } + + /** + * Sets the trigger which will initiate scan. + * + * @param trigger + */ + public void setTrigger(boolean trigger) + { + this.trigger = trigger; + } + + /** + * The trigger which will initiate scan. + * + * @return trigger + */ + public boolean isTrigger() + { + return this.trigger; + } + + /** + * Returns the frequency with which new files are scanned for in milliseconds. + * + * @return The scan interval in milliseconds. + */ + public long getScanIntervalMillis() + { + return scanIntervalMillis; + } + + /** + * Sets the frequency with which new files are scanned for in milliseconds. + * + * @param scanIntervalMillis The scan interval in milliseconds. + */ + public void setScanIntervalMillis(long scanIntervalMillis) + { + this.scanIntervalMillis = scanIntervalMillis; + } + } + + /** + * File info created for files discovered by scanner + */ + public static class ScannedFileInfo extends AbstractFileSplitter.FileInfo + { + protected final long modifiedTime; + private transient boolean lastFileOfScan; + + private ScannedFileInfo() + { + super(); + modifiedTime = -1; + } + + public ScannedFileInfo(@Nullable String directoryPath, @NotNull String relativeFilePath, long modifiedTime) + { + super(directoryPath, relativeFilePath); + this.modifiedTime = modifiedTime; + } + + protected boolean isLastFileOfScan() + { + return lastFileOfScan; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java new file mode 100644 index 0000000..61ccfad --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java @@ -0,0 +1,264 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.lib.io.fs; + +import java.io.File; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.junit.*; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.*; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +/** + * Tests for {@link FileSplitterBase} + */ +public class FileSplitterBaseTest +{ + @ClassRule + public static FileSplitterInputTest.TestClassMeta classTestMeta = new FileSplitterInputTest.TestClassMeta(); + + static class BastTestMeta extends TestWatcher + { + public String dataDirectory; + + FileSplitterBase fileSplitter; + CollectorTestSink<FileSplitterInput.FileMetadata> fileMetadataSink; + CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink; + Set<String> filePaths; + Context.OperatorContext context; + + @Override + protected void starting(org.junit.runner.Description description) + { + + String methodName = description.getMethodName(); + String className = description.getClassName(); + this.dataDirectory = "target/" + className + "/" + methodName; + try { + filePaths = FileSplitterInputTest.createData(this.dataDirectory); + } catch (IOException e) { + throw new RuntimeException(e); + } + + fileSplitter = new FileSplitterBase(); + fileSplitter.setFile(this.dataDirectory); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(Context.OperatorContext.SPIN_MILLIS, 500); + + context = new OperatorContextTestHelper.TestIdOperatorContext(0, attributes); + fileSplitter.setup(context); + + fileMetadataSink = new CollectorTestSink<>(); + TestUtils.setSink(fileSplitter.filesMetadataOutput, fileMetadataSink); + + blockMetadataSink = new CollectorTestSink<>(); + TestUtils.setSink(fileSplitter.blocksMetadataOutput, blockMetadataSink); + } + + @Override + protected void finished(Description description) + { + this.fileSplitter.teardown(); + } + } + + @Rule + public BastTestMeta baseTestMeta = new BastTestMeta(); + + @Test + public void testFileMetadata() throws InterruptedException + { + baseTestMeta.fileSplitter.beginWindow(1); + for (String filePath : baseTestMeta.filePaths) { + baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath)); + } + baseTestMeta.fileSplitter.endWindow(); + Assert.assertEquals("File metadata", 12, baseTestMeta.fileMetadataSink.collectedTuples.size()); + for (Object fileMetadata : baseTestMeta.fileMetadataSink.collectedTuples) { + FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata; + Assert.assertTrue("path: " + metadata.getFilePath(), baseTestMeta.filePaths.contains(metadata.getFilePath())); + Assert.assertNotNull("name: ", metadata.getFileName()); + } + + baseTestMeta.fileMetadataSink.collectedTuples.clear(); + } + + @Test + public void testBlockMetadataNoSplit() throws InterruptedException + { + baseTestMeta.fileSplitter.beginWindow(1); + for (String filePath : baseTestMeta.filePaths) { + baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath)); + } + Assert.assertEquals("Blocks", 12, baseTestMeta.blockMetadataSink.collectedTuples.size()); + for (Object blockMetadata : baseTestMeta.blockMetadataSink.collectedTuples) { + BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata; + Assert.assertTrue("path: " + metadata.getFilePath(), baseTestMeta.filePaths.contains(metadata.getFilePath())); + } + } + + @Test + public void testBlockMetadataWithSplit() throws InterruptedException + { + baseTestMeta.fileSplitter.setBlockSize(2L); + baseTestMeta.fileSplitter.beginWindow(1); + for (String filePath : baseTestMeta.filePaths) { + baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath)); + } + Assert.assertEquals("Files", 12, baseTestMeta.fileMetadataSink.collectedTuples.size()); + + int noOfBlocks = 0; + for (int i = 0; i < 12; i++) { + FileSplitterInput.FileMetadata fm = baseTestMeta.fileMetadataSink.collectedTuples.get(i); + File testFile = new File(baseTestMeta.dataDirectory, fm.getFileName()); + noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0)); + } + Assert.assertEquals("Blocks", noOfBlocks, baseTestMeta.blockMetadataSink.collectedTuples.size()); + } + + @Test + public void testBlocksThreshold() throws InterruptedException + { + int noOfBlocks = 0; + for (int i = 0; i < 12; i++) { + File testFile = new File(baseTestMeta.dataDirectory, "file" + i + ".txt"); + noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0)); + } + + baseTestMeta.fileSplitter.setBlockSize(2L); + baseTestMeta.fileSplitter.setBlocksThreshold(10); + baseTestMeta.fileSplitter.beginWindow(1); + + for (String filePath : baseTestMeta.filePaths) { + baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath)); + } + baseTestMeta.fileSplitter.endWindow(); + + Assert.assertEquals("Blocks", 10, baseTestMeta.blockMetadataSink.collectedTuples.size()); + + for (int window = 2; window < 8; window++) { + baseTestMeta.fileSplitter.beginWindow(window); + baseTestMeta.fileSplitter.handleIdleTime(); + baseTestMeta.fileSplitter.endWindow(); + } + + Assert.assertEquals("Files", 12, baseTestMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("Blocks", noOfBlocks, baseTestMeta.blockMetadataSink.collectedTuples.size()); + } + + @Test + public void testSplitterInApp() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + SplitterApp app = new SplitterApp(); + lma.prepareDAG(app, new Configuration()); + lma.cloneDAG(); // check serialization + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + app.receiver.latch.await(); + Assert.assertEquals("no. of metadata", 12, app.receiver.count); + FileUtils.deleteQuietly(new File("target/SplitterInApp")); + } + + @ApplicationAnnotation(name = "TestApp") + class SplitterApp implements StreamingApplication + { + MockReceiver receiver; + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + dag.setAttribute(DAG.APPLICATION_PATH, "target/SplitterInApp"); + MockFileInput fileInput = dag.addOperator("Input", new MockFileInput()); + fileInput.filePaths = baseTestMeta.filePaths; + + FileSplitterBase splitter = dag.addOperator("Splitter", new FileSplitterBase()); + splitter.setFile(baseTestMeta.dataDirectory); + + receiver = dag.addOperator("Receiver", new MockReceiver()); + + dag.addStream("files", fileInput.files, splitter.input); + dag.addStream("file-metadata", splitter.filesMetadataOutput, receiver.fileMetadata); + } + } + + static class MockReceiver extends BaseOperator implements StatsListener + { + @AutoMetric + int count; + + transient CountDownLatch latch = new CountDownLatch(1); + public final transient DefaultInputPort<FileSplitterInput.FileMetadata> fileMetadata = new DefaultInputPort<FileSplitterInput.FileMetadata>() + { + @Override + public void process(FileSplitterInput.FileMetadata fileMetadata) + { + count++; + LOG.debug("count {}", count); + } + }; + + @Override + public Response processStats(BatchedOperatorStats stats) + { + Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1); + count = (Integer)operatorStats.metrics.get("count"); + if (count == 12) { + latch.countDown(); + } + return null; + } + } + + static class MockFileInput extends BaseOperator implements InputOperator + { + + public final transient DefaultOutputPort<FileSplitterInput.FileInfo> files = new DefaultOutputPort<>(); + + protected Set<String> filePaths; + + protected boolean done; + + @Override + public void emitTuples() + { + if (!done) { + done = true; + for (String file : filePaths) { + files.emit(new FileSplitterInput.FileInfo(null, file)); + } + } + } + } + + private static final transient Logger LOG = LoggerFactory.getLogger(FileSplitterBaseTest.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java new file mode 100644 index 0000000..8dfea7a --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java @@ -0,0 +1,486 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.lib.io.fs; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.junit.*; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; + +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +/** + * Tests for {@link FileSplitterInput} + */ +public class FileSplitterInputTest +{ + + public static class TestClassMeta extends TestWatcher + { + @Override + protected void finished(Description description) + { + try { + FileContext.getLocalFSFileContext().delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + static Set<String> createData(String dataDirectory) throws IOException + { + Set<String> filePaths = Sets.newHashSet(); + FileContext.getLocalFSFileContext().delete(new Path(new File(dataDirectory).getAbsolutePath()), true); + HashSet<String> allLines = Sets.newHashSet(); + for (int file = 0; file < 12; file++) { + HashSet<String> lines = Sets.newHashSet(); + for (int line = 0; line < 2; line++) { + lines.add("f" + file + "l" + line); + } + allLines.addAll(lines); + File created = new File(dataDirectory, "file" + file + ".txt"); + filePaths.add(new Path(dataDirectory, created.getName()).toUri().toString()); + FileUtils.write(created, StringUtils.join(lines, '\n')); + } + return filePaths; + } + + public static class TestMeta extends TestWatcher + { + public String dataDirectory; + + FileSplitterInput fileSplitterInput; + CollectorTestSink<FileSplitterInput.FileMetadata> fileMetadataSink; + CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink; + Set<String> filePaths; + Context.OperatorContext context; + MockScanner scanner; + + @Override + protected void starting(org.junit.runner.Description description) + { + + String methodName = description.getMethodName(); + String className = description.getClassName(); + this.dataDirectory = "target/" + className + "/" + methodName + "/data"; + try { + filePaths = createData(this.dataDirectory); + } catch (IOException e) { + throw new RuntimeException(e); + } + + fileSplitterInput = new FileSplitterInput(); + scanner = new MockScanner(); + fileSplitterInput.setScanner(scanner); + fileSplitterInput.getScanner().setScanIntervalMillis(500); + fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt"); + fileSplitterInput.getScanner().setFiles(dataDirectory); + fileSplitterInput.setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager()); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(Context.DAGContext.APPLICATION_PATH, "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis())); + + context = new OperatorContextTestHelper.TestIdOperatorContext(0, attributes); + fileSplitterInput.setup(context); + + fileMetadataSink = new CollectorTestSink<>(); + TestUtils.setSink(fileSplitterInput.filesMetadataOutput, fileMetadataSink); + + blockMetadataSink = new CollectorTestSink<>(); + TestUtils.setSink(fileSplitterInput.blocksMetadataOutput, blockMetadataSink); + } + + @Override + protected void finished(Description description) + { + this.fileSplitterInput.teardown(); + } + } + + @ClassRule + public static TestClassMeta classTestMeta = new TestClassMeta(); + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testFileMetadata() throws InterruptedException + { + testMeta.fileSplitterInput.beginWindow(1); + testMeta.scanner.semaphore.acquire(); + + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + Assert.assertEquals("File metadata", 12, testMeta.fileMetadataSink.collectedTuples.size()); + for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) { + FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata; + Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath())); + Assert.assertNotNull("name: ", metadata.getFileName()); + } + + testMeta.fileMetadataSink.collectedTuples.clear(); + } + + @Test + public void testBlockMetadataNoSplit() throws InterruptedException + { + testMeta.fileSplitterInput.beginWindow(1); + testMeta.scanner.semaphore.acquire(); + + testMeta.fileSplitterInput.emitTuples(); + Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size()); + for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) { + BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata; + Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath())); + } + } + + @Test + public void testBlockMetadataWithSplit() throws InterruptedException + { + testMeta.fileSplitterInput.setBlockSize(2L); + testMeta.fileSplitterInput.beginWindow(1); + testMeta.scanner.semaphore.acquire(); + + testMeta.fileSplitterInput.emitTuples(); + Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size()); + + int noOfBlocks = 0; + for (int i = 0; i < 12; i++) { + FileSplitterInput.FileMetadata fm = testMeta.fileMetadataSink.collectedTuples.get(i); + File testFile = new File(testMeta.dataDirectory, fm.getFileName()); + noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0)); + } + Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size()); + } + + @Test + public void testIdempotency() throws InterruptedException + { + IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = + new IdempotentStorageManager.FSIdempotentStorageManager(); + testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager); + + testMeta.fileSplitterInput.setup(testMeta.context); + //will emit window 1 from data directory + testFileMetadata(); + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); + + testMeta.fileSplitterInput.setup(testMeta.context); + testMeta.fileSplitterInput.beginWindow(1); + Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size()); + for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) { + BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata; + Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath())); + } + } + + @Test + public void testTimeScan() throws InterruptedException, IOException, TimeoutException + { + testFileMetadata(); + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); + + Thread.sleep(1000); + //added a new relativeFilePath + File f13 = new File(testMeta.dataDirectory, "file13" + ".txt"); + HashSet<String> lines = Sets.newHashSet(); + for (int line = 0; line < 2; line++) { + lines.add("f13" + "l" + line); + } + FileUtils.write(f13, StringUtils.join(lines, '\n')); + + //window 2 + testMeta.fileSplitterInput.beginWindow(2); + testMeta.scanner.semaphore.acquire(); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size()); + } + + @Test + public void testTrigger() throws InterruptedException, IOException, TimeoutException + { + testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000); + testFileMetadata(); + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); + + Thread.sleep(1000); + //added a new relativeFilePath + File f13 = new File(testMeta.dataDirectory, "file13" + ".txt"); + HashSet<String> lines = Sets.newHashSet(); + for (int line = 0; line < 2; line++) { + lines.add("f13" + "l" + line); + } + FileUtils.write(f13, StringUtils.join(lines, '\n')); + testMeta.fileSplitterInput.getScanner().setTrigger(true); + + //window 2 + testMeta.fileSplitterInput.beginWindow(2); + testMeta.scanner.semaphore.acquire(); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size()); + } + + @Test + public void testBlocksThreshold() throws InterruptedException + { + int noOfBlocks = 0; + for (int i = 0; i < 12; i++) { + File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt"); + noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0)); + } + + testMeta.fileSplitterInput.setBlockSize(2L); + testMeta.fileSplitterInput.setBlocksThreshold(10); + testMeta.fileSplitterInput.beginWindow(1); + + testMeta.scanner.semaphore.acquire(); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + Assert.assertEquals("Blocks", 10, testMeta.blockMetadataSink.collectedTuples.size()); + + for (int window = 2; window < 8; window++) { + testMeta.fileSplitterInput.beginWindow(window); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + } + + Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size()); + } + + @Test + public void testIdempotencyWithBlocksThreshold() throws InterruptedException + { + IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager); + testMeta.fileSplitterInput.setBlocksThreshold(10); + testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500); + testMeta.fileSplitterInput.setup(testMeta.context); + + testBlocksThreshold(); + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); + + testMeta.fileSplitterInput.setup(testMeta.context); + for (int i = 1; i < 8; i++) { + testMeta.fileSplitterInput.beginWindow(i); + } + Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("Blocks", 62, testMeta.blockMetadataSink.collectedTuples.size()); + } + + @Test + public void testFirstWindowAfterRecovery() throws IOException, InterruptedException + { + testIdempotencyWithBlocksThreshold(); + Thread.sleep(1000); + HashSet<String> lines = Sets.newHashSet(); + for (int line = 2; line < 4; line++) { + lines.add("f13" + "l" + line); + } + File f13 = new File(testMeta.dataDirectory, "file13" + ".txt"); + + FileUtils.writeLines(f13, lines, true); + + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); + + testMeta.fileSplitterInput.beginWindow(8); + testMeta.scanner.semaphore.acquire(); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("Blocks", 6, testMeta.blockMetadataSink.collectedTuples.size()); + } + + @Test + public void testRecoveryOfPartialFile() throws InterruptedException + { + IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager); + testMeta.fileSplitterInput.setBlockSize(2L); + testMeta.fileSplitterInput.setBlocksThreshold(2); + testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500); + + Kryo kryo = new Kryo(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Output loutput = new Output(bos); + kryo.writeObject(loutput, testMeta.fileSplitterInput); + loutput.close(); + + testMeta.fileSplitterInput.setup(testMeta.context); + + testMeta.fileSplitterInput.beginWindow(1); + + testMeta.scanner.semaphore.acquire(); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + //file0.txt has just 5 blocks. Since blocks threshold is 2, only 2 are emitted. + Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size()); + + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); + + testMeta.fileSplitterInput.teardown(); + + //there was a failure and the operator was re-deployed + Input lInput = new Input(bos.toByteArray()); + testMeta.fileSplitterInput = kryo.readObject(lInput, testMeta.fileSplitterInput.getClass()); + lInput.close(); + TestUtils.setSink(testMeta.fileSplitterInput.blocksMetadataOutput, testMeta.blockMetadataSink); + TestUtils.setSink(testMeta.fileSplitterInput.filesMetadataOutput, testMeta.fileMetadataSink); + + testMeta.fileSplitterInput.setup(testMeta.context); + testMeta.fileSplitterInput.beginWindow(1); + + Assert.assertEquals("Recovered Files", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size()); + + testMeta.fileSplitterInput.beginWindow(2); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + Assert.assertEquals("Blocks", 4, testMeta.blockMetadataSink.collectedTuples.size()); + + String file1 = testMeta.fileMetadataSink.collectedTuples.get(0).getFileName(); + + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); + + testMeta.fileSplitterInput.beginWindow(3); + ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + Assert.assertEquals("New file", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size()); + + String file2 = testMeta.fileMetadataSink.collectedTuples.get(0).getFileName(); + + Assert.assertTrue("Block file name 0", testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1)); + Assert.assertTrue("Block file name 1", testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2)); + } + + @Test + public void testRecursive() throws InterruptedException, IOException + { + testMeta.fileSplitterInput.getScanner().regex = null; + testFileMetadata(); + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); + + Thread.sleep(1000); + //added a new relativeFilePath + File f13 = new File(testMeta.dataDirectory + "/child", "file13" + ".txt"); + HashSet<String> lines = Sets.newHashSet(); + for (int line = 0; line < 2; line++) { + lines.add("f13" + "l" + line); + } + FileUtils.write(f13, StringUtils.join(lines, '\n')); + + //window 2 + testMeta.fileSplitterInput.beginWindow(2); + testMeta.scanner.semaphore.acquire(); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + Assert.assertEquals("window 2: files", 2, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size()); + } + + @Test + public void testSingleFile() throws InterruptedException, IOException + { + testMeta.fileSplitterInput.teardown(); + testMeta.fileSplitterInput.setScanner(new MockScanner()); + testMeta.fileSplitterInput.getScanner().regex = null; + testMeta.fileSplitterInput.getScanner().setFiles(testMeta.dataDirectory + "/file1.txt"); + + testMeta.fileSplitterInput.setup(testMeta.context); + testMeta.fileSplitterInput.beginWindow(1); + ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(); + + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + Assert.assertEquals("File metadata count", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("File metadata", new File(testMeta.dataDirectory + "/file1.txt").getAbsolutePath(), + testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath()); + } + + private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner + { + transient Semaphore semaphore; + + private MockScanner() + { + super(); + this.semaphore = new Semaphore(0); + } + + @Override + protected void scanComplete() + { + super.scanComplete(); + if (discoveredFiles.size() > 0 && discoveredFiles.getLast().isLastFileOfScan()) { + semaphore.release(); + LOG.debug("discovered {}", discoveredFiles.size()); + } + + } + } + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInputTest.class); +}
