Repository: apex-malhar Updated Branches: refs/heads/master 9db044741 -> 6b0e931ce
APEXMALHAR-2325 1) Set the file system default block size to the reader context. 2) Set the block size to the reader in FSInputModule Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/6b0e931c Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/6b0e931c Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/6b0e931c Branch: refs/heads/master Commit: 6b0e931ce6f676273ff1978c03be5f2e237c050d Parents: 9db0447 Author: chaitanya <[email protected]> Authored: Thu Nov 10 15:09:27 2016 +0530 Committer: chaitanya <[email protected]> Committed: Wed Nov 16 15:09:26 2016 +0530 ---------------------------------------------------------------------- .../datatorrent/lib/io/block/FSSliceReader.java | 12 ++++++ .../datatorrent/lib/io/fs/FSInputModule.java | 4 ++ .../lib/io/block/FSSliceReaderTest.java | 40 ++++++++++++++++++++ 3 files changed, 56 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6b0e931c/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java index 60fd93c..9ac2a61 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java @@ -18,6 +18,9 @@ */ package com.datatorrent.lib.io.block; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Context; import com.datatorrent.api.StatsListener; import com.datatorrent.netlet.util.Slice; @@ -39,6 +42,15 @@ public class FSSliceReader extends AbstractFSBlockReader<Slice> } @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + if (basePath != null && this.readerContext instanceof ReaderContext.FixedBytesReaderContext) { + ((ReaderContext.FixedBytesReaderContext)this.readerContext).setLength((int)fs.getDefaultBlockSize(new Path(basePath))); + } + } + + @Override protected Slice convertToRecord(byte[] bytes) { return new Slice(bytes); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6b0e931c/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java index 6d54756..d4edb40 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java @@ -29,6 +29,7 @@ import com.datatorrent.lib.codec.KryoSerializableStreamCodec; import com.datatorrent.lib.io.block.AbstractBlockReader; import com.datatorrent.lib.io.block.BlockMetadata; import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; import com.datatorrent.netlet.util.Slice; /** @@ -98,6 +99,9 @@ public class FSInputModule implements Module } if (blockSize != 0) { fileSplitter.setBlockSize(blockSize); + if (blockReader.getReaderContext() instanceof ReaderContext.FixedBytesReaderContext) { + ((ReaderContext.FixedBytesReaderContext)blockReader.getReaderContext()).setLength((int)blockSize); + } } FileSplitterInput.TimeBasedDirectoryScanner fileScanner = fileSplitter.getScanner(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6b0e931c/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java index 37222a7..374617a 100644 --- a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java @@ -24,12 +24,17 @@ import java.io.IOException; import java.util.List; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; @@ -38,6 +43,8 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.netlet.util.Slice; +import static org.mockito.Mockito.when; + /** * Tests for {@link FSSliceReader}. */ @@ -135,4 +142,37 @@ public class FSSliceReaderTest FileUtils.contentEquals(testMeta.dataFile, outputFile); } + @Mock + FileSystem fileSystem; + + public class FSTestReader extends FSSliceReader + { + @Override + protected FileSystem getFSInstance() throws IOException + { + return fileSystem; + } + } + + @Before + public void beforeTest() + { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testBlockSize() throws IOException + { + long blockSize = 1000; + Path path = new Path(testMeta.output); + when(fileSystem.getDefaultBlockSize(path)).thenReturn(blockSize); + Attribute.AttributeMap.DefaultAttributeMap readerAttr = new Attribute.AttributeMap.DefaultAttributeMap(); + readerAttr.put(DAG.APPLICATION_ID, Long.toHexString(System.currentTimeMillis())); + readerAttr.put(Context.OperatorContext.SPIN_MILLIS, 10); + + FSTestReader reader = new FSTestReader(); + reader.setBasePath(testMeta.output); + reader.setup(new OperatorContextTestHelper.TestIdOperatorContext(1, readerAttr)); + Assert.assertEquals("Block Size", blockSize, (long)((ReaderContext.FixedBytesReaderContext)reader.getReaderContext()).getLength()); + } }
