[
https://issues.apache.org/jira/browse/HADOOP-19139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837561#comment-17837561
]
ASF GitHub Bot commented on HADOOP-19139:
-----------------------------------------
anmolanmol1234 commented on code in PR #6699:
URL: https://github.com/apache/hadoop/pull/6699#discussion_r1566774019
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -128,6 +133,15 @@ public class AbfsInputStream extends FSInputStream
implements CanUnbuffer,
/** ABFS instance to be held by the input stream to avoid GC close. */
private final BackReference fsBackRef;
+ private AtomicBoolean fileStatusInformationPresent;
+
+ /**
+ * Defines if the inputStream has been used successfully once. Prefetches
would
+ * start only after the first successful read.
+ */
+ private volatile boolean successfulUsage = false;
+ private final boolean pretechTriggerOnFirstRead;
Review Comment:
typo:prefetch
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -309,5 +309,19 @@ public static String accountProperty(String property,
String account) {
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
*/
public static final String FS_AZURE_BUFFERED_PREAD_DISABLE =
"fs.azure.buffered.pread.disable";
+
+ /**
+ * Disable the call of HEAD call for opening a inputStream. ReadPath API of
server
Review Comment:
Disable the call of HEAD call can be reworded to Remove additional HEAD call
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -226,7 +268,15 @@ public int read() throws IOException {
}
@Override
- public synchronized int read(final byte[] b, final int off, final int len)
throws IOException {
+ public synchronized int read(final byte[] b, final int off, final int len)
+ throws IOException {
+ int result = synchronizedRead(b, off, len);
+ successfulUsage = true;
+ return result;
+ }
+
+ private int synchronizedRead(final byte[] b, final int off, final int len)
Review Comment:
why add this new method, simply add a variable in the existing read method
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!fileStatusInformationPresent.get()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isReadWithoutContentLengthInformation) throws IOException {
fCursor = readFrom;
int totalBytesRead = 0;
int lastBytesRead = 0;
try {
buffer = new byte[bufferSize];
+ boolean fileStatusInformationPresentBeforeRead =
fileStatusInformationPresent.get();
for (int i = 0;
- i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+ i < MAX_OPTIMIZED_READ_ATTEMPTS &&
(!fileStatusInformationPresent.get()
+ || fCursor < getContentLength()); i++) {
lastBytesRead = readInternal(fCursor, buffer, limit,
(int) actualLen - limit, true);
if (lastBytesRead > 0) {
totalBytesRead += lastBytesRead;
+ boolean shouldBreak = !fileStatusInformationPresentBeforeRead
+ && totalBytesRead == (int) actualLen;
limit += lastBytesRead;
fCursor += lastBytesRead;
fCursorAfterLastRead = fCursor;
+ if (shouldBreak) {
+ break;
+ }
}
}
} catch (IOException e) {
+ if (isNonRetriableOptimizedReadException(e)) {
+ throw e;
Review Comment:
Yes this is not clear why these exceptions should not be retried ?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -1099,7 +1099,9 @@ public AbfsRestOperation read(final String path,
AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1));
requestHeaders.add(rangeHeader);
- requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
+ if (eTag == null || !eTag.isEmpty()) {
Review Comment:
Shouldn't the condition be eTag != null ?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -145,6 +159,10 @@ public AbfsInputStream(
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
+ this.fileStatusInformationPresent = new
AtomicBoolean(StringUtils.isNotEmpty(eTag));
+ this.pretechTriggerOnFirstRead =
Review Comment:
typo : prefetch
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -306,7 +366,7 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
//If buffer is empty, then fill the buffer.
if (bCursor == limit) {
//If EOF, then return -1
- if (fCursor >= contentLength) {
+ if (fileStatusInformationPresent.get() && fCursor >= getContentLength())
{
Review Comment:
We can have a case where bcursor is already equal to the limit and we have
not made a read yet, so the value of fileStatusInformationPresent is false.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -192,6 +210,30 @@ public int read(long position, byte[] buffer, int offset,
int length)
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
+
+ /*
+ * When the inputStream is started, if the application tries to parallelly
read
+ * ont he inputStream, the first read will be synchronized and the
subsequent
Review Comment:
typo: on the
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!fileStatusInformationPresent.get()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isReadWithoutContentLengthInformation) throws IOException {
fCursor = readFrom;
int totalBytesRead = 0;
int lastBytesRead = 0;
try {
buffer = new byte[bufferSize];
+ boolean fileStatusInformationPresentBeforeRead =
fileStatusInformationPresent.get();
for (int i = 0;
- i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+ i < MAX_OPTIMIZED_READ_ATTEMPTS &&
(!fileStatusInformationPresent.get()
+ || fCursor < getContentLength()); i++) {
lastBytesRead = readInternal(fCursor, buffer, limit,
(int) actualLen - limit, true);
if (lastBytesRead > 0) {
totalBytesRead += lastBytesRead;
+ boolean shouldBreak = !fileStatusInformationPresentBeforeRead
Review Comment:
How is this limited to the case when file status is not available ?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -817,8 +817,8 @@ public AbfsInputStream openFileForRead(Path path,
FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
.orElse(null);
String relativePath = getRelativePath(path);
- String resourceType, eTag;
- long contentLength;
+ String resourceType = null, eTag = null;
+ long contentLength = -1;
ContextEncryptionAdapter contextEncryptionAdapter =
NoContextEncryptionAdapter.getInstance();
/*
* GetPathStatus API has to be called in case of:
Review Comment:
This comment should be updated
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -192,6 +210,30 @@ public int read(long position, byte[] buffer, int offset,
int length)
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
+
+ /*
+ * When the inputStream is started, if the application tries to parallelly
read
+ * ont he inputStream, the first read will be synchronized and the
subsequent
+ * reads will be non-synchronized.
+ */
+ if (!successfulUsage) {
Review Comment:
the variable successful usage is not clear should be something like
secondRead
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!fileStatusInformationPresent.get()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isReadWithoutContentLengthInformation) throws IOException {
Review Comment:
Variable should not be opposite of what is expected I mean, should be
isContentLengthInfoAvailable. Without keyword is causing confusion
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -564,11 +669,25 @@ int readRemote(long position, byte[] b, int offset, int
length, TracingContext t
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
+ abfsHttpOperation = ((AbfsRestOperationException)
ex).getAbfsHttpOperation();
if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ere.getMessage());
}
+ /*
+ * Status 416 is sent when read range is out of contentLength range.
+ * This would happen only in the case if contentLength is not known
before
+ * opening the inputStream.
+ */
+ if (ere.getStatusCode() == READ_PATH_REQUEST_NOT_SATISFIABLE
+ && !fileStatusInformationPresent.get()) {
+ return -1;
+ }
}
throw new IOException(ex);
+ } finally {
+ if (!fileStatusInformationPresent.get() && abfsHttpOperation != null) {
+ initPathPropertiesFromReadPathResponseHeader(abfsHttpOperation);
Review Comment:
method name can be shortened
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -447,8 +537,7 @@ private boolean validate(final byte[] b, final int off,
final int len)
Preconditions.checkNotNull(b);
LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
off, len);
-
- if (this.available() == 0) {
+ if (fileStatusInformationPresent.get() && this.available() == 0) {
Review Comment:
Since we are already checking file status info is available here the if
check ( !fileStatusInformationPresent.get()) in available() method will never
get hit.
> [ABFS]: No GetPathStatus call for opening AbfsInputStream
> ---------------------------------------------------------
>
> Key: HADOOP-19139
> URL: https://issues.apache.org/jira/browse/HADOOP-19139
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Reporter: Pranav Saxena
> Assignee: Pranav Saxena
> Priority: Major
> Labels: pull-request-available
>
> Read API gives contentLen and etag of the path. This information would be
> used in future calls on that inputStream. Prior information of eTag is of not
> much importance.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]