sodonnel commented on a change in pull request #2899:
URL: https://github.com/apache/ozone/pull/2899#discussion_r768579094
##########
File path:
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
##########
@@ -439,32 +454,61 @@ private void clearParityBuffers() {
}
protected void loadDataBuffersFromStream() throws IOException {
+ Queue<ImmutablePair<Integer, Future<Void>>> pendingReads
+ = new ArrayDeque<>();
for (int i : dataIndexes) {
+ pendingReads.add(new ImmutablePair<>(i, executor.submit(() -> {
+ readIntoBuffer(i, decoderInputBuffers[i]);
+ return null;
+ })));
+ }
+ boolean exceptionOccurred = false;
+ while(!pendingReads.isEmpty()) {
+ int index = -1;
try {
- BlockExtendedInputStream stream = getOrOpenStream(i);
- seekStreamIfNecessary(stream, 0);
- ByteBuffer b = decoderInputBuffers[i];
- while (b.hasRemaining()) {
- int read = stream.read(b);
- if (read == EOF) {
- // We should not reach EOF, as the block should have enough data to
- // fill the buffer. If the block does not, then it indicates the
- // block is not as long as it should be, based on the block length
- // stored in OM. Therefore if there is any remaining space in the
- // buffer, we should throw an exception.
- if (b.hasRemaining()) {
- throw new IOException("Expected to read " + b.remaining() +
- " bytes from block " + getBlockID() + " EC index " + (i + 1)
+
- " but reached EOF");
- }
- break;
- }
- }
- } catch (IOException e) {
+ ImmutablePair<Integer, Future<Void>> pair = pendingReads.poll();
+ index = pair.getKey();
+ // Should this future.get() have a timeout? At the end of the call
chain
+ // we eventually call a grpc or ratis client to read the block data.
Its
+ // the call to the DNs which could potentially block. There is a
timeout
+ // on that call controlled by:
+ // OZONE_CLIENT_READ_TIMEOUT = "ozone.client.read.timeout";
+ // Which defaults to 30s. So if there is a DN communication problem, it
+ // should timeout in the client which should propagate up the stack as
+ // an IOException.
+ pair.getValue().get();
+ } catch (ExecutionException ee) {
LOG.warn("Failed to read from block {} EC index {}. Excluding the " +
- "block", getBlockID(), i + 1, e);
- failedDataIndexes.add(i);
- throw e;
+ "block", getBlockID(), index + 1, ee.getCause());
+ failedDataIndexes.add(index);
+ exceptionOccurred = true;
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted waiting for reads to complete", ie);
Review comment:
Yea, I forgot the IOE would be retried by the higher level. I think we
should let the interrupted exception propagate up from this method untouched
and then handle it at the higher level.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]