bilaharith commented on a change in pull request #2464:
URL: https://github.com/apache/hadoop/pull/2464#discussion_r540354018
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -224,6 +240,123 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
return bytesToRead;
}
+ private boolean shouldReadFully() {
+ return this.firstRead && this.context.readSmallFilesCompletely()
+ && this.contentLength <= this.bufferSize;
+ }
+
+ private boolean shouldReadLastBlock(int len) {
+ return this.firstRead && this.context.optimizeFooterRead()
+ && len == FOOTER_SIZE
+ && this.fCursor == this.contentLength - FOOTER_SIZE;
+ }
+
+ private int readFileCompletely(final byte[] b, final int off, final int len)
+ throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+
+ Preconditions.checkNotNull(b);
+ LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
+ off, len);
+
+ if (len == 0) {
+ return 0;
+ }
+
+ if (this.available() == 0) {
+ return -1;
+ }
+
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ buffer = new byte[bufferSize];
+ // data need to be copied to user buffer from index bCursor, bCursor has
+ // to be the current fCusor
+ bCursor = (int) fCursor;
+ fCursorAfterLastRead = fCursor;
+ int totalBytesRead = 0;
+ int loopCount = 0;
+ // Read from begining
+ fCursor = 0;
+ while (fCursor < contentLength) {
+ int bytesRead = readInternal(fCursor, buffer, limit,
+ (int) contentLength - limit, true);
+ if (bytesRead > 0) {
+ totalBytesRead += bytesRead;
+ limit += bytesRead;
+ fCursor += bytesRead;
+ }
+ if (loopCount++ >= 10) {
Review comment:
Made changes the following way
1. The available data is returned
2. Done
3. Done
4. Done
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -224,6 +240,123 @@ private int readOneBlock(final byte[] b, final int off,
final int len) throws IO
return bytesToRead;
}
+ private boolean shouldReadFully() {
+ return this.firstRead && this.context.readSmallFilesCompletely()
+ && this.contentLength <= this.bufferSize;
+ }
+
+ private boolean shouldReadLastBlock(int len) {
+ return this.firstRead && this.context.optimizeFooterRead()
+ && len == FOOTER_SIZE
+ && this.fCursor == this.contentLength - FOOTER_SIZE;
+ }
+
+ private int readFileCompletely(final byte[] b, final int off, final int len)
+ throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+
+ Preconditions.checkNotNull(b);
+ LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
+ off, len);
+
+ if (len == 0) {
+ return 0;
+ }
+
+ if (this.available() == 0) {
+ return -1;
+ }
+
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ buffer = new byte[bufferSize];
+ // data need to be copied to user buffer from index bCursor, bCursor has
+ // to be the current fCusor
+ bCursor = (int) fCursor;
+ fCursorAfterLastRead = fCursor;
+ int totalBytesRead = 0;
+ int loopCount = 0;
+ // Read from begining
+ fCursor = 0;
+ while (fCursor < contentLength) {
+ int bytesRead = readInternal(fCursor, buffer, limit,
+ (int) contentLength - limit, true);
+ if (bytesRead > 0) {
+ totalBytesRead += bytesRead;
+ limit += bytesRead;
+ fCursor += bytesRead;
+ }
+ if (loopCount++ >= 10) {
+ throw new IOException(
+ "Too many attempts in reading whole file " + path);
+ }
+ }
+ firstRead = false;
+ if (totalBytesRead == -1) {
+ return -1;
+ }
+ return copyToUserBuffer(b, off, len);
+ }
+
+ private int readLastBlock(final byte[] b, final int off, final int len)
+ throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+
+ Preconditions.checkNotNull(b);
+ LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
+ off, len);
+
+ if (len == 0) {
+ return 0;
+ }
+
+ if (this.available() == 0) {
+ return -1;
+ }
+
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ buffer = new byte[bufferSize];
+ // data need to be copied to user buffer from index bCursor, for small
+ // files the bCursor will be contentlength - footer size,
+ // otherwise buffersize - footer size
+ bCursor = (int) (Math.min(contentLength, bufferSize) - FOOTER_SIZE);
+ // read API call is considered 1 single operation in reality server could
+ // return partial data and client has to retry untill the last full block
+ // is read. So setting the fCursorAfterLastRead before the possible
+ // multiple server calls
+ fCursorAfterLastRead = fCursor;
+ // 0 if contentlength is < buffersize
+ fCursor = Math.max(0, contentLength - bufferSize);
+ int totalBytesRead = 0;
+ int loopCount = 0;
+ while (fCursor < contentLength) {
+ int bytesRead = readInternal(fCursor, buffer, limit, bufferSize - limit,
+ true);
+ if (bytesRead > 0) {
+ totalBytesRead += bytesRead;
+ limit += bytesRead;
+ fCursor += bytesRead;
+ }
+ if (loopCount++ >= 10) {
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]