ashutoshcipher commented on code in PR #7291:
URL: https://github.com/apache/hadoop/pull/7291#discussion_r1917769760
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java:
##########
@@ -286,31 +326,98 @@ public AvroFileInputStream(FileStatus status) throws
IOException {
writer = new GenericDatumWriter<Object>(schema);
output = new ByteArrayOutputStream();
encoder = EncoderFactory.get().jsonEncoder(schema, output);
+ finalSeparator =
System.getProperty("line.separator").getBytes(StandardCharsets.UTF_8);
}
/**
* Read a single byte from the stream.
*/
@Override
public int read() throws IOException {
+ if (buffer == null) {
+ return -1;
+ }
+
if (pos < buffer.length) {
return buffer[pos++];
}
+
if (!fileReader.hasNext()) {
+ // Unset buffer to signal EOF on future calls.
+ buffer = null;
return -1;
}
+
writer.write(fileReader.next(), encoder);
encoder.flush();
+
if (!fileReader.hasNext()) {
- // Write a new line after the last Avro record.
- output.write(System.getProperty("line.separator")
- .getBytes(StandardCharsets.UTF_8));
- output.flush();
+ if (buffer.length > 0) {
+ // Write a new line after the last Avro record.
+ output.write(finalSeparator);
+ output.flush();
+ }
+ }
+
+ swapBuffer();
+ return read();
+ }
+
+ @Override
+ public int read(byte[] dest, int destPos, int destLen) throws IOException {
+ if (dest == null) {
+ throw new NullPointerException();
+ } else if (destPos < 0 || destLen < 0 || destLen > dest.length -
destPos) {
+ throw new IndexOutOfBoundsException();
+ } else if (destLen == 0) {
+ return 0;
+ }
+
+ if (buffer == null) {
+ return -1;
}
+
+ int bytesRead = 0;
+ while (destLen > 0 && buffer != null) {
+ if (pos < buffer.length) {
+ // We have buffered data available, either from the Avro file or the
final separator.
+ int copyLen = Math.min(buffer.length - pos, destLen);
+ System.arraycopy(buffer, pos, dest, destPos, copyLen);
+ pos += copyLen;
+ bytesRead += copyLen;
+ destPos += copyLen;
+ destLen -= copyLen;
+ } else if (buffer == finalSeparator) {
+ // There is no buffered data, and the last buffer processed was the
final separator.
+ // Unset buffer to signal EOF on future calls.
+ buffer = null;
+ } else if (!fileReader.hasNext()) {
+ if (buffer.length > 0) {
+ // There is no data remaining in the file. Get ready to write the
final separator on
+ // the next iteration.
+ buffer = finalSeparator;
+ pos = 0;
+ } else {
+ // We never read data into the buffer. This must be an empty file.
+ // Immediate EOF, no separator needed.
+ buffer = null;
+ return -1;
+ }
+ } else {
+ // Read the next data from the file into the buffer.
+ writer.write(fileReader.next(), encoder);
+ encoder.flush();
+ swapBuffer();
+ }
+ }
+
+ return bytesRead;
+ }
+
+ private void swapBuffer() {
pos = 0;
buffer = output.toByteArray();
Review Comment:
Right now, swapBuffer() sets buffer = output.toByteArray() unconditionally.
It’s presumably always valid, but would be good to add defensive if there's any
chance output might be null or closed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]