[
https://issues.apache.org/jira/browse/HADOOP-19139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837511#comment-17837511
]
ASF GitHub Bot commented on HADOOP-19139:
-----------------------------------------
anujmodi2021 commented on code in PR #6699:
URL: https://github.com/apache/hadoop/pull/6699#discussion_r1565182630
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -309,5 +309,19 @@ public static String accountProperty(String property,
String account) {
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
*/
public static final String FS_AZURE_BUFFERED_PREAD_DISABLE =
"fs.azure.buffered.pread.disable";
+
+ /**
+ * Disable the call of HEAD call for opening a inputStream. ReadPath API of
server
+ * would give the contentLength and eTag which would be used in subsequent
calls
+ * for if-match headers.
+ */
+ public static final String FS_AZURE_HEAD_CALL_OPTIMIZATION_INPUT_STREAM =
"fs.azure.head.call.optimization.input.stream";
Review Comment:
I would prefer: `fs.azure.input.stream.lazy.open.optimization.enabled`
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -309,5 +309,19 @@ public static String accountProperty(String property,
String account) {
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
*/
public static final String FS_AZURE_BUFFERED_PREAD_DISABLE =
"fs.azure.buffered.pread.disable";
+
+ /**
+ * Disable the call of HEAD call for opening a inputStream. ReadPath API of
server
+ * would give the contentLength and eTag which would be used in subsequent
calls
+ * for if-match headers.
+ */
+ public static final String FS_AZURE_HEAD_CALL_OPTIMIZATION_INPUT_STREAM =
"fs.azure.head.call.optimization.input.stream";
+
+ /**
+ * Enable prefetch on the first read to {@link
org.apache.hadoop.fs.azurebfs.services.AbfsInputStream}.
+ * If disabled, first call would not trigger prefetch. Prefetch would be
switched on
+ * after first read call.
+ */
+ public static final String FS_AZURE_PREFETCH_FIRST_READ_CALL =
"fs.azure.prefetch.first.read.call";
Review Comment:
I would prefer: `fs.azure.prefetch.on.first.read.enabled`
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -367,6 +367,14 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue =
DEFAULT_ENABLE_PAGINATED_DELETE)
private boolean isPaginatedDeleteEnabled;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_HEAD_CALL_OPTIMIZATION_INPUT_STREAM,
+ DefaultValue = DEFAULT_HEAD_OPTIMIZATION_INPUT_STREAM)
+ private boolean headOptimizationForInputStream;
+
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_PREFETCH_FIRST_READ_CALL,
+ DefaultValue = DEFAULT_PREFETCH_READAHEAD_ON_FIRST_READ)
+ private boolean prefetchReadaheadOnFirstRead;
Review Comment:
prefetch and read ahead are synonyms.
I would prefer: `isPrefetchOnFirstReadEnabled`
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -848,32 +848,37 @@ public AbfsInputStream openFileForRead(Path path,
encryptionContext.getBytes(StandardCharsets.UTF_8));
}
} else {
- AbfsHttpOperation op = client.getPathStatus(relativePath, false,
- tracingContext, null).getResult();
- resourceType = op.getResponseHeader(
- HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
- contentLength = Long.parseLong(
- op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
- eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
- /*
- * For file created with ENCRYPTION_CONTEXT, client shall receive
- * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
- */
- if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
- final String fileEncryptionContext = op.getResponseHeader(
- HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
- if (fileEncryptionContext == null) {
- LOG.debug("EncryptionContext missing in GetPathStatus response");
- throw new PathIOException(path.toString(),
- "EncryptionContext not present in GetPathStatus response
headers");
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT
+ || !abfsConfiguration.getHeadOptimizationForInputStream()) {
+ final AbfsHttpOperation op = client.getPathStatus(relativePath,
false,
+ tracingContext, null).getResult();
+ resourceType = op.getResponseHeader(
+ HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+ contentLength = Long.parseLong(
+ op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+ eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+ /*
+ * For file created with ENCRYPTION_CONTEXT, client shall receive
+ * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
+ */
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT)
{
+ final String fileEncryptionContext = op.getResponseHeader(
+ HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
+ if (fileEncryptionContext == null) {
+ LOG.debug("EncryptionContext missing in GetPathStatus response");
+ throw new PathIOException(path.toString(),
+ "EncryptionContext not present in GetPathStatus response
headers");
+ }
+ contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
+ client.getEncryptionContextProvider(), getRelativePath(path),
+ fileEncryptionContext.getBytes(StandardCharsets.UTF_8));
}
- contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
- client.getEncryptionContextProvider(), getRelativePath(path),
- fileEncryptionContext.getBytes(StandardCharsets.UTF_8));
}
}
- if (parseIsDirectory(resourceType)) {
+ if (!abfsConfiguration.getHeadOptimizationForInputStream()
Review Comment:
Could there be a case where this optimizations is enabled and user passes a
valid verisioned filestatus with resourcetype directory??
If yes, better to fail for that as well.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!fileStatusInformationPresent.get()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isReadWithoutContentLengthInformation) throws IOException {
fCursor = readFrom;
int totalBytesRead = 0;
int lastBytesRead = 0;
try {
buffer = new byte[bufferSize];
+ boolean fileStatusInformationPresentBeforeRead =
fileStatusInformationPresent.get();
for (int i = 0;
- i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+ i < MAX_OPTIMIZED_READ_ATTEMPTS &&
(!fileStatusInformationPresent.get()
+ || fCursor < getContentLength()); i++) {
Review Comment:
Can we add a comment here as to what will happen if content Length is not
available, Condition fCurson<getContentLength will be false and still we want
to enter loop??
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -285,13 +335,23 @@ public synchronized int read(final byte[] b, final int
off, final int len) throw
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}
- private boolean shouldReadFully() {
+ private boolean shouldReadFully(int lengthToRead) {
Review Comment:
Can we have a comment on the logic used here?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -192,6 +210,30 @@ public int read(long position, byte[] buffer, int offset,
int length)
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
+
+ /*
+ * When the inputStream is started, if the application tries to parallelly
read
Review Comment:
Some typos in the comments
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -367,6 +367,14 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue =
DEFAULT_ENABLE_PAGINATED_DELETE)
private boolean isPaginatedDeleteEnabled;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_HEAD_CALL_OPTIMIZATION_INPUT_STREAM,
+ DefaultValue = DEFAULT_HEAD_OPTIMIZATION_INPUT_STREAM)
+ private boolean headOptimizationForInputStream;
Review Comment:
For boolean variables, it sounds better to start variable name with is
Like `is<feature>Enabled`
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!fileStatusInformationPresent.get()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
Review Comment:
Same here for last parameter. The content length is known so why false??
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -165,5 +165,7 @@ public static ApiVersion getCurrentVersion() {
*/
public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
+ public static final Integer READ_PATH_REQUEST_NOT_SATISFIABLE = 416;
Review Comment:
Should we name this as per format followed by other HTTP Status code
constants:
`HTTP_RANGE_NOT_SATISFIABLE`
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -360,7 +420,10 @@ private int readFileCompletely(final byte[] b, final int
off, final int len)
// data need to be copied to user buffer from index bCursor, bCursor has
// to be the current fCusor
bCursor = (int) fCursor;
- return optimisedRead(b, off, len, 0, contentLength);
+ if (!fileStatusInformationPresent.get()) {
+ return optimisedRead(b, off, len, 0, bufferSize, false);
+ }
+ return optimisedRead(b, off, len, 0, getContentLength(), false);
Review Comment:
Shouldn't the last parameter be true here?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -285,13 +335,23 @@ public synchronized int read(final byte[] b, final int
off, final int len) throw
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}
- private boolean shouldReadFully() {
+ private boolean shouldReadFully(int lengthToRead) {
+ if (!fileStatusInformationPresent.get()) {
+ return (lengthToRead + fCursor) <= this.bufferSize
+ && this.firstRead && this.context.readSmallFilesCompletely();
+ }
+
return this.firstRead && this.context.readSmallFilesCompletely()
- && this.contentLength <= this.bufferSize;
+ && getContentLength() <= this.bufferSize;
}
- private boolean shouldReadLastBlock() {
- long footerStart = max(0, this.contentLength - FOOTER_SIZE);
+ private boolean shouldReadLastBlock(int lengthToRead) {
+ if (!fileStatusInformationPresent.get()) {
Review Comment:
Comment on new logic here as well
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -582,6 +701,29 @@ int readRemote(long position, byte[] b, int offset, int
length, TracingContext t
return (int) bytesRead;
}
+ private void initPathPropertiesFromReadPathResponseHeader(final
AbfsHttpOperation op) throws IOException {
+ if (DIRECTORY.equals(
+ op.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE))) {
+ throw new FileNotFoundException(
+ "read must be used with files and not directories. Path: " + path);
+ }
+ contentLength = parseFromRange(
+ op.getResponseHeader(HttpHeaderConfigurations.CONTENT_RANGE));
+ eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
+ fileStatusInformationPresent.set(true);
Review Comment:
Should we have a check here that contentLength and ETag are valid before
setting it to true??
Or is it okay to have it -1??
I see some handling done for that already.
Just a thought
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!fileStatusInformationPresent.get()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isReadWithoutContentLengthInformation) throws IOException {
fCursor = readFrom;
int totalBytesRead = 0;
int lastBytesRead = 0;
try {
buffer = new byte[bufferSize];
+ boolean fileStatusInformationPresentBeforeRead =
fileStatusInformationPresent.get();
for (int i = 0;
- i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+ i < MAX_OPTIMIZED_READ_ATTEMPTS &&
(!fileStatusInformationPresent.get()
+ || fCursor < getContentLength()); i++) {
lastBytesRead = readInternal(fCursor, buffer, limit,
(int) actualLen - limit, true);
if (lastBytesRead > 0) {
totalBytesRead += lastBytesRead;
+ boolean shouldBreak = !fileStatusInformationPresentBeforeRead
+ && totalBytesRead == (int) actualLen;
limit += lastBytesRead;
fCursor += lastBytesRead;
fCursorAfterLastRead = fCursor;
+ if (shouldBreak) {
+ break;
+ }
}
}
} catch (IOException e) {
+ if (isNonRetriableOptimizedReadException(e)) {
+ throw e;
Review Comment:
Why throwing exception here?
Don't we want to fallback to readOneBlock() for these?
> [ABFS]: No GetPathStatus call for opening AbfsInputStream
> ---------------------------------------------------------
>
> Key: HADOOP-19139
> URL: https://issues.apache.org/jira/browse/HADOOP-19139
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Reporter: Pranav Saxena
> Assignee: Pranav Saxena
> Priority: Major
> Labels: pull-request-available
>
> Read API gives contentLen and etag of the path. This information would be
> used in future calls on that inputStream. Prior information of eTag is of not
> much importance.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]