umamaheswararao commented on a change in pull request #2899:
URL: https://github.com/apache/ozone/pull/2899#discussion_r768300478



##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
##########
@@ -439,32 +454,61 @@ private void clearParityBuffers() {
   }
 
   protected void loadDataBuffersFromStream() throws IOException {
+    Queue<ImmutablePair<Integer, Future<Void>>> pendingReads
+        = new ArrayDeque<>();
     for (int i : dataIndexes) {
+      pendingReads.add(new ImmutablePair<>(i, executor.submit(() -> {
+        readIntoBuffer(i, decoderInputBuffers[i]);
+        return null;
+      })));
+    }
+    boolean exceptionOccurred = false;
+    while(!pendingReads.isEmpty()) {
+      int index = -1;
       try {
-        BlockExtendedInputStream stream = getOrOpenStream(i);
-        seekStreamIfNecessary(stream, 0);
-        ByteBuffer b = decoderInputBuffers[i];
-        while (b.hasRemaining()) {
-          int read = stream.read(b);
-          if (read == EOF) {
-            // We should not reach EOF, as the block should have enough data to
-            // fill the buffer. If the block does not, then it indicates the
-            // block is not as long as it should be, based on the block length
-            // stored in OM. Therefore if there is any remaining space in the
-            // buffer, we should throw an exception.
-            if (b.hasRemaining()) {
-              throw new IOException("Expected to read " + b.remaining() +
-                  " bytes from block " + getBlockID() + " EC index " + (i + 1) 
+
-                  " but reached EOF");
-            }
-            break;
-          }
-        }
-      } catch (IOException e) {
+        ImmutablePair<Integer, Future<Void>> pair = pendingReads.poll();
+        index = pair.getKey();
+        // Should this future.get() have a timeout? At the end of the call 
chain
+        // we eventually call a grpc or ratis client to read the block data. 
Its
+        // the call to the DNs which could potentially block. There is a 
timeout
+        // on that call controlled by:
+        //     OZONE_CLIENT_READ_TIMEOUT = "ozone.client.read.timeout";
+        // Which defaults to 30s. So if there is a DN communication problem, it
+        // should timeout in the client which should propagate up the stack as
+        // an IOException.
+        pair.getValue().get();
+      } catch (ExecutionException ee) {
         LOG.warn("Failed to read from block {} EC index {}. Excluding the " +
-                "block", getBlockID(), i + 1, e);
-        failedDataIndexes.add(i);
-        throw e;
+            "block", getBlockID(), index + 1, ee.getCause());
+        failedDataIndexes.add(index);
+        exceptionOccurred = true;
+      } catch (InterruptedException ie) {

Review comment:
       May want to reset this Interrupted exception? SO that we will not 
suppress it?

##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
##########
@@ -439,32 +454,61 @@ private void clearParityBuffers() {
   }
 
   protected void loadDataBuffersFromStream() throws IOException {
+    Queue<ImmutablePair<Integer, Future<Void>>> pendingReads
+        = new ArrayDeque<>();
     for (int i : dataIndexes) {
+      pendingReads.add(new ImmutablePair<>(i, executor.submit(() -> {
+        readIntoBuffer(i, decoderInputBuffers[i]);
+        return null;
+      })));
+    }
+    boolean exceptionOccurred = false;
+    while(!pendingReads.isEmpty()) {
+      int index = -1;
       try {
-        BlockExtendedInputStream stream = getOrOpenStream(i);
-        seekStreamIfNecessary(stream, 0);
-        ByteBuffer b = decoderInputBuffers[i];
-        while (b.hasRemaining()) {
-          int read = stream.read(b);
-          if (read == EOF) {
-            // We should not reach EOF, as the block should have enough data to
-            // fill the buffer. If the block does not, then it indicates the
-            // block is not as long as it should be, based on the block length
-            // stored in OM. Therefore if there is any remaining space in the
-            // buffer, we should throw an exception.
-            if (b.hasRemaining()) {
-              throw new IOException("Expected to read " + b.remaining() +
-                  " bytes from block " + getBlockID() + " EC index " + (i + 1) 
+
-                  " but reached EOF");
-            }
-            break;
-          }
-        }
-      } catch (IOException e) {
+        ImmutablePair<Integer, Future<Void>> pair = pendingReads.poll();
+        index = pair.getKey();
+        // Should this future.get() have a timeout? At the end of the call 
chain
+        // we eventually call a grpc or ratis client to read the block data. 
Its
+        // the call to the DNs which could potentially block. There is a 
timeout
+        // on that call controlled by:
+        //     OZONE_CLIENT_READ_TIMEOUT = "ozone.client.read.timeout";
+        // Which defaults to 30s. So if there is a DN communication problem, it
+        // should timeout in the client which should propagate up the stack as
+        // an IOException.
+        pair.getValue().get();
+      } catch (ExecutionException ee) {
         LOG.warn("Failed to read from block {} EC index {}. Excluding the " +
-                "block", getBlockID(), i + 1, e);
-        failedDataIndexes.add(i);
-        throw e;
+            "block", getBlockID(), index + 1, ee.getCause());
+        failedDataIndexes.add(index);
+        exceptionOccurred = true;
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted waiting for reads to complete", ie);
+        throw new IOException("Interrupted waiting for reads to complete", ie);

Review comment:
       We may need to shutdown the pool?

##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
##########
@@ -439,32 +454,61 @@ private void clearParityBuffers() {
   }
 
   protected void loadDataBuffersFromStream() throws IOException {
+    Queue<ImmutablePair<Integer, Future<Void>>> pendingReads
+        = new ArrayDeque<>();
     for (int i : dataIndexes) {
+      pendingReads.add(new ImmutablePair<>(i, executor.submit(() -> {
+        readIntoBuffer(i, decoderInputBuffers[i]);
+        return null;
+      })));
+    }
+    boolean exceptionOccurred = false;
+    while(!pendingReads.isEmpty()) {
+      int index = -1;
       try {
-        BlockExtendedInputStream stream = getOrOpenStream(i);
-        seekStreamIfNecessary(stream, 0);
-        ByteBuffer b = decoderInputBuffers[i];
-        while (b.hasRemaining()) {
-          int read = stream.read(b);
-          if (read == EOF) {
-            // We should not reach EOF, as the block should have enough data to
-            // fill the buffer. If the block does not, then it indicates the
-            // block is not as long as it should be, based on the block length
-            // stored in OM. Therefore if there is any remaining space in the
-            // buffer, we should throw an exception.
-            if (b.hasRemaining()) {
-              throw new IOException("Expected to read " + b.remaining() +
-                  " bytes from block " + getBlockID() + " EC index " + (i + 1) 
+
-                  " but reached EOF");
-            }
-            break;
-          }
-        }
-      } catch (IOException e) {
+        ImmutablePair<Integer, Future<Void>> pair = pendingReads.poll();
+        index = pair.getKey();
+        // Should this future.get() have a timeout? At the end of the call 
chain
+        // we eventually call a grpc or ratis client to read the block data. 
Its
+        // the call to the DNs which could potentially block. There is a 
timeout
+        // on that call controlled by:
+        //     OZONE_CLIENT_READ_TIMEOUT = "ozone.client.read.timeout";
+        // Which defaults to 30s. So if there is a DN communication problem, it
+        // should timeout in the client which should propagate up the stack as
+        // an IOException.
+        pair.getValue().get();
+      } catch (ExecutionException ee) {
         LOG.warn("Failed to read from block {} EC index {}. Excluding the " +
-                "block", getBlockID(), i + 1, e);
-        failedDataIndexes.add(i);
-        throw e;
+            "block", getBlockID(), index + 1, ee.getCause());
+        failedDataIndexes.add(index);
+        exceptionOccurred = true;
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted waiting for reads to complete", ie);

Review comment:
       Looks like we are throwing the IOE on InterruptedException, but we catch 
the IOE in readStripe and retrying. 
   So, this interrupted exception would be masked. Probably, we should reset 
interrupted exception here and handle interruptedException in readStripe as 
well to throw out?

##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
##########
@@ -118,6 +129,10 @@ public 
ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
     // The EC decoder needs an array data+parity long, with missing or not
     // needed indexes set to null.
     decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
+
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
+        "ec-reader-for-" + blockInfo.getBlockID() + "-%d").build();

Review comment:
       Can we just explicitly say -TID-%d, just not confuse with block indexes.

##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
##########
@@ -439,32 +454,61 @@ private void clearParityBuffers() {
   }
 
   protected void loadDataBuffersFromStream() throws IOException {
+    Queue<ImmutablePair<Integer, Future<Void>>> pendingReads
+        = new ArrayDeque<>();
     for (int i : dataIndexes) {
+      pendingReads.add(new ImmutablePair<>(i, executor.submit(() -> {
+        readIntoBuffer(i, decoderInputBuffers[i]);
+        return null;
+      })));
+    }
+    boolean exceptionOccurred = false;
+    while(!pendingReads.isEmpty()) {
+      int index = -1;
       try {
-        BlockExtendedInputStream stream = getOrOpenStream(i);
-        seekStreamIfNecessary(stream, 0);
-        ByteBuffer b = decoderInputBuffers[i];
-        while (b.hasRemaining()) {
-          int read = stream.read(b);
-          if (read == EOF) {
-            // We should not reach EOF, as the block should have enough data to
-            // fill the buffer. If the block does not, then it indicates the
-            // block is not as long as it should be, based on the block length
-            // stored in OM. Therefore if there is any remaining space in the
-            // buffer, we should throw an exception.
-            if (b.hasRemaining()) {
-              throw new IOException("Expected to read " + b.remaining() +
-                  " bytes from block " + getBlockID() + " EC index " + (i + 1) 
+
-                  " but reached EOF");
-            }
-            break;
-          }
-        }
-      } catch (IOException e) {
+        ImmutablePair<Integer, Future<Void>> pair = pendingReads.poll();
+        index = pair.getKey();
+        // Should this future.get() have a timeout? At the end of the call 
chain

Review comment:
       I agree. Actual timeouts should be handled by underlying end points. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to