adoroszlai commented on a change in pull request #3034:
URL: https://github.com/apache/ozone/pull/3034#discussion_r800180214



##########
File path: 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
##########
@@ -89,18 +89,17 @@
   //ID of the datanode where this container is created
   private String originNodeId;
 
-  /** parameters for read/write statistics on the container. **/
-  private final AtomicLong readBytes;
-  private final AtomicLong writeBytes;
-  private final AtomicLong readCount;
-  private final AtomicLong writeCount;
+  // Total number of blocks in the container
+  private final AtomicLong blockCount;
+  // Total number of bytes of all the blocks in the container
   private final AtomicLong bytesUsed;
-  private final AtomicLong keyCount;
 
   private HddsVolume volume;
 
   private String checksum;
 
+  private ContainerDataStats stats;

Review comment:
       Separating it is nice, but is it necessary for the fix?  If not, can we 
separate it to a follow-up improvement Jira issue?

##########
File path: 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataStats.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+
+public class ContainerDataStats {

Review comment:
       Would `ContainerOpStats` be a better name?

##########
File path: 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
##########
@@ -143,11 +135,24 @@ public static long persistPutBlock(KeyValueContainer 
container,
             containerBCSId, bcsId);
         return data.getSize();
       }
+
       // update the blockData as well as BlockCommitSequenceId here
       try (BatchOperation batch = db.getStore().getBatchHandler()
           .initBatchOperation()) {
+        String dbKeyForBlock = Long.toString(data.getLocalID());
+
+        // If incrBlockCount is absent, then check the DB if the block exists
+        // or not. If it does not exist, increment the blockCount.
+        if (!incrBlockCount.isPresent()) {
+          if (db.getStore().getBlockDataTable().get(dbKeyForBlock) == null) {
+            incrBlockCount = Optional.of(false);
+          } else {
+            incrBlockCount = Optional.of(true);
+          }
+        }

Review comment:
       Given this fallback logic for old clients, is the client side change 
only a performance optimization?  Would simply ignoring flag from client be a 
performance hit?

##########
File path: 
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
##########
@@ -415,41 +422,54 @@ void updateCommitInfo(XceiverClientReply reply, 
List<ChunkBuffer> buffers) {
         ContainerCommandResponseProto> flushFuture = null;
     try {
       BlockData blockData = containerBlockData.build();
-      XceiverClientReply asyncReply =
-          putBlockAsync(xceiverClient, blockData, close, token);
+      boolean firstBlockPutCall = updateBlockCount.getAndSet(false);
+      XceiverClientReply asyncReply = putBlockAsync(xceiverClient, blockData,
+          firstBlockPutCall, close, token);
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
           asyncReply.getResponse();
-      flushFuture = future.thenApplyAsync(e -> {
+      flushFuture = future.thenApplyAsync(response -> {
         try {
-          validateResponse(e);
+          validateResponse(response);
         } catch (IOException sce) {
           throw new CompletionException(sce);
         }
         // if the ioException is not set, putBlock is successful
-        if (getIoException() == null && !force) {
-          BlockID responseBlockID = BlockID.getFromProtobuf(
-              e.getPutBlock().getCommittedBlockLength().getBlockID());
-          Preconditions.checkState(blockID.get().getContainerBlockID()
-              .equals(responseBlockID.getContainerBlockID()));
-          // updates the bcsId of the block
-          blockID.set(responseBlockID);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                "Adding index " + asyncReply.getLogIndex() + " flushLength "
-                    + flushPos + " numBuffers " + byteBufferList.size()
-                    + " blockID " + blockID + " bufferPool size" + bufferPool
-                    .getSize() + " currentBufferIndex " + bufferPool
-                    .getCurrentBufferIndex());
+        if (getIoException() == null) {
+          if (!force) {
+            BlockID responseBlockID = BlockID.getFromProtobuf(
+                response.getPutBlock().getCommittedBlockLength().getBlockID());
+            Preconditions.checkState(blockID.get().getContainerBlockID()
+                .equals(responseBlockID.getContainerBlockID()));
+            // updates the bcsId of the block
+            blockID.set(responseBlockID);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Adding index " + asyncReply.getLogIndex() + " flushLength "
+                      + flushPos + " numBuffers " + byteBufferList.size()
+                      + " blockID " + blockID + " bufferPool size" + bufferPool
+                      .getSize() + " currentBufferIndex " + bufferPool
+                      .getCurrentBufferIndex());
+            }
+            // for standalone protocol, logIndex will always be 0.
+            updateCommitInfo(asyncReply, byteBufferList);
+          }
+        } else {
+          // PutBlock not successful -> reset updateBlockCount if it was set
+          // to true
+          if (firstBlockPutCall) {
+            updateBlockCount.compareAndSet(true, false);
           }
-          // for standalone protocol, logIndex will always be 0.
-          updateCommitInfo(asyncReply, byteBufferList);
         }
-        return e;
+        return response;
       }, responseExecutor).exceptionally(e -> {
         if (LOG.isDebugEnabled()) {
           LOG.debug("putBlock failed for blockID {} with exception {}",
                   blockID, e.getLocalizedMessage());
         }
+        // Reset updateBlockCount if it was set to true
+        if (firstBlockPutCall) {
+          updateBlockCount.compareAndSet(true, false);

Review comment:
       By the same rationale:
   
   ```suggestion
           // Reset updateBlockCount if it was set to false
           if (firstBlockPutCall) {
             updateBlockCount.set(true);
   ```

##########
File path: 
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
##########
@@ -415,41 +422,54 @@ void updateCommitInfo(XceiverClientReply reply, 
List<ChunkBuffer> buffers) {
         ContainerCommandResponseProto> flushFuture = null;
     try {
       BlockData blockData = containerBlockData.build();
-      XceiverClientReply asyncReply =
-          putBlockAsync(xceiverClient, blockData, close, token);
+      boolean firstBlockPutCall = updateBlockCount.getAndSet(false);
+      XceiverClientReply asyncReply = putBlockAsync(xceiverClient, blockData,
+          firstBlockPutCall, close, token);
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
           asyncReply.getResponse();
-      flushFuture = future.thenApplyAsync(e -> {
+      flushFuture = future.thenApplyAsync(response -> {
         try {
-          validateResponse(e);
+          validateResponse(response);
         } catch (IOException sce) {
           throw new CompletionException(sce);
         }
         // if the ioException is not set, putBlock is successful
-        if (getIoException() == null && !force) {
-          BlockID responseBlockID = BlockID.getFromProtobuf(
-              e.getPutBlock().getCommittedBlockLength().getBlockID());
-          Preconditions.checkState(blockID.get().getContainerBlockID()
-              .equals(responseBlockID.getContainerBlockID()));
-          // updates the bcsId of the block
-          blockID.set(responseBlockID);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                "Adding index " + asyncReply.getLogIndex() + " flushLength "
-                    + flushPos + " numBuffers " + byteBufferList.size()
-                    + " blockID " + blockID + " bufferPool size" + bufferPool
-                    .getSize() + " currentBufferIndex " + bufferPool
-                    .getCurrentBufferIndex());
+        if (getIoException() == null) {
+          if (!force) {
+            BlockID responseBlockID = BlockID.getFromProtobuf(
+                response.getPutBlock().getCommittedBlockLength().getBlockID());
+            Preconditions.checkState(blockID.get().getContainerBlockID()
+                .equals(responseBlockID.getContainerBlockID()));
+            // updates the bcsId of the block
+            blockID.set(responseBlockID);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Adding index " + asyncReply.getLogIndex() + " flushLength "
+                      + flushPos + " numBuffers " + byteBufferList.size()
+                      + " blockID " + blockID + " bufferPool size" + bufferPool
+                      .getSize() + " currentBufferIndex " + bufferPool
+                      .getCurrentBufferIndex());
+            }
+            // for standalone protocol, logIndex will always be 0.
+            updateCommitInfo(asyncReply, byteBufferList);
+          }
+        } else {
+          // PutBlock not successful -> reset updateBlockCount if it was set
+          // to true
+          if (firstBlockPutCall) {
+            updateBlockCount.compareAndSet(true, false);

Review comment:
       This doesn't seem right.
   
   1. Comparison is unnecessary.  `updateBlockCount` can only have two values, 
`true` or `false`.  The `compareAndSet` call sets it to `false` if currently 
`true`, otherwise (if it's already `false`) it does not change it.  The end 
result is that `updateBlockCount` is always `false` at the end.  Thus we can 
directly `set(false)` without checking current value.  I think `compareAndSet` 
is useful only if we use its return value, which reflects whether it has made 
the update or not.
   2. More importantly the expected and updated values seem to be swapped.  
`updateBlockCount` starts out as `true`, but by the time we got here, it must 
have been set to `false`.  Isn't this intended to reset to the original value, 
which is `true`?
   
   (Hope that makes sense, let me know if I should further clarify.)
   
   ```suggestion
             // PutBlock not successful -> reset updateBlockCount if it was set
             // to true
             if (firstBlockPutCall) {
               updateBlockCount.set(true);
   ```

##########
File path: 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
##########
@@ -201,8 +207,78 @@ public void testBlockWritesWithDnFailures() throws 
Exception {
         .setRefreshPipeline(true)
         .build();
     OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+
     Assert.assertEquals(data.length, keyInfo.getDataSize());
     validateData(keyName, data);
+
+    // Verify that the block information is updated correctly in the DB on
+    // failures
+    testBlockCountOnFailures(keyInfo);
+  }
+
+  /**
+   * Test whether blockData and Container metadata (block count and used
+   * bytes) is updated correctly when there is a write failure.
+   * We can combine this test with {@link #testBlockWritesWithDnFailures()}
+   * as that test also simulates a write failure and client writes failed
+   * chunk writes to a new block.
+   */
+  private void testBlockCountOnFailures(OmKeyInfo omKeyInfo) throws Exception {
+    // testBlockWritesWithDnFailures writes chunkSize*1.5 size of data into
+    // KeyOutputStream. But before closing the outputStream, 2 of the DNs in
+    // the pipeline being written to are closed. This forces the client to
+    // write the uncommitted data in the buffer (first chunkSize of data will
+    // be committed before close as the flush size = chunkSize, the last 0
+    // .5*chunkSize of data will be in buffer) to another block in a
+    // different pipeline.
+
+    // Get information about the first and second block (in different 
pipelines)
+    List<OmKeyLocationInfo> locationList = 
omKeyInfo.getLatestVersionLocations()
+        .getLocationList();
+    long containerId1 = locationList.get(0).getContainerID();
+    long containerId2 = locationList.get(1).getContainerID();

Review comment:
       This is failing with `IndexOutOfBoundsException: Index: 1, Size: 1`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to