anmolanmol1234 commented on code in PR #6699:
URL: https://github.com/apache/hadoop/pull/6699#discussion_r1566774019
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -128,6 +133,15 @@ public class AbfsInputStream extends FSInputStream
implements CanUnbuffer,
/** ABFS instance to be held by the input stream to avoid GC close. */
private final BackReference fsBackRef;
+ private AtomicBoolean fileStatusInformationPresent;
+
+ /**
+ * Defines if the inputStream has been used successfully once. Prefetches
would
+ * start only after the first successful read.
+ */
+ private volatile boolean successfulUsage = false;
+ private final boolean pretechTriggerOnFirstRead;
Review Comment:
typo:prefetch
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -309,5 +309,19 @@ public static String accountProperty(String property,
String account) {
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
*/
public static final String FS_AZURE_BUFFERED_PREAD_DISABLE =
"fs.azure.buffered.pread.disable";
+
+ /**
+ * Disable the call of HEAD call for opening a inputStream. ReadPath API of
server
Review Comment:
Disable the call of HEAD call can be reworded to Remove additional HEAD call
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -226,7 +268,15 @@ public int read() throws IOException {
}
@Override
- public synchronized int read(final byte[] b, final int off, final int len)
throws IOException {
+ public synchronized int read(final byte[] b, final int off, final int len)
+ throws IOException {
+ int result = synchronizedRead(b, off, len);
+ successfulUsage = true;
+ return result;
+ }
+
+ private int synchronizedRead(final byte[] b, final int off, final int len)
Review Comment:
why add this new method, simply add a variable in the existing read method
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!fileStatusInformationPresent.get()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isReadWithoutContentLengthInformation) throws IOException {
fCursor = readFrom;
int totalBytesRead = 0;
int lastBytesRead = 0;
try {
buffer = new byte[bufferSize];
+ boolean fileStatusInformationPresentBeforeRead =
fileStatusInformationPresent.get();
for (int i = 0;
- i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+ i < MAX_OPTIMIZED_READ_ATTEMPTS &&
(!fileStatusInformationPresent.get()
+ || fCursor < getContentLength()); i++) {
lastBytesRead = readInternal(fCursor, buffer, limit,
(int) actualLen - limit, true);
if (lastBytesRead > 0) {
totalBytesRead += lastBytesRead;
+ boolean shouldBreak = !fileStatusInformationPresentBeforeRead
+ && totalBytesRead == (int) actualLen;
limit += lastBytesRead;
fCursor += lastBytesRead;
fCursorAfterLastRead = fCursor;
+ if (shouldBreak) {
+ break;
+ }
}
}
} catch (IOException e) {
+ if (isNonRetriableOptimizedReadException(e)) {
+ throw e;
Review Comment:
Yes this is not clear why these exceptions should not be retried ?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -1099,7 +1099,9 @@ public AbfsRestOperation read(final String path,
AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1));
requestHeaders.add(rangeHeader);
- requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
+ if (eTag == null || !eTag.isEmpty()) {
Review Comment:
Shouldn't the condition be eTag != null ?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -145,6 +159,10 @@ public AbfsInputStream(
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
+ this.fileStatusInformationPresent = new
AtomicBoolean(StringUtils.isNotEmpty(eTag));
+ this.pretechTriggerOnFirstRead =
Review Comment:
typo : prefetch
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -306,7 +366,7 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
//If buffer is empty, then fill the buffer.
if (bCursor == limit) {
//If EOF, then return -1
- if (fCursor >= contentLength) {
+ if (fileStatusInformationPresent.get() && fCursor >= getContentLength())
{
Review Comment:
We can have a case where bcursor is already equal to the limit and we have
not made a read yet, so the value of fileStatusInformationPresent is false.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -192,6 +210,30 @@ public int read(long position, byte[] buffer, int offset,
int length)
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
+
+ /*
+ * When the inputStream is started, if the application tries to parallelly
read
+ * ont he inputStream, the first read will be synchronized and the
subsequent
Review Comment:
typo: on the
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!fileStatusInformationPresent.get()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isReadWithoutContentLengthInformation) throws IOException {
fCursor = readFrom;
int totalBytesRead = 0;
int lastBytesRead = 0;
try {
buffer = new byte[bufferSize];
+ boolean fileStatusInformationPresentBeforeRead =
fileStatusInformationPresent.get();
for (int i = 0;
- i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+ i < MAX_OPTIMIZED_READ_ATTEMPTS &&
(!fileStatusInformationPresent.get()
+ || fCursor < getContentLength()); i++) {
lastBytesRead = readInternal(fCursor, buffer, limit,
(int) actualLen - limit, true);
if (lastBytesRead > 0) {
totalBytesRead += lastBytesRead;
+ boolean shouldBreak = !fileStatusInformationPresentBeforeRead
Review Comment:
How is this limited to the case when file status is not available ?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -817,8 +817,8 @@ public AbfsInputStream openFileForRead(Path path,
FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
.orElse(null);
String relativePath = getRelativePath(path);
- String resourceType, eTag;
- long contentLength;
+ String resourceType = null, eTag = null;
+ long contentLength = -1;
ContextEncryptionAdapter contextEncryptionAdapter =
NoContextEncryptionAdapter.getInstance();
/*
* GetPathStatus API has to be called in case of:
Review Comment:
This comment should be updated
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -192,6 +210,30 @@ public int read(long position, byte[] buffer, int offset,
int length)
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
+
+ /*
+ * When the inputStream is started, if the application tries to parallelly
read
+ * ont he inputStream, the first read will be synchronized and the
subsequent
+ * reads will be non-synchronized.
+ */
+ if (!successfulUsage) {
Review Comment:
the variable successful usage is not clear should be something like
secondRead
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!fileStatusInformationPresent.get()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isReadWithoutContentLengthInformation) throws IOException {
Review Comment:
Variable should not be opposite of what is expected I mean, should be
isContentLengthInfoAvailable. Without keyword is causing confusion
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -564,11 +669,25 @@ int readRemote(long position, byte[] b, int offset, int
length, TracingContext t
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
+ abfsHttpOperation = ((AbfsRestOperationException)
ex).getAbfsHttpOperation();
if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ere.getMessage());
}
+ /*
+ * Status 416 is sent when read range is out of contentLength range.
+ * This would happen only in the case if contentLength is not known
before
+ * opening the inputStream.
+ */
+ if (ere.getStatusCode() == READ_PATH_REQUEST_NOT_SATISFIABLE
+ && !fileStatusInformationPresent.get()) {
+ return -1;
+ }
}
throw new IOException(ex);
+ } finally {
+ if (!fileStatusInformationPresent.get() && abfsHttpOperation != null) {
+ initPathPropertiesFromReadPathResponseHeader(abfsHttpOperation);
Review Comment:
method name can be shortened
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -447,8 +537,7 @@ private boolean validate(final byte[] b, final int off,
final int len)
Preconditions.checkNotNull(b);
LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
off, len);
-
- if (this.available() == 0) {
+ if (fileStatusInformationPresent.get() && this.available() == 0) {
Review Comment:
Since we are already checking file status info is available here the if
check ( !fileStatusInformationPresent.get()) in available() method will never
get hit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]