This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new 305a176b7d HDDS-9751. [hsync] Make Putblock performance acceptable -
DataNode side (#5662)
305a176b7d is described below
commit 305a176b7d479a3ef40171a21ffc7361454f682e
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Tue Jan 23 10:20:48 2024 -0800
HDDS-9751. [hsync] Make Putblock performance acceptable - DataNode side
(#5662)
---
.../container/keyvalue/impl/BlockManagerImpl.java | 3 +-
.../metadata/AbstractDatanodeDBDefinition.java | 1 +
.../ozone/container/metadata/DatanodeStore.java | 3 +-
.../metadata/DatanodeStoreSchemaThreeImpl.java | 2 +-
.../metadata/DatanodeStoreSchemaTwoImpl.java | 2 +-
.../DatanodeStoreWithIncrementalChunkList.java | 226 +++++++++++++++++++++
.../keyvalue/impl/TestBlockManagerImpl.java | 182 +++++++++++++++++
.../hadoop/ozone/client/MockDatanodeStorage.java | 132 ++++++++++--
.../hadoop/ozone/client/MockXceiverClientSpi.java | 3 +-
9 files changed, 525 insertions(+), 29 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 1977b11531..e40434f508 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -364,6 +364,7 @@ public class BlockManagerImpl implements BlockManager {
private BlockData getBlockByID(DBHandle db, BlockID blockID,
KeyValueContainerData containerData) throws IOException {
- return db.getStore().getBlockByID(blockID, containerData);
+ String blockKey = containerData.getBlockKey(blockID.getLocalID());
+ return db.getStore().getBlockByID(blockID, blockKey);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
index fec2a3f7d2..b2c62dfcbd 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
@@ -77,6 +77,7 @@ public abstract class AbstractDatanodeDBDefinition implements
DBDefinition {
public DBColumnFamilyDefinition<String, Long>
getFinalizeBlocksColumnFamily() {
return null;
}
+
public abstract DBColumnFamilyDefinition<String, BlockData>
getLastChunkInfoColumnFamily();
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
index 4abfb60c4f..35f8bf8322 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
@@ -130,8 +130,7 @@ public interface DatanodeStore extends Closeable {
}
default BlockData getBlockByID(BlockID blockID,
- KeyValueContainerData containerData) throws IOException {
- String blockKey = containerData.getBlockKey(blockID.getLocalID());
+ String blockKey) throws IOException {
// check block data table
BlockData blockData = getBlockDataTable().get(blockKey);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
index 7f37c9ae51..25479a7a9c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
@@ -53,7 +53,7 @@ import static
org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDe
* - All keys have containerID as prefix.
* - The table 3 has String as key instead of Long since we want to use prefix.
*/
-public class DatanodeStoreSchemaThreeImpl extends AbstractDatanodeStore
+public class DatanodeStoreSchemaThreeImpl extends
DatanodeStoreWithIncrementalChunkList
implements DeleteTransactionStore<String> {
public static final String DUMP_FILE_SUFFIX = ".data";
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
index f09d30e45a..c9ea52b47c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
@@ -31,7 +31,7 @@ import java.io.IOException;
* 2. A metadata table.
* 3. A Delete Transaction Table.
*/
-public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore
+public class DatanodeStoreSchemaTwoImpl extends
DatanodeStoreWithIncrementalChunkList
implements DeleteTransactionStore<Long> {
private final Table<Long, DeletedBlocksTransaction>
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
new file mode 100644
index 0000000000..51e4533500
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.metadata;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
+import static
org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK;
+import static
org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.INCREMENTAL_CHUNK_LIST;
+
+/**
+ * Constructs a datanode store in accordance with schema version 2, which uses
+ * three column families/tables:
+ * 1. A block data table.
+ * 2. A metadata table.
+ * 3. A Delete Transaction Table.
+ */
+public class DatanodeStoreWithIncrementalChunkList extends
AbstractDatanodeStore {
+ /**
+ * Constructs the metadata store and starts the DB services.
+ *
+ * @param config - Ozone Configuration.
+ * @throws IOException - on Failure.
+ */
+ public DatanodeStoreWithIncrementalChunkList(ConfigurationSource config,
+ AbstractDatanodeDBDefinition dbDef, boolean openReadOnly) throws
IOException {
+ super(config, dbDef, openReadOnly);
+ }
+
+
+ @Override
+ public BlockData getBlockByID(BlockID blockID,
+ String blockKey) throws IOException {
+ BlockData lastChunk = null;
+ // check block data table
+ BlockData blockData = getBlockDataTable().get(blockKey);
+ if (blockData == null || isPartialChunkList(blockData)) {
+ // check last chunk table
+ lastChunk = getLastChunkInfoTable().get(blockKey);
+ }
+
+ if (blockData == null) {
+ if (lastChunk == null) {
+ throw new StorageContainerException(
+ NO_SUCH_BLOCK_ERR_MSG + " BlockID : " + blockID, NO_SUCH_BLOCK);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("blockData=(null), lastChunk={}", lastChunk.getChunks());
+ }
+ return lastChunk;
+ }
+ } else {
+ if (lastChunk != null) {
+ reconcilePartialChunks(lastChunk, blockData);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("blockData={}, lastChunk=(null)", blockData.getChunks());
+ }
+ }
+ }
+
+ return blockData;
+ }
+
+ private void reconcilePartialChunks(
+ BlockData lastChunk, BlockData blockData) {
+ LOG.debug("blockData={}, lastChunk={}",
+ blockData.getChunks(), lastChunk.getChunks());
+ Preconditions.checkState(lastChunk.getChunks().size() == 1);
+ ContainerProtos.ChunkInfo lastChunkInBlockData =
+ blockData.getChunks().get(blockData.getChunks().size() - 1);
+ Preconditions.checkState(
+ lastChunkInBlockData.getOffset() + lastChunkInBlockData.getLen()
+ == lastChunk.getChunks().get(0).getOffset(),
+ "chunk offset does not match");
+
+ // append last partial chunk to the block data
+ List<ContainerProtos.ChunkInfo> chunkInfos =
+ new ArrayList<>(blockData.getChunks());
+ chunkInfos.add(lastChunk.getChunks().get(0));
+ blockData.setChunks(chunkInfos);
+
+ blockData.setBlockCommitSequenceId(
+ lastChunk.getBlockCommitSequenceId());
+ }
+
+ private static boolean isPartialChunkList(BlockData data) {
+ return data.getMetadata().containsKey(INCREMENTAL_CHUNK_LIST);
+ }
+
+ private static boolean isFullChunk(ContainerProtos.ChunkInfo chunkInfo) {
+ for (ContainerProtos.KeyValue kv: chunkInfo.getMetadataList()) {
+ if (kv.getKey().equals(FULL_CHUNK)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // if eob or if the last chunk is full,
+ private static boolean shouldAppendLastChunk(boolean endOfBlock,
+ BlockData data) {
+ if (endOfBlock || data.getChunks().isEmpty()) {
+ return true;
+ }
+ return isFullChunk(data.getChunks().get(data.getChunks().size() - 1));
+ }
+
+ public void putBlockByID(BatchOperation batch, boolean incremental,
+ long localID, BlockData data, KeyValueContainerData containerData,
+ boolean endOfBlock) throws IOException {
+ if (!incremental && !isPartialChunkList(data)) {
+ // Case (1) old client: override chunk list.
+ getBlockDataTable().putWithBatch(
+ batch, containerData.getBlockKey(localID), data);
+ } else if (shouldAppendLastChunk(endOfBlock, data)) {
+ moveLastChunkToBlockData(batch, localID, data, containerData);
+ } else {
+ // incremental chunk list,
+ // not end of block, has partial chunks
+ putBlockWithPartialChunks(batch, localID, data, containerData);
+ }
+ }
+
+ private void moveLastChunkToBlockData(BatchOperation batch, long localID,
+ BlockData data, KeyValueContainerData containerData) throws IOException {
+ // if eob or if the last chunk is full,
+ // the 'data' is full so append it to the block table's chunk info
+ // and then remove from lastChunkInfo
+ BlockData blockData = getBlockDataTable().get(
+ containerData.getBlockKey(localID));
+ if (blockData == null) {
+ // Case 2.1 if the block did not have full chunks before,
+ // the block's chunk is what received from client this time.
+ blockData = data;
+ } else {
+ // case 2.2 the block already has some full chunks
+ List<ContainerProtos.ChunkInfo> chunkInfoList = blockData.getChunks();
+ blockData.setChunks(new ArrayList<>(chunkInfoList));
+ for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
+ blockData.addChunk(chunk);
+ }
+ blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
+ }
+ // delete the entry from last chunk info table
+ getLastChunkInfoTable().deleteWithBatch(
+ batch, containerData.getBlockKey(localID));
+ // update block data table
+ getBlockDataTable().putWithBatch(batch,
+ containerData.getBlockKey(localID), blockData);
+ }
+
+ private void putBlockWithPartialChunks(BatchOperation batch, long localID,
+ BlockData data, KeyValueContainerData containerData) throws IOException {
+ if (data.getChunks().size() == 1) {
+ // Case (3.1) replace/update the last chunk info table
+ getLastChunkInfoTable().putWithBatch(
+ batch, containerData.getBlockKey(localID), data);
+ } else {
+ int lastChunkIndex = data.getChunks().size() - 1;
+ // received more than one chunk this time
+ List<ContainerProtos.ChunkInfo> lastChunkInfo =
+ Collections.singletonList(
+ data.getChunks().get(lastChunkIndex));
+ BlockData blockData = getBlockDataTable().get(
+ containerData.getBlockKey(localID));
+ if (blockData == null) {
+ // Case 3.2: if the block does not exist in the block data table
+ List<ContainerProtos.ChunkInfo> chunkInfos =
+ new ArrayList<>(data.getChunks());
+ chunkInfos.remove(lastChunkIndex);
+ data.setChunks(chunkInfos);
+ blockData = data;
+ LOG.debug("block {} does not have full chunks yet. Adding the " +
+ "chunks to it {}", localID, blockData);
+ } else {
+ // Case 3.3: if the block exists in the block data table,
+ // append chunks till except the last one (supposedly partial)
+ List<ContainerProtos.ChunkInfo> chunkInfos =
+ new ArrayList<>(blockData.getChunks());
+
+ LOG.debug("blockData.getChunks()={}", chunkInfos);
+ LOG.debug("data.getChunks()={}", data.getChunks());
+
+ for (int i = 0; i < lastChunkIndex; i++) {
+ chunkInfos.add(data.getChunks().get(i));
+ }
+ blockData.setChunks(chunkInfos);
+ blockData.setBlockCommitSequenceId(data.getBlockCommitSequenceId());
+ }
+ getBlockDataTable().putWithBatch(batch,
+ containerData.getBlockKey(localID), blockData);
+ // update the last partial chunk
+ data.setChunks(lastChunkInfo);
+ getLastChunkInfoTable().putWithBatch(
+ batch, containerData.getBlockKey(localID), data);
+ }
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
index 57ea7234e9..38a01e4690 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java
@@ -36,8 +36,10 @@ import
org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.io.TempDir;
+import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
@@ -45,6 +47,9 @@ import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
+import static
org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.FULL_CHUNK;
+import static
org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl.INCREMENTAL_CHUNK_LIST;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
@@ -222,4 +227,181 @@ public class TestBlockManagerImpl {
assertNotNull(listBlockData);
assertEquals(10, listBlockData.size());
}
+
+ private BlockData createBlockData(long containerID, long blockNo,
+ int chunkID, long offset, long len, long bcsID)
+ throws IOException {
+ blockID1 = new BlockID(containerID, blockNo);
+ blockData = new BlockData(blockID1);
+ List<ContainerProtos.ChunkInfo> chunkList1 = new ArrayList<>();
+ ChunkInfo info1 = new ChunkInfo(String.format("%d_chunk_%d", blockID1
+ .getLocalID(), chunkID), offset, len);
+ chunkList1.add(info1.getProtoBufMessage());
+ blockData.setChunks(chunkList1);
+ blockData.setBlockCommitSequenceId(bcsID);
+ blockData.addMetadata(INCREMENTAL_CHUNK_LIST, "");
+
+ return blockData;
+ }
+
+ private BlockData createBlockDataWithOneFullChunk(long containerID,
+ long blockNo, int chunkID, long offset, long len, long bcsID)
+ throws IOException {
+ blockID1 = new BlockID(containerID, blockNo);
+ blockData = new BlockData(blockID1);
+ List<ContainerProtos.ChunkInfo> chunkList1 = new ArrayList<>();
+ ChunkInfo info1 = new ChunkInfo(String.format("%d_chunk_%d", blockID1
+ .getLocalID(), 1), 0, 4 * 1024 * 1024);
+ info1.addMetadata(FULL_CHUNK, "");
+
+ ChunkInfo info2 = new ChunkInfo(String.format("%d_chunk_%d", blockID1
+ .getLocalID(), chunkID), offset, len);
+ chunkList1.add(info1.getProtoBufMessage());
+ chunkList1.add(info2.getProtoBufMessage());
+ blockData.setChunks(chunkList1);
+ blockData.setBlockCommitSequenceId(bcsID);
+ blockData.addMetadata(INCREMENTAL_CHUNK_LIST, "");
+
+ return blockData;
+ }
+
+ private BlockData createBlockDataWithThreeFullChunks(long containerID,
+ long blockNo, long bcsID) throws IOException {
+ blockID1 = new BlockID(containerID, blockNo);
+ blockData = new BlockData(blockID1);
+ List<ContainerProtos.ChunkInfo> chunkList1 = new ArrayList<>();
+ long chunkLimit = 4 * 1024 * 1024;
+ for (int i = 1; i < 4; i++) {
+ ChunkInfo info1 = new ChunkInfo(
+ String.format("%d_chunk_%d", blockID1.getLocalID(), i),
+ chunkLimit * i, chunkLimit);
+ info1.addMetadata(FULL_CHUNK, "");
+ chunkList1.add(info1.getProtoBufMessage());
+ }
+ blockData.setChunks(chunkList1);
+ blockData.setBlockCommitSequenceId(bcsID);
+ blockData.addMetadata(INCREMENTAL_CHUNK_LIST, "");
+
+ return blockData;
+ }
+
+ @ContainerTestVersionInfo.ContainerTest
+ public void testFlush1(ContainerTestVersionInfo versionInfo)
+ throws Exception {
+ initTest(versionInfo);
+ Assumptions.assumeFalse(
+ isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V1));
+ // simulates writing 1024 bytes, hsync,
+ // write another 1024 bytes, hsync
+ // write another 1024 bytes, hsync
+ long containerID = 1;
+ long blockNo = 2;
+ // put 1st chunk
+ blockData1 = createBlockData(containerID, blockNo, 1, 0, 1024,
+ 1);
+ blockManager.putBlock(keyValueContainer, blockData1, false);
+ // put 2nd chunk
+ BlockData blockData2 = createBlockData(containerID, blockNo, 1, 0, 2048,
+ 2);
+ blockManager.putBlock(keyValueContainer, blockData2, false);
+ assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
+
+ BlockData getBlockData = blockManager.getBlock(keyValueContainer,
+ new BlockID(containerID, blockNo));
+ assertEquals(2048, getBlockData.getSize());
+ assertEquals(2, getBlockData.getBlockCommitSequenceId());
+ List<ContainerProtos.ChunkInfo> chunkInfos = getBlockData.getChunks();
+ assertEquals(1, chunkInfos.size());
+ assertEquals(2048, chunkInfos.get(0).getLen());
+ assertEquals(0, chunkInfos.get(0).getOffset());
+
+ // put 3rd chunk, end-of-block
+ BlockData blockData3 = createBlockData(containerID, blockNo, 1, 0, 3072,
+ 3);
+ blockManager.putBlock(keyValueContainer, blockData3, true);
+ assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
+
+ getBlockData = blockManager.getBlock(keyValueContainer,
+ new BlockID(containerID, blockNo));
+ assertEquals(3072, getBlockData.getSize());
+ assertEquals(3, getBlockData.getBlockCommitSequenceId());
+ chunkInfos = getBlockData.getChunks();
+ assertEquals(1, chunkInfos.size());
+ assertEquals(3072, chunkInfos.get(0).getLen());
+ assertEquals(0, chunkInfos.get(0).getOffset());
+ }
+
+ @ContainerTestVersionInfo.ContainerTest
+ public void testFlush2(ContainerTestVersionInfo versionInfo)
+ throws Exception {
+ initTest(versionInfo);
+ Assumptions.assumeFalse(
+ isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V1));
+ // simulates writing a full chunk + 1024 bytes, hsync,
+ // write another 1024 bytes, hsync
+ // write another 1024 bytes, hsync
+ long containerID = 1;
+ long blockNo = 2;
+ long chunkLimit = 4 * 1024 * 1024;
+ // first hsync (a full chunk + 1024 bytes)
+ blockData1 = createBlockDataWithOneFullChunk(containerID,
+ blockNo, 2, chunkLimit, 1024, 1);
+ blockManager.putBlock(keyValueContainer, blockData1, false);
+ // second hsync (1024 bytes)
+ BlockData blockData2 = createBlockData(containerID, blockNo, 2,
+ chunkLimit, 2048, 2);
+ blockManager.putBlock(keyValueContainer, blockData2, false);
+ assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
+ // third hsync (1024 bytes)
+ BlockData blockData3 = createBlockData(containerID, blockNo, 2,
+ chunkLimit, 3072, 3);
+ blockManager.putBlock(keyValueContainer, blockData3, false);
+ assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
+
+ // verify that first chunk is full, second chunk is 3072 bytes
+ BlockData getBlockData = blockManager.getBlock(keyValueContainer,
+ new BlockID(containerID, blockNo));
+ assertEquals(3072 + chunkLimit, getBlockData.getSize());
+ assertEquals(3, getBlockData.getBlockCommitSequenceId());
+ List<ContainerProtos.ChunkInfo> chunkInfos = getBlockData.getChunks();
+ assertEquals(2, chunkInfos.size());
+ assertEquals(chunkLimit, chunkInfos.get(0).getLen());
+ assertEquals(0, chunkInfos.get(0).getOffset());
+ assertEquals(3072, chunkInfos.get(1).getLen());
+ assertEquals(chunkLimit, chunkInfos.get(1).getOffset());
+ }
+
+ @ContainerTestVersionInfo.ContainerTest
+ public void testFlush3(ContainerTestVersionInfo versionInfo)
+ throws Exception {
+ initTest(versionInfo);
+ Assumptions.assumeFalse(
+ isSameSchemaVersion(schemaVersion, OzoneConsts.SCHEMA_V1));
+ // simulates writing 1024 bytes, hsync,
+ // and then write till 4 chunks are full
+ long containerID = 1;
+ long blockNo = 2;
+ long chunkLimit = 4 * 1024 * 1024;
+ // first hsync (1024 bytes)
+ blockData1 = createBlockDataWithOneFullChunk(containerID, blockNo, 2,
+ chunkLimit, 1024, 1);
+ blockManager.putBlock(keyValueContainer, blockData1, false);
+ // full flush (4 chunks)
+ BlockData blockData2 = createBlockDataWithThreeFullChunks(
+ containerID, blockNo, 2);
+ blockManager.putBlock(keyValueContainer, blockData2, false);
+ assertEquals(1, keyValueContainer.getContainerData().getBlockCount());
+
+ // verify that the four chunks are full
+ BlockData getBlockData = blockManager.getBlock(keyValueContainer,
+ new BlockID(containerID, blockNo));
+ assertEquals(chunkLimit * 4, getBlockData.getSize());
+ assertEquals(2, getBlockData.getBlockCommitSequenceId());
+ List<ContainerProtos.ChunkInfo> chunkInfos = getBlockData.getChunks();
+ assertEquals(4, chunkInfos.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(chunkLimit, chunkInfos.get(i).getLen());
+ assertEquals(chunkLimit * i, chunkInfos.get(i).getOffset());
+ }
+ }
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
index ef2f1fa118..393e8cdb31 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client;
import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
@@ -30,18 +31,25 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* State represents persisted data of one specific datanode.
*/
public class MockDatanodeStorage {
-
- private final Map<DatanodeBlockID, BlockData> blocks = new HashedMap();
+ public static final Logger LOG =
+ LoggerFactory.getLogger(MockDatanodeStorage.class);
+ public static final String INCREMENTAL_CHUNK_LIST = "incremental";
+ public static final String FULL_CHUNK = "full";
+ public static final ContainerProtos.KeyValue FULL_CHUNK_KV =
+ ContainerProtos.KeyValue.newBuilder().setKey(FULL_CHUNK).build();
+
+ private final Map<BlockID, BlockData> blocks = new HashedMap();
private final Map<Long, List<DatanodeBlockID>>
containerBlocks = new HashedMap();
private final Map<BlockID, String> fullBlockData = new HashMap<>();
- private final Map<String, ChunkInfo> chunks = new HashMap<>();
-
private final Map<String, ByteString> data = new HashMap<>();
private IOException exception = null;
@@ -50,8 +58,70 @@ public class MockDatanodeStorage {
this.exception = reason;
}
+ private boolean isIncrementalChunkList(BlockData blockData) {
+ for (ContainerProtos.KeyValue kv : blockData.getMetadataList()) {
+ if (kv.getKey().equals(INCREMENTAL_CHUNK_LIST)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private BlockID toBlockID(DatanodeBlockID datanodeBlockID) {
+ return new BlockID(datanodeBlockID.getContainerID(),
+ datanodeBlockID.getLocalID());
+ }
+
public void putBlock(DatanodeBlockID blockID, BlockData blockData) {
- blocks.put(blockID, blockData);
+ if (isIncrementalChunkList(blockData)) {
+ LOG.debug("incremental chunk list");
+ putBlockIncremental(blockID, blockData);
+ } else {
+ LOG.debug("full chunk list");
+ putBlockFull(blockID, blockData);
+ }
+ }
+
+ private boolean isFullChunk(ChunkInfo chunkInfo) {
+ return (chunkInfo.getMetadataList().contains(FULL_CHUNK_KV));
+ }
+
+ public void putBlockIncremental(
+ DatanodeBlockID blockID, BlockData blockData) {
+ BlockID id = toBlockID(blockID);
+ if (blocks.containsKey(id)) {
+ // block already exists. let's append the chunk list to it.
+ BlockData existing = blocks.get(id);
+ if (existing.getChunksCount() == 0) {
+ // empty chunk list. override it.
+ putBlockFull(blockID, blockData);
+ } else {
+ BlockData.Builder blockDataBuilder = pruneLastPartialChunks(existing);
+ blockDataBuilder.addAllChunks(blockData.getChunksList());
+ blocks.put(id, blockDataBuilder.build());
+ }
+ // TODO: verify the chunk list beginning/offset/len is sane
+ } else {
+ // the block does not exist yet, simply add it
+ putBlockFull(blockID, blockData);
+ }
+ }
+
+ private BlockData.Builder pruneLastPartialChunks(BlockData existing) {
+ BlockData.Builder blockDataBuilder = BlockData.newBuilder(existing);
+ int lastChunkIndex = existing.getChunksCount() - 1;
+ // if the last chunk in the existing block is full, append after it.
+ ChunkInfo chunkInfo = existing.getChunks(lastChunkIndex);
+ if (!isFullChunk(chunkInfo)) {
+ // otherwise, remove it and append
+ blockDataBuilder.removeChunks(lastChunkIndex);
+ }
+ return blockDataBuilder;
+ }
+
+ public void putBlockFull(DatanodeBlockID blockID, BlockData blockData) {
+ BlockID id = toBlockID(blockID);
+ blocks.put(id, blockData);
List<DatanodeBlockID> dnBlocks = containerBlocks
.getOrDefault(blockID.getContainerID(), new ArrayList<>());
dnBlocks.add(blockID);
@@ -59,14 +129,24 @@ public class MockDatanodeStorage {
}
public BlockData getBlock(DatanodeBlockID blockID) {
- return blocks.get(blockID);
+ BlockID id = toBlockID(blockID);
+ //assert blocks.containsKey(blockID);
+ if (!blocks.containsKey(id)) {
+ StringBuilder sb = new StringBuilder();
+ for (BlockID bid : blocks.keySet()) {
+ sb.append(bid).append("\n");
+ }
+ throw new AssertionError("blockID " + id +
+ " not found in blocks. Available block ID: \n" + sb);
+ }
+ return blocks.get(id);
}
public List<BlockData> listBlock(long containerID) {
List<DatanodeBlockID> datanodeBlockIDS = containerBlocks.get(containerID);
List<BlockData> listBlocksData = new ArrayList<>();
for (DatanodeBlockID dBlock : datanodeBlockIDS) {
- listBlocksData.add(blocks.get(dBlock));
+ listBlocksData.add(blocks.get(toBlockID(dBlock)));
}
return listBlocksData;
}
@@ -77,31 +157,39 @@ public class MockDatanodeStorage {
if (exception != null) {
throw exception;
}
- data.put(createKey(blockID, chunkInfo),
- ByteString.copyFrom(bytes.toByteArray()));
- chunks.put(createKey(blockID, chunkInfo), chunkInfo);
+ String blockKey = createKey(blockID);
+ ByteString block;
+ if (data.containsKey(blockKey)) {
+ block = data.get(blockKey);
+ assert block.size() == chunkInfo.getOffset();
+ data.put(blockKey, block.concat(bytes));
+ } else {
+ assert chunkInfo.getOffset() == 0;
+ data.put(blockKey, bytes);
+ }
+
fullBlockData
.put(new BlockID(blockID.getContainerID(), blockID.getLocalID()),
- fullBlockData.getOrDefault(blockID, "")
+ fullBlockData.getOrDefault(toBlockID(blockID), "")
.concat(bytes.toStringUtf8()));
}
- public ChunkInfo readChunkInfo(
- DatanodeBlockID blockID,
- ChunkInfo chunkInfo) {
- return chunks.get(createKey(blockID, chunkInfo));
- }
-
public ByteString readChunkData(
DatanodeBlockID blockID,
ChunkInfo chunkInfo) {
- return data.get(createKey(blockID, chunkInfo));
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "readChunkData: blockID={}, offset={}, len={}",
+ createKey(blockID), chunkInfo.getOffset(), chunkInfo.getLen());
+ }
+ ByteString str = data.get(createKey(blockID)).substring(
+ (int)chunkInfo.getOffset(),
+ (int)chunkInfo.getOffset() + (int)chunkInfo.getLen());
+ return str;
}
- private String createKey(DatanodeBlockID blockId, ChunkInfo chunkInfo) {
- return blockId.getContainerID() + "_" + blockId.getLocalID() + "_"
- + chunkInfo.getChunkName() + "_" + chunkInfo.getOffset();
+ private String createKey(DatanodeBlockID blockId) {
+ return blockId.getContainerID() + "_" + blockId.getLocalID();
}
public Map<String, ByteString> getAllBlockData() {
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
index 59eb49e555..7e5de329d1 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
@@ -110,8 +110,7 @@ public class MockXceiverClientSpi extends XceiverClientSpi {
private ReadChunkResponseProto readChunk(ReadChunkRequestProto readChunk) {
return ReadChunkResponseProto.newBuilder()
- .setChunkData(datanodeStorage
- .readChunkInfo(readChunk.getBlockID(), readChunk.getChunkData()))
+ .setChunkData(readChunk.getChunkData())
.setData(datanodeStorage
.readChunkData(readChunk.getBlockID(), readChunk.getChunkData()))
.setBlockID(readChunk.getBlockID())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]