adoroszlai commented on code in PR #3457:
URL: https://github.com/apache/ozone/pull/3457#discussion_r887923263


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java:
##########
@@ -154,92 +182,112 @@ public synchronized void 
addFailedDatanodes(List<DatanodeDetails> dns) {
     }
   }
 
+  public synchronized void setRecoveryIndexes(Collection<Integer> indexes) {
+    if (initialized) {
+      throw new IllegalStateException("Cannot set recovery indexes after the " 
+
+          "reader has been initialized");
+    }
+    Preconditions.assertNotNull(indexes, "recovery indexes");
+    recoveryIndexes.clear();
+    recoveryIndexes.addAll(indexes);
+  }
+
   private void init() throws InsufficientLocationsException {
     if (decoder == null) {
       decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
     }
-    if (decoderInputBuffers == null) {
-      // The EC decoder needs an array data+parity long, with missing or not
-      // needed indexes set to null.
-      decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
-    }
     if (!hasSufficientLocations()) {
       throw new InsufficientLocationsException("There are insufficient " +
           "datanodes to read the EC block");
     }
-    dataIndexes.clear();
-    ECReplicationConfig repConfig = getRepConfig();
-    DatanodeDetails[] locations = getDataLocations();
-    setMissingIndexesAndDataLocations(locations);
-    List<Integer> parityIndexes =
-        selectParityIndexes(locations, missingIndexes.length);
-    // We read from the selected parity blocks, so add them to the data 
indexes.
-    dataIndexes.addAll(parityIndexes);
+    allocateInternalBuffers();
+    if (!isOfflineRecovery()) {
+      decoderOutputBuffers = new ByteBuffer[missingIndexes.size()];
+    }
+    initialized = true;
+  }
+
+  private void allocateInternalBuffers() {
     // The decoder inputs originally start as all nulls. Then we populate the
-    // pieces we have data for. The parity buffers are reused for the block
-    // so we can allocated them now. On re-init, we reuse any parity buffers
+    // pieces we have data for. The internal buffers are reused for the block,
+    // so we can allocate them now. On re-init, we reuse any internal buffers
     // already allocated.
-    for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
-      if (parityIndexes.contains(i)) {
-        if (decoderInputBuffers[i] == null) {
-          decoderInputBuffers[i] = allocateBuffer(repConfig);
-        }
-      } else {
-        decoderInputBuffers[i] = null;
+    final int minIndex = isOfflineRecovery() ? 0 : getRepConfig().getData();
+    for (int i = minIndex; i < getRepConfig().getRequiredNodes(); i++) {
+      boolean internalInput = selectedIndexes.contains(i)
+          || paddingIndexes.contains(i);
+      boolean hasBuffer = decoderInputBuffers[i] != null;
+
+      if (internalInput && !hasBuffer) {
+        allocateInternalBuffer(i);
+      } else if (!internalInput && hasBuffer) {
+        releaseInternalBuffer(i);
       }
     }
-    decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
-    initialized = true;
   }
 
-  /**
-   * Determine which indexes are missing, taking into account the length of the
-   * block. For a block shorter than a full EC stripe, it is expected that
-   * some of the data locations will not be present.
-   * Populates the missingIndex and dataIndexes instance variables.
-   * @param locations Available locations for the block group
-   */
-  private void setMissingIndexesAndDataLocations(DatanodeDetails[] locations) {
-    ECReplicationConfig repConfig = getRepConfig();
-    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
-    List<Integer> missingInd = new ArrayList<>();
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if ((locations[i] == null || failedDataIndexes.contains(i))
-          && i < expectedDataBlocks) {
-        missingInd.add(i);
-      } else if (locations[i] != null && !failedDataIndexes.contains(i)) {
-        dataIndexes.add(i);
+  private void allocateInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.add(index),
+        () -> "Buffer " + index + " already tracked as internal input");
+    decoderInputBuffers[index] =
+        byteBufferPool.getBuffer(false, getRepConfig().getEcChunkSize());
+  }
+
+  private void releaseInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.remove(index),
+        () -> "Buffer " + index + " not tracked as internal input");
+    byteBufferPool.putBuffer(decoderInputBuffers[index]);
+    decoderInputBuffers[index] = null;
+  }
+
+  private void markMissingLocationsAsFailed() {
+    DatanodeDetails[] locations = getDataLocations();
+    for (int i = 0; i < locations.length; i++) {
+      if (locations[i] == null && failedDataIndexes.add(i)) {
+        LOG.debug("Marked index={} as failed", i);
       }
     }
-    missingIndexes = missingInd.stream().mapToInt(Integer::valueOf).toArray();
+  }
+
+  private boolean isOfflineRecovery() {
+    return !recoveryIndexes.isEmpty();
   }
 
   private void assignBuffers(ByteBuffer[] bufs) {
-    ECReplicationConfig repConfig = getRepConfig();
-    Preconditions.assertTrue(bufs.length == repConfig.getData());
-    int recoveryIndex = 0;
-    // Here bufs come from the caller and will be filled with data read from
-    // the blocks or recovered. Therefore, if the index is missing, we assign
-    // the buffer to the decoder outputs, where data is recovered via EC
-    // decoding. Otherwise the buffer is set to the input. Note, it may be a
-    // buffer which needs padded.
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if (isMissingIndex(i)) {
-        decoderOutputBuffers[recoveryIndex++] = bufs[i];
-        decoderInputBuffers[i] = null;
-      } else {
-        decoderInputBuffers[i] = bufs[i];
+    Preconditions.assertTrue(bufs.length == getExpectedBufferCount());
+
+    if (isOfflineRecovery()) {
+      decoderOutputBuffers = bufs;
+    } else {
+      int recoveryIndex = 0;
+      // Here bufs come from the caller and will be filled with data read from
+      // the blocks or recovered. Therefore, if the index is missing, we assign
+      // the buffer to the decoder outputs, where data is recovered via EC
+      // decoding. Otherwise the buffer is set to the input. Note, it may be a
+      // buffer which needs padded.
+      for (int i = 0; i < bufs.length; i++) {
+        if (isMissingIndex(i)) {
+          decoderOutputBuffers[recoveryIndex++] = bufs[i];
+          if (internalBuffers.contains(i)) {
+            releaseInternalBuffer(i);
+          } else {
+            decoderInputBuffers[i] = null;
+          }
+        } else {
+          decoderInputBuffers[i] = bufs[i];
+        }
       }
     }
   }
 
+  private int getExpectedBufferCount() {
+    return isOfflineRecovery()
+        ? recoveryIndexes.size()
+        : getRepConfig().getData();
+  }
+
   private boolean isMissingIndex(int ind) {
-    for (int i : missingIndexes) {
-      if (i == ind) {
-        return true;
-      }
-    }
-    return false;
+    return missingIndexes.contains(ind);
   }
 
   /**

Review Comment:
   > it may be a good idea to have overloaded API and define it's own 
expectations?
   
   Yep, we can have a separate method with different javadoc and different 
precondition (recovery indexes should or should not be set).  They would call 
the same underlying implementation.  In parameterized test we can either call 
the separate methods or the common helper -- which one would you prefer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to