rakeshadr commented on code in PR #6699: URL: https://github.com/apache/hadoop/pull/6699#discussion_r1671604923
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java: ########## @@ -34,6 +34,7 @@ public final class HttpHeaderConfigurations { public static final String IF_MATCH = "If-Match"; public static final String IF_NONE_MATCH = "If-None-Match"; public static final String CONTENT_LENGTH = "Content-Length"; + public static final String CONTENT_RANGE = "Content-Range"; Review Comment: Couldn't see its usage. Pls remove it, if not required. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java: ########## @@ -321,5 +321,21 @@ public static String accountProperty(String property, String account) { * @see FileSystem#openFile(org.apache.hadoop.fs.Path) */ public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable"; + + /** + * Enable lazy opening of an inputStream. Lazy opening would not call HEAD call + * to get file metadata before creating inputStream. ReadPath API of server + * would give the contentLength and eTag which would be used in subsequent calls + * for if-match headers. + */ + public static final String + FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED = "fs.azure.input.stream.lazy.open.optimization.enabled"; + + /** + * Enable prefetch on the first read to {@link org.apache.hadoop.fs.azurebfs.services.AbfsInputStream}. + * If disabled, first call would not trigger prefetch. Prefetch would be switched on Review Comment: As these are two combinational configurations, can you document the behavior of these two configurations and its combinational behavior in https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/site/markdown/index.md or in https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md Does Azure documentation cover prefetching behavior and configurations? Could you please point me to that. If not, then I think it's good to document it in a follow-up jira task, later. I could see a good doc for the aws part, fyi. https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java: ########## @@ -890,32 +891,37 @@ public AbfsInputStream openFileForRead(Path path, encryptionContext.getBytes(StandardCharsets.UTF_8)); } } else { - AbfsHttpOperation op = client.getPathStatus(relativePath, false, - tracingContext, null).getResult(); - resourceType = op.getResponseHeader( - HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - contentLength = Long.parseLong( - op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); - /* - * For file created with ENCRYPTION_CONTEXT, client shall receive - * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT. - */ - if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { - final String fileEncryptionContext = op.getResponseHeader( - HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT); - if (fileEncryptionContext == null) { - LOG.debug("EncryptionContext missing in GetPathStatus response"); - throw new PathIOException(path.toString(), - "EncryptionContext not present in GetPathStatus response headers"); + if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT Review Comment: Here, why do we need EncryptionType.ENCRYPTION_CONTEXT in the condition? Should the condition be like below ``` if(!abfsConfiguration.isInputStreamLazyOptimizationEnabled()){ final AbfsHttpOperation op = client.getPathStatus(relativePath, false, tracingContext, null).getResult(); resourceType = op.getResponseHeader( HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); contentLength = Long.parseLong( op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); } ``` ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java: ########## @@ -1148,7 +1149,9 @@ public AbfsRestOperation read(final String path, AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, String.format("bytes=%d-%d", position, position + bufferLength - 1)); requestHeaders.add(rangeHeader); - requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + if (!StringUtils.isEmpty(eTag)) { Review Comment: use `StringUtils.isNotEmpty` for better readability. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -488,7 +601,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){ private int readInternal(final long position, final byte[] b, final int offset, final int length, final boolean bypassReadAhead) throws IOException { - if (readAheadEnabled && !bypassReadAhead) { + if (readAheadEnabled && !bypassReadAhead && (prefetchTriggerOnFirstRead || sequentialReadStarted)) { Review Comment: Here also, you can make a method `effectiveReadAhead` or a better name. `if(effectiveReadAhead())` ``` /** * <add_description> */ private boolean effectiveReadAhead() { } ``` ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -424,11 +515,20 @@ private int optimisedRead(final byte[] b, final int off, final int len, // If the read was partial and the user requested part of data has // not read then fallback to readoneblock. When limit is smaller than // bCursor that means the user requested data has not been read. - if (fCursor < contentLength && bCursor > limit) { + if (fCursor < getContentLength() && bCursor > limit) { restorePointerState(); return readOneBlock(b, off, len); } - return copyToUserBuffer(b, off, len); + return copyToUserBuffer(b, off, len, isOptimizedReadWithoutContentLengthInformation); + } + + @VisibleForTesting + long getContentLength() { + return contentLength; + } + + boolean getFileStatusInformationPresent() { Review Comment: 1) Please make it to `private boolean` visibility. 2) How abt rename the method to =>` hasFileStatusInfo()` ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -467,7 +566,21 @@ private boolean validate(final byte[] b, final int off, final int len) return true; } - private int copyToUserBuffer(byte[] b, int off, int len){ + private int copyToUserBuffer(byte[] b, int off, int len, + final boolean isOptimizedReadWithoutContentLengthInformation){ Review Comment: Its relative thinking, we can reduce the var name `isOptimizedReadWithoutContentLengthInformation` -> to -> `isOptimizedReadWithoutContentLengthInfo` ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -573,11 +688,25 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t } catch (AzureBlobFileSystemException ex) { if (ex instanceof AbfsRestOperationException) { AbfsRestOperationException ere = (AbfsRestOperationException) ex; + abfsHttpOperation = ((AbfsRestOperationException) ex).getAbfsHttpOperation(); if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new FileNotFoundException(ere.getMessage()); } + /* + * Status 416 is sent when read range is out of contentLength range. + * This would happen only in the case if contentLength is not known before + * opening the inputStream. + */ + if (ere.getStatusCode() == READ_PATH_REQUEST_NOT_SATISFIABLE + && !getFileStatusInformationPresent()) { + return -1; Review Comment: Can you add log message about the exception case. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -591,6 +720,31 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t return (int) bytesRead; } + private void initPropertiesFromReadResponseHeader(final AbfsHttpOperation op) throws IOException { + if (DIRECTORY.equals( Review Comment: Its duplicated in two places. Probably you can create a method and use it ``` if (DIRECTORY.equals( op.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE))) { throw new FileNotFoundException( "read must be used with files and not directories. Path: " + path); } contentLength = parseFromRange( op.getResponseHeader(HttpHeaderConfigurations.CONTENT_RANGE)); eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); if (eTag != null && contentLength >= 0) { fileStatusInformationPresent = true; } } ``` ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -385,32 +434,74 @@ private int readLastBlock(final byte[] b, final int off, final int len) // data need to be copied to user buffer from index bCursor, // AbfsInutStream buffer is going to contain data from last block start. In // that case bCursor will be set to fCursor - lastBlockStart - long lastBlockStart = max(0, contentLength - footerReadSize); + if (!getFileStatusInformationPresent()) { + long lastBlockStart = max(0, (fCursor + len) - footerReadSize); + bCursor = (int) (fCursor - lastBlockStart); + return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, footerReadSize), true); + } + long lastBlockStart = max(0, getContentLength() - footerReadSize); bCursor = (int) (fCursor - lastBlockStart); // 0 if contentlength is < buffersize - long actualLenToRead = min(footerReadSize, contentLength); - return optimisedRead(b, off, len, lastBlockStart, actualLenToRead); + long actualLenToRead = min(footerReadSize, getContentLength()); + return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false); } private int optimisedRead(final byte[] b, final int off, final int len, - final long readFrom, final long actualLen) throws IOException { + final long readFrom, final long actualLen, + final boolean isOptimizedReadWithoutContentLengthInformation) throws IOException { fCursor = readFrom; int totalBytesRead = 0; int lastBytesRead = 0; try { buffer = new byte[bufferSize]; + boolean fileStatusInformationPresentBeforeRead = getFileStatusInformationPresent(); + /* + * Content length would not be available for the first optimized read in case + * of lazy head optimization in inputStream. In such case, read of the first optimized read + * would be done without the contentLength constraint. Post first call, the contentLength + * would be present and should be used for further reads. + */ for (int i = 0; - i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) { + i < MAX_OPTIMIZED_READ_ATTEMPTS && (!getFileStatusInformationPresent() + || fCursor < getContentLength()); i++) { lastBytesRead = readInternal(fCursor, buffer, limit, (int) actualLen - limit, true); if (lastBytesRead > 0) { totalBytesRead += lastBytesRead; limit += lastBytesRead; fCursor += lastBytesRead; fCursorAfterLastRead = fCursor; + + /* + * In non-lazily opened inputStream, the contentLength would be available before + * opening the inputStream. In such case, optimized read would always be done + * on the last part of the file. + * + * In lazily opened inputStream, the contentLength would not be available before + * opening the inputStream. In such case, contentLength conditioning would not be + * applied to execute optimizedRead. Hence, the optimized read may not be done on the + * last part of the file. If the optimized read is done on the non-last part of the + * file, inputStream should read only the amount of data requested by optimizedRead, + * as the buffer supplied would be only of the size of the data requested by optimizedRead. + */ + boolean shouldBreak = !fileStatusInformationPresentBeforeRead + && totalBytesRead == (int) actualLen; + if (shouldBreak) { + break; + } } } } catch (IOException e) { + if (e instanceof FileNotFoundException) { Review Comment: can't we add explicit catch block like, ``` } catch (FileNotFoundException fnfe) { // <add_required_logic> } catch (IOException e) { ``` ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -145,6 +158,10 @@ public AbfsInputStream( this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; + this.fileStatusInformationPresent = StringUtils.isNotEmpty(eTag); + this.prefetchTriggerOnFirstRead = + abfsInputStreamContext.isPrefetchTriggerOnFirstRead() + && getFileStatusInformationPresent(); Review Comment: Move this to a method with a meaningful name and add javadoc about the case. Something like below or a better method name. ``` /** * <describe the combinational behavior> */ public boolean getEffectivePrefetchTriggerOnFirstRead(....<add_required_args>) { return abfsInputStreamContext.isPrefetchTriggerOnFirstRead() && getFileStatusInformationPresent(); } ``` ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java: ########## @@ -890,32 +891,37 @@ public AbfsInputStream openFileForRead(Path path, encryptionContext.getBytes(StandardCharsets.UTF_8)); } } else { - AbfsHttpOperation op = client.getPathStatus(relativePath, false, - tracingContext, null).getResult(); - resourceType = op.getResponseHeader( - HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - contentLength = Long.parseLong( - op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); - /* - * For file created with ENCRYPTION_CONTEXT, client shall receive - * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT. - */ - if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { - final String fileEncryptionContext = op.getResponseHeader( - HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT); - if (fileEncryptionContext == null) { - LOG.debug("EncryptionContext missing in GetPathStatus response"); - throw new PathIOException(path.toString(), - "EncryptionContext not present in GetPathStatus response headers"); + if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT + || !abfsConfiguration.isInputStreamLazyOptimizationEnabled()) { + final AbfsHttpOperation op = client.getPathStatus(relativePath, false, + tracingContext, null).getResult(); + resourceType = op.getResponseHeader( + HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + contentLength = Long.parseLong( + op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); + + /* + * For file created with ENCRYPTION_CONTEXT, client shall receive + * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT. + */ + if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { + final String fileEncryptionContext = op.getResponseHeader( + HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT); + if (fileEncryptionContext == null) { + LOG.debug("EncryptionContext missing in GetPathStatus response"); + throw new PathIOException(path.toString(), + "EncryptionContext not present in GetPathStatus response headers"); + } + contextEncryptionAdapter = new ContextProviderEncryptionAdapter( + client.getEncryptionContextProvider(), getRelativePath(path), + fileEncryptionContext.getBytes(StandardCharsets.UTF_8)); } - contextEncryptionAdapter = new ContextProviderEncryptionAdapter( - client.getEncryptionContextProvider(), getRelativePath(path), - fileEncryptionContext.getBytes(StandardCharsets.UTF_8)); } } - if (parseIsDirectory(resourceType)) { + if ((fileStatus != null || !abfsConfiguration.isInputStreamLazyOptimizationEnabled()) Review Comment: Its good to keep the conditions simple for the code maintenance. Have you added `filestatus != null. ` for the defensive coding? If `filestatus != null` check is really required, then please add java comments about the case where it becomes null. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -385,32 +434,74 @@ private int readLastBlock(final byte[] b, final int off, final int len) // data need to be copied to user buffer from index bCursor, // AbfsInutStream buffer is going to contain data from last block start. In // that case bCursor will be set to fCursor - lastBlockStart - long lastBlockStart = max(0, contentLength - footerReadSize); + if (!getFileStatusInformationPresent()) { + long lastBlockStart = max(0, (fCursor + len) - footerReadSize); + bCursor = (int) (fCursor - lastBlockStart); + return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, footerReadSize), true); + } + long lastBlockStart = max(0, getContentLength() - footerReadSize); bCursor = (int) (fCursor - lastBlockStart); // 0 if contentlength is < buffersize - long actualLenToRead = min(footerReadSize, contentLength); - return optimisedRead(b, off, len, lastBlockStart, actualLenToRead); + long actualLenToRead = min(footerReadSize, getContentLength()); + return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false); } private int optimisedRead(final byte[] b, final int off, final int len, - final long readFrom, final long actualLen) throws IOException { + final long readFrom, final long actualLen, + final boolean isOptimizedReadWithoutContentLengthInformation) throws IOException { fCursor = readFrom; int totalBytesRead = 0; int lastBytesRead = 0; try { buffer = new byte[bufferSize]; + boolean fileStatusInformationPresentBeforeRead = getFileStatusInformationPresent(); + /* + * Content length would not be available for the first optimized read in case + * of lazy head optimization in inputStream. In such case, read of the first optimized read + * would be done without the contentLength constraint. Post first call, the contentLength + * would be present and should be used for further reads. + */ for (int i = 0; - i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) { + i < MAX_OPTIMIZED_READ_ATTEMPTS && (!getFileStatusInformationPresent() + || fCursor < getContentLength()); i++) { lastBytesRead = readInternal(fCursor, buffer, limit, (int) actualLen - limit, true); if (lastBytesRead > 0) { totalBytesRead += lastBytesRead; limit += lastBytesRead; fCursor += lastBytesRead; fCursorAfterLastRead = fCursor; + + /* + * In non-lazily opened inputStream, the contentLength would be available before + * opening the inputStream. In such case, optimized read would always be done + * on the last part of the file. + * + * In lazily opened inputStream, the contentLength would not be available before + * opening the inputStream. In such case, contentLength conditioning would not be + * applied to execute optimizedRead. Hence, the optimized read may not be done on the + * last part of the file. If the optimized read is done on the non-last part of the + * file, inputStream should read only the amount of data requested by optimizedRead, + * as the buffer supplied would be only of the size of the data requested by optimizedRead. + */ + boolean shouldBreak = !fileStatusInformationPresentBeforeRead + && totalBytesRead == (int) actualLen; Review Comment: Here also, move this condition a method and use it in the if condition. ``` /* * In non-lazily opened inputStream, the contentLength would be available before * opening the inputStream. In such case, optimized read would always be done * on the last part of the file. * * In lazily opened inputStream, the contentLength would not be available before * opening the inputStream. In such case, contentLength conditioning would not be * applied to execute optimizedRead. Hence, the optimized read may not be done on the * last part of the file. If the optimized read is done on the non-last part of the * file, inputStream should read only the amount of data requested by optimizedRead, * as the buffer supplied would be only of the size of the data requested by optimizedRead. */ public boolean shouldBreakxxxxx(....<add_required_args>){ boolean shouldBreak = !fileStatusInformationPresentBeforeRead && totalBytesRead == (int) actualLen; ``` ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java: ########## @@ -573,11 +688,25 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t } catch (AzureBlobFileSystemException ex) { if (ex instanceof AbfsRestOperationException) { AbfsRestOperationException ere = (AbfsRestOperationException) ex; + abfsHttpOperation = ((AbfsRestOperationException) ex).getAbfsHttpOperation(); if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new FileNotFoundException(ere.getMessage()); } + /* + * Status 416 is sent when read range is out of contentLength range. + * This would happen only in the case if contentLength is not known before + * opening the inputStream. + */ + if (ere.getStatusCode() == READ_PATH_REQUEST_NOT_SATISFIABLE + && !getFileStatusInformationPresent()) { + return -1; + } } throw new IOException(ex); + } finally { + if (!getFileStatusInformationPresent() && abfsHttpOperation != null) { Review Comment: can you move condition` !getFileStatusInformationPresent()` to inside `initPropertiesFromReadResponseHeader` ``` void initPropertiesFromReadResponseHeader(abfsHttpOperation) { if(!getFileStatusInformationPresent()) { return; } //... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org