[
https://issues.apache.org/jira/browse/HADOOP-19389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913547#comment-17913547
]
ASF GitHub Bot commented on HADOOP-19389:
-----------------------------------------
ashutoshcipher commented on code in PR #7291:
URL: https://github.com/apache/hadoop/pull/7291#discussion_r1917769760
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java:
##########
@@ -286,31 +326,98 @@ public AvroFileInputStream(FileStatus status) throws
IOException {
writer = new GenericDatumWriter<Object>(schema);
output = new ByteArrayOutputStream();
encoder = EncoderFactory.get().jsonEncoder(schema, output);
+ finalSeparator =
System.getProperty("line.separator").getBytes(StandardCharsets.UTF_8);
}
/**
* Read a single byte from the stream.
*/
@Override
public int read() throws IOException {
+ if (buffer == null) {
+ return -1;
+ }
+
if (pos < buffer.length) {
return buffer[pos++];
}
+
if (!fileReader.hasNext()) {
+ // Unset buffer to signal EOF on future calls.
+ buffer = null;
return -1;
}
+
writer.write(fileReader.next(), encoder);
encoder.flush();
+
if (!fileReader.hasNext()) {
- // Write a new line after the last Avro record.
- output.write(System.getProperty("line.separator")
- .getBytes(StandardCharsets.UTF_8));
- output.flush();
+ if (buffer.length > 0) {
+ // Write a new line after the last Avro record.
+ output.write(finalSeparator);
+ output.flush();
+ }
+ }
+
+ swapBuffer();
+ return read();
+ }
+
+ @Override
+ public int read(byte[] dest, int destPos, int destLen) throws IOException {
+ if (dest == null) {
+ throw new NullPointerException();
+ } else if (destPos < 0 || destLen < 0 || destLen > dest.length -
destPos) {
+ throw new IndexOutOfBoundsException();
+ } else if (destLen == 0) {
+ return 0;
+ }
+
+ if (buffer == null) {
+ return -1;
}
+
+ int bytesRead = 0;
+ while (destLen > 0 && buffer != null) {
+ if (pos < buffer.length) {
+ // We have buffered data available, either from the Avro file or the
final separator.
+ int copyLen = Math.min(buffer.length - pos, destLen);
+ System.arraycopy(buffer, pos, dest, destPos, copyLen);
+ pos += copyLen;
+ bytesRead += copyLen;
+ destPos += copyLen;
+ destLen -= copyLen;
+ } else if (buffer == finalSeparator) {
+ // There is no buffered data, and the last buffer processed was the
final separator.
+ // Unset buffer to signal EOF on future calls.
+ buffer = null;
+ } else if (!fileReader.hasNext()) {
+ if (buffer.length > 0) {
+ // There is no data remaining in the file. Get ready to write the
final separator on
+ // the next iteration.
+ buffer = finalSeparator;
+ pos = 0;
+ } else {
+ // We never read data into the buffer. This must be an empty file.
+ // Immediate EOF, no separator needed.
+ buffer = null;
+ return -1;
+ }
+ } else {
+ // Read the next data from the file into the buffer.
+ writer.write(fileReader.next(), encoder);
+ encoder.flush();
+ swapBuffer();
+ }
+ }
+
+ return bytesRead;
+ }
+
+ private void swapBuffer() {
pos = 0;
buffer = output.toByteArray();
Review Comment:
Right now, swapBuffer() sets buffer = output.toByteArray() unconditionally.
It’s presumably always valid, but would be good to add defensive if there's any
chance output might be null or closed.
> Optimize shell -text command I/O with multi-byte read.
> ------------------------------------------------------
>
> Key: HADOOP-19389
> URL: https://issues.apache.org/jira/browse/HADOOP-19389
> Project: Hadoop Common
> Issue Type: Improvement
> Components: fs
> Reporter: Chris Nauroth
> Assignee: Chris Nauroth
> Priority: Minor
> Labels: pull-request-available
>
> {{hadoop fs -text}} reads Avro files and sequence files by internally
> wrapping the stream in
> [{{AvroFileInputStream}}|https://github.com/apache/hadoop/blob/rel/release-3.4.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java#L270]
> or
> [{{TextRecordInputStream}}|https://github.com/apache/hadoop/blob/rel/release-3.4.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java#L217].
> These classes implement the required single-byte
> [{{read()}}|https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/io/InputStream.html#read()],
> but not the optional multi-byte buffered [{{read(byte[], int,
> int)}}|https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/io/InputStream.html#read(byte%5B%5D,int,int)].
> The default implementation in the JDK is a [loop over single-byte
> read|https://github.com/openjdk/jdk11u-dev/blob/a47c72fad455bfdf9053cb8e94c99e73965ab50d/src/java.base/share/classes/java/io/InputStream.java#L280],
> which causes sub-optimal I/O and method call overhead. We can optimize this
> by overriding the multi-byte read method.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]