szetszwo commented on a change in pull request #2701:
URL: https://github.com/apache/ozone/pull/2701#discussion_r721208631



##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
##########
@@ -59,6 +60,7 @@
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private final long openID;
   private final ExcludeList excludeList;
+  private List<ByteBuffer> bufferPool;

Review comment:
       Unlike BlockOutputStreamEntryPool, BlockDataStreamOutputEntryPool should 
not have a bufferPool since it won't reuse buffers.

##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
##########
@@ -278,11 +278,14 @@ private void handleException(BlockDataStreamOutputEntry 
streamEntry,
     }
     Pipeline pipeline = streamEntry.getPipeline();
     PipelineID pipelineId = pipeline.getId();
-
+    long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
+    //set the correct length for the current stream
+    streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
     long containerId = streamEntry.getBlockID().getContainerID();
     Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
     Preconditions.checkNotNull(failedServers);
     ExcludeList excludeList = blockDataStreamOutputEntryPool.getExcludeList();
+    long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData();

Review comment:
       The bufferedDataLen should be obtained from streamEntry (i.e. 
BlockDataStreamOutputEntry).  In BlockDataStreamOutputEntry, compute the 
buffered data length from BlockDataStreamOutput.

##########
File path: 
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
##########
@@ -92,6 +92,19 @@
 
   private int chunkIndex;
   private final AtomicLong chunkOffset = new AtomicLong();
+
+  // Similar to 'BufferPool' but this list maintains only references
+  // to the ByteBuffers.
+  private List<ByteBuffer> bufferPool;
+
+  // List containing buffers for which the putBlock call will
+  // update the length in the datanodes. This list will just maintain
+  // references to the buffers in the BufferPool which will be cleared
+  // when the watchForCommit acknowledges a putBlock logIndex has been
+  // committed on all datanodes. This list will be a  place holder for buffers
+  // which got written between successive putBlock calls.
+  private List<ByteBuffer> bufferList;

Review comment:
       I suggest to create a new class for the bufferList which also record the 
ackedIndex.
   ```
     static class BufferList {
       private final List<ByteBuffer> buffers;
       private int ackedIndex = -1;
   
       ...
     }
   ```

##########
File path: 
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
##########
@@ -140,11 +162,55 @@ public XceiverClientReply streamWatchForCommit(long 
commitIndex)
     }
   }
 
+  void releaseBuffersOnException() {
+    adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
+  }
+
+  private void adjustBuffers(long commitIndex) {
+    List<Long> keyList = commitIndexSet.keySet().stream()
+        .filter(p -> p <= commitIndex).collect(Collectors.toList());
+    if (!keyList.isEmpty()) {
+      releaseBuffers(keyList);
+    }
+  }
+
+  private long releaseBuffers(List<Long> indexes) {
+    Preconditions.checkArgument(!commitIndexSet.isEmpty());
+    for (long index : indexes) {
+      Preconditions.checkState(commitIndexSet.containsKey(index));
+      final List<ByteBuffer> buffers
+          = commitIndexSet.remove(index);
+      long length =
+          buffers.stream().mapToLong(buf -> (buf.limit() - buf.position()))
+              .sum();
+      totalAckDataLength += length;
+      // clear the future object from the future Map
+      final CompletableFuture<ContainerCommandResponseProto> remove =
+          futureMap.remove(totalAckDataLength);
+      if (remove == null) {
+        LOG.error("Couldn't find required future for " + totalAckDataLength);
+        for (Long key : futureMap.keySet()) {
+          LOG.error("Existing acknowledged data: " + key);
+        }
+      }
+      Preconditions.checkNotNull(remove);
+      for (ByteBuffer byteBuffer : buffers) {
+        bufferPool.remove(byteBuffer);

Review comment:
       This is very expensive!  It will compare every byte inside the buffer.  
Anyway, we should not have a bufferPool here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to