This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new 76a573a104 HDDS-9752. [hsync] Make Putblock performance acceptable -
Client side (#5663)
76a573a104 is described below
commit 76a573a104b7f88a117caba1075f10dc33637c86
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Tue Feb 6 20:07:15 2024 -0800
HDDS-9752. [hsync] Make Putblock performance acceptable - Client side
(#5663)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 17 ++
.../hadoop/hdds/scm/storage/BlockInputStream.java | 6 +-
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 184 ++++++++++++++++++++-
.../DatanodeStoreWithIncrementalChunkList.java | 29 +++-
.../keyvalue/impl/TestBlockManagerImpl.java | 2 +
.../hadoop/ozone/client/MockOmTransport.java | 39 ++++-
.../TestBlockOutputStreamIncrementalPutBlock.java | 163 ++++++++++++++++++
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 179 +++++++++++++++++---
8 files changed, 585 insertions(+), 34 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 44af34cb91..d1dcc654b1 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -223,6 +223,15 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED";
+ @Config(key = "incremental.chunk.list",
+ defaultValue = "false",
+ type = ConfigType.BOOLEAN,
+ description = "Client PutBlock request can choose incremental chunk " +
+ "list rather than full chunk list to optimize performance. " +
+ "Critical to HBase.",
+ tags = ConfigTag.CLIENT)
+ private boolean incrementalChunkList = false;
+
@PostConstruct
private void validate() {
Preconditions.checkState(streamBufferSize > 0);
@@ -404,4 +413,12 @@ public class OzoneClientConfig {
public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
this.datastreamPipelineMode = datastreamPipelineMode;
}
+
+ public void setIncrementalChunkList(boolean enable) {
+ this.incrementalChunkList = enable;
+ }
+
+ public boolean getIncrementalChunkList() {
+ return this.incrementalChunkList;
+ }
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index a12f9067ce..78cef5f1b4 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -63,7 +63,7 @@ import static
org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor;
*/
public class BlockInputStream extends BlockExtendedInputStream {
- private static final Logger LOG =
+ public static final Logger LOG =
LoggerFactory.getLogger(BlockInputStream.class);
private final BlockID blockID;
@@ -256,8 +256,8 @@ public class BlockInputStream extends
BlockExtendedInputStream {
final Pipeline pipeline = xceiverClient.getPipeline();
if (LOG.isDebugEnabled()) {
- LOG.debug("Initializing BlockInputStream for get key to access {}",
- blockID.getContainerID());
+ LOG.debug("Initializing BlockInputStream for get key to access block {}",
+ blockID);
}
DatanodeBlockID.Builder blkIDBuilder =
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index ac21411ea5..1308de4f45 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hdds.scm.storage;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -80,6 +82,12 @@ public class BlockOutputStream extends OutputStream {
LoggerFactory.getLogger(BlockOutputStream.class);
public static final String EXCEPTION_MSG =
"Unexpected Storage Container Exception: ";
+ public static final String INCREMENTAL_CHUNK_LIST = "incremental";
+ public static final KeyValue INCREMENTAL_CHUNK_LIST_KV =
+ KeyValue.newBuilder().setKey(INCREMENTAL_CHUNK_LIST).build();
+ public static final String FULL_CHUNK = "full";
+ public static final KeyValue FULL_CHUNK_KV =
+ KeyValue.newBuilder().setKey(FULL_CHUNK).build();
private AtomicReference<BlockID> blockID;
private final AtomicReference<ChunkInfo> previousChunkInfo
@@ -123,6 +131,10 @@ public class BlockOutputStream extends OutputStream {
private int currentBufferRemaining;
//current buffer allocated to write
private ChunkBuffer currentBuffer;
+ // last chunk holds the buffer after the last complete chunk, which may be
+ // different from currentBuffer. We need this to calculate checksum.
+ private ByteBuffer lastChunkBuffer;
+ private long lastChunkOffset;
private final Token<? extends TokenIdentifier> token;
private final String tokenString;
private int replicationIndex;
@@ -164,6 +176,13 @@ public class BlockOutputStream extends OutputStream {
}
this.containerBlockData = BlockData.newBuilder().setBlockID(
blkIDBuilder.build()).addMetadata(keyValue);
+ // tell DataNode I will send incremental chunk list
+ if (config.getIncrementalChunkList()) {
+ this.containerBlockData.addMetadata(INCREMENTAL_CHUNK_LIST_KV);
+ this.lastChunkBuffer =
+ ByteBuffer.allocate(config.getStreamBufferSize());
+ this.lastChunkOffset = 0;
+ }
this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
this.bufferPool = bufferPool;
this.token = token;
@@ -468,6 +487,14 @@ public class BlockOutputStream extends OutputStream {
ContainerCommandResponseProto> flushFuture = null;
try {
BlockData blockData = containerBlockData.build();
+ LOG.debug("sending PutBlock {}", blockData);
+
+ if (config.getIncrementalChunkList()) {
+ // remove any chunks in the containerBlockData list.
+ // since they are sent.
+ containerBlockData.clearChunks();
+ }
+
XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, blockData, close, tokenString);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
@@ -746,7 +773,12 @@ public class BlockOutputStream extends OutputStream {
setIoException(ce);
throw ce;
});
- containerBlockData.addChunks(chunkInfo);
+ if (config.getIncrementalChunkList()) {
+ updateBlockDataForWriteChunk(chunk);
+ } else {
+ containerBlockData.addChunks(chunkInfo);
+ }
+
clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen());
return validateFuture;
} catch (IOException | ExecutionException e) {
@@ -758,6 +790,156 @@ public class BlockOutputStream extends OutputStream {
return null;
}
+ /**
+ * Update container block data, which is later sent to DataNodes via
PutBlock,
+ * using the new chunks sent out via WriteChunk.
+ *
+ * This method is only used when incremental chunk list is enabled.
+ * @param chunk the chunk buffer to be sent out by WriteChunk.
+ * @throws OzoneChecksumException
+ */
+ private void updateBlockDataForWriteChunk(ChunkBuffer chunk)
+ throws OzoneChecksumException {
+ // Update lastChunkBuffer using the new chunk data.
+ // This is used to calculate checksum for the last partial chunk in
+ // containerBlockData which will used by PutBlock.
+
+ // the last partial chunk in containerBlockData will be replaced.
+ // So remove it.
+ removeLastPartialChunk();
+ chunk.rewind();
+ LOG.debug("Adding chunk pos {} limit {} remaining {}." +
+ "lastChunkBuffer pos {} limit {} remaining {} lastChunkOffset =
{}",
+ chunk.position(), chunk.limit(), chunk.remaining(),
+ lastChunkBuffer.position(), lastChunkBuffer.limit(),
+ lastChunkBuffer.remaining(), lastChunkOffset);
+
+ // Append the chunk to the last chunk buffer.
+ // if the resulting size exceeds limit (4MB),
+ // drop the full chunk and keep the rest.
+ if (lastChunkBuffer.position() + chunk.remaining() <=
+ lastChunkBuffer.capacity()) {
+ appendLastChunkBuffer(chunk, 0, chunk.remaining());
+ } else {
+ int remainingBufferSize =
+ lastChunkBuffer.capacity() - lastChunkBuffer.position();
+ appendLastChunkBuffer(chunk, 0, remainingBufferSize);
+ updateBlockDataWithLastChunkBuffer();
+ appendLastChunkBuffer(chunk, remainingBufferSize,
+ chunk.remaining() - remainingBufferSize);
+ }
+ LOG.debug("after append, lastChunkBuffer={} lastChunkOffset={}",
+ lastChunkBuffer, lastChunkOffset);
+
+ updateBlockDataWithLastChunkBuffer();
+ }
+
+ private void updateBlockDataWithLastChunkBuffer()
+ throws OzoneChecksumException {
+ // create chunk info for lastChunkBuffer
+ ChunkInfo lastChunkInfo = createChunkInfo(lastChunkOffset);
+ LOG.debug("lastChunkInfo = {}", lastChunkInfo);
+ long lastChunkSize = lastChunkInfo.getLen();
+ addToBlockData(lastChunkInfo);
+
+ lastChunkBuffer.clear();
+ if (lastChunkSize == config.getStreamBufferSize()) {
+ lastChunkOffset += config.getStreamBufferSize();
+ } else {
+ lastChunkBuffer.position((int) lastChunkSize);
+ }
+ }
+
+ private void appendLastChunkBuffer(ChunkBuffer chunkBuffer, int offset,
+ int length) {
+ LOG.debug("copying to last chunk buffer offset={} length={}",
+ offset, length);
+ int pos = 0;
+ int uncopied = length;
+ for (ByteBuffer bb : chunkBuffer.asByteBufferList()) {
+ if (pos + bb.remaining() >= offset) {
+ int copyStart = offset < pos ? 0 : offset - pos;
+ int copyLen = Math.min(uncopied, bb.remaining());
+ try {
+ LOG.debug("put into last chunk buffer start = {} len = {}",
+ copyStart, copyLen);
+ lastChunkBuffer.put(bb.array(), copyStart, copyLen);
+ } catch (BufferOverflowException e) {
+ LOG.error("appending from " + copyStart + " for len=" + copyLen +
+ ". lastChunkBuffer remaining=" + lastChunkBuffer.remaining() +
+ " pos=" + lastChunkBuffer.position() +
+ " limit=" + lastChunkBuffer.limit() +
+ " capacity=" + lastChunkBuffer.capacity());
+ throw e;
+ }
+
+ uncopied -= copyLen;
+ }
+
+ pos += bb.remaining();
+ if (pos >= offset + length) {
+ return;
+ }
+ if (uncopied == 0) {
+ return;
+ }
+ }
+ }
+
+ private void removeLastPartialChunk() {
+ // remove the last chunk if it's partial.
+ if (containerBlockData.getChunksList().isEmpty()) {
+ return;
+ }
+ int lastChunkIndex = containerBlockData.getChunksCount() - 1;
+ ChunkInfo lastChunkInBlockData = containerBlockData.getChunks(
+ lastChunkIndex);
+ if (!isFullChunk(lastChunkInBlockData)) {
+ containerBlockData.removeChunks(lastChunkIndex);
+ }
+ }
+
+ private ChunkInfo createChunkInfo(long lastPartialChunkOffset)
+ throws OzoneChecksumException {
+ lastChunkBuffer.flip();
+ int revisedChunkSize = lastChunkBuffer.remaining();
+ // create the chunk info to be sent in PutBlock.
+ ChecksumData revisedChecksumData =
+ checksum.computeChecksum(lastChunkBuffer);
+
+ long chunkID = lastPartialChunkOffset / config.getStreamBufferSize();
+ ChunkInfo.Builder revisedChunkInfo = ChunkInfo.newBuilder()
+ .setChunkName(blockID.get().getLocalID() + "_chunk_" + chunkID)
+ .setOffset(lastPartialChunkOffset)
+ .setLen(revisedChunkSize)
+ .setChecksumData(revisedChecksumData.getProtoBufMessage());
+ // if full chunk
+ if (revisedChunkSize == config.getStreamBufferSize()) {
+ revisedChunkInfo.addMetadata(FULL_CHUNK_KV);
+ }
+ return revisedChunkInfo.build();
+ }
+
+ private boolean isFullChunk(ChunkInfo chunkInfo) {
+ Preconditions.checkState(
+ chunkInfo.getLen() <= config.getStreamBufferSize());
+ return chunkInfo.getLen() == config.getStreamBufferSize();
+ }
+
+ private void addToBlockData(ChunkInfo revisedChunkInfo) {
+ LOG.debug("containerBlockData chunk: {}", containerBlockData);
+ if (containerBlockData.getChunksCount() > 0) {
+ ChunkInfo lastChunk = containerBlockData.getChunks(
+ containerBlockData.getChunksCount() - 1);
+ LOG.debug("revisedChunkInfo chunk: {}", revisedChunkInfo);
+ Preconditions.checkState(lastChunk.getOffset() + lastChunk.getLen() ==
+ revisedChunkInfo.getOffset(),
+ "lastChunk.getOffset() + lastChunk.getLen() " +
+ "!= revisedChunkInfo.getOffset()");
+ }
+ containerBlockData.addChunks(revisedChunkInfo);
+ }
+
@VisibleForTesting
public void setXceiverClient(XceiverClientSpi xceiverClient) {
this.xceiverClient = xceiverClient;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
index 51e4533500..84000ba2fb 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
@@ -94,12 +94,16 @@ public class DatanodeStoreWithIncrementalChunkList extends
AbstractDatanodeStore
LOG.debug("blockData={}, lastChunk={}",
blockData.getChunks(), lastChunk.getChunks());
Preconditions.checkState(lastChunk.getChunks().size() == 1);
- ContainerProtos.ChunkInfo lastChunkInBlockData =
- blockData.getChunks().get(blockData.getChunks().size() - 1);
- Preconditions.checkState(
- lastChunkInBlockData.getOffset() + lastChunkInBlockData.getLen()
- == lastChunk.getChunks().get(0).getOffset(),
- "chunk offset does not match");
+ if (!blockData.getChunks().isEmpty()) {
+ ContainerProtos.ChunkInfo lastChunkInBlockData =
+ blockData.getChunks().get(blockData.getChunks().size() - 1);
+ if (lastChunkInBlockData != null) {
+ Preconditions.checkState(
+ lastChunkInBlockData.getOffset() + lastChunkInBlockData.getLen()
+ == lastChunk.getChunks().get(0).getOffset(),
+ "chunk offset does not match");
+ }
+ }
// append last partial chunk to the block data
List<ContainerProtos.ChunkInfo> chunkInfos =
@@ -136,7 +140,7 @@ public class DatanodeStoreWithIncrementalChunkList extends
AbstractDatanodeStore
public void putBlockByID(BatchOperation batch, boolean incremental,
long localID, BlockData data, KeyValueContainerData containerData,
boolean endOfBlock) throws IOException {
- if (!incremental && !isPartialChunkList(data)) {
+ if (!incremental || !isPartialChunkList(data)) {
// Case (1) old client: override chunk list.
getBlockDataTable().putWithBatch(
batch, containerData.getBlockKey(localID), data);
@@ -151,14 +155,21 @@ public class DatanodeStoreWithIncrementalChunkList
extends AbstractDatanodeStore
private void moveLastChunkToBlockData(BatchOperation batch, long localID,
BlockData data, KeyValueContainerData containerData) throws IOException {
+ // if data has no chunks, fetch the last chunk info from lastChunkInfoTable
+ if (data.getChunks().isEmpty()) {
+ BlockData lastChunk =
getLastChunkInfoTable().get(containerData.getBlockKey(localID));
+ if (lastChunk != null) {
+ reconcilePartialChunks(lastChunk, data);
+ }
+ }
// if eob or if the last chunk is full,
// the 'data' is full so append it to the block table's chunk info
// and then remove from lastChunkInfo
BlockData blockData = getBlockDataTable().get(
containerData.getBlockKey(localID));
if (blockData == null) {
- // Case 2.1 if the block did not have full chunks before,
- // the block's chunk is what received from client this time.
+ // Case 2.1 if the block did not have full chunks before
+ // the block's chunk is what received from client this time, plus the
chunks in lastChunkInfoTable
blockData = data;
} else {
// case 2.2 the block already has some full chunks
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
index 38a01e4690..26d959e886 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_LIST_INCREMENTAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
@@ -83,6 +84,7 @@ public class TestBlockManagerImpl {
this.schemaVersion = versionInfo.getSchemaVersion();
this.config = new OzoneConfiguration();
ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, config);
+ config.setBoolean(OZONE_CHUNK_LIST_INCREMENTAL, true);
initilaze();
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
index e4a8a80a63..31f5e20bc8 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
@@ -56,6 +56,8 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Service
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
@@ -67,6 +69,8 @@ import java.util.function.Function;
* OM transport for testing with in-memory state.
*/
public class MockOmTransport implements OmTransport {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MockOmTransport.class);
private final MockBlockAllocator blockAllocator;
//volumename -> volumeinfo
@@ -185,11 +189,44 @@ public class MockOmTransport implements OmTransport {
.build();
}
+ private boolean isHSync(CommitKeyRequest commitKeyRequest) {
+ return commitKeyRequest.hasHsync() && commitKeyRequest.getHsync();
+ }
+
+ private boolean isRecovery(CommitKeyRequest commitKeyRequest) {
+ return commitKeyRequest.hasRecovery() && commitKeyRequest.getRecovery();
+ }
+
+ private String toOperationString(CommitKeyRequest commitKeyRequest) {
+ boolean hsync = isHSync(commitKeyRequest);
+ boolean recovery = isRecovery(commitKeyRequest);
+ if (hsync) {
+ return "hsync";
+ }
+ if (recovery) {
+ return "recover";
+ }
+ return "commit";
+ }
+
+
private CommitKeyResponse commitKey(CommitKeyRequest commitKeyRequest) {
final KeyArgs keyArgs = commitKeyRequest.getKeyArgs();
final KeyInfo openKey =
openKeys.get(keyArgs.getVolumeName()).get(keyArgs.getBucketName())
- .remove(keyArgs.getKeyName());
+ .get(keyArgs.getKeyName());
+ LOG.debug("{} open key vol: {} bucket: {} key: {}",
+ toOperationString(commitKeyRequest),
+ keyArgs.getVolumeName(),
+ keyArgs.getBucketName(),
+ keyArgs.getKeyName());
+ boolean hsync = isHSync(commitKeyRequest);
+ if (!hsync) {
+ KeyInfo deleteKey = openKeys.get(keyArgs.getVolumeName())
+ .get(keyArgs.getBucketName())
+ .remove(keyArgs.getKeyName());
+ assert deleteKey != null;
+ }
final KeyInfo.Builder committedKeyInfoWithLocations =
KeyInfo.newBuilder().setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java
new file mode 100644
index 0000000000..68549e4104
--- /dev/null
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.UUID;
+
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
+import org.jetbrains.annotations.NotNull;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.InMemoryConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_LIST_INCREMENTAL;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+
+/**
+ * Verify BlockOutputStream with incremental PutBlock feature.
+ * (ozone.client.incremental.chunk.list = true)
+ */
+public class TestBlockOutputStreamIncrementalPutBlock {
+ private OzoneClient client;
+ private final String keyName = UUID.randomUUID().toString();
+ private final String volumeName = UUID.randomUUID().toString();
+ private final String bucketName = UUID.randomUUID().toString();
+ private OzoneBucket bucket;
+ private final ConfigurationSource config = new InMemoryConfiguration();
+
+ public static Iterable<Boolean> parameters() {
+ return Arrays.asList(true, false);
+ }
+
+ private void init(boolean incrementalChunkList) throws IOException {
+ OzoneClientConfig clientConfig = config.getObject(OzoneClientConfig.class);
+
+ clientConfig.setIncrementalChunkList(incrementalChunkList);
+ clientConfig.setChecksumType(ContainerProtos.ChecksumType.CRC32C);
+
+ ((InMemoryConfiguration)config).setFromObject(clientConfig);
+
+ ((InMemoryConfiguration) config).setBoolean(
+ OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
+ ((InMemoryConfiguration) config).setBoolean(
+ OZONE_CHUNK_LIST_INCREMENTAL, incrementalChunkList);
+
+ RpcClient rpcClient = new RpcClient(config, null) {
+
+ @Override
+ protected OmTransport createOmTransport(
+ String omServiceId)
+ throws IOException {
+ return new MockOmTransport();
+ }
+
+ @NotNull
+ @Override
+ protected XceiverClientFactory createXceiverClientFactory(
+ ServiceInfoEx serviceInfo) throws IOException {
+ return new MockXceiverClientFactory();
+ }
+ };
+
+ client = new OzoneClient(config, rpcClient);
+ ObjectStore store = client.getObjectStore();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ bucket = volume.getBucket(bucketName);
+ }
+
+ @AfterEach
+ public void close() throws IOException {
+ client.close();
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void writeSmallChunk(boolean incrementalChunkList)
+ throws IOException {
+ init(incrementalChunkList);
+
+ int size = 1024;
+ String s = RandomStringUtils.randomAlphabetic(1024);
+ ByteBuffer byteBuffer =
ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
+
+ try (OzoneOutputStream out = bucket.createKey(keyName, size,
+ ReplicationConfig.getDefault(config), new HashMap<>())) {
+ for (int i = 0; i < 4097; i++) {
+ out.write(byteBuffer);
+ out.hsync();
+ }
+ }
+
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ ByteBuffer readBuffer = ByteBuffer.allocate(size);
+ for (int i = 0; i < 4097; i++) {
+ is.read(readBuffer);
+ assertArrayEquals(readBuffer.array(), byteBuffer.array());
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void writeLargeChunk(boolean incrementalChunkList)
+ throws IOException {
+ init(incrementalChunkList);
+
+ int size = 1024 * 1024 + 1;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(size);
+
+ try (OzoneOutputStream out = bucket.createKey(keyName, size,
+ ReplicationConfig.getDefault(config), new HashMap<>())) {
+ for (int i = 0; i < 4; i++) {
+ out.write(byteBuffer);
+ out.hsync();
+ }
+ }
+
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ ByteBuffer readBuffer = ByteBuffer.allocate(size);
+ for (int i = 0; i < 4; i++) {
+ is.read(readBuffer);
+ assertArrayEquals(readBuffer.array(), byteBuffer.array());
+ }
+ }
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 8d7439604e..cd318e88ba 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.ozone;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
@@ -28,12 +30,19 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.crypto.CipherSuite;
@@ -58,8 +67,10 @@ import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
@@ -82,11 +93,16 @@ import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_LIST_INCREMENTAL;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
@@ -96,11 +112,13 @@ import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOU
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -120,12 +138,13 @@ public class TestHSync {
private static OzoneClient client;
private static final BucketLayout BUCKET_LAYOUT =
BucketLayout.FILE_SYSTEM_OPTIMIZED;
+ private static final int CHUNK_SIZE = 4 << 12;
+ private static final int FLUSH_SIZE = 2 * CHUNK_SIZE;
+ private static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;
+ private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
+
@BeforeAll
public static void init() throws Exception {
- final int chunkSize = 4 << 10;
- final int flushSize = 2 * chunkSize;
- final int maxFlushSize = 2 * flushSize;
- final int blockSize = 2 * maxFlushSize;
final BucketLayout layout = BUCKET_LAYOUT;
CONF.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false);
@@ -133,17 +152,19 @@ public class TestHSync {
CONF.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
// Reduce KeyDeletingService interval
CONF.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+ CONF.setBoolean("ozone.client.incremental.chunk.list", true);
+ CONF.setBoolean(OZONE_CHUNK_LIST_INCREMENTAL, true);
cluster = MiniOzoneCluster.newBuilder(CONF)
.setNumDatanodes(5)
.setTotalPipelineNumLimit(10)
- .setBlockSize(blockSize)
- .setChunkSize(chunkSize)
- .setStreamBufferFlushSize(flushSize)
- .setStreamBufferMaxSize(maxFlushSize)
- .setDataStreamBufferFlushize(maxFlushSize)
+ .setBlockSize(BLOCK_SIZE)
+ .setChunkSize(CHUNK_SIZE)
+ .setStreamBufferFlushSize(FLUSH_SIZE)
+ .setStreamBufferMaxSize(MAX_FLUSH_SIZE)
+ .setDataStreamBufferFlushize(MAX_FLUSH_SIZE)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
- .setDataStreamMinPacketSize(chunkSize)
- .setDataStreamStreamWindowSize(5 * chunkSize)
+ .setDataStreamMinPacketSize(CHUNK_SIZE)
+ .setDataStreamStreamWindowSize(5 * CHUNK_SIZE)
.build();
cluster.waitForClusterToBeReady();
client = cluster.newClient();
@@ -155,6 +176,8 @@ public class TestHSync {
GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(BlockOutputStream.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG);
}
@AfterAll
@@ -287,13 +310,15 @@ public class TestHSync {
}
}
- @Test
- public void testO3fsHSync() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testO3fsHSync(boolean incrementalChunkList) throws Exception {
// Set the fs.defaultFS
final String rootPath = String.format("%s://%s.%s/",
OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+ initClientConfig(incrementalChunkList);
try (FileSystem fs = FileSystem.get(CONF)) {
for (int i = 0; i < 10; i++) {
final Path file = new Path("/file" + i);
@@ -302,8 +327,10 @@ public class TestHSync {
}
}
- @Test
- public void testOfsHSync() throws Exception {
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testOfsHSync(boolean incrementalChunkList) throws Exception {
// Set the fs.defaultFS
final String rootPath = String.format("%s://%s/",
OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
@@ -312,6 +339,7 @@ public class TestHSync {
final String dir = OZONE_ROOT + bucket.getVolumeName()
+ OZONE_URI_DELIMITER + bucket.getName();
+ initClientConfig(incrementalChunkList);
try (FileSystem fs = FileSystem.get(CONF)) {
for (int i = 0; i < 10; i++) {
final Path file = new Path(dir, "file" + i);
@@ -429,13 +457,11 @@ public class TestHSync {
ThreadLocalRandom.current().nextBytes(data);
final Path file = new Path(dir, "file-hsync-then-close");
- long blockSize;
try (FileSystem fs = FileSystem.get(CONF)) {
- blockSize = fs.getDefaultBlockSize(file);
long fileSize = 0;
try (FSDataOutputStream outputStream = fs.create(file, true)) {
// make sure at least writing 2 blocks data
- while (fileSize <= blockSize) {
+ while (fileSize <= BLOCK_SIZE) {
outputStream.write(data, 0, data.length);
outputStream.hsync();
fileSize += data.length;
@@ -448,9 +474,9 @@ public class TestHSync {
omMetrics.resetNumKeyHSyncs();
long writtenSize = 0;
try (OzoneOutputStream outputStream = bucket.createKey("key-" +
RandomStringUtils.randomNumeric(5),
- blockSize * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new
HashMap<>())) {
+ BLOCK_SIZE * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new
HashMap<>())) {
// make sure at least writing 2 blocks data
- while (writtenSize <= blockSize) {
+ while (writtenSize <= BLOCK_SIZE) {
outputStream.write(data, 0, data.length);
outputStream.hsync();
writtenSize += data.length;
@@ -733,4 +759,117 @@ public class TestHSync {
assertFalse(cofsos.hasCapability(StreamCapabilities.HFLUSH));
}
}
+
+ public void initClientConfig(boolean incrementalChunkList) {
+ OzoneClientConfig clientConfig = CONF.getObject(OzoneClientConfig.class);
+ clientConfig.setIncrementalChunkList(incrementalChunkList);
+ clientConfig.setChecksumType(ContainerProtos.ChecksumType.CRC32C);
+ CONF.setFromObject(clientConfig);
+ }
+
+ public static Stream<Arguments> parameters1() {
+ return Stream.of(
+ arguments(true, 512),
+ arguments(true, 511),
+ arguments(true, 513),
+ arguments(false, 512),
+ arguments(false, 511),
+ arguments(false, 513)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters1")
+ public void writeWithSmallBuffer(boolean incrementalChunkList, int
bufferSize)
+ throws IOException {
+ initClientConfig(incrementalChunkList);
+
+ final String keyName = UUID.randomUUID().toString();
+ int fileSize = 16 << 11;
+ String s = RandomStringUtils.randomAlphabetic(bufferSize);
+ ByteBuffer byteBuffer =
ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
+
+ int writtenSize = 0;
+ try (OzoneOutputStream out = bucket.createKey(keyName, fileSize,
+ ReplicationConfig.getDefault(CONF), new HashMap<>())) {
+ while (writtenSize < fileSize) {
+ int len = Math.min(bufferSize, fileSize - writtenSize);
+ out.write(byteBuffer, 0, len);
+ out.hsync();
+ writtenSize += bufferSize;
+ }
+ }
+
+ OzoneKeyDetails keyInfo = bucket.getKey(keyName);
+ assertEquals(fileSize, keyInfo.getDataSize());
+
+ int readSize = 0;
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ while (readSize < fileSize) {
+ int len = Math.min(bufferSize, fileSize - readSize);
+ ByteBuffer readBuffer = ByteBuffer.allocate(len);
+ int readLen = is.read(readBuffer);
+ assertEquals(len, readLen);
+ if (len < bufferSize) {
+ for (int i = 0; i < len; i++) {
+ assertEquals(readBuffer.array()[i], byteBuffer.array()[i]);
+ }
+ } else {
+ assertArrayEquals(readBuffer.array(), byteBuffer.array());
+ }
+ readSize += readLen;
+ }
+ }
+ bucket.deleteKey(keyName);
+ }
+
+ public static Stream<Arguments> parameters2() {
+ return Stream.of(
+ arguments(true, 1024 * 1024 + 1),
+ arguments(true, 1024 * 1024 + 1 + CHUNK_SIZE),
+ arguments(true, 1024 * 1024 - 1 + CHUNK_SIZE),
+ arguments(false, 1024 * 1024 + 1),
+ arguments(false, 1024 * 1024 + 1 + CHUNK_SIZE),
+ arguments(false, 1024 * 1024 - 1 + CHUNK_SIZE)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters2")
+ public void writeWithBigBuffer(boolean incrementalChunkList, int bufferSize)
+ throws IOException {
+ initClientConfig(incrementalChunkList);
+
+ final String keyName = UUID.randomUUID().toString();
+ int count = 2;
+ int fileSize = bufferSize * count;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
+
+ try (OzoneOutputStream out = bucket.createKey(keyName, fileSize,
+ ReplicationConfig.getDefault(CONF), new HashMap<>())) {
+ for (int i = 0; i < count; i++) {
+ out.write(byteBuffer);
+ out.hsync();
+ }
+ }
+
+ OzoneKeyDetails keyInfo = bucket.getKey(keyName);
+ assertEquals(fileSize, keyInfo.getDataSize());
+ int totalReadLen = 0;
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+
+ for (int i = 0; i < count; i++) {
+ ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize);
+ int readLen = is.read(readBuffer);
+ if (bufferSize != readLen) {
+ throw new IOException("failed to read " + bufferSize + " from offset
" + totalReadLen +
+ ", actually read " + readLen + ", block " + totalReadLen /
+ BLOCK_SIZE);
+ }
+ assertArrayEquals(byteBuffer.array(), readBuffer.array());
+ totalReadLen += readLen;
+ }
+ }
+ bucket.deleteKey(keyName);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]