snvijaya commented on a change in pull request #2464:
URL: https://github.com/apache/hadoop/pull/2464#discussion_r531364075
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -1242,7 +1242,7 @@ boolean getIsNamespaceEnabled() throws
AzureBlobFileSystemException {
}
@VisibleForTesting
- Map<String, Long> getInstrumentationMap() {
+ public Map<String, Long> getInstrumentationMap() {
Review comment:
Same as above.
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -47,6 +47,10 @@
StreamCapabilities {
private static final Logger LOG =
LoggerFactory.getLogger(AbfsInputStream.class);
+ public static final int FOOTER_SIZE = 8;
+
Review comment:
static finals together, non-static together .. no need for new lines
above and below for every new field
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -211,6 +235,59 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
return bytesToRead;
}
+ private boolean shouldReadFully() {
+ return this.firstRead && this.context.readSmallFilesCompletely()
+ && this.contentLength <= this.bufferSize;
+ }
+
+ private boolean shouldReadLastBlock(int len) {
+ return this.firstRead && this.context.optimizeFooterRead()
+ && len == FOOTER_SIZE
+ && this.fCursor == this.contentLength - FOOTER_SIZE;
+ }
+
+ private int readFileCompletely(final byte[] b, final int off, final int len)
throws IOException {
+ int validation = validate(b, off, len);
+ if (validation < 1) {
Review comment:
Refer to validate method comments
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -211,6 +235,59 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
return bytesToRead;
}
+ private boolean shouldReadFully() {
+ return this.firstRead && this.context.readSmallFilesCompletely()
+ && this.contentLength <= this.bufferSize;
+ }
+
+ private boolean shouldReadLastBlock(int len) {
+ return this.firstRead && this.context.optimizeFooterRead()
+ && len == FOOTER_SIZE
+ && this.fCursor == this.contentLength - FOOTER_SIZE;
+ }
+
+ private int readFileCompletely(final byte[] b, final int off, final int len)
throws IOException {
+ int validation = validate(b, off, len);
+ if (validation < 1) {
+ return validation;
+ }
+
+ buffer = new byte[bufferSize];
+ bCursor = (int) fCursor;
+ int bytesRead = readInternal(0, buffer, 0, (int) contentLength, true);
+ firstRead = false;
+ if (bytesRead == -1) {
+ return -1;
+ }
+ fCursorAfterLastRead = fCursor;
+ limit = bytesRead;
+ fCursor = bytesRead;
+ return copyToUserBuffer(b, off, len);
+ }
+
+ private int readLastBlock(final byte[] b, final int off, final int len)
throws IOException {
+ int validation = validate(b, off, len);
+ if (validation < 1) {
+ return validation;
+ }
+
+ buffer = new byte[bufferSize];
+ bCursor = (int) (((contentLength < bufferSize) ? contentLength :
bufferSize)
+ - FOOTER_SIZE);
+ long lastBlockStartPos = (contentLength < bufferSize)
+ ? 0
+ : contentLength - bufferSize;
+ int bytesRead = readInternal(lastBlockStartPos, buffer, 0, bufferSize,
Review comment:
So readInternal will go directly to server to fetch the data and server
returning partial data is a valid case. Expectation is that the client will
loop over and continue calling read until it has fetched all the data it needs.
Take a case of client requesting last 8 bytes of a file which here will
translate to a read of last 4 MB. ReadInternal returned 3MB data, and so we
havent got the last 8 bytes that needs to be returned to client app. bCursor on
the other hand has already been set to 4MB - 8 bytes location. This will end up
returning what ever is present in the allocated buffer which is corrupt data.
bytesRead has to be of bufferSize else the whole logic falls apart. As
server can return partial data, respective handling is needed.
##########
File path:
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestAbfsInputStreamReadFooter
+ extends AbstractAbfsIntegrationTest {
+
+ private static final int TEN = 10;
+ private static final int TWENTY = 20;
+
+ public ITestAbfsInputStreamReadFooter() throws Exception {
+ }
+
+ @Test
+ public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
+ testNumBackendCalls(true);
+ }
+
+ @Test
+ public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
+ throws Exception {
+ testNumBackendCalls(false);
+ }
+
+ private void testNumBackendCalls(boolean optimizeFooterRead)
+ throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead);
+ for (int i = 1; i <= 4; i++) {
+ String fileName = methodName.getMethodName() + i;
+ int fileSize = i * ONE_MB;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ int length = AbfsInputStream.FOOTER_SIZE;
+ try (FSDataInputStream iStream = fs.open(testFilePath)) {
+ byte[] buffer = new byte[length];
+
+ Map<String, Long> metricMap = fs.getInstrumentationMap();
+ long requestsMadeBeforeTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+
+ iStream.seek(fileSize - 8);
+ iStream.read(buffer, 0, length);
+
+ iStream.seek(fileSize - (TEN * ONE_KB));
+ iStream.read(buffer, 0, length);
+
+ iStream.seek(fileSize - (TWENTY * ONE_KB));
+ iStream.read(buffer, 0, length);
+
+ metricMap = fs.getInstrumentationMap();
+ long requestsMadeAfterTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+
+ if (optimizeFooterRead) {
+ assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
+ } else {
+ assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSeekToEndAndReadWithConfTrue() throws Exception {
+ testSeekToEndAndReadWithConf(true);
+ }
+
+ @Test
+ public void testSeekToEndAndReadWithConfFalse() throws Exception {
+ testSeekToEndAndReadWithConf(false);
+ }
+
+ private void testSeekToEndAndReadWithConf(boolean optimizeFooterRead) throws
Exception {
+ final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead);
+ for (int i = 5; i <= 10; i++) {
+ String fileName = methodName.getMethodName() + i;
+ int fileSize = i * ONE_MB;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ seekReadAndTest(fs, testFilePath, fileSize - AbfsInputStream.FOOTER_SIZE,
+ AbfsInputStream.FOOTER_SIZE, fileContent);
+ }
+ }
+
+ private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead)
+ throws IOException {
+ final AzureBlobFileSystem fs = getFileSystem();
+ fs.getAbfsStore().getAbfsConfiguration()
+ .setOptimizeFooterRead(optimizeFooterRead);
+ fs.getAbfsStore().getAbfsConfiguration()
+ .setReadSmallFilesCompletely(false);
Review comment:
This shouldnt be enforced by config. As mentioned in earlier comments,
align buffer size and file size for it to become eligible for footer read
optimization.
We need the test to validate that it goes to footer checks even when the
other config is on.
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -1218,7 +1218,7 @@ public boolean failed() {
}
@VisibleForTesting
- AzureBlobFileSystemStore getAbfsStore() {
+ public AzureBlobFileSystemStore getAbfsStore() {
Review comment:
Making methods public for test purposes is not a good idea, esp. for
AzureBlobFileSystem class. Find alternative.
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -161,6 +174,14 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
if (off < 0 || len < 0 || len > b.length - off) {
Review comment:
Validation for read beyond EOF - fCursor > contentLength - can also be
added here
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -161,6 +174,14 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
+ return 1; // 1 indicate success
+ }
+
+ private int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ int validation = validate(b, off, len);
+ if (validation < 1) {
+ return validation;
Review comment:
validate should return a boolean , true if validation is successful and
false if not.
For cases it needs to throw exception is already present, so the if check
wont be hit. Change to:
if (!validate(b, off, len)) { return -1; }
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -141,7 +154,7 @@ public synchronized int read(final byte[] b, final int off,
final int len) throw
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}
- private int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ private int validate(byte[] b, int off, int len) throws IOException {
Review comment:
Why does validate return an int ? There is a return from what validate
does in below method, even if its for error case, its a wrong case.
readOneBlock is supposed to written size of the data read, validate as such
does no data read.
Change to return boolean true for success here.
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -211,6 +235,59 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
return bytesToRead;
}
+ private boolean shouldReadFully() {
+ return this.firstRead && this.context.readSmallFilesCompletely()
+ && this.contentLength <= this.bufferSize;
+ }
+
+ private boolean shouldReadLastBlock(int len) {
+ return this.firstRead && this.context.optimizeFooterRead()
+ && len == FOOTER_SIZE
+ && this.fCursor == this.contentLength - FOOTER_SIZE;
+ }
+
+ private int readFileCompletely(final byte[] b, final int off, final int len)
throws IOException {
+ int validation = validate(b, off, len);
+ if (validation < 1) {
+ return validation;
+ }
+
+ buffer = new byte[bufferSize];
+ bCursor = (int) fCursor;
+ int bytesRead = readInternal(0, buffer, 0, (int) contentLength, true);
+ firstRead = false;
Review comment:
If read failed due to some reason, say throttling. Will this line still
be hit ? Is there a scenario to handle if it doesnt ?
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -211,6 +235,59 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
return bytesToRead;
}
+ private boolean shouldReadFully() {
+ return this.firstRead && this.context.readSmallFilesCompletely()
+ && this.contentLength <= this.bufferSize;
+ }
+
+ private boolean shouldReadLastBlock(int len) {
+ return this.firstRead && this.context.optimizeFooterRead()
+ && len == FOOTER_SIZE
+ && this.fCursor == this.contentLength - FOOTER_SIZE;
+ }
+
+ private int readFileCompletely(final byte[] b, final int off, final int len)
throws IOException {
+ int validation = validate(b, off, len);
+ if (validation < 1) {
+ return validation;
+ }
+
+ buffer = new byte[bufferSize];
+ bCursor = (int) fCursor;
+ int bytesRead = readInternal(0, buffer, 0, (int) contentLength, true);
+ firstRead = false;
+ if (bytesRead == -1) {
+ return -1;
+ }
+ fCursorAfterLastRead = fCursor;
+ limit = bytesRead;
+ fCursor = bytesRead;
+ return copyToUserBuffer(b, off, len);
+ }
+
+ private int readLastBlock(final byte[] b, final int off, final int len)
throws IOException {
+ int validation = validate(b, off, len);
+ if (validation < 1) {
Review comment:
Refer to comments in validate.
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -211,6 +235,59 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
return bytesToRead;
}
+ private boolean shouldReadFully() {
+ return this.firstRead && this.context.readSmallFilesCompletely()
+ && this.contentLength <= this.bufferSize;
+ }
+
+ private boolean shouldReadLastBlock(int len) {
+ return this.firstRead && this.context.optimizeFooterRead()
+ && len == FOOTER_SIZE
+ && this.fCursor == this.contentLength - FOOTER_SIZE;
+ }
+
+ private int readFileCompletely(final byte[] b, final int off, final int len)
throws IOException {
+ int validation = validate(b, off, len);
+ if (validation < 1) {
+ return validation;
+ }
+
+ buffer = new byte[bufferSize];
+ bCursor = (int) fCursor;
+ int bytesRead = readInternal(0, buffer, 0, (int) contentLength, true);
+ firstRead = false;
+ if (bytesRead == -1) {
+ return -1;
+ }
+ fCursorAfterLastRead = fCursor;
+ limit = bytesRead;
+ fCursor = bytesRead;
+ return copyToUserBuffer(b, off, len);
+ }
+
+ private int readLastBlock(final byte[] b, final int off, final int len)
throws IOException {
+ int validation = validate(b, off, len);
+ if (validation < 1) {
+ return validation;
+ }
+
+ buffer = new byte[bufferSize];
+ bCursor = (int) (((contentLength < bufferSize) ? contentLength :
bufferSize)
+ - FOOTER_SIZE);
+ long lastBlockStartPos = (contentLength < bufferSize)
+ ? 0
+ : contentLength - bufferSize;
+ int bytesRead = readInternal(lastBlockStartPos, buffer, 0, bufferSize,
+ true);
Review comment:
In both the new handling, we are bypassing readAheads.
(Unless readAhead is disabled) if read.request.size is configured to 100MB,
data read from store would still be in 4MB chunks as that is the readAhead
buffer size.
If any of these logics determines that more data from an earlier offset
needs to be read, it is triggering reads by bypassing readAheads. So wont it
end up in a 100MB direct read to store ?
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -211,6 +235,59 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
return bytesToRead;
}
+ private boolean shouldReadFully() {
+ return this.firstRead && this.context.readSmallFilesCompletely()
+ && this.contentLength <= this.bufferSize;
+ }
+
+ private boolean shouldReadLastBlock(int len) {
+ return this.firstRead && this.context.optimizeFooterRead()
+ && len == FOOTER_SIZE
+ && this.fCursor == this.contentLength - FOOTER_SIZE;
+ }
+
+ private int readFileCompletely(final byte[] b, final int off, final int len)
throws IOException {
+ int validation = validate(b, off, len);
+ if (validation < 1) {
+ return validation;
+ }
+
+ buffer = new byte[bufferSize];
+ bCursor = (int) fCursor;
+ int bytesRead = readInternal(0, buffer, 0, (int) contentLength, true);
+ firstRead = false;
+ if (bytesRead == -1) {
+ return -1;
+ }
+ fCursorAfterLastRead = fCursor;
+ limit = bytesRead;
+ fCursor = bytesRead;
+ return copyToUserBuffer(b, off, len);
+ }
+
+ private int readLastBlock(final byte[] b, final int off, final int len)
throws IOException {
+ int validation = validate(b, off, len);
+ if (validation < 1) {
+ return validation;
+ }
+
+ buffer = new byte[bufferSize];
+ bCursor = (int) (((contentLength < bufferSize) ? contentLength :
bufferSize)
+ - FOOTER_SIZE);
+ long lastBlockStartPos = (contentLength < bufferSize)
+ ? 0
+ : contentLength - bufferSize;
+ int bytesRead = readInternal(lastBlockStartPos, buffer, 0, bufferSize,
+ true);
+ firstRead = false;
+ if (bytesRead == -1) {
+ return -1;
+ }
+ fCursorAfterLastRead = fCursor;
+ limit = bytesRead;
+ fCursor = lastBlockStartPos + bytesRead;
Review comment:
Shouldnt fCursorAfterLastRead be set after this line ?
##########
File path:
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestAbfsInputStreamReadFooter
+ extends AbstractAbfsIntegrationTest {
+
+ private static final int TEN = 10;
+ private static final int TWENTY = 20;
+
+ public ITestAbfsInputStreamReadFooter() throws Exception {
+ }
+
+ @Test
+ public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
+ testNumBackendCalls(true);
+ }
+
+ @Test
+ public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
+ throws Exception {
+ testNumBackendCalls(false);
+ }
+
+ private void testNumBackendCalls(boolean optimizeFooterRead)
+ throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead);
+ for (int i = 1; i <= 4; i++) {
+ String fileName = methodName.getMethodName() + i;
+ int fileSize = i * ONE_MB;
Review comment:
For testing of footer read optimized, better to have a file that is
atleast 3 or 4 times buffer size. For testing purposes, filesize of 1 mb is
good, set the buffer to a much smaller size over config.
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
##########
@@ -517,6 +527,14 @@ public int getWriteBufferSize() {
return this.writeBufferSize;
}
+ public boolean readSmallFilesCompletely() {
Review comment:
I recall it was discussed that small file criteria will be capped at 4Mb
file size. I dont see that in the review. Was it scratched back to small file
being one less than buffer size ?
##########
File path:
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
##########
@@ -0,0 +1,199 @@
+/**
Review comment:
Run read tests in particular and contract test on a whole multiple times
to check for any random failures.
##########
File path:
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestAbfsInputStreamReadFooter
+ extends AbstractAbfsIntegrationTest {
+
+ private static final int TEN = 10;
+ private static final int TWENTY = 20;
+
+ public ITestAbfsInputStreamReadFooter() throws Exception {
+ }
+
+ @Test
+ public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
+ testNumBackendCalls(true);
+ }
+
+ @Test
+ public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
+ throws Exception {
+ testNumBackendCalls(false);
+ }
+
+ private void testNumBackendCalls(boolean optimizeFooterRead)
+ throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead);
+ for (int i = 1; i <= 4; i++) {
+ String fileName = methodName.getMethodName() + i;
+ int fileSize = i * ONE_MB;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ int length = AbfsInputStream.FOOTER_SIZE;
+ try (FSDataInputStream iStream = fs.open(testFilePath)) {
+ byte[] buffer = new byte[length];
+
+ Map<String, Long> metricMap = fs.getInstrumentationMap();
+ long requestsMadeBeforeTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+
+ iStream.seek(fileSize - 8);
+ iStream.read(buffer, 0, length);
+
+ iStream.seek(fileSize - (TEN * ONE_KB));
+ iStream.read(buffer, 0, length);
+
+ iStream.seek(fileSize - (TWENTY * ONE_KB));
+ iStream.read(buffer, 0, length);
+
+ metricMap = fs.getInstrumentationMap();
+ long requestsMadeAfterTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+
+ if (optimizeFooterRead) {
+ assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
+ } else {
+ assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSeekToEndAndReadWithConfTrue() throws Exception {
+ testSeekToEndAndReadWithConf(true);
+ }
+
+ @Test
+ public void testSeekToEndAndReadWithConfFalse() throws Exception {
+ testSeekToEndAndReadWithConf(false);
+ }
+
+ private void testSeekToEndAndReadWithConf(boolean optimizeFooterRead) throws
Exception {
+ final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead);
+ for (int i = 5; i <= 10; i++) {
+ String fileName = methodName.getMethodName() + i;
+ int fileSize = i * ONE_MB;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ seekReadAndTest(fs, testFilePath, fileSize - AbfsInputStream.FOOTER_SIZE,
+ AbfsInputStream.FOOTER_SIZE, fileContent);
+ }
+ }
+
+ private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead)
+ throws IOException {
+ final AzureBlobFileSystem fs = getFileSystem();
+ fs.getAbfsStore().getAbfsConfiguration()
+ .setOptimizeFooterRead(optimizeFooterRead);
+ fs.getAbfsStore().getAbfsConfiguration()
+ .setReadSmallFilesCompletely(false);
+ return fs;
+ }
+
+ private Path createFileWithContent(FileSystem fs, String fileName,
+ byte[] fileContent) throws IOException {
+ Path testFilePath = path(fileName);
+ try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+ oStream.write(fileContent);
+ oStream.flush();
+ }
+ return testFilePath;
+ }
+
+ private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
+ final int seekPos, final int length, final byte[] fileContent) throws
IOException {
+ try (FSDataInputStream iStream = fs.open(testFilePath)) {
+ iStream.seek(seekPos);
+ byte[] buffer = new byte[length];
+ iStream.read(buffer, 0, length);
+ assertSuccessfulRead(fileContent, seekPos, length, buffer);
+ AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+ .getWrappedStream();
+
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+ AbfsConfiguration conf = abfs.getAbfsStore().getAbfsConfiguration();
+
+ int expectedFCursor = fileContent.length;
+ int expectedLimit;
+ int expectedBCursor;
+ if (conf.optimizeFooterRead()) {
+ expectedBCursor = ((conf.getReadBufferSize() < fileContent.length)
+ ? conf.getReadBufferSize()
+ : fileContent.length);
+ expectedLimit = (conf.getReadBufferSize() < fileContent.length)
+ ? conf.getReadBufferSize()
+ : fileContent.length;
+ } else {
+ expectedBCursor = length;
+ expectedLimit = length;
+ }
+ assertSuccessfulRead(fileContent, abfsInputStream.getBuffer(),
+ conf, length);
+ assertEquals(expectedFCursor, abfsInputStream.getFCursor());
+ assertEquals(expectedBCursor, abfsInputStream.getBCursor());
+ assertEquals(expectedLimit, abfsInputStream.getLimit());
+ }
+ }
+
+ private void assertSuccessfulRead(byte[] actualFileContent,
+ byte[] contentRead, AbfsConfiguration conf, int len) {
+ int buffersize = conf.getReadBufferSize();
+ int actualContentSize = actualFileContent.length;
+ if (conf.optimizeFooterRead()) {
+ len = (actualContentSize < buffersize)
+ ? actualContentSize
+ : buffersize;
+ }
+ assertSuccessfulRead(actualFileContent, actualContentSize - len, len,
Review comment:
File content asserts are needed on the data that is returned through the
client provided buffer as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]