Repository: hadoop Updated Branches: refs/heads/HDFS-7240 070ad8438 -> 43a133486
HDFS-12000. Ozone: Container : Add key versioning support-1. Contributed by Chen Liang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/43a13348 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/43a13348 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/43a13348 Branch: refs/heads/HDFS-7240 Commit: 43a133486d4e7fa7f0ce907d523a062ba8e3a694 Parents: 070ad84 Author: Xiaoyu Yao <[email protected]> Authored: Wed Dec 13 16:07:53 2017 -0800 Committer: Xiaoyu Yao <[email protected]> Committed: Wed Dec 13 16:07:53 2017 -0800 ---------------------------------------------------------------------- .../ozone/client/io/ChunkGroupInputStream.java | 7 +- .../ozone/client/io/ChunkGroupOutputStream.java | 37 ++- .../hadoop/ozone/client/rpc/RpcClient.java | 3 + .../hadoop/ozone/ksm/helpers/KsmKeyInfo.java | 95 +++++-- .../ozone/ksm/helpers/KsmKeyLocationInfo.java | 39 ++- .../ksm/helpers/KsmKeyLocationInfoGroup.java | 118 +++++++++ .../ozone/ksm/helpers/OpenKeySession.java | 11 +- ...ceManagerProtocolClientSideTranslatorPB.java | 2 +- .../main/proto/KeySpaceManagerProtocol.proto | 13 +- .../ozone/ksm/KSMMetadataManagerImpl.java | 11 +- .../apache/hadoop/ozone/ksm/KeyManagerImpl.java | 49 ++-- ...ceManagerProtocolServerSideTranslatorPB.java | 1 + .../web/storage/DistributedStorageHandler.java | 3 + .../ozone/TestStorageContainerManager.java | 7 +- .../ozone/client/rpc/TestOzoneRpcClient.java | 3 +- .../ozone/ksm/TestKsmBlockVersioning.java | 254 +++++++++++++++++++ .../hadoop/ozone/web/client/TestKeys.java | 3 +- 17 files changed, 582 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java index 8e4ce92..fa17b7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.client.io; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; @@ -166,13 +165,11 @@ public class ChunkGroupInputStream extends InputStream { StorageContainerLocationProtocolClientSideTranslatorPB storageContainerLocationClient, String requestId) throws IOException { - int index = 0; long length = 0; String containerKey; ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream(); - for (KsmKeyLocationInfo ksmKeyLocationInfo : keyInfo.getKeyLocationList()) { - // check index as sanity check - Preconditions.checkArgument(index++ == ksmKeyLocationInfo.getIndex()); + for (KsmKeyLocationInfo ksmKeyLocationInfo : + keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) { String containerName = ksmKeyLocationInfo.getContainerName(); Pipeline pipeline = storageContainerLocationClient.getContainer(containerName); http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index d9f271d..fe248e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; @@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.List; /** * Maintaining a list of ChunkInputStream. Write based on offset. @@ -98,6 +100,11 @@ public class ChunkGroupOutputStream extends OutputStream { streamEntries.add(new ChunkOutputStreamEntry(outputStream, length)); } + @VisibleForTesting + public List<ChunkOutputStreamEntry> getStreamEntries() { + return streamEntries; + } + public ChunkGroupOutputStream( OpenKeySession handler, XceiverClientManager xceiverClientManager, StorageContainerLocationProtocolClientSideTranslatorPB scmClient, @@ -122,12 +129,31 @@ public class ChunkGroupOutputStream extends OutputStream { this.chunkSize = chunkSize; this.requestID = requestId; LOG.debug("Expecting open key with one block, but got" + - info.getKeyLocationList().size()); + info.getKeyLocationVersions().size()); + } + + /** + * When a key is opened, it is possible that there are some blocks already + * allocated to it for this open session. In this case, to make use of these + * blocks, we need to add these blocks to stream entries. But, a key's version + * also includes blocks from previous versions, we need to avoid adding these + * old blocks to stream entries, because these old blocks should not be picked + * for write. To do this, the following method checks that, only those + * blocks created in this particular open version are added to stream entries. + * + * @param version the set of blocks that are pre-allocated. + * @param openVersion the version corresponding to the pre-allocation. + * @throws IOException + */ + public void addPreallocateBlocks(KsmKeyLocationInfoGroup version, + long openVersion) throws IOException { // server may return any number of blocks, (0 to any) - int idx = 0; - for (KsmKeyLocationInfo subKeyInfo : info.getKeyLocationList()) { - subKeyInfo.setIndex(idx++); - checkKeyLocationInfo(subKeyInfo); + // only the blocks allocated in this open session (block createVersion + // equals to open session version) + for (KsmKeyLocationInfo subKeyInfo : version.getLocationList()) { + if (subKeyInfo.getCreateVersion() == openVersion) { + checkKeyLocationInfo(subKeyInfo); + } } } @@ -255,7 +281,6 @@ public class ChunkGroupOutputStream extends OutputStream { */ private void allocateNewBlock(int index) throws IOException { KsmKeyLocationInfo subKeyInfo = ksmClient.allocateBlock(keyArgs, openID); - subKeyInfo.setIndex(index); checkKeyLocationInfo(subKeyInfo); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 0dda10b..94038e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -464,6 +464,9 @@ public class RpcClient implements ClientProtocol { .setType(OzoneProtos.ReplicationType.valueOf(type.toString())) .setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue())) .build(); + groupOutputStream.addPreallocateBlocks( + openKey.getKeyInfo().getLatestVersionLocations(), + openKey.getOpenVersion()); return new OzoneOutputStream(groupOutputStream); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java index b6054eb..41d523c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.ozone.ksm.helpers; +import com.google.common.base.Preconditions; import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo; import org.apache.hadoop.util.Time; +import java.io.IOException; import java.util.List; import java.util.stream.Collectors; @@ -34,18 +36,29 @@ public final class KsmKeyInfo { // name of key client specified private final String keyName; private long dataSize; - private List<KsmKeyLocationInfo> keyLocationList; + private List<KsmKeyLocationInfoGroup> keyLocationVersions; private final long creationTime; private long modificationTime; private KsmKeyInfo(String volumeName, String bucketName, String keyName, - List<KsmKeyLocationInfo> locationInfos, long dataSize, long creationTime, - long modificationTime) { + List<KsmKeyLocationInfoGroup> versions, long dataSize, + long creationTime, long modificationTime) { this.volumeName = volumeName; this.bucketName = bucketName; this.keyName = keyName; this.dataSize = dataSize; - this.keyLocationList = locationInfos; + // it is important that the versions are ordered from old to new. + // Do this sanity check when versions got loaded on creating KsmKeyInfo. + // TODO : this is not necessary, here only because versioning is still a + // work in-progress, remove this following check when versioning is + // complete and prove correctly functioning + long currentVersion = -1; + for (KsmKeyLocationInfoGroup version : versions) { + Preconditions.checkArgument( + currentVersion + 1 == version.getVersion()); + currentVersion = version.getVersion(); + } + this.keyLocationVersions = versions; this.creationTime = creationTime; this.modificationTime = modificationTime; } @@ -70,16 +83,64 @@ public final class KsmKeyInfo { this.dataSize = size; } - public List<KsmKeyLocationInfo> getKeyLocationList() { - return keyLocationList; + public synchronized KsmKeyLocationInfoGroup getLatestVersionLocations() + throws IOException { + return keyLocationVersions.size() == 0? null : + keyLocationVersions.get(keyLocationVersions.size() - 1); + } + + public List<KsmKeyLocationInfoGroup> getKeyLocationVersions() { + return keyLocationVersions; } public void updateModifcationTime() { this.modificationTime = Time.monotonicNow(); } - public void appendKeyLocation(KsmKeyLocationInfo newLocation) { - keyLocationList.add(newLocation); + /** + * Append a set of blocks to the latest version. Note that these blocks are + * part of the latest version, not a new version. + * + * @param newLocationList the list of new blocks to be added. + * @throws IOException + */ + public synchronized void appendNewBlocks( + List<KsmKeyLocationInfo> newLocationList) throws IOException { + if (keyLocationVersions.size() == 0) { + throw new IOException("Appending new block, but no version exist"); + } + KsmKeyLocationInfoGroup currentLatestVersion = + keyLocationVersions.get(keyLocationVersions.size() - 1); + currentLatestVersion.appendNewBlocks(newLocationList); + setModificationTime(Time.now()); + } + + /** + * Add a new set of blocks. The new blocks will be added as appending a new + * version to the all version list. + * + * @param newLocationList the list of new blocks to be added. + * @throws IOException + */ + public synchronized long addNewVersion( + List<KsmKeyLocationInfo> newLocationList) throws IOException { + long latestVersionNum; + if (keyLocationVersions.size() == 0) { + // no version exist, these blocks are the very first version. + keyLocationVersions.add(new KsmKeyLocationInfoGroup(0, newLocationList)); + latestVersionNum = 0; + } else { + // it is important that the new version are always at the tail of the list + KsmKeyLocationInfoGroup currentLatestVersion = + keyLocationVersions.get(keyLocationVersions.size() - 1); + // the new version is created based on the current latest version + KsmKeyLocationInfoGroup newVersion = + currentLatestVersion.generateNextVersion(newLocationList); + keyLocationVersions.add(newVersion); + latestVersionNum = newVersion.getVersion(); + } + setModificationTime(Time.now()); + return latestVersionNum; } public long getCreationTime() { @@ -102,7 +163,7 @@ public final class KsmKeyInfo { private String bucketName; private String keyName; private long dataSize; - private List<KsmKeyLocationInfo> ksmKeyLocationInfos; + private List<KsmKeyLocationInfoGroup> ksmKeyLocationInfoGroups; private long creationTime; private long modificationTime; @@ -122,8 +183,8 @@ public final class KsmKeyInfo { } public Builder setKsmKeyLocationInfos( - List<KsmKeyLocationInfo> ksmKeyLocationInfoList) { - this.ksmKeyLocationInfos = ksmKeyLocationInfoList; + List<KsmKeyLocationInfoGroup> ksmKeyLocationInfoList) { + this.ksmKeyLocationInfoGroups = ksmKeyLocationInfoList; return this; } @@ -144,19 +205,23 @@ public final class KsmKeyInfo { public KsmKeyInfo build() { return new KsmKeyInfo( - volumeName, bucketName, keyName, ksmKeyLocationInfos, + volumeName, bucketName, keyName, ksmKeyLocationInfoGroups, dataSize, creationTime, modificationTime); } } public KeyInfo getProtobuf() { + long latestVersion = keyLocationVersions.size() == 0 ? -1 : + keyLocationVersions.get(keyLocationVersions.size() - 1).getVersion(); return KeyInfo.newBuilder() .setVolumeName(volumeName) .setBucketName(bucketName) .setKeyName(keyName) .setDataSize(dataSize) - .addAllKeyLocationList(keyLocationList.stream() - .map(KsmKeyLocationInfo::getProtobuf).collect(Collectors.toList())) + .addAllKeyLocationList(keyLocationVersions.stream() + .map(KsmKeyLocationInfoGroup::getProtobuf) + .collect(Collectors.toList())) + .setLatestVersion(latestVersion) .setCreationTime(creationTime) .setModificationTime(modificationTime) .build(); @@ -168,7 +233,7 @@ public final class KsmKeyInfo { keyInfo.getBucketName(), keyInfo.getKeyName(), keyInfo.getKeyLocationListList().stream() - .map(KsmKeyLocationInfo::getFromProtobuf) + .map(KsmKeyLocationInfoGroup::getFromProtobuf) .collect(Collectors.toList()), keyInfo.getDataSize(), keyInfo.getCreationTime(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfo.java index 36a2654..9d24b30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfo.java @@ -28,21 +28,29 @@ public final class KsmKeyLocationInfo { private final String blockID; private final boolean shouldCreateContainer; // the id of this subkey in all the subkeys. - private int index; private final long length; private final long offset; + // the version number indicating when this block was added + private long createVersion; private KsmKeyLocationInfo(String containerName, - String blockID, boolean shouldCreateContainer, int index, + String blockID, boolean shouldCreateContainer, long length, long offset) { this.containerName = containerName; this.blockID = blockID; this.shouldCreateContainer = shouldCreateContainer; - this.index = index; this.length = length; this.offset = offset; } + public void setCreateVersion(long version) { + createVersion = version; + } + + public long getCreateVersion() { + return createVersion; + } + public String getContainerName() { return containerName; } @@ -55,14 +63,6 @@ public final class KsmKeyLocationInfo { return shouldCreateContainer; } - public int getIndex() { - return index; - } - - public void setIndex(int idx) { - index = idx; - } - public long getLength() { return length; } @@ -78,10 +78,9 @@ public final class KsmKeyLocationInfo { private String containerName; private String blockID; private boolean shouldCreateContainer; - // the id of this subkey in all the subkeys. - private int index; private long length; private long offset; + public Builder setContainerName(String container) { this.containerName = container; return this; @@ -97,11 +96,6 @@ public final class KsmKeyLocationInfo { return this; } - public Builder setIndex(int id) { - this.index = id; - return this; - } - public Builder setLength(long len) { this.length = len; return this; @@ -114,7 +108,7 @@ public final class KsmKeyLocationInfo { public KsmKeyLocationInfo build() { return new KsmKeyLocationInfo(containerName, blockID, - shouldCreateContainer, index, length, offset); + shouldCreateContainer, length, offset); } } @@ -123,19 +117,20 @@ public final class KsmKeyLocationInfo { .setContainerName(containerName) .setBlockID(blockID) .setShouldCreateContainer(shouldCreateContainer) - .setIndex(index) .setLength(length) .setOffset(offset) + .setCreateVersion(createVersion) .build(); } public static KsmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) { - return new KsmKeyLocationInfo( + KsmKeyLocationInfo info = new KsmKeyLocationInfo( keyLocation.getContainerName(), keyLocation.getBlockID(), keyLocation.getShouldCreateContainer(), - keyLocation.getIndex(), keyLocation.getLength(), keyLocation.getOffset()); + info.setCreateVersion(keyLocation.getCreateVersion()); + return info; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfoGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfoGroup.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfoGroup.java new file mode 100644 index 0000000..bef65ec --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfoGroup.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.ksm.helpers; + +import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyLocationList; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A list of key locations. This class represents one single version of the + * blocks of a key. + */ +public class KsmKeyLocationInfoGroup { + private final long version; + private final List<KsmKeyLocationInfo> locationList; + + public KsmKeyLocationInfoGroup(long version, + List<KsmKeyLocationInfo> locations) { + this.version = version; + this.locationList = locations; + } + + /** + * Return only the blocks that are created in the most recent version. + * + * @return the list of blocks that are created in the latest version. + */ + public List<KsmKeyLocationInfo> getBlocksLatestVersionOnly() { + List<KsmKeyLocationInfo> list = new ArrayList<>(); + locationList.stream().filter(x -> x.getCreateVersion() == version) + .forEach(list::add); + return list; + } + + public long getVersion() { + return version; + } + + public List<KsmKeyLocationInfo> getLocationList() { + return locationList; + } + + public KeyLocationList getProtobuf() { + return KeyLocationList.newBuilder() + .setVersion(version) + .addAllKeyLocations( + locationList.stream().map(KsmKeyLocationInfo::getProtobuf) + .collect(Collectors.toList())) + .build(); + } + + public static KsmKeyLocationInfoGroup getFromProtobuf( + KeyLocationList keyLocationList) { + return new KsmKeyLocationInfoGroup( + keyLocationList.getVersion(), + keyLocationList.getKeyLocationsList().stream() + .map(KsmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList())); + } + + /** + * Given a new block location, generate a new version list based upon this + * one. + * + * @param newLocationList a list of new location to be added. + * @return + */ + KsmKeyLocationInfoGroup generateNextVersion( + List<KsmKeyLocationInfo> newLocationList) throws IOException { + // TODO : revisit if we can do this method more efficiently + // one potential inefficiency here is that later version always include + // older ones. e.g. v1 has B1, then v2, v3...will all have B1 and only add + // more + List<KsmKeyLocationInfo> newList = new ArrayList<>(); + newList.addAll(locationList); + for (KsmKeyLocationInfo newInfo : newLocationList) { + // all these new blocks will have addVersion of current version + 1 + newInfo.setCreateVersion(version + 1); + newList.add(newInfo); + } + return new KsmKeyLocationInfoGroup(version + 1, newList); + } + + void appendNewBlocks(List<KsmKeyLocationInfo> newLocationList) + throws IOException { + for (KsmKeyLocationInfo info : newLocationList) { + info.setCreateVersion(version); + locationList.add(info); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("version:").append(version).append(" "); + for (KsmKeyLocationInfo kli : locationList) { + sb.append(kli.getBlockID()).append(" || "); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/OpenKeySession.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/OpenKeySession.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/OpenKeySession.java index 9d27ed0..c19c04b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/OpenKeySession.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/OpenKeySession.java @@ -25,10 +25,19 @@ package org.apache.hadoop.ozone.ksm.helpers; public class OpenKeySession { private final int id; private final KsmKeyInfo keyInfo; + // the version of the key when it is being opened in this session. + // a block that has a create version equals to open version means it will + // be committed only when this open session is closed. + private long openVersion; - public OpenKeySession(int id, KsmKeyInfo info) { + public OpenKeySession(int id, KsmKeyInfo info, long version) { this.id = id; this.keyInfo = info; + this.openVersion = version; + } + + public long getOpenVersion() { + return this.openVersion; } public KsmKeyInfo getKeyInfo() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java index bee3da2..b3b5951 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java @@ -538,7 +538,7 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB throw new IOException("Create key failed, error:" + resp.getStatus()); } return new OpenKeySession(resp.getID(), - KsmKeyInfo.getFromProtobuf(resp.getKeyInfo())); + KsmKeyInfo.getFromProtobuf(resp.getKeyInfo()), resp.getOpenVersion()); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto index 2cf3123..480b701 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto @@ -235,7 +235,13 @@ message KeyLocation { required bool shouldCreateContainer = 3; required uint64 offset = 4; required uint64 length = 5; - required uint32 index = 6; + // indicated at which version this block gets created. + optional uint64 createVersion = 6; +} + +message KeyLocationList { + optional uint64 version = 1; + repeated KeyLocation keyLocations = 2; } message KeyInfo { @@ -243,9 +249,10 @@ message KeyInfo { required string bucketName = 2; required string keyName = 3; required uint64 dataSize = 4; - repeated KeyLocation keyLocationList = 5; + repeated KeyLocationList keyLocationList = 5; required uint64 creationTime = 6; required uint64 modificationTime = 7; + optional uint64 latestVersion = 8; } message LocateKeyRequest { @@ -258,6 +265,8 @@ message LocateKeyResponse { // clients' followup request may carry this ID for stateful operations (similar // to a cookie). optional uint32 ID = 3; + // TODO : allow specifiying a particular version to read. + optional uint64 openVersion = 4; } message SetBucketPropertyRequest { http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java index fbc1131..e7da79d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup; import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; import org.apache.hadoop.ozone.common.BlockGroup; @@ -45,6 +46,7 @@ import org.apache.hadoop.utils.MetadataStoreBuilder; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.ArrayList; import java.util.Map; @@ -473,7 +475,11 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager { KsmKeyInfo info = KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue())); // Get block keys as a list. - List<String> item = info.getKeyLocationList().stream() + KsmKeyLocationInfoGroup latest = info.getLatestVersionLocations(); + if (latest == null) { + return Collections.emptyList(); + } + List<String> item = latest.getLocationList().stream() .map(KsmKeyLocationInfo::getBlockID) .collect(Collectors.toList()); BlockGroup keyBlocks = BlockGroup.newBuilder() @@ -503,7 +509,8 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager { continue; } // Get block keys as a list. - List<String> item = info.getKeyLocationList().stream() + List<String> item = info.getLatestVersionLocations() + .getBlocksLatestVersionOnly().stream() .map(KsmKeyLocationInfo::getBlockID) .collect(Collectors.toList()); BlockGroup keyBlocks = BlockGroup.newBuilder() http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java index 620816a..5e274e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.ozone.ksm.exceptions.KSMException; import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup; import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession; import org.apache.hadoop.ozone.protocol.proto .KeySpaceManagerProtocolProtos.KeyInfo; @@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -195,9 +197,10 @@ public class KeyManagerImpl implements KeyManager { .setShouldCreateContainer(allocatedBlock.getCreateContainer()) .setLength(scmBlockSize) .setOffset(0) - .setIndex(keyInfo.getKeyLocationList().size()) .build(); - keyInfo.appendKeyLocation(info); + // current version not committed, so new blocks coming now are added to + // the same version + keyInfo.appendNewBlocks(Collections.singletonList(info)); keyInfo.updateModifcationTime(); metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray()); return info; @@ -237,7 +240,6 @@ public class KeyManagerImpl implements KeyManager { // the point, if client needs more blocks, client can always call // allocateBlock. But if requested size is not 0, KSM will preallocate // some blocks and piggyback to client, to save RPC calls. - int idx = 0; while (requestedSize > 0) { long allocateSize = Math.min(scmBlockSize, requestedSize); AllocatedBlock allocatedBlock = @@ -246,28 +248,45 @@ public class KeyManagerImpl implements KeyManager { .setContainerName(allocatedBlock.getPipeline().getContainerName()) .setBlockID(allocatedBlock.getKey()) .setShouldCreateContainer(allocatedBlock.getCreateContainer()) - .setIndex(idx++) .setLength(allocateSize) .setOffset(0) .build(); locations.add(subKeyInfo); requestedSize -= allocateSize; } - long currentTime = Time.now(); // NOTE size of a key is not a hard limit on anything, it is a value that // client should expect, in terms of current size of key. If client sets a // value, then this value is used, otherwise, we allocate a single block // which is the current size, if read by the client. long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize; - KsmKeyInfo keyInfo = new KsmKeyInfo.Builder() - .setVolumeName(args.getVolumeName()) - .setBucketName(args.getBucketName()) - .setKeyName(args.getKeyName()) - .setKsmKeyLocationInfos(locations) - .setCreationTime(currentTime) - .setModificationTime(currentTime) - .setDataSize(size) - .build(); + byte[] keyKey = metadataManager.getDBKeyBytes( + volumeName, bucketName, keyName); + byte[] value = metadataManager.get(keyKey); + KsmKeyInfo keyInfo; + long openVersion; + if (value != null) { + // the key already exist, the new blocks will be added as new version + keyInfo = KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value)); + // when locations.size = 0, the new version will have identical blocks + // as its previous version + openVersion = keyInfo.addNewVersion(locations); + keyInfo.setDataSize(size + keyInfo.getDataSize()); + } else { + // the key does not exist, create a new object, the new blocks are the + // version 0 + long currentTime = Time.now(); + keyInfo = new KsmKeyInfo.Builder() + .setVolumeName(args.getVolumeName()) + .setBucketName(args.getBucketName()) + .setKeyName(args.getKeyName()) + .setKsmKeyLocationInfos(Collections.singletonList( + new KsmKeyLocationInfoGroup(0, locations))) + .setCreationTime(currentTime) + .setModificationTime(currentTime) + .setDataSize(size) + .build(); + openVersion = 0; + } // Generate a random ID which is not already in meta db. int id = -1; // in general this should finish in a couple times at most. putting some @@ -285,7 +304,7 @@ public class KeyManagerImpl implements KeyManager { } LOG.debug("Key {} allocated in volume {} bucket {}", keyName, volumeName, bucketName); - return new OpenKeySession(id, keyInfo); + return new OpenKeySession(id, keyInfo, openVersion); } catch (KSMException e) { throw e; } catch (IOException ex) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java index e4ba28d..16157c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java @@ -334,6 +334,7 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements OpenKeySession openKey = impl.openKey(ksmKeyArgs); resp.setKeyInfo(openKey.getKeyInfo().getProtobuf()); resp.setID(openKey.getId()); + resp.setOpenVersion(openKey.getOpenVersion()); resp.setStatus(Status.OK); } catch (IOException e) { resp.setStatus(exceptionToResponseStatus(e)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index 63f2cce..1830c71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -419,6 +419,9 @@ public final class DistributedStorageHandler implements StorageHandler { .setType(xceiverClientManager.getType()) .setFactor(xceiverClientManager.getFactor()) .build(); + groupOutputStream.addPreallocateBlocks( + openKey.getKeyInfo().getLatestVersionLocations(), + openKey.getOpenVersion()); return new OzoneOutputStream(groupOutputStream); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index 2dd9fbb..5ab9e3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -323,7 +323,7 @@ public class TestStorageContainerManager { // on datanodes. Set<String> containerNames = new HashSet<>(); for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) { - entry.getValue().getKeyLocationList() + entry.getValue().getLatestVersionLocations().getLocationList() .forEach(loc -> containerNames.add(loc.getContainerName())); } @@ -331,7 +331,7 @@ public class TestStorageContainerManager { // total number of containerBlocks via creation call. int totalCreatedBlocks = 0; for (KsmKeyInfo info : keyLocations.values()) { - totalCreatedBlocks += info.getKeyLocationList().size(); + totalCreatedBlocks += info.getKeyLocationVersions().size(); } Assert.assertTrue(totalCreatedBlocks > 0); Assert.assertEquals(totalCreatedBlocks, @@ -340,7 +340,8 @@ public class TestStorageContainerManager { // Create a deletion TX for each key. Map<String, List<String>> containerBlocks = Maps.newHashMap(); for (KsmKeyInfo info : keyLocations.values()) { - List<KsmKeyLocationInfo> list = info.getKeyLocationList(); + List<KsmKeyLocationInfo> list = + info.getLatestVersionLocations().getLocationList(); list.forEach(location -> { if (containerBlocks.containsKey(location.getContainerName())) { containerBlocks.get(location.getContainerName()) http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 383f32c..f7cb7cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -387,7 +387,8 @@ public class TestOzoneRpcClient { OzoneProtos.ReplicationFactor replicationFactor = OzoneProtos.ReplicationFactor.valueOf(factor.getValue()); KsmKeyInfo keyInfo = keySpaceManager.lookupKey(keyArgs); - for (KsmKeyLocationInfo info: keyInfo.getKeyLocationList()) { + for (KsmKeyLocationInfo info: + keyInfo.getLatestVersionLocations().getLocationList()) { Pipeline pipeline = storageContainerLocationClient.getContainer(info.getContainerName()); if ((pipeline.getFactor() != replicationFactor) || http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKsmBlockVersioning.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKsmBlockVersioning.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKsmBlockVersioning.java new file mode 100644 index 0000000..d974416 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKsmBlockVersioning.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.ksm; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.OzoneConfiguration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler; +import org.apache.hadoop.ozone.MiniOzoneClassicCluster; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession; +import org.apache.hadoop.ozone.web.handlers.BucketArgs; +import org.apache.hadoop.ozone.web.handlers.KeyArgs; +import org.apache.hadoop.ozone.web.handlers.UserArgs; +import org.apache.hadoop.ozone.web.handlers.VolumeArgs; +import org.apache.hadoop.ozone.web.interfaces.StorageHandler; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.LinkedList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * This class tests the versioning of blocks from KSM side. + */ +public class TestKsmBlockVersioning { + private static MiniOzoneCluster cluster = null; + private static UserArgs userArgs; + private static OzoneConfiguration conf; + private static KeySpaceManager keySpaceManager; + private static StorageHandler storageHandler; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + /** + * Create a MiniDFSCluster for testing. + * <p> + * Ozone is made active by setting OZONE_ENABLED = true and + * OZONE_HANDLER_TYPE_KEY = "distributed" + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, + OzoneConsts.OZONE_HANDLER_DISTRIBUTED); + cluster = new MiniOzoneClassicCluster.Builder(conf) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); + storageHandler = new ObjectStoreHandler(conf).getStorageHandler(); + userArgs = new UserArgs(null, OzoneUtils.getRequestID(), + null, null, null, null); + keySpaceManager = cluster.getKeySpaceManager(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testAllocateCommit() throws Exception { + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); + String keyName = "key" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs); + createVolumeArgs.setUserName(userName); + createVolumeArgs.setAdminName(adminName); + storageHandler.createVolume(createVolumeArgs); + + BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs); + bucketArgs.setAddAcls(new LinkedList<>()); + bucketArgs.setRemoveAcls(new LinkedList<>()); + bucketArgs.setStorageType(StorageType.DISK); + storageHandler.createBucket(bucketArgs); + + KsmKeyArgs keyArgs = new KsmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setDataSize(1000) + .build(); + + // 1st update, version 0 + OpenKeySession openKey = keySpaceManager.openKey(keyArgs); + keySpaceManager.commitKey(keyArgs, openKey.getId()); + + KsmKeyInfo keyInfo = keySpaceManager.lookupKey(keyArgs); + KsmKeyLocationInfoGroup highestVersion = + checkVersions(keyInfo.getKeyLocationVersions()); + assertEquals(0, highestVersion.getVersion()); + assertEquals(1, highestVersion.getLocationList().size()); + + // 2nd update, version 1 + openKey = keySpaceManager.openKey(keyArgs); + //KsmKeyLocationInfo locationInfo = + // keySpaceManager.allocateBlock(keyArgs, openKey.getId()); + keySpaceManager.commitKey(keyArgs, openKey.getId()); + + keyInfo = keySpaceManager.lookupKey(keyArgs); + highestVersion = checkVersions(keyInfo.getKeyLocationVersions()); + assertEquals(1, highestVersion.getVersion()); + assertEquals(2, highestVersion.getLocationList().size()); + + // 3rd update, version 2 + openKey = keySpaceManager.openKey(keyArgs); + // this block will be appended to the latest version of version 2. + keySpaceManager.allocateBlock(keyArgs, openKey.getId()); + keySpaceManager.commitKey(keyArgs, openKey.getId()); + + keyInfo = keySpaceManager.lookupKey(keyArgs); + highestVersion = checkVersions(keyInfo.getKeyLocationVersions()); + assertEquals(2, highestVersion.getVersion()); + assertEquals(4, highestVersion.getLocationList().size()); + } + + private KsmKeyLocationInfoGroup checkVersions( + List<KsmKeyLocationInfoGroup> versions) { + KsmKeyLocationInfoGroup currentVersion = null; + for (KsmKeyLocationInfoGroup version : versions) { + if (currentVersion != null) { + assertEquals(currentVersion.getVersion() + 1, version.getVersion()); + for (KsmKeyLocationInfo info : currentVersion.getLocationList()) { + boolean found = false; + // all the blocks from the previous version must present in the next + // version + for (KsmKeyLocationInfo info2 : version.getLocationList()) { + if (info.getBlockID().equals(info2.getBlockID())) { + found = true; + break; + } + } + assertTrue(found); + } + } + currentVersion = version; + } + return currentVersion; + } + + @Test + public void testReadLatestVersion() throws Exception { + + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); + String keyName = "key" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs); + createVolumeArgs.setUserName(userName); + createVolumeArgs.setAdminName(adminName); + storageHandler.createVolume(createVolumeArgs); + + BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs); + bucketArgs.setAddAcls(new LinkedList<>()); + bucketArgs.setRemoveAcls(new LinkedList<>()); + bucketArgs.setStorageType(StorageType.DISK); + storageHandler.createBucket(bucketArgs); + + KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setDataSize(1000) + .build(); + + String dataString = RandomStringUtils.randomAlphabetic(100); + KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs); + // this write will create 1st version with one block + try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) { + stream.write(dataString.getBytes()); + } + byte[] data = new byte[dataString.length()]; + try (InputStream in = storageHandler.newKeyReader(keyArgs)) { + in.read(data); + } + KsmKeyInfo keyInfo = keySpaceManager.lookupKey(ksmKeyArgs); + assertEquals(dataString, DFSUtil.bytes2String(data)); + assertEquals(0, keyInfo.getLatestVersionLocations().getVersion()); + assertEquals(1, + keyInfo.getLatestVersionLocations().getLocationList().size()); + + // this write will create 2nd version, 2nd version will contain block from + // version 1, and add a new block + dataString = RandomStringUtils.randomAlphabetic(10); + data = new byte[dataString.length()]; + try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) { + stream.write(dataString.getBytes()); + } + try (InputStream in = storageHandler.newKeyReader(keyArgs)) { + in.read(data); + } + keyInfo = keySpaceManager.lookupKey(ksmKeyArgs); + assertEquals(dataString, DFSUtil.bytes2String(data)); + assertEquals(1, keyInfo.getLatestVersionLocations().getVersion()); + assertEquals(2, + keyInfo.getLatestVersionLocations().getLocationList().size()); + + dataString = RandomStringUtils.randomAlphabetic(200); + data = new byte[dataString.length()]; + try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) { + stream.write(dataString.getBytes()); + } + try (InputStream in = storageHandler.newKeyReader(keyArgs)) { + in.read(data); + } + keyInfo = keySpaceManager.lookupKey(ksmKeyArgs); + assertEquals(dataString, DFSUtil.bytes2String(data)); + assertEquals(2, keyInfo.getLatestVersionLocations().getVersion()); + assertEquals(3, + keyInfo.getLatestVersionLocations().getLocationList().size()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a13348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java index f2b0272..f5736ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java @@ -618,7 +618,8 @@ public class TestKeys { // Memorize chunks that has been created, // so we can verify actual deletions at DN side later. for (KsmKeyInfo keyInfo : createdKeys) { - List<KsmKeyLocationInfo> locations = keyInfo.getKeyLocationList(); + List<KsmKeyLocationInfo> locations = + keyInfo.getLatestVersionLocations().getLocationList(); for (KsmKeyLocationInfo location : locations) { String containerName = location.getContainerName(); KeyData keyData = new KeyData(containerName, location.getBlockID()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
