This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new 54a75ca0a4 HDDS-9387. [hsync] Reduce updating block length times at OM
during hsync (#6054)
54a75ca0a4 is described below
commit 54a75ca0a44190f4dc17ecce6301869cca57c3c6
Author: Sammi Chen <[email protected]>
AuthorDate: Thu Jan 25 11:04:47 2024 +0800
HDDS-9387. [hsync] Reduce updating block length times at OM during hsync
(#6054)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 43 ++++++++++------
.../client/io/BlockInputStreamFactoryImpl.java | 4 +-
.../hdds/scm/storage/DummyBlockInputStream.java | 10 ++--
.../storage/DummyBlockInputStreamWithRetry.java | 7 +--
.../hdds/scm/storage/TestBlockInputStream.java | 14 ++++--
.../hadoop/hdds/scm/storage/BlockLocationInfo.java | 10 ++++
.../client/io/BlockOutputStreamEntryPool.java | 14 +++++-
.../hadoop/ozone/client/io/KeyInputStream.java | 10 +++-
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 57 ++++++++++++++++++++--
.../java/org/apache/hadoop/ozone/om/OMMetrics.java | 5 ++
.../apache/hadoop/ozone/om/TestChunkStreams.java | 2 +-
11 files changed, 142 insertions(+), 34 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 385ea6d0c3..a12f9067ce 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -32,6 +32,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
@@ -66,7 +67,8 @@ public class BlockInputStream extends
BlockExtendedInputStream {
LoggerFactory.getLogger(BlockInputStream.class);
private final BlockID blockID;
- private final long length;
+ private long length;
+ private final BlockLocationInfo blockInfo;
private final AtomicReference<Pipeline> pipelineRef =
new AtomicReference<>();
private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
@@ -111,12 +113,13 @@ public class BlockInputStream extends
BlockExtendedInputStream {
private final Function<BlockID, BlockLocationInfo> refreshFunction;
- public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
+ public BlockInputStream(BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction) {
- this.blockID = blockId;
- this.length = blockLen;
+ this.blockInfo = blockInfo;
+ this.blockID = blockInfo.getBlockID();
+ this.length = blockInfo.getLength();
setPipeline(pipeline);
tokenRef.set(token);
this.verifyChecksum = verifyChecksum;
@@ -124,14 +127,16 @@ public class BlockInputStream extends
BlockExtendedInputStream {
this.refreshFunction = refreshFunction;
}
+ // only for unit tests
public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
- XceiverClientFactory xceiverClientFactory
- ) {
- this(blockId, blockLen, pipeline, token, verifyChecksum,
+ XceiverClientFactory xceiverClientFactory) {
+ this(new BlockLocationInfo(new
BlockLocationInfo.Builder().setBlockID(blockId).setLength(blockLen)),
+ pipeline, token, verifyChecksum,
xceiverClientFactory, null);
}
+
/**
* Initialize the BlockInputStream. Get the BlockData (list of chunks) from
* the Container and create the ChunkInputStreams for each Chunk in the
Block.
@@ -143,11 +148,17 @@ public class BlockInputStream extends
BlockExtendedInputStream {
return;
}
+ BlockData blockData = null;
List<ChunkInfo> chunks = null;
IOException catchEx = null;
do {
try {
- chunks = getChunkInfoList();
+ blockData = getBlockData();
+ chunks = blockData.getChunksList();
+ if (blockInfo != null && blockInfo.isUnderConstruction()) {
+ // use the block length from DN if block is under construction.
+ length = blockData.getSize();
+ }
break;
// If we get a StorageContainerException or an IOException due to
// datanodes are not reachable, refresh to get the latest pipeline
@@ -226,19 +237,22 @@ public class BlockInputStream extends
BlockExtendedInputStream {
/**
* Send RPC call to get the block info from the container.
- * @return List of chunks in this block.
+ * @return BlockData.
*/
- protected List<ChunkInfo> getChunkInfoList() throws IOException {
+ protected BlockData getBlockData() throws IOException {
acquireClient();
try {
- return getChunkInfoListUsingClient();
+ return getBlockDataUsingClient();
} finally {
releaseClient();
}
}
- @VisibleForTesting
- protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
+ /**
+ * Send RPC call to get the block info from the container.
+ * @return BlockData.
+ */
+ protected BlockData getBlockDataUsingClient() throws IOException {
final Pipeline pipeline = xceiverClient.getPipeline();
if (LOG.isDebugEnabled()) {
@@ -258,8 +272,7 @@ public class BlockInputStream extends
BlockExtendedInputStream {
GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get());
-
- return response.getBlockData().getChunksList();
+ return response.getBlockData();
}
private void setPipeline(Pipeline pipeline) {
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 40063f9ce4..b9233f42d5 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -84,8 +84,8 @@ public class BlockInputStreamFactoryImpl implements
BlockInputStreamFactory {
blockInfo, verifyChecksum, xceiverFactory, refreshFunction,
ecBlockStreamFactory);
} else {
- return new BlockInputStream(blockInfo.getBlockID(),
blockInfo.getLength(),
- pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
+ return new BlockInputStream(blockInfo, pipeline, token, verifyChecksum,
xceiverFactory,
+ refreshFunction);
}
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
index 3e7779f0d1..ca3199d8ac 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -49,7 +50,8 @@ class DummyBlockInputStream extends BlockInputStream {
Function<BlockID, BlockLocationInfo> refreshFunction,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunks) {
- super(blockId, blockLen, pipeline, token, verifyChecksum,
+ super(new BlockLocationInfo(new
BlockLocationInfo.Builder().setBlockID(blockId).setLength(blockLen)),
+ pipeline, token, verifyChecksum,
xceiverClientManager, refreshFunction);
this.chunkDataMap = chunks;
this.chunks = chunkList;
@@ -57,8 +59,10 @@ class DummyBlockInputStream extends BlockInputStream {
}
@Override
- protected List<ChunkInfo> getChunkInfoList() throws IOException {
- return chunks;
+ protected ContainerProtos.BlockData getBlockData() throws IOException {
+ BlockID blockID = getBlockID();
+ ContainerProtos.DatanodeBlockID datanodeBlockID =
blockID.getDatanodeBlockIDProtobuf();
+ return
ContainerProtos.BlockData.newBuilder().addAllChunks(chunks).setBlockID(datanodeBlockID).build();
}
@Override
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
index 24a3574514..d66c76dcdd 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -73,16 +74,16 @@ final class DummyBlockInputStreamWithRetry
}
@Override
- protected List<ChunkInfo> getChunkInfoList() throws IOException {
+ protected ContainerProtos.BlockData getBlockData() throws IOException {
if (getChunkInfoCount == 0) {
getChunkInfoCount++;
if (ioException != null) {
- throw ioException;
+ throw ioException;
}
throw new StorageContainerException("Exception encountered",
CONTAINER_NOT_FOUND);
} else {
- return super.getChunkInfoList();
+ return super.getBlockData();
}
}
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index 3dc5a82b33..9d1feafb9a 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -22,6 +22,7 @@ import com.google.common.primitives.Bytes;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
@@ -409,16 +410,19 @@ public class TestBlockInputStream {
.thenReturn(blockLocationInfo);
when(blockLocationInfo.getPipeline()).thenReturn(newPipeline);
- BlockInputStream subject = new BlockInputStream(blockID, blockSize,
+ BlockInputStream subject = new BlockInputStream(
+ new BlockLocationInfo(new
BlockLocationInfo.Builder().setBlockID(blockID).setLength(blockSize)),
pipeline, null, false, clientFactory, refreshFunction) {
@Override
- protected List<ChunkInfo> getChunkInfoListUsingClient() {
- return chunks;
+ protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
+ return stream;
}
@Override
- protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
- return stream;
+ protected ContainerProtos.BlockData getBlockDataUsingClient() throws
IOException {
+ BlockID blockID = getBlockID();
+ ContainerProtos.DatanodeBlockID datanodeBlockID =
blockID.getDatanodeBlockIDProtobuf();
+ return
ContainerProtos.BlockData.newBuilder().addAllChunks(chunks).setBlockID(datanodeBlockID).build();
}
};
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
index 019e16c2f1..a6b291c3f4 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
@@ -40,6 +40,8 @@ public class BlockLocationInfo {
// PartNumber is set for Multipart upload Keys.
private int partNumber;
+ // The block is under construction. Apply to hsynced file last block.
+ private boolean underConstruction;
protected BlockLocationInfo(Builder builder) {
this.blockID = builder.blockID;
@@ -111,6 +113,14 @@ public class BlockLocationInfo {
return partNumber;
}
+ public void setUnderConstruction(boolean uc) {
+ this.underConstruction = uc;
+ }
+
+ public boolean isUnderConstruction() {
+ return this.underConstruction;
+ }
+
/**
* Builder of BlockLocationInfo.
*/
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index d0f3b5728a..52ef31daf5 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
@@ -85,6 +86,8 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
private final ExcludeList excludeList;
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;
+ // update blocks on OM
+ private ContainerBlockID lastUpdatedBlockId = new ContainerBlockID(-1, -1);
@SuppressWarnings({"parameternumber", "squid:S00107"})
public BlockOutputStreamEntryPool(
@@ -368,7 +371,16 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
if (keyArgs.getIsMultipartKey()) {
throw new IOException("Hsync is unsupported for multipart keys.");
} else {
- omClient.hsyncKey(keyArgs, openID);
+ if (keyArgs.getLocationInfoList().size() == 0) {
+ omClient.hsyncKey(keyArgs, openID);
+ } else {
+ ContainerBlockID lastBLockId =
keyArgs.getLocationInfoList().get(keyArgs.getLocationInfoList().size() - 1)
+ .getBlockID().getContainerBlockID();
+ if (!lastUpdatedBlockId.equals(lastBLockId)) {
+ omClient.hsyncKey(keyArgs, openID);
+ lastUpdatedBlockId = lastBLockId;
+ }
+ }
}
} else {
LOG.warn("Closing KeyOutputStream, but key args is null");
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 4843c1c45e..6b6be1abd4 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.hdds.scm.storage.MultipartInputStream;
import org.apache.hadoop.hdds.scm.storage.PartInputStream;
+import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -61,8 +62,10 @@ public class KeyInputStream extends MultipartInputStream {
boolean verifyChecksum,
Function<OmKeyInfo, OmKeyInfo> retryFunction,
BlockInputStreamFactory blockStreamFactory) {
+ boolean isHsyncFile =
keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID);
List<BlockExtendedInputStream> partStreams = new ArrayList<>();
- for (OmKeyLocationInfo omKeyLocationInfo : blockInfos) {
+ for (int i = 0; i < blockInfos.size(); i++) {
+ OmKeyLocationInfo omKeyLocationInfo = blockInfos.get(i);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding stream for accessing {}. The stream will be " +
"initialized later.", omKeyLocationInfo);
@@ -85,6 +88,11 @@ public class KeyInputStream extends MultipartInputStream {
retry = null;
}
+ if (i == (blockInfos.size() - 1) && isHsyncFile) {
+ // block is under construction
+ omKeyLocationInfo.setUnderConstruction(true);
+ }
+
BlockExtendedInputStream stream =
blockStreamFactory.create(
keyInfo.getReplicationConfig(),
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 559b8da498..7a1c055b00 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -23,11 +23,16 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.crypto.CipherSuite;
@@ -35,7 +40,6 @@ import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -57,6 +61,7 @@ import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -107,7 +112,7 @@ public class TestHSync {
@BeforeAll
public static void init() throws Exception {
- final int chunkSize = 16 << 10;
+ final int chunkSize = 4 << 10;
final int flushSize = 2 * chunkSize;
final int maxFlushSize = 2 * flushSize;
final int blockSize = 2 * maxFlushSize;
@@ -279,6 +284,52 @@ public class TestHSync {
}
}
+ @Test
+ public void testHsyncKeyCallCount() throws Exception {
+ // Set the fs.defaultFS
+ final String rootPath = String.format("%s://%s/",
+ OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
+ CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+ final String dir = OZONE_ROOT + bucket.getVolumeName()
+ + OZONE_URI_DELIMITER + bucket.getName();
+
+ OMMetrics omMetrics = cluster.getOzoneManager().getMetrics();
+ omMetrics.resetNumKeyHSyncs();
+ final byte[] data = new byte[128];
+ ThreadLocalRandom.current().nextBytes(data);
+
+ final Path file = new Path(dir, "file-hsync-then-close");
+ long blockSize;
+ try (FileSystem fs = FileSystem.get(CONF)) {
+ blockSize = fs.getDefaultBlockSize(file);
+ long fileSize = 0;
+ try (FSDataOutputStream outputStream = fs.create(file, true)) {
+ // make sure at least writing 2 blocks data
+ while (fileSize <= blockSize) {
+ outputStream.write(data, 0, data.length);
+ outputStream.hsync();
+ fileSize += data.length;
+ }
+ }
+ }
+ assertEquals(2, omMetrics.getNumKeyHSyncs());
+
+ // test file with all blocks pre-allocated
+ omMetrics.resetNumKeyHSyncs();
+ long writtenSize = 0;
+ try (OzoneOutputStream outputStream = bucket.createKey("key-" +
RandomStringUtils.randomNumeric(5),
+ blockSize * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new
HashMap<>())) {
+ // make sure at least writing 2 blocks data
+ while (writtenSize <= blockSize) {
+ outputStream.write(data, 0, data.length);
+ outputStream.hsync();
+ writtenSize += data.length;
+ }
+ }
+ assertEquals(2, omMetrics.getNumKeyHSyncs());
+ }
+
static void runTestHSync(FileSystem fs, Path file, int initialDataSize)
throws Exception {
try (StreamWithLength out = new StreamWithLength(
@@ -409,7 +460,7 @@ public class TestHSync {
+ OZONE_URI_DELIMITER + bucket.getName();
try (FileSystem fs = FileSystem.get(CONF)) {
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 5; i++) {
final Path file = new Path(dir, "file" + i);
try (FSDataOutputStream out =
fs.create(file, true)) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 86fa867060..2fbbbe1530 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -1100,6 +1100,11 @@ public class OMMetrics implements
OmMetadataReaderMetrics {
return numKeyHSyncs.value();
}
+ @VisibleForTesting
+ public void resetNumKeyHSyncs() {
+ numKeyHSyncs.incr(-numKeyHSyncs.value());
+ }
+
@VisibleForTesting
public long getNumKeyCommitFails() {
return numKeyCommitFails.value();
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
index dbcf9f6ea4..60cfcd1a2c 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
@@ -90,7 +90,7 @@ public class TestChunkStreams {
}
private BlockInputStream createStream(byte[] buf, int offset) {
- return new BlockInputStream(null, 100, null, null, true, null) {
+ return new BlockInputStream(null, 100L, null, null, true, null) {
private long pos;
private final ByteArrayInputStream in =
new ByteArrayInputStream(buf, offset, 100);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]