szetszwo commented on a change in pull request #2701:
URL: https://github.com/apache/ozone/pull/2701#discussion_r729876056



##########
File path: 
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
##########
@@ -242,6 +248,8 @@ public IOException getIoException() {
     return ioException.get();
   }
 
+
+

Review comment:
       Please remove these added empty lines.
   
   Also, please revert other similar whitespace changes.

##########
File path: 
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
##########
@@ -261,15 +272,34 @@ private void updateFlushLength() {
     totalDataFlushedLength = writtenDataLength;
   }
 
+  @VisibleForTesting
+  public long getTotalDataFlushedLength() {
+    return totalDataFlushedLength;
+  }
   /**
    * Will be called on the retryPath in case closedContainerException/
    * TimeoutException.
    * @param len length of data to write
    * @throws IOException if error occurred
    */
 
-  // TODO: We need add new retry policy without depend on bufferPool.
   public void writeOnRetry(long len) throws IOException {
+    if (len == 0) {
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Retrying write length {} for blockID {}", len, blockID);
+    }
+    int count = 0;
+    while (len > 0) {
+      ByteBuffer buf = bufferList.get(count);
+      long writeLen = Math.min(buf.limit() - buf.position(), len);
+      writeChunkToContainer(buf.duplicate());

Review comment:
       When the len is the min, the limit in the duplicated buf needs to be 
updated.

##########
File path: 
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
##########
@@ -48,7 +53,13 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(StreamCommitWatcher.class);
 
-  private Set<Long> commitIndexSet;
+  private Map<Long, List<ByteBuffer>> commitIndexMap;
+
+  private List<ByteBuffer> bufferPool;

Review comment:
       Please rename it to bufferList.
   
   Please also rename other use of "bufferPool" to "bufferList".

##########
File path: 
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
##########
@@ -140,11 +162,55 @@ public XceiverClientReply streamWatchForCommit(long 
commitIndex)
     }
   }
 
+  void releaseBuffersOnException() {
+    adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
+  }
+
+  private void adjustBuffers(long commitIndex) {
+    List<Long> keyList = commitIndexSet.keySet().stream()
+        .filter(p -> p <= commitIndex).collect(Collectors.toList());
+    if (!keyList.isEmpty()) {
+      releaseBuffers(keyList);
+    }
+  }
+
+  private long releaseBuffers(List<Long> indexes) {
+    Preconditions.checkArgument(!commitIndexSet.isEmpty());
+    for (long index : indexes) {
+      Preconditions.checkState(commitIndexSet.containsKey(index));
+      final List<ByteBuffer> buffers
+          = commitIndexSet.remove(index);
+      long length =
+          buffers.stream().mapToLong(buf -> (buf.limit() - buf.position()))
+              .sum();
+      totalAckDataLength += length;
+      // clear the future object from the future Map
+      final CompletableFuture<ContainerCommandResponseProto> remove =
+          futureMap.remove(totalAckDataLength);
+      if (remove == null) {
+        LOG.error("Couldn't find required future for " + totalAckDataLength);
+        for (Long key : futureMap.keySet()) {
+          LOG.error("Existing acknowledged data: " + key);
+        }
+      }
+      Preconditions.checkNotNull(remove);
+      for (ByteBuffer byteBuffer : buffers) {
+        bufferPool.remove(byteBuffer);

Review comment:
       We must avoid using remove(..) since it is super expensive when using it 
with ByteBuffer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to