This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new 54a75ca0a4 HDDS-9387. [hsync] Reduce updating block length times at OM 
during hsync (#6054)
54a75ca0a4 is described below

commit 54a75ca0a44190f4dc17ecce6301869cca57c3c6
Author: Sammi Chen <[email protected]>
AuthorDate: Thu Jan 25 11:04:47 2024 +0800

    HDDS-9387. [hsync] Reduce updating block length times at OM during hsync 
(#6054)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  | 43 ++++++++++------
 .../client/io/BlockInputStreamFactoryImpl.java     |  4 +-
 .../hdds/scm/storage/DummyBlockInputStream.java    | 10 ++--
 .../storage/DummyBlockInputStreamWithRetry.java    |  7 +--
 .../hdds/scm/storage/TestBlockInputStream.java     | 14 ++++--
 .../hadoop/hdds/scm/storage/BlockLocationInfo.java | 10 ++++
 .../client/io/BlockOutputStreamEntryPool.java      | 14 +++++-
 .../hadoop/ozone/client/io/KeyInputStream.java     | 10 +++-
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java | 57 ++++++++++++++++++++--
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  5 ++
 .../apache/hadoop/ozone/om/TestChunkStreams.java   |  2 +-
 11 files changed, 142 insertions(+), 34 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 385ea6d0c3..a12f9067ce 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -32,6 +32,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
@@ -66,7 +67,8 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
       LoggerFactory.getLogger(BlockInputStream.class);
 
   private final BlockID blockID;
-  private final long length;
+  private long length;
+  private final BlockLocationInfo blockInfo;
   private final AtomicReference<Pipeline> pipelineRef =
       new AtomicReference<>();
   private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
@@ -111,12 +113,13 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
 
   private final Function<BlockID, BlockLocationInfo> refreshFunction;
 
-  public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
+  public BlockInputStream(BlockLocationInfo blockInfo, Pipeline pipeline,
       Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory,
       Function<BlockID, BlockLocationInfo> refreshFunction) {
-    this.blockID = blockId;
-    this.length = blockLen;
+    this.blockInfo = blockInfo;
+    this.blockID = blockInfo.getBlockID();
+    this.length = blockInfo.getLength();
     setPipeline(pipeline);
     tokenRef.set(token);
     this.verifyChecksum = verifyChecksum;
@@ -124,14 +127,16 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
     this.refreshFunction = refreshFunction;
   }
 
+  // only for unit tests
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
                           Token<OzoneBlockTokenIdentifier> token,
                           boolean verifyChecksum,
-                          XceiverClientFactory xceiverClientFactory
-  ) {
-    this(blockId, blockLen, pipeline, token, verifyChecksum,
+                          XceiverClientFactory xceiverClientFactory) {
+    this(new BlockLocationInfo(new 
BlockLocationInfo.Builder().setBlockID(blockId).setLength(blockLen)),
+        pipeline, token, verifyChecksum,
         xceiverClientFactory, null);
   }
+
   /**
    * Initialize the BlockInputStream. Get the BlockData (list of chunks) from
    * the Container and create the ChunkInputStreams for each Chunk in the 
Block.
@@ -143,11 +148,17 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
       return;
     }
 
+    BlockData blockData = null;
     List<ChunkInfo> chunks = null;
     IOException catchEx = null;
     do {
       try {
-        chunks = getChunkInfoList();
+        blockData = getBlockData();
+        chunks = blockData.getChunksList();
+        if (blockInfo != null && blockInfo.isUnderConstruction()) {
+          // use the block length from DN if block is under construction.
+          length = blockData.getSize();
+        }
         break;
         // If we get a StorageContainerException or an IOException due to
         // datanodes are not reachable, refresh to get the latest pipeline
@@ -226,19 +237,22 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
 
   /**
    * Send RPC call to get the block info from the container.
-   * @return List of chunks in this block.
+   * @return BlockData.
    */
-  protected List<ChunkInfo> getChunkInfoList() throws IOException {
+  protected BlockData getBlockData() throws IOException {
     acquireClient();
     try {
-      return getChunkInfoListUsingClient();
+      return getBlockDataUsingClient();
     } finally {
       releaseClient();
     }
   }
 
-  @VisibleForTesting
-  protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
+  /**
+   * Send RPC call to get the block info from the container.
+   * @return BlockData.
+   */
+  protected BlockData getBlockDataUsingClient() throws IOException {
     final Pipeline pipeline = xceiverClient.getPipeline();
 
     if (LOG.isDebugEnabled()) {
@@ -258,8 +272,7 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
 
     GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
         xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get());
-
-    return response.getBlockData().getChunksList();
+    return response.getBlockData();
   }
 
   private void setPipeline(Pipeline pipeline) {
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 40063f9ce4..b9233f42d5 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -84,8 +84,8 @@ public class BlockInputStreamFactoryImpl implements 
BlockInputStreamFactory {
           blockInfo, verifyChecksum, xceiverFactory, refreshFunction,
           ecBlockStreamFactory);
     } else {
-      return new BlockInputStream(blockInfo.getBlockID(), 
blockInfo.getLength(),
-          pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
+      return new BlockInputStream(blockInfo, pipeline, token, verifyChecksum, 
xceiverFactory,
+          refreshFunction);
     }
   }
 
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
index 3e7779f0d1..ca3199d8ac 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.function.Function;
 
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -49,7 +50,8 @@ class DummyBlockInputStream extends BlockInputStream {
       Function<BlockID, BlockLocationInfo> refreshFunction,
       List<ChunkInfo> chunkList,
       Map<String, byte[]> chunks) {
-    super(blockId, blockLen, pipeline, token, verifyChecksum,
+    super(new BlockLocationInfo(new 
BlockLocationInfo.Builder().setBlockID(blockId).setLength(blockLen)),
+        pipeline, token, verifyChecksum,
         xceiverClientManager, refreshFunction);
     this.chunkDataMap = chunks;
     this.chunks = chunkList;
@@ -57,8 +59,10 @@ class DummyBlockInputStream extends BlockInputStream {
   }
 
   @Override
-  protected List<ChunkInfo> getChunkInfoList() throws IOException {
-    return chunks;
+  protected ContainerProtos.BlockData getBlockData() throws IOException {
+    BlockID blockID = getBlockID();
+    ContainerProtos.DatanodeBlockID datanodeBlockID = 
blockID.getDatanodeBlockIDProtobuf();
+    return 
ContainerProtos.BlockData.newBuilder().addAllChunks(chunks).setBlockID(datanodeBlockID).build();
   }
 
   @Override
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
index 24a3574514..d66c76dcdd 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -73,16 +74,16 @@ final class DummyBlockInputStreamWithRetry
   }
 
   @Override
-  protected List<ChunkInfo> getChunkInfoList() throws IOException {
+  protected ContainerProtos.BlockData getBlockData() throws IOException {
     if (getChunkInfoCount == 0) {
       getChunkInfoCount++;
       if (ioException != null) {
-        throw  ioException;
+        throw ioException;
       }
       throw new StorageContainerException("Exception encountered",
           CONTAINER_NOT_FOUND);
     } else {
-      return super.getChunkInfoList();
+      return super.getBlockData();
     }
   }
 }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index 3dc5a82b33..9d1feafb9a 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -22,6 +22,7 @@ import com.google.common.primitives.Bytes;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
@@ -409,16 +410,19 @@ public class TestBlockInputStream {
         .thenReturn(blockLocationInfo);
     when(blockLocationInfo.getPipeline()).thenReturn(newPipeline);
 
-    BlockInputStream subject = new BlockInputStream(blockID, blockSize,
+    BlockInputStream subject = new BlockInputStream(
+        new BlockLocationInfo(new 
BlockLocationInfo.Builder().setBlockID(blockID).setLength(blockSize)),
         pipeline, null, false, clientFactory, refreshFunction) {
       @Override
-      protected List<ChunkInfo> getChunkInfoListUsingClient() {
-        return chunks;
+      protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
+        return stream;
       }
 
       @Override
-      protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
-        return stream;
+      protected ContainerProtos.BlockData getBlockDataUsingClient() throws 
IOException {
+        BlockID blockID = getBlockID();
+        ContainerProtos.DatanodeBlockID datanodeBlockID = 
blockID.getDatanodeBlockIDProtobuf();
+        return 
ContainerProtos.BlockData.newBuilder().addAllChunks(chunks).setBlockID(datanodeBlockID).build();
       }
     };
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
index 019e16c2f1..a6b291c3f4 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
@@ -40,6 +40,8 @@ public class BlockLocationInfo {
 
   // PartNumber is set for Multipart upload Keys.
   private int partNumber;
+  // The block is under construction. Apply to hsynced file last block.
+  private boolean underConstruction;
 
   protected BlockLocationInfo(Builder builder) {
     this.blockID = builder.blockID;
@@ -111,6 +113,14 @@ public class BlockLocationInfo {
     return partNumber;
   }
 
+  public void setUnderConstruction(boolean uc) {
+    this.underConstruction = uc;
+  }
+
+  public boolean isUnderConstruction() {
+    return this.underConstruction;
+  }
+
   /**
    * Builder of BlockLocationInfo.
    */
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index d0f3b5728a..52ef31daf5 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 
+import org.apache.hadoop.hdds.client.ContainerBlockID;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
@@ -85,6 +86,8 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
   private final ExcludeList excludeList;
   private final ContainerClientMetrics clientMetrics;
   private final StreamBufferArgs streamBufferArgs;
+  // update blocks on OM
+  private ContainerBlockID lastUpdatedBlockId = new ContainerBlockID(-1, -1);
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   public BlockOutputStreamEntryPool(
@@ -368,7 +371,16 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
       if (keyArgs.getIsMultipartKey()) {
         throw new IOException("Hsync is unsupported for multipart keys.");
       } else {
-        omClient.hsyncKey(keyArgs, openID);
+        if (keyArgs.getLocationInfoList().size() == 0) {
+          omClient.hsyncKey(keyArgs, openID);
+        } else {
+          ContainerBlockID lastBLockId = 
keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1)
+              .getBlockID().getContainerBlockID();
+          if (!lastUpdatedBlockId.equals(lastBLockId)) {
+            omClient.hsyncKey(keyArgs, openID);
+            lastUpdatedBlockId = lastBLockId;
+          }
+        }
       }
     } else {
       LOG.warn("Closing KeyOutputStream, but key args is null");
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 4843c1c45e..6b6be1abd4 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
 import org.apache.hadoop.hdds.scm.storage.MultipartInputStream;
 import org.apache.hadoop.hdds.scm.storage.PartInputStream;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 
@@ -61,8 +62,10 @@ public class KeyInputStream extends MultipartInputStream {
       boolean verifyChecksum,
       Function<OmKeyInfo, OmKeyInfo> retryFunction,
       BlockInputStreamFactory blockStreamFactory) {
+    boolean isHsyncFile = 
keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID);
     List<BlockExtendedInputStream> partStreams = new ArrayList<>();
-    for (OmKeyLocationInfo omKeyLocationInfo : blockInfos) {
+    for (int i = 0; i < blockInfos.size(); i++) {
+      OmKeyLocationInfo omKeyLocationInfo = blockInfos.get(i);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Adding stream for accessing {}. The stream will be " +
             "initialized later.", omKeyLocationInfo);
@@ -85,6 +88,11 @@ public class KeyInputStream extends MultipartInputStream {
         retry = null;
       }
 
+      if (i == (blockInfos.size() - 1) && isHsyncFile) {
+        // block is under construction
+        omKeyLocationInfo.setUnderConstruction(true);
+      }
+
       BlockExtendedInputStream stream =
           blockStreamFactory.create(
               keyInfo.getReplicationConfig(),
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 559b8da498..7a1c055b00 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -23,11 +23,16 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.crypto.CipherSuite;
@@ -35,7 +40,6 @@ import org.apache.hadoop.crypto.CryptoCodec;
 import org.apache.hadoop.crypto.CryptoOutputStream;
 import org.apache.hadoop.crypto.Encryptor;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,6 +61,7 @@ import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 
@@ -107,7 +112,7 @@ public class TestHSync {
 
   @BeforeAll
   public static void init() throws Exception {
-    final int chunkSize = 16 << 10;
+    final int chunkSize = 4 << 10;
     final int flushSize = 2 * chunkSize;
     final int maxFlushSize = 2 * flushSize;
     final int blockSize = 2 * maxFlushSize;
@@ -279,6 +284,52 @@ public class TestHSync {
     }
   }
 
+  @Test
+  public void testHsyncKeyCallCount() throws Exception {
+    // Set the fs.defaultFS
+    final String rootPath = String.format("%s://%s/",
+        OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
+    CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+    final String dir = OZONE_ROOT + bucket.getVolumeName()
+        + OZONE_URI_DELIMITER + bucket.getName();
+
+    OMMetrics omMetrics = cluster.getOzoneManager().getMetrics();
+    omMetrics.resetNumKeyHSyncs();
+    final byte[] data = new byte[128];
+    ThreadLocalRandom.current().nextBytes(data);
+
+    final Path file = new Path(dir, "file-hsync-then-close");
+    long blockSize;
+    try (FileSystem fs = FileSystem.get(CONF)) {
+      blockSize = fs.getDefaultBlockSize(file);
+      long fileSize = 0;
+      try (FSDataOutputStream outputStream = fs.create(file, true)) {
+        // make sure at least writing 2 blocks data
+        while (fileSize <= blockSize) {
+          outputStream.write(data, 0, data.length);
+          outputStream.hsync();
+          fileSize += data.length;
+        }
+      }
+    }
+    assertEquals(2, omMetrics.getNumKeyHSyncs());
+
+    // test file with all blocks pre-allocated
+    omMetrics.resetNumKeyHSyncs();
+    long writtenSize = 0;
+    try (OzoneOutputStream outputStream = bucket.createKey("key-" + 
RandomStringUtils.randomNumeric(5),
+        blockSize * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new 
HashMap<>())) {
+      // make sure at least writing 2 blocks data
+      while (writtenSize <= blockSize) {
+        outputStream.write(data, 0, data.length);
+        outputStream.hsync();
+        writtenSize += data.length;
+      }
+    }
+    assertEquals(2, omMetrics.getNumKeyHSyncs());
+  }
+
   static void runTestHSync(FileSystem fs, Path file, int initialDataSize)
       throws Exception {
     try (StreamWithLength out = new StreamWithLength(
@@ -409,7 +460,7 @@ public class TestHSync {
         + OZONE_URI_DELIMITER + bucket.getName();
 
     try (FileSystem fs = FileSystem.get(CONF)) {
-      for (int i = 0; i < 10; i++) {
+      for (int i = 0; i < 5; i++) {
         final Path file = new Path(dir, "file" + i);
         try (FSDataOutputStream out =
             fs.create(file, true)) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 86fa867060..2fbbbe1530 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -1100,6 +1100,11 @@ public class OMMetrics implements 
OmMetadataReaderMetrics {
     return numKeyHSyncs.value();
   }
 
+  @VisibleForTesting
+  public void resetNumKeyHSyncs() {
+    numKeyHSyncs.incr(-numKeyHSyncs.value());
+  }
+
   @VisibleForTesting
   public long getNumKeyCommitFails() {
     return numKeyCommitFails.value();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
index dbcf9f6ea4..60cfcd1a2c 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
@@ -90,7 +90,7 @@ public class TestChunkStreams {
   }
 
   private BlockInputStream createStream(byte[] buf, int offset) {
-    return new BlockInputStream(null, 100, null, null, true, null) {
+    return new BlockInputStream(null, 100L, null, null, true, null) {
       private long pos;
       private final ByteArrayInputStream in =
           new ByteArrayInputStream(buf, offset, 100);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to