steveloughran commented on a change in pull request #3499: URL: https://github.com/apache/hadoop/pull/3499#discussion_r718620177
########## File path: hadoop-common-project/benchmark/pom.xml ########## @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-project</artifactId> + <version>3.4.0-SNAPSHOT</version> + <relativePath>../../hadoop-project</relativePath> + </parent> + <artifactId>hadoop-benchmark</artifactId> Review comment: think this should go into hadoop-tools, as it allows for more things to go in later ########## File path: hadoop-common-project/benchmark/pom.xml ########## @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-project</artifactId> + <version>3.4.0-SNAPSHOT</version> + <relativePath>../../hadoop-project</relativePath> + </parent> + <artifactId>hadoop-benchmark</artifactId> + <version>3.4.0-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>Apache Hadoop Common Benchmark</name> + <description>Apache Hadoop Common Benchmark</description> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <maven.compiler.useIncrementalCompilation>false</maven.compiler.useIncrementalCompilation> + <jmh.version>1.20</jmh.version> Review comment: declaration to go into hadoop project, ########## File path: hadoop-common-project/benchmark/pom.xml ########## @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-project</artifactId> + <version>3.4.0-SNAPSHOT</version> + <relativePath>../../hadoop-project</relativePath> + </parent> + <artifactId>hadoop-benchmark</artifactId> + <version>3.4.0-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>Apache Hadoop Common Benchmark</name> + <description>Apache Hadoop Common Benchmark</description> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <maven.compiler.useIncrementalCompilation>false</maven.compiler.useIncrementalCompilation> + <jmh.version>1.20</jmh.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> Review comment: declaration to go into hadoop project, with reference here ########## File path: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestAsyncReaderUtils.java ########## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import org.apache.hadoop.fs.ByteBufferPositionedReadable; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.PositionedReadable; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +/** + * Test behavior of {@link AsyncReaderUtils}. + */ +public class TestAsyncReaderUtils { + + @Test + public void testSliceTo() { + final int SIZE = 64 * 1024; + ByteBuffer buffer = ByteBuffer.allocate(SIZE); + // fill the buffer with data + IntBuffer intBuffer = buffer.asIntBuffer(); + for(int i=0; i < SIZE / Integer.BYTES; ++i) { + intBuffer.put(i); + } + // ensure we don't make unnecessary slices + ByteBuffer slice = AsyncReaderUtils.sliceTo(buffer, 100, + new FileRangeImpl(100, SIZE)); + assertSame(buffer, slice); + + // try slicing a range + final int OFFSET = 100; + final int SLICE_START = 1024; + final int SLICE_LENGTH = 16 * 1024; + slice = AsyncReaderUtils.sliceTo(buffer, OFFSET, + new FileRangeImpl(OFFSET + SLICE_START, SLICE_LENGTH)); + // make sure they aren't the same, but use the same backing data + assertNotSame(buffer, slice); + assertSame(buffer.array(), slice.array()); + // test the contents of the slice + intBuffer = slice.asIntBuffer(); + for(int i=0; i < SLICE_LENGTH / Integer.BYTES; ++i) { + assertEquals("i = " + i, i + SLICE_START / Integer.BYTES, intBuffer.get()); + } + } + + @Test + public void testRounding() { + for(int i=5; i < 10; ++i) { + assertEquals("i = "+ i, 5, AsyncReaderUtils.roundDown(i, 5)); + assertEquals("i = "+ i, 10, AsyncReaderUtils.roundUp(i+1, 5)); + } + assertEquals(13, AsyncReaderUtils.roundDown(13, 1)); + assertEquals(13, AsyncReaderUtils.roundUp(13, 1)); + } + + @Test + public void testMerge() { + FileRange base = new FileRangeImpl(2000, 1000); + CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base); + + // test when the gap between is too big + assertFalse(mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2000, 4000)); + assertEquals(1, mergeBase.getUnderlying().size()); + assertEquals(2000, mergeBase.getOffset()); + assertEquals(1000, mergeBase.getLength()); + + // test when the total size gets exceeded + assertFalse(mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2001, 3999)); + assertEquals(1, mergeBase.getUnderlying().size()); + assertEquals(2000, mergeBase.getOffset()); + assertEquals(1000, mergeBase.getLength()); + + // test when the merge works + assertTrue(mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2001, 4000)); + assertEquals(2, mergeBase.getUnderlying().size()); + assertEquals(2000, mergeBase.getOffset()); + assertEquals(4000, mergeBase.getLength()); + + // reset the mergeBase and test with a 10:1 reduction + mergeBase = new CombinedFileRange(200, 300, base); + assertEquals(200, mergeBase.getOffset()); + assertEquals(100, mergeBase.getLength()); + assertTrue(mergeBase.merge(500, 600, + new FileRangeImpl(5000, 1000), 201, 400)); + assertEquals(2, mergeBase.getUnderlying().size()); + assertEquals(200, mergeBase.getOffset()); + assertEquals(400, mergeBase.getLength()); + } + + @Test + public void testSortAndMerge() { + List<FileRange> input = Arrays.asList( + new FileRangeImpl(3000, 100), + new FileRangeImpl(2100, 100), + new FileRangeImpl(1000, 100) + ); + assertFalse(AsyncReaderUtils.isOrderedDisjoint(input, 100, 800)); + List<CombinedFileRange> outputList = AsyncReaderUtils.sortAndMergeRanges( + input, 100, 1001, 2500); + assertEquals(1, outputList.size()); + CombinedFileRange output = outputList.get(0); + assertEquals(3, output.getUnderlying().size()); + assertEquals("range[1000,3100)", output.toString()); + assertTrue(AsyncReaderUtils.isOrderedDisjoint(outputList, 100, 800)); + + // the minSeek doesn't allow the first two to merge + assertFalse(AsyncReaderUtils.isOrderedDisjoint(input, 100, 1000)); + outputList = AsyncReaderUtils.sortAndMergeRanges(input, 100, 1000, 2100); + assertEquals(2, outputList.size()); + assertEquals("range[1000,1100)", outputList.get(0).toString()); + assertEquals("range[2100,3100)", outputList.get(1).toString()); + assertTrue(AsyncReaderUtils.isOrderedDisjoint(outputList, 100, 1000)); + + // the maxSize doesn't allow the third range to merge + assertFalse(AsyncReaderUtils.isOrderedDisjoint(input, 100, 800)); + outputList = AsyncReaderUtils.sortAndMergeRanges(input, 100, 1001, 2099); + assertEquals(2, outputList.size()); + assertEquals("range[1000,2200)", outputList.get(0).toString()); + assertEquals("range[3000,3100)", outputList.get(1).toString()); + assertTrue(AsyncReaderUtils.isOrderedDisjoint(outputList, 100, 800)); + + // test the round up and round down (the maxSize doesn't allow any merges) + assertFalse(AsyncReaderUtils.isOrderedDisjoint(input, 16, 700)); + outputList = AsyncReaderUtils.sortAndMergeRanges(input, 16, 1001, 100); + assertEquals(3, outputList.size()); + assertEquals("range[992,1104)", outputList.get(0).toString()); + assertEquals("range[2096,2208)", outputList.get(1).toString()); + assertEquals("range[2992,3104)", outputList.get(2).toString()); + assertTrue(AsyncReaderUtils.isOrderedDisjoint(outputList, 16, 700)); + } + + interface Stream extends PositionedReadable, ByteBufferPositionedReadable { + // nothing + } + + static void fillBuffer(ByteBuffer buffer) { + byte b = 0; + while (buffer.remaining() > 0) { + buffer.put(b++); + } + } + + @Test + public void testReadRangeFromByteBufferPositionedReadable() throws Exception { + Stream stream = Mockito.mock(Stream.class); + Mockito.doAnswer(invocation -> { + fillBuffer(invocation.getArgument(1)); + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + CompletableFuture<ByteBuffer> result = + AsyncReaderUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + ByteBuffer::allocate); + assertTrue(result.isDone()); + ByteBuffer buffer = result.get(); + assertEquals(100, buffer.remaining()); + byte b = 0; + while (buffer.remaining() > 0) { + assertEquals("remain = " + buffer.remaining(), b++, buffer.get()); + } + + // test an IOException + Mockito.reset(stream); + Mockito.doThrow(new IOException("foo")) + .when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + result = + AsyncReaderUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + ByteBuffer::allocate); + assertTrue(result.isCompletedExceptionally()); + } + + static void runReadRangeFromPositionedReadable(IntFunction<ByteBuffer> allocate) throws Exception { Review comment: be good to make sure this backports ~OK, even before committing ########## File path: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestAsyncReaderUtils.java ########## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import org.apache.hadoop.fs.ByteBufferPositionedReadable; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.PositionedReadable; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +/** + * Test behavior of {@link AsyncReaderUtils}. + */ +public class TestAsyncReaderUtils { + + @Test + public void testSliceTo() { + final int SIZE = 64 * 1024; + ByteBuffer buffer = ByteBuffer.allocate(SIZE); + // fill the buffer with data + IntBuffer intBuffer = buffer.asIntBuffer(); + for(int i=0; i < SIZE / Integer.BYTES; ++i) { + intBuffer.put(i); + } + // ensure we don't make unnecessary slices + ByteBuffer slice = AsyncReaderUtils.sliceTo(buffer, 100, + new FileRangeImpl(100, SIZE)); + assertSame(buffer, slice); + + // try slicing a range + final int OFFSET = 100; + final int SLICE_START = 1024; + final int SLICE_LENGTH = 16 * 1024; + slice = AsyncReaderUtils.sliceTo(buffer, OFFSET, + new FileRangeImpl(OFFSET + SLICE_START, SLICE_LENGTH)); + // make sure they aren't the same, but use the same backing data + assertNotSame(buffer, slice); + assertSame(buffer.array(), slice.array()); + // test the contents of the slice + intBuffer = slice.asIntBuffer(); + for(int i=0; i < SLICE_LENGTH / Integer.BYTES; ++i) { + assertEquals("i = " + i, i + SLICE_START / Integer.BYTES, intBuffer.get()); + } + } + + @Test + public void testRounding() { + for(int i=5; i < 10; ++i) { + assertEquals("i = "+ i, 5, AsyncReaderUtils.roundDown(i, 5)); + assertEquals("i = "+ i, 10, AsyncReaderUtils.roundUp(i+1, 5)); + } + assertEquals(13, AsyncReaderUtils.roundDown(13, 1)); + assertEquals(13, AsyncReaderUtils.roundUp(13, 1)); + } + + @Test + public void testMerge() { + FileRange base = new FileRangeImpl(2000, 1000); + CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base); + + // test when the gap between is too big + assertFalse(mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2000, 4000)); + assertEquals(1, mergeBase.getUnderlying().size()); + assertEquals(2000, mergeBase.getOffset()); + assertEquals(1000, mergeBase.getLength()); + + // test when the total size gets exceeded + assertFalse(mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2001, 3999)); + assertEquals(1, mergeBase.getUnderlying().size()); + assertEquals(2000, mergeBase.getOffset()); + assertEquals(1000, mergeBase.getLength()); + + // test when the merge works + assertTrue(mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2001, 4000)); + assertEquals(2, mergeBase.getUnderlying().size()); + assertEquals(2000, mergeBase.getOffset()); + assertEquals(4000, mergeBase.getLength()); + + // reset the mergeBase and test with a 10:1 reduction + mergeBase = new CombinedFileRange(200, 300, base); + assertEquals(200, mergeBase.getOffset()); + assertEquals(100, mergeBase.getLength()); + assertTrue(mergeBase.merge(500, 600, + new FileRangeImpl(5000, 1000), 201, 400)); + assertEquals(2, mergeBase.getUnderlying().size()); + assertEquals(200, mergeBase.getOffset()); + assertEquals(400, mergeBase.getLength()); + } + + @Test + public void testSortAndMerge() { + List<FileRange> input = Arrays.asList( + new FileRangeImpl(3000, 100), + new FileRangeImpl(2100, 100), + new FileRangeImpl(1000, 100) + ); + assertFalse(AsyncReaderUtils.isOrderedDisjoint(input, 100, 800)); + List<CombinedFileRange> outputList = AsyncReaderUtils.sortAndMergeRanges( + input, 100, 1001, 2500); + assertEquals(1, outputList.size()); + CombinedFileRange output = outputList.get(0); + assertEquals(3, output.getUnderlying().size()); + assertEquals("range[1000,3100)", output.toString()); + assertTrue(AsyncReaderUtils.isOrderedDisjoint(outputList, 100, 800)); + + // the minSeek doesn't allow the first two to merge + assertFalse(AsyncReaderUtils.isOrderedDisjoint(input, 100, 1000)); + outputList = AsyncReaderUtils.sortAndMergeRanges(input, 100, 1000, 2100); + assertEquals(2, outputList.size()); + assertEquals("range[1000,1100)", outputList.get(0).toString()); + assertEquals("range[2100,3100)", outputList.get(1).toString()); + assertTrue(AsyncReaderUtils.isOrderedDisjoint(outputList, 100, 1000)); + + // the maxSize doesn't allow the third range to merge + assertFalse(AsyncReaderUtils.isOrderedDisjoint(input, 100, 800)); + outputList = AsyncReaderUtils.sortAndMergeRanges(input, 100, 1001, 2099); + assertEquals(2, outputList.size()); + assertEquals("range[1000,2200)", outputList.get(0).toString()); + assertEquals("range[3000,3100)", outputList.get(1).toString()); + assertTrue(AsyncReaderUtils.isOrderedDisjoint(outputList, 100, 800)); + + // test the round up and round down (the maxSize doesn't allow any merges) + assertFalse(AsyncReaderUtils.isOrderedDisjoint(input, 16, 700)); + outputList = AsyncReaderUtils.sortAndMergeRanges(input, 16, 1001, 100); + assertEquals(3, outputList.size()); + assertEquals("range[992,1104)", outputList.get(0).toString()); + assertEquals("range[2096,2208)", outputList.get(1).toString()); + assertEquals("range[2992,3104)", outputList.get(2).toString()); + assertTrue(AsyncReaderUtils.isOrderedDisjoint(outputList, 16, 700)); + } + + interface Stream extends PositionedReadable, ByteBufferPositionedReadable { + // nothing + } + + static void fillBuffer(ByteBuffer buffer) { + byte b = 0; + while (buffer.remaining() > 0) { + buffer.put(b++); + } + } + + @Test + public void testReadRangeFromByteBufferPositionedReadable() throws Exception { + Stream stream = Mockito.mock(Stream.class); + Mockito.doAnswer(invocation -> { + fillBuffer(invocation.getArgument(1)); + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + CompletableFuture<ByteBuffer> result = + AsyncReaderUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + ByteBuffer::allocate); + assertTrue(result.isDone()); + ByteBuffer buffer = result.get(); + assertEquals(100, buffer.remaining()); + byte b = 0; + while (buffer.remaining() > 0) { + assertEquals("remain = " + buffer.remaining(), b++, buffer.get()); + } + + // test an IOException + Mockito.reset(stream); + Mockito.doThrow(new IOException("foo")) + .when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + result = + AsyncReaderUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + ByteBuffer::allocate); + assertTrue(result.isCompletedExceptionally()); + } + + static void runReadRangeFromPositionedReadable(IntFunction<ByteBuffer> allocate) throws Exception { + PositionedReadable stream = Mockito.mock(PositionedReadable.class); + Mockito.doAnswer(invocation -> { + byte b=0; + byte[] buffer = invocation.getArgument(1); + for(int i=0; i < buffer.length; ++i) { + buffer[i] = b++; + } + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.anyInt(), + ArgumentMatchers.anyInt()); + CompletableFuture<ByteBuffer> result = + AsyncReaderUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + allocate); + assertTrue(result.isDone()); Review comment: pull out to separate asserts for reuse. Maybe we should add to MoreAsserts as some ``` public static <T> void assertCompletedSuccesfully(CompletableFuture<T> future) { + both asserts with a useful .toString value, and on an exceptional failure throw the exception } ``` ########## File path: hadoop-common-project/hadoop-common/pom.xml ########## @@ -651,6 +651,14 @@ </filesets> </configuration> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>8</source> Review comment: versions to be set in hadoop project pom ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java ########## @@ -267,8 +282,84 @@ public boolean hasCapability(String capability) { public IOStatistics getIOStatistics() { return ioStatistics; } + + // QQ: Should we make this synchronized? + AsynchronousFileChannel getAsyncChannel() throws IOException { + if (asyncChannel == null) { + asyncChannel = AsynchronousFileChannel.open(name.toPath(), + StandardOpenOption.READ); + } + return asyncChannel; + } + + @Override + public void readAsync(List<? extends FileRange> ranges, + IntFunction<ByteBuffer> allocate) { + // Set up all of the futures, so that we can use them if things fail + for(FileRange range: ranges) { + range.setData(new CompletableFuture<>()); + } + try { + AsynchronousFileChannel channel = getAsyncChannel(); + ByteBuffer[] buffers = new ByteBuffer[ranges.size()]; + AsyncHandler asyncHandler = new AsyncHandler(channel, ranges, buffers); + for(int i = 0; i < ranges.size(); ++i) { + FileRange range = ranges.get(i); + buffers[i] = allocate.apply(range.getLength()); + channel.read(buffers[i], range.getOffset(), i, asyncHandler); + } + } catch (IOException ioe) { + LOG.info("Can't get async channel", ioe); Review comment: propose log full stack at debug ########## File path: hadoop-common-project/benchmark/src/main/java/org/apache/hadoop/benchmark/AsyncBenchmark.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.benchmark; + +import org.apache.hadoop.conf.Configuration; Review comment: usual ordering nits ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java ########## @@ -85,4 +89,37 @@ void readFully(long position, byte[] buffer, int offset, int length) * the read operation completed */ void readFully(long position, byte[] buffer) throws IOException; + + /** + * What is the smallest reasonable seek? + * @return the minimum number of bytes + */ + default int minimumReasonableSeek() { Review comment: maybre add `forVectorReads` on these, to show what they are for ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java ########## @@ -267,8 +282,84 @@ public boolean hasCapability(String capability) { public IOStatistics getIOStatistics() { return ioStatistics; } + + // QQ: Should we make this synchronized? Review comment: yes. Also needs to reject if file is closed. ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java ########## @@ -18,9 +18,13 @@ package org.apache.hadoop.fs; import java.io.*; Review comment: might be time to expand the .*, given we are editing the imports ########## File path: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestAsyncReaderUtils.java ########## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import org.apache.hadoop.fs.ByteBufferPositionedReadable; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.PositionedReadable; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +/** + * Test behavior of {@link AsyncReaderUtils}. + */ +public class TestAsyncReaderUtils { Review comment: 1. `extends HadoopTestBase` 2. should go for AssertJ ########## File path: hadoop-common-project/benchmark/src/main/java/org/apache/hadoop/benchmark/AsyncBenchmark.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.benchmark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.EOFException; +import java.io.IOException; + +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.nio.file.FileSystems; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.IntFunction; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +public class AsyncBenchmark { Review comment: wonder what it'd take to make this a contract test -or at least deployable on the command line? ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AsyncReaderUtils.java ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; + +import org.apache.hadoop.fs.ByteBufferPositionedReadable; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.PositionedReadable; + +public class AsyncReaderUtils { + /** + * Read fully a list of file ranges asynchronously from this file. + * The default iterates through the ranges to read each synchronously, but + * the intent is that subclasses can make more efficient readers. + * The data or exceptions are pushed into {@link FileRange#getData()}. + * @param stream the stream to read the data from + * @param ranges the byte ranges to read + * @param allocate the byte buffer allocation + * @param minimumSeek the minimum number of bytes to seek over + * @param maximumRead the largest number of bytes to combine into a single read + */ + public static void readAsync(PositionedReadable stream, + List<? extends FileRange> ranges, + IntFunction<ByteBuffer> allocate, + int minimumSeek, + int maximumRead) { + if (isOrderedDisjoint(ranges, 1, minimumSeek)) { + for(FileRange range: ranges) { + range.setData(readRangeFrom(stream, range, allocate)); + } + } else { + for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek, + maximumRead)) { + CompletableFuture<ByteBuffer> read = + readRangeFrom(stream, range, allocate); + for(FileRange child: range.getUnderlying()) { + child.setData(read.thenApply( + (b) -> sliceTo(b, range.getOffset(), child))); + } + } + } + } + + /** + * Synchronously reads a range from the stream dealing with the combinations + * of ByteBuffers buffers and PositionedReadable streams. + * @param stream the stream to read from + * @param range the range to read + * @param allocate the function to allocate ByteBuffers + * @return the CompletableFuture that contains the read data + */ + public static CompletableFuture<ByteBuffer> readRangeFrom(PositionedReadable stream, + FileRange range, + IntFunction<ByteBuffer> allocate) { + CompletableFuture<ByteBuffer> result = new CompletableFuture<>(); + try { + ByteBuffer buffer = allocate.apply(range.getLength()); + if (stream instanceof ByteBufferPositionedReadable) { + ((ByteBufferPositionedReadable) stream).readFully(range.getOffset(), + buffer); + buffer.flip(); + } else { + if (buffer.isDirect()) { Review comment: propose pulling this out into its own static method which can be tested in isolation, maybe used in implementations ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java ########## @@ -286,8 +295,123 @@ protected int readChunk(long pos, byte[] buf, int offset, int len, public IOStatistics getIOStatistics() { return IOStatisticsSupport.retrieveIOStatistics(datas); } + + public static long findChecksumOffset(long dataOffset, Review comment: does bytesPerSum ever come in as 0? ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java ########## @@ -85,4 +89,37 @@ void readFully(long position, byte[] buffer, int offset, int length) * the read operation completed */ void readFully(long position, byte[] buffer) throws IOException; + + /** + * What is the smallest reasonable seek? + * @return the minimum number of bytes + */ + default int minimumReasonableSeek() { + return 4 * 1024; + } + + /** + * What is the largest size that we should group ranges together as? + * @return the number of bytes to read at once + */ + default int maximumReadSize() { + return 1024 * 1024; + } + + /** + * Read fully a list of file ranges asynchronously from this file. + * The default iterates through the ranges to read each synchronously, but + * the intent is that FSDataInputStream subclasses can make more efficient + * readers. + * As a result of the call, each range will have FileRange.setData(CompletableFuture) + * called with a future that when complete will have a ByteBuffer with the + * data from the file's range. + * @param ranges the byte ranges to read + * @param allocate the function to allocate ByteBuffer + */ + default void readAsync(List<? extends FileRange> ranges, Review comment: * `readVectored()`; the async is part of the implementation * add `throws IOException` in case implementations do this. ########## File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; + +import java.util.ArrayList; +import java.util.List; + +/** + * A file range that represents a set of underlying file ranges. + * This is used when we combine the user's FileRange objects + * together into a single read for efficiency. + */ +public class CombinedFileRange extends FileRangeImpl { Review comment: might be good for the toString() to provide more info than the superclass does -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
