Repository: hadoop Updated Branches: refs/heads/HDFS-7240 4ac97b181 -> b3044db40
HDFS-10250. Ozone: Add key Persistence. Contributed by Anu Engineer. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b3044db4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b3044db4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b3044db4 Branch: refs/heads/HDFS-7240 Commit: b3044db40731ed4d915c9d2452fbd6175c52e10a Parents: 4ac97b1 Author: Chris Nauroth <[email protected]> Authored: Tue Apr 5 12:59:45 2016 -0700 Committer: Chris Nauroth <[email protected]> Committed: Tue Apr 5 12:59:45 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/ozone/OzoneConfigKeys.java | 4 + .../container/common/helpers/ChunkUtils.java | 6 +- .../container/common/helpers/ContainerData.java | 4 +- .../ozone/container/common/helpers/KeyData.java | 160 +++++++++++++++++ .../container/common/helpers/KeyUtils.java | 98 +++++++++++ .../common/impl/ContainerManagerImpl.java | 22 +++ .../ozone/container/common/impl/Dispatcher.java | 126 +++++++++++++- .../container/common/impl/KeyManagerImpl.java | 145 ++++++++++++++++ .../common/interfaces/ContainerManager.java | 26 ++- .../container/common/interfaces/KeyManager.java | 63 +++++++ .../container/common/utils/ContainerCache.java | 111 ++++++++++++ .../container/common/utils/LevelDBStore.java | 5 +- .../container/ozoneimpl/OzoneContainer.java | 6 + .../main/proto/DatanodeContainerProtocol.proto | 119 ++++++------- .../ozone/container/ContainerTestHelper.java | 93 +++++++++- .../common/impl/TestContainerPersistence.java | 170 ++++++++++++++++--- .../container/ozoneimpl/TestOzoneContainer.java | 23 +++ 17 files changed, 1080 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 27c79c0..cb2ad22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -54,6 +54,10 @@ public final class OzoneConfigKeys { public static final String DFS_OZONE_METADATA_DIRS = "dfs.ozone.metadata.dirs"; + public static final String DFS_OZONE_KEY_CACHE = "dfs.ozone.key.cache.size"; + public static final int DFS_OZONE_KEY_CACHE_DEFAULT = 1024; + + /** * There is no need to instantiate this class. http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java index 03370ac..15e4524 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java @@ -216,7 +216,7 @@ public final class ChunkUtils { * Reads data from an existing chunk file. * * @param chunkFile - file where data lives. - * @param data - chunk defintion. + * @param data - chunk definition. * @return ByteBuffer * @throws IOException * @throws ExecutionException @@ -284,8 +284,8 @@ public final class ChunkUtils { byte[] data, ChunkInfo info) { Preconditions.checkNotNull(msg); - ContainerProtos.ReadChunkReponseProto.Builder response = - ContainerProtos.ReadChunkReponseProto.newBuilder(); + ContainerProtos.ReadChunkResponseProto.Builder response = + ContainerProtos.ReadChunkResponseProto.newBuilder(); response.setChunkData(info.getProtoBufMessage()); response.setData(ByteString.copyFrom(data)); response.setPipeline(msg.getReadChunk().getPipeline()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java index 9e3e2a3..da03e00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java @@ -170,9 +170,9 @@ public class ContainerData { } /** - * This function serves as the generic key for OzoneCache class. Both + * This function serves as the generic key for ContainerCache class. Both * ContainerData and ContainerKeyData overrides this function to appropriately - * return the right name that can be used in OzoneCache. + * return the right name that can be used in ContainerCache. * * @return String Name. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java new file mode 100644 index 0000000..289dbe4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * Helper class to convert Protobuf to Java classes. + */ +public class KeyData { + private final String containerName; + private final String keyName; + private final Map<String, String> metadata; + + /** + * Please note : when we are working with keys, we don't care what they point + * to. So we We don't read chunkinfo nor validate them. It is responsibility + * of higher layer like ozone. We just read and write data from network. + */ + private List<ContainerProtos.ChunkInfo> chunks; + + /** + * Constructs a KeyData Object. + * + * @param containerName + * @param keyName + */ + public KeyData(String containerName, String keyName) { + this.containerName = containerName; + this.keyName = keyName; + this.metadata = new TreeMap<>(); + } + + /** + * Returns a keyData object from the protobuf data. + * + * @param data - Protobuf data. + * @return - KeyData + * @throws IOException + */ + public static KeyData getFromProtoBuf(ContainerProtos.KeyData data) throws + IOException { + KeyData keyData = new KeyData(data.getContainerName(), data.getName()); + for (int x = 0; x < data.getMetadataCount(); x++) { + keyData.addMetadata(data.getMetadata(x).getKey(), + data.getMetadata(x).getValue()); + } + keyData.setChunks(data.getChunksList()); + return keyData; + } + + /** + * Returns a Protobuf message from KeyData. + * @return Proto Buf Message. + */ + public ContainerProtos.KeyData getProtoBufMessage() { + ContainerProtos.KeyData.Builder builder = + ContainerProtos.KeyData.newBuilder(); + builder.setContainerName(this.containerName); + builder.setName(this.getKeyName()); + builder.addAllChunks(this.chunks); + for (Map.Entry<String, String> entry : metadata.entrySet()) { + ContainerProtos.KeyValue.Builder keyValBuilder = + ContainerProtos.KeyValue.newBuilder(); + builder.addMetadata(keyValBuilder.setKey(entry.getKey()) + .setValue(entry.getValue()).build()); + } + return builder.build(); + } + + /** + * Adds metadata. + * + * @param key - Key + * @param value - Value + * @throws IOException + */ + public synchronized void addMetadata(String key, String value) throws + IOException { + if (this.metadata.containsKey(key)) { + throw new IOException("This key already exists. Key " + key); + } + metadata.put(key, value); + } + + public synchronized Map<String, String> getMetadata() { + return Collections.unmodifiableMap(this.metadata); + } + + /** + * Returns value of a key. + */ + public synchronized String getValue(String key) { + return metadata.get(key); + } + + /** + * Deletes a metadata entry from the map. + * + * @param key - Key + */ + public synchronized void deleteKey(String key) { + metadata.remove(key); + } + + /** + * Returns chunks list. + * + * @return list of chunkinfo. + */ + public List<ContainerProtos.ChunkInfo> getChunks() { + return chunks; + } + + /** + * Returns container Name. + * @return String. + */ + public String getContainerName() { + return containerName; + } + + /** + * Returns KeyName. + * @return String. + */ + public String getKeyName() { + return keyName; + } + + /** + * Sets Chunk list. + * + * @param chunks - List of chunks. + */ + public void setChunks(List<ContainerProtos.ChunkInfo> chunks) { + this.chunks = chunks; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java new file mode 100644 index 0000000..b0db9a7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache; +import org.apache.hadoop.ozone.container.common.utils.LevelDBStore; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; + +/** + * Utils functions to help key functions. + */ +public final class KeyUtils { + public static final String ENCODING_NAME = "UTF-8"; + public static final Charset ENCODING = Charset.forName(ENCODING_NAME); + + /** + * Never Constructed. + */ + private KeyUtils() { + } + + /** + * Returns a file handle to LevelDB. + * + * @param dbPath - DbPath. + * @return LevelDB + */ + public static LevelDBStore getDB(String dbPath) throws IOException { + Preconditions.checkNotNull(dbPath); + Preconditions.checkState(!dbPath.isEmpty()); + return new LevelDBStore(new File(dbPath), false); + } + + /** + * This function is called with containerManager ReadLock held. + * + * @param container - container. + * @param cache - cache + * @return LevelDB handle. + * @throws IOException + */ + public static LevelDBStore getDB(ContainerData container, + ContainerCache cache) throws IOException { + Preconditions.checkNotNull(container); + Preconditions.checkNotNull(cache); + LevelDBStore db = cache.getDB(container.getContainerName()); + if (db == null) { + db = getDB(container.getDBPath()); + cache.putDB(container.getContainerName(), db); + } + return db; + } + + /** + * Returns successful keyResponse. + * @param msg - Request. + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getKeyResponse(ContainerProtos.ContainerCommandRequestProto msg) { + return ContainerUtils.getContainerResponse(msg); + } + + + public static ContainerProtos.ContainerCommandResponseProto + getKeyDataResponse(ContainerProtos.ContainerCommandRequestProto msg + , KeyData data) { + ContainerProtos.GetKeyResponseProto.Builder getKey = ContainerProtos + .GetKeyResponseProto.newBuilder(); + getKey.setKeyData(data.getProtoBufMessage()); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getContainerResponse(msg, ContainerProtos.Result + .SUCCESS, ""); + builder.setGetKey(getKey); + return builder.build(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java index 1d6a695..28bb663 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.common.interfaces.ContainerLocationManager; import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,7 @@ public class ContainerManagerImpl implements ContainerManager { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); private ContainerLocationManager locationManager; private ChunkManager chunkManager; + private KeyManager keyManager; /** * Init call that sets up a container Manager. @@ -465,6 +467,26 @@ public class ContainerManagerImpl implements ContainerManager { } /** + * Sets the Key Manager. + * + * @param keyManager - Key Manager. + */ + @Override + public void setKeyManager(KeyManager keyManager) { + this.keyManager = keyManager; + } + + /** + * Gets the Key Manager. + * + * @return KeyManager. + */ + @Override + public KeyManager getKeyManager() { + return this.keyManager; + } + + /** * Filter out only container files from the container metadata dir. */ private static class ContainerFilter implements FilenameFilter { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java index d39b709..66ff1ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -20,13 +20,17 @@ package org.apache.hadoop.ozone.container.common.impl; import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; import org.apache.hadoop.ozone.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; @@ -69,6 +73,12 @@ public class Dispatcher implements ContainerDispatcher { return containerProcessHandler(msg); } + if ((cmdType == Type.PutKey) || + (cmdType == Type.GetKey) || + (cmdType == Type.DeleteKey) || + (cmdType == Type.ListKey)) { + return keyProcessHandler(msg); + } if ((cmdType == Type.WriteChunk) || (cmdType == Type.ReadChunk) || @@ -127,6 +137,48 @@ public class Dispatcher implements ContainerDispatcher { } /** + * Handles the all key related functionality. + * + * @param msg - command + * @return - response + * @throws IOException + */ + private ContainerCommandResponseProto keyProcessHandler( + ContainerCommandRequestProto msg) throws IOException { + try { + switch (msg.getCmdType()) { + case PutKey: + return handlePutKey(msg); + + case GetKey: + return handleGetKey(msg); + + case DeleteKey: + return handleDeleteKey(msg); + + case ListKey: + return ContainerUtils.unsupportedRequest(msg); + + default: + return ContainerUtils.unsupportedRequest(msg); + + } + } catch (IOException ex) { + LOG.warn("Container operation failed. " + + "Container: {} Operation: {} trace ID: {} Error: {}", + msg.getCreateContainer().getContainerData().getName(), + msg.getCmdType().name(), + msg.getTraceID(), + ex.toString()); + + // TODO : Replace with finer error codes. + return ContainerUtils.getContainerResponse(msg, + ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, + ex.toString()).build(); + } + } + + /** * Handles the all chunk related functionality. * * @param msg - command @@ -136,7 +188,6 @@ public class Dispatcher implements ContainerDispatcher { private ContainerCommandResponseProto chunkProcessHandler( ContainerCommandRequestProto msg) throws IOException { try { - switch (msg.getCmdType()) { case WriteChunk: return handleWriteChunk(msg); @@ -327,4 +378,73 @@ public class Dispatcher implements ContainerDispatcher { return ChunkUtils.getChunkResponse(msg); } + /** + * Put Key handler. + * + * @param msg - Request. + * @return - Response. + * @throws IOException + */ + private ContainerCommandResponseProto handlePutKey( + ContainerCommandRequestProto msg) throws IOException { + if(!msg.hasPutKey()){ + LOG.debug("Malformed put key request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getPutKey().getPipeline()); + Preconditions.checkNotNull(pipeline); + KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData()); + Preconditions.checkNotNull(keyData); + this.containerManager.getKeyManager().putKey(pipeline, keyData); + return KeyUtils.getKeyResponse(msg); + } + + /** + * Handle Get Key. + * + * @param msg - Request. + * @return - Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleGetKey( + ContainerCommandRequestProto msg) throws IOException { + if(!msg.hasGetKey()){ + LOG.debug("Malformed get key request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + KeyData keyData = KeyData.getFromProtoBuf(msg.getGetKey().getKeyData()); + Preconditions.checkNotNull(keyData); + KeyData responseData = + this.containerManager.getKeyManager().getKey(keyData); + return KeyUtils.getKeyDataResponse(msg, responseData); + } + + /** + * Handle Delete Key. + * + * @param msg - Request. + * @return - Response. + * @throws IOException + */ + private ContainerCommandResponseProto handleDeleteKey( + ContainerCommandRequestProto msg) throws IOException { + if(!msg.hasDeleteKey()){ + LOG.debug("Malformed delete key request. trace ID: {}", + msg.getTraceID()); + return ContainerUtils.malformedRequest(msg); + } + + Pipeline pipeline = + Pipeline.getFromProtoBuf(msg.getDeleteKey().getPipeline()); + Preconditions.checkNotNull(pipeline); + String keyName = msg.getDeleteKey().getName(); + Preconditions.checkNotNull(keyName); + Preconditions.checkState(!keyName.isEmpty()); + + this.containerManager.getKeyManager().deleteKey(pipeline, keyName); + return KeyUtils.getKeyResponse(msg); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java new file mode 100644 index 0000000..2da65ca --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache; +import org.apache.hadoop.ozone.container.common.utils.LevelDBStore; + +import java.io.IOException; +import java.util.List; + +/** + * Key Manager impl. + */ +public class KeyManagerImpl implements KeyManager { + private static final float LOAD_FACTOR = 0.75f; + private final ContainerManager containerManager; + private final ContainerCache containerCache; + + /** + * Constructs a key Manager. + * + * @param containerManager - Container Manager. + */ + public KeyManagerImpl(ContainerManager containerManager, Configuration conf) { + Preconditions.checkNotNull(containerManager); + Preconditions.checkNotNull(conf); + int cacheSize = conf.getInt(OzoneConfigKeys.DFS_OZONE_KEY_CACHE, + OzoneConfigKeys.DFS_OZONE_KEY_CACHE_DEFAULT); + this.containerManager = containerManager; + containerCache = new ContainerCache(cacheSize, LOAD_FACTOR, true); + } + + /** + * {@inheritDoc} + */ + @Override + public void putKey(Pipeline pipeline, KeyData data) throws IOException { + containerManager.readLock(); + try { + // We are not locking the key manager since LevelDb serializes all actions + // against a single DB. We rely on DB level locking to avoid conflicts. + Preconditions.checkNotNull(pipeline); + Preconditions.checkNotNull(pipeline.getContainerName()); + ContainerData cData = containerManager.readContainer( + pipeline.getContainerName()); + LevelDBStore db = KeyUtils.getDB(cData, containerCache); + Preconditions.checkNotNull(db); + db.put(data.getKeyName().getBytes(KeyUtils.ENCODING), data + .getProtoBufMessage().toByteArray()); + } finally { + containerManager.readUnlock(); + } + + } + + /** + * {@inheritDoc} + */ + @Override + public KeyData getKey(KeyData data) throws IOException { + containerManager.readLock(); + try { + Preconditions.checkNotNull(data); + Preconditions.checkNotNull(data.getContainerName()); + ContainerData cData = containerManager.readContainer(data + .getContainerName()); + LevelDBStore db = KeyUtils.getDB(cData, containerCache); + Preconditions.checkNotNull(db); + byte[] kData = db.get(data.getKeyName().getBytes(KeyUtils.ENCODING)); + if(kData == null) { + throw new IOException("Unable to find the key."); + } + ContainerProtos.KeyData keyData = + ContainerProtos.KeyData.parseFrom(kData); + return KeyData.getFromProtoBuf(keyData); + } finally { + containerManager.readUnlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void deleteKey(Pipeline pipeline, String keyName) throws IOException { + containerManager.readLock(); + try { + Preconditions.checkNotNull(pipeline); + Preconditions.checkNotNull(pipeline.getContainerName()); + ContainerData cData = containerManager.readContainer(pipeline + .getContainerName()); + LevelDBStore db = KeyUtils.getDB(cData, containerCache); + Preconditions.checkNotNull(db); + + // Note : There is a race condition here, since get and delete + // are not atomic. Leaving it here since the impact is refusing + // to delete a key which might have just gotten inserted after + // the get check. + + byte[] kData = db.get(keyName.getBytes(KeyUtils.ENCODING)); + if(kData == null) { + throw new IOException("Unable to find the key."); + } + db.delete(keyName.getBytes(KeyUtils.ENCODING)); + } finally { + containerManager.readUnlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<KeyData> listKey(Pipeline pipeline, String prefix, String + prevKey, int count) { + // TODO : + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java index eba5d9a..65a974c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java @@ -30,7 +30,6 @@ import java.io.IOException; import java.nio.file.Path; import java.util.List; - /** * Interface for container operations. */ @@ -41,9 +40,9 @@ public interface ContainerManager extends RwLock { /** * Init call that sets up a container Manager. * - * @param config - Configuration. + * @param config - Configuration. * @param containerDirs - List of Metadata Container locations. - * @param dataset - FSDataset. + * @param dataset - FSDataset. * @throws IOException */ void init(Configuration config, List<Path> containerDirs, @@ -74,8 +73,8 @@ public interface ContainerManager extends RwLock { * As simple interface for container Iterations. * * @param prevKey - Starting KeyValue - * @param count - how many to return - * @param data - Actual containerData + * @param count - how many to return + * @param data - Actual containerData * @throws IOException */ void listContainer(String prevKey, long count, List<ContainerData> data) @@ -99,15 +98,30 @@ public interface ContainerManager extends RwLock { /** * Sets the Chunk Manager. + * * @param chunkManager - ChunkManager. */ void setChunkManager(ChunkManager chunkManager); /** * Gets the Chunk Manager. - * @return ChunkManager. + * + * @return ChunkManager. */ ChunkManager getChunkManager(); + /** + * Sets the Key Manager. + * + * @param keyManager - Key Manager. + */ + void setKeyManager(KeyManager keyManager); + + /** + * Gets the Key Manager. + * + * @return KeyManager. + */ + KeyManager getKeyManager(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java new file mode 100644 index 0000000..d33fe9a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; + +import java.io.IOException; +import java.util.List; + +/** + * KeyManager deals with Key Operations in the container Level. + */ +public interface KeyManager { + /** + * Puts or overwrites a key. + * @param pipeline - Pipeline. + * @param data - Key Data. + */ + void putKey(Pipeline pipeline, KeyData data) throws IOException; + + /** + * Gets an existing key. + * @param data - Key Data. + * @return Key Data. + */ + KeyData getKey(KeyData data) throws IOException; + + /** + * Deletes an existing Key. + * @param pipeline - Pipeline. + * @param keyName Key Data. + */ + void deleteKey(Pipeline pipeline, String keyName) throws IOException; + + /** + * List keys in a container. + * @param pipeline - pipeline. + * @param prefix - Prefix in needed. + * @param prevKey - Key to Start from, EMPTY_STRING to begin. + * @param count - Number of keys to return. + * @return List of Keys that match the criteria. + */ + List<KeyData> listKey(Pipeline pipeline, String prefix, String prevKey, int + count); + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java new file mode 100644 index 0000000..bf03faf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.utils; + +import com.google.common.base.Preconditions; +import org.apache.commons.collections.map.LRUMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * container cache is a LRUMap that maintains the DB handles. + */ +public class ContainerCache extends LRUMap { + static final Log LOG = LogFactory.getLog(ContainerCache.class); + private final Lock lock = new ReentrantLock(); + + /** + * Constructs a cache that holds DBHandle references. + */ + public ContainerCache(int maxSize, float loadFactor, boolean + scanUntilRemovable) { + super(maxSize, loadFactor, scanUntilRemovable); + } + + /** + * {@inheritDoc} + */ + @Override + protected boolean removeLRU(LinkEntry entry) { + lock.lock(); + try { + LevelDBStore db = (LevelDBStore) entry.getValue(); + db.close(); + } catch (IOException e) { + LOG.error("Error closing DB. Container: " + entry.getKey().toString(), e); + } finally { + lock.unlock(); + } + return true; + } + + /** + * Returns a DB handle if available, null otherwise. + * + * @param containerName - Name of the container. + * @return OzoneLevelDBStore. + */ + public LevelDBStore getDB(String containerName) { + Preconditions.checkNotNull(containerName); + Preconditions.checkState(!containerName.isEmpty()); + lock.lock(); + try { + return (LevelDBStore) this.get(containerName); + } finally { + lock.unlock(); + } + } + + /** + * Add a new DB to the cache. + * + * @param containerName - Name of the container + * @param db - DB handle + */ + public void putDB(String containerName, LevelDBStore db) { + Preconditions.checkNotNull(containerName); + Preconditions.checkState(!containerName.isEmpty()); + lock.lock(); + try { + this.put(containerName, db); + } finally { + lock.unlock(); + } + } + + /** + * Remove an entry from the cache. + * + * @param containerName - Name of the container. + */ + public void removeDB(String containerName) { + Preconditions.checkNotNull(containerName); + Preconditions.checkState(!containerName.isEmpty()); + lock.lock(); + try { + this.remove(containerName); + } finally { + lock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/LevelDBStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/LevelDBStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/LevelDBStore.java index 2a2c5cc..5b1a903 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/LevelDBStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/LevelDBStore.java @@ -22,6 +22,7 @@ import org.fusesource.leveldbjni.JniDBFactory; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.Options; +import org.iq80.leveldb.WriteOptions; import java.io.File; import java.io.IOException; @@ -58,7 +59,9 @@ public class LevelDBStore { * @param value - value */ public void put(byte[] key, byte[] value) { - db.put(key, value); + WriteOptions options = new WriteOptions(); + options.sync(true); + db.put(key, value, options); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index b793f47..34a2968 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -24,9 +24,11 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; import org.apache.hadoop.ozone.container.common.impl.Dispatcher; +import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl; import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +53,7 @@ public class OzoneContainer { private final ContainerManager manager; private final XceiverServer server; private final ChunkManager chunkManager; + private final KeyManager keyManager; /** * Creates a network endpoint and enables Ozone container. @@ -80,6 +83,9 @@ public class OzoneContainer { this.chunkManager = new ChunkManagerImpl(manager); manager.setChunkManager(this.chunkManager); + this.keyManager = new KeyManagerImpl(manager, ozoneConfig); + manager.setKeyManager(this.keyManager); + this.dispatcher = new Dispatcher(manager); server = new XceiverServer(this.ozoneConfig, this.dispatcher); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto index 4bb53e1..9cd420c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto @@ -49,25 +49,24 @@ import "hdfs.proto"; * 5. ListContainer - Returns the list of containers on this * datanode. This will be used by tests and tools. * - * 6. CreateKey - Given a valid container, creates a key. + * 6. PutKey - Given a valid container, creates a key. * - * 7. ReadKey - Allows user to read the metadata of a Key. + * 7. GetKey - Allows user to read the metadata of a Key. * - * 8. UpdateKey - Updates the metadata of a Key. + * 8. DeleteKey - Deletes a given key. * - * 9. DeleteKey - Deletes a given key. - * - * 10. ListKey - Returns a list of keys that are present inside + * 9. ListKey - Returns a list of keys that are present inside * a given container. * - * 11. ReadChunk - Allows us to read a chunk. + * 10. ReadChunk - Allows us to read a chunk. * - * 12. DeleteChunk - Delete an unused chunk. + * 11. DeleteChunk - Delete an unused chunk. * - * 13. WriteChunk - Allows us to write a chunk + * 12. WriteChunk - Allows us to write a chunk * - * 14. ListChunk - Given a Container/Key returns the list of Chunks. + * 13. ListChunk - Given a Container/Key returns the list of Chunks. * + * 14. CompactChunk - Re-writes a chunk based on Offsets. */ enum Type { @@ -77,16 +76,16 @@ enum Type { DeleteContainer = 4; ListContainer = 5; - CreateKey = 6; - Readkey = 7; - UpdateKey = 8; - DeleteKey = 9; - ListKey = 10; + PutKey = 6; + GetKey = 7; + DeleteKey = 8; + ListKey = 9; - ReadChunk = 11; - DeleteChunk = 12; - WriteChunk = 13; - ListChunk = 14; + ReadChunk = 10; + DeleteChunk = 11; + WriteChunk = 12; + ListChunk = 13; + CompactChunk = 14; } @@ -95,7 +94,6 @@ enum Result { UNSUPPORTED_REQUEST = 2; MALFORMED_REQUEST = 3; CONTAINER_INTERNAL_ERROR = 4; - } message ContainerCommandRequestProto { @@ -115,16 +113,15 @@ message ContainerCommandRequestProto { optional DeleteContainerRequestProto deleteContainer = 6; optional ListContainerRequestProto listContainer = 7; - optional CreateKeyRequestProto createKey = 8; - optional ReadKeyRequestProto readKey = 9; - optional UpdateKeyRequestProto updateKey = 10; - optional DeleteKeyRequestProto deleteKey = 11; - optional ListKeyRequestProto listKey = 12; + optional PutKeyRequestProto putKey = 8; + optional GetKeyRequestProto getKey = 9; + optional DeleteKeyRequestProto deleteKey = 10; + optional ListKeyRequestProto listKey = 11; - optional ReadChunkRequestProto readChunk = 13; - optional WriteChunkRequestProto writeChunk = 14; - optional DeleteChunkRequestProto deleteChunk = 15; - optional ListChunkRequestProto listChunk = 16; + optional ReadChunkRequestProto readChunk = 12; + optional WriteChunkRequestProto writeChunk = 13; + optional DeleteChunkRequestProto deleteChunk = 14; + optional ListChunkRequestProto listChunk = 15; } message ContainerCommandResponseProto { @@ -137,16 +134,15 @@ message ContainerCommandResponseProto { optional DeleteContainerResponseProto deleteContainer = 6; optional ListContainerResponseProto listContainer = 7; - optional CreateKeyResponseProto createKey = 8; - optional ReadKeyResponeProto readKey = 9; - optional UpdateKeyResponseProto updateKey = 10; - optional DeleteKeyResponeProto deleteKey = 11; - optional ListKeyResponeProto listKey = 12; + optional PutKeyResponseProto putKey = 8; + optional GetKeyResponseProto getKey = 9; + optional DeleteKeyResponseProto deleteKey = 10; + optional ListKeyResponseProto listKey = 11; - optional WriteChunkReponseProto writeChunk = 13; - optional ReadChunkReponseProto readChunk = 14; - optional DeleteChunkResponseProto deleteChunk = 15; - optional ListChunkResponseProto listChunk = 16; + optional WriteChunkResponseProto writeChunk = 12; + optional ReadChunkResponseProto readChunk = 13; + optional DeleteChunkResponseProto deleteChunk = 14; + optional ListChunkResponseProto listChunk = 15; required Result result = 17; optional string message = 18; @@ -222,37 +218,30 @@ message ListContainerResponseProto { } -message ContainerKeyData { - optional string containerName = 1; +message KeyData { + required string containerName = 1; required string name = 2; - repeated KeyValue metadata = 3; + optional int64 flags = 3; // for future use. + repeated KeyValue metadata = 4; + repeated ChunkInfo chunks = 5; } // Key Messages. -message CreateKeyRequestProto { +message PutKeyRequestProto { required Pipeline pipeline = 1; - required ContainerKeyData containerKeyData = 2; + required KeyData keyData = 2; } -message CreateKeyResponseProto { +message PutKeyResponseProto { } -message ReadKeyRequestProto { +message GetKeyRequestProto { required Pipeline pipeline = 1; - required ContainerKeyData containerKeyData = 2; + required KeyData keyData = 2; } -message ReadKeyResponeProto { - repeated KeyValue metadata = 1; - repeated ChunkInfo chunkData = 2; -} - -message UpdateKeyRequestProto { - required Pipeline pipeline = 1; - required ContainerKeyData containerKeyData = 2; -} - -message UpdateKeyResponseProto { +message GetKeyResponseProto { + required KeyData keyData = 1; } @@ -261,17 +250,19 @@ message DeleteKeyRequestProto { required string name = 2; } -message DeleteKeyResponeProto { +message DeleteKeyResponseProto { } message ListKeyRequestProto { required Pipeline pipeline = 1; - required string prevKey = 2; - required uint32 count = 3; + optional string prefix = 2; // if specified returns keys that match prefix. + required string prevKey = 3; + required uint32 count = 4; + } -message ListKeyResponeProto { - repeated ContainerKeyData containerKeyData = 1; +message ListKeyResponseProto { + repeated KeyData keyData = 1; } // Chunk Operations @@ -291,7 +282,7 @@ message WriteChunkRequestProto { required bytes data = 4; } -message WriteChunkReponseProto { +message WriteChunkResponseProto { } message ReadChunkRequestProto { @@ -300,7 +291,7 @@ message ReadChunkRequestProto { required ChunkInfo chunkData = 3; } -message ReadChunkReponseProto { +message ReadChunkResponseProto { required Pipeline pipeline = 1; required ChunkInfo chunkData = 2; required bytes data = 3; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index bef290c..3396eca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -20,8 +20,6 @@ package org.apache.hadoop.ozone.container; import com.google.protobuf.ByteString; import org.apache.commons.codec.binary.Hex; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.io.IOExceptionWithCause; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos .ContainerCommandRequestProto; @@ -31,11 +29,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.junit.Assert; import java.io.IOException; import java.net.ServerSocket; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.LinkedList; +import java.util.List; import java.util.Random; import java.util.UUID; @@ -231,7 +233,7 @@ public class ContainerTestHelper { * @return ContainerCommandRequestProto. */ public static ContainerCommandResponseProto - getCreateContainerResponse(ContainerCommandRequestProto request) throws + getCreateContainerResponse(ContainerCommandRequestProto request) throws IOException { ContainerProtos.CreateContainerResponseProto.Builder createResponse = ContainerProtos.CreateContainerResponseProto.newBuilder(); @@ -244,4 +246,89 @@ public class ContainerTestHelper { response.setResult(ContainerProtos.Result.SUCCESS); return response.build(); } + + /** + * Returns the PutKeyRequest for test purpose. + * + * @param writeRequest - Write Chunk Request. + * @return - Request + */ + public static ContainerCommandRequestProto getPutKeyRequest( + ContainerProtos.WriteChunkRequestProto writeRequest) { + ContainerProtos.PutKeyRequestProto.Builder putRequest = + ContainerProtos.PutKeyRequestProto.newBuilder(); + + putRequest.setPipeline(writeRequest.getPipeline()); + KeyData keyData = new KeyData(writeRequest.getPipeline().getContainerName(), + writeRequest.getKeyName()); + List<ContainerProtos.ChunkInfo> newList = new LinkedList<>(); + newList.add(writeRequest.getChunkData()); + keyData.setChunks(newList); + putRequest.setKeyData(keyData.getProtoBufMessage()); + + ContainerCommandRequestProto.Builder request = + ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.PutKey); + request.setPutKey(putRequest); + return request.build(); + } + + /** + * Gets a GetKeyRequest for test purpose. + * + * @param putKeyRequest - putKeyRequest. + * @return - Request + */ + public static ContainerCommandRequestProto getKeyRequest( + ContainerProtos.PutKeyRequestProto putKeyRequest) { + ContainerProtos.GetKeyRequestProto.Builder getRequest = + ContainerProtos.GetKeyRequestProto.newBuilder(); + ContainerProtos.KeyData.Builder keyData = ContainerProtos.KeyData + .newBuilder(); + keyData.setContainerName(putKeyRequest.getPipeline().getContainerName()); + keyData.setName(putKeyRequest.getKeyData().getName()); + getRequest.setKeyData(keyData); + getRequest.setPipeline(putKeyRequest.getPipeline()); + + ContainerCommandRequestProto.Builder request = + ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.GetKey); + request.setGetKey(getRequest); + return request.build(); + } + + /** + * Verify the response against the request. + * @param request - Request + * @param response - Response + */ + public static void verifyGetKey(ContainerCommandRequestProto request, + ContainerCommandResponseProto response) { + Assert.assertEquals(request.getTraceID(), response.getTraceID()); + Assert.assertEquals(response.getResult(), ContainerProtos.Result.SUCCESS); + ContainerProtos.PutKeyRequestProto putKey = request.getPutKey(); + ContainerProtos. GetKeyRequestProto getKey = request.getGetKey(); + Assert.assertEquals(putKey.getKeyData().getChunksCount(), + getKey.getKeyData().getChunksCount()); + } + + + /** + * + * @param putKeyRequest - putKeyRequest. + * @return - Request + */ + public static ContainerCommandRequestProto getDeleteKeyRequest( + ContainerProtos.PutKeyRequestProto putKeyRequest) { + ContainerProtos.DeleteKeyRequestProto.Builder delRequest = + ContainerProtos.DeleteKeyRequestProto.newBuilder(); + delRequest.setPipeline(putKeyRequest.getPipeline()); + delRequest.setName(putKeyRequest.getKeyData().getName()); + ContainerCommandRequestProto.Builder request = + ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.DeleteKey); + request.setDeleteKey(delRequest); + return request.build(); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 213019e..2bdb99d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.impl; import org.apache.commons.codec.binary.Hex; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; @@ -28,6 +29,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; import org.apache.hadoop.ozone.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.container.common.utils.LevelDBStore; import org.apache.hadoop.ozone.web.utils.OzoneUtils; @@ -60,6 +62,7 @@ import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; import static org.apache.hadoop.ozone.container.ContainerTestHelper .setDataChecksum; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.fail; /** @@ -70,6 +73,7 @@ public class TestContainerPersistence { static String path; static ContainerManagerImpl containerManager; static ChunkManagerImpl chunkManager; + static KeyManagerImpl keyManager; static OzoneConfiguration conf; static FsDatasetSpi fsDataSet; static MiniDFSCluster cluster; @@ -103,6 +107,8 @@ public class TestContainerPersistence { containerManager = new ContainerManagerImpl(); chunkManager = new ChunkManagerImpl(containerManager); containerManager.setChunkManager(chunkManager); + keyManager = new KeyManagerImpl(containerManager, conf); + containerManager.setKeyManager(keyManager); } @@ -176,9 +182,11 @@ public class TestContainerPersistence { ContainerData data = new ContainerData(containerName); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); - containerManager.createContainer(createSingleNodePipeline(containerName), data); + containerManager.createContainer(createSingleNodePipeline(containerName), + data); try { - containerManager.createContainer(createSingleNodePipeline(containerName), data); + containerManager.createContainer(createSingleNodePipeline + (containerName), data); fail("Expected Exception not thrown."); } catch (IOException ex) { Assert.assertNotNull(ex); @@ -194,12 +202,14 @@ public class TestContainerPersistence { ContainerData data = new ContainerData(containerName1); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); - containerManager.createContainer(createSingleNodePipeline(containerName1), data); + containerManager.createContainer(createSingleNodePipeline(containerName1) + , data); data = new ContainerData(containerName2); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); - containerManager.createContainer(createSingleNodePipeline(containerName2), data); + containerManager.createContainer(createSingleNodePipeline(containerName2) + , data); Assert.assertTrue(containerManager.getContainerMap() @@ -218,7 +228,8 @@ public class TestContainerPersistence { data = new ContainerData(containerName1); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); - containerManager.createContainer(createSingleNodePipeline(containerName1), data); + containerManager.createContainer(createSingleNodePipeline(containerName1) + , data); // Assert we still have both containers. Assert.assertTrue(containerManager.getContainerMap() @@ -246,7 +257,8 @@ public class TestContainerPersistence { ContainerData data = new ContainerData(containerName); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); - containerManager.createContainer(createSingleNodePipeline(containerName), data); + containerManager.createContainer(createSingleNodePipeline + (containerName), data); testMap.put(containerName, data); } @@ -271,19 +283,10 @@ public class TestContainerPersistence { Assert.assertTrue(testMap.isEmpty()); } - /** - * Writes a single chunk. - * - * @throws IOException - * @throws NoSuchAlgorithmException - */ - @Test - public void testWriteChunk() throws IOException, NoSuchAlgorithmException { + private ChunkInfo writeChunkHelper(String containerName, String keyName, + Pipeline pipeline) throws IOException, + NoSuchAlgorithmException { final int datalen = 1024; - String containerName = OzoneUtils.getRequestID(); - String keyName = OzoneUtils.getRequestID(); - Pipeline pipeline = createSingleNodePipeline(containerName); - pipeline.setContainerName(containerName); ContainerData cData = new ContainerData(containerName); cData.addMetadata("VOLUME", "shire"); @@ -293,6 +296,23 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(pipeline, keyName, info, data); + return info; + + } + + /** + * Writes a single chunk. + * + * @throws IOException + * @throws NoSuchAlgorithmException + */ + @Test + public void testWriteChunk() throws IOException, + NoSuchAlgorithmException { + String containerName = OzoneUtils.getRequestID(); + String keyName = OzoneUtils.getRequestID(); + Pipeline pipeline = createSingleNodePipeline(containerName); + writeChunkHelper(containerName, keyName, pipeline); } /** @@ -389,7 +409,7 @@ public class TestContainerPersistence { chunkManager.writeChunk(pipeline, keyName, info, data); try { chunkManager.writeChunk(pipeline, keyName, info, data); - } catch(IOException ex) { + } catch (IOException ex) { Assert.assertTrue(ex.getMessage().contains( "Rejecting write chunk request. OverWrite flag required.")); } @@ -469,4 +489,116 @@ public class TestContainerPersistence { exception.expectMessage("Unable to find the chunk file."); chunkManager.readChunk(pipeline, keyName, info); } + + /** + * Tests a put key and read key. + * + * @throws IOException + * @throws NoSuchAlgorithmException + */ + @Test + public void testPutKey() throws IOException, NoSuchAlgorithmException { + String containerName = OzoneUtils.getRequestID(); + String keyName = OzoneUtils.getRequestID(); + Pipeline pipeline = createSingleNodePipeline(containerName); + ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline); + KeyData keyData = new KeyData(containerName, keyName); + List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>(); + chunkList.add(info.getProtoBufMessage()); + keyData.setChunks(chunkList); + keyManager.putKey(pipeline, keyData); + KeyData readKeyData = keyManager.getKey(keyData); + ChunkInfo readChunk = + ChunkInfo.getFromProtoBuf(readKeyData.getChunks().get(0)); + Assert.assertEquals(info.getChecksum(), readChunk.getChecksum()); + } + + /** + * Tests a put key and read key. + * + * @throws IOException + * @throws NoSuchAlgorithmException + */ + @Test + public void testPutKeyWithLotsOfChunks() throws IOException, + NoSuchAlgorithmException { + final int chunkCount = 1024; + final int datalen = 1024; + String containerName = OzoneUtils.getRequestID(); + String keyName = OzoneUtils.getRequestID(); + Pipeline pipeline = createSingleNodePipeline(containerName); + List<ChunkInfo> chunkList = new LinkedList<>(); + ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline); + chunkList.add(info); + for (int x = 1; x < chunkCount; x++) { + info = getChunk(keyName, x, x * datalen, datalen); + byte[] data = getData(datalen); + setDataChecksum(info, data); + chunkManager.writeChunk(pipeline, keyName, info, data); + chunkList.add(info); + } + + KeyData keyData = new KeyData(containerName, keyName); + List<ContainerProtos.ChunkInfo> chunkProtoList = new LinkedList<>(); + for (ChunkInfo i : chunkList) { + chunkProtoList.add(i.getProtoBufMessage()); + } + keyData.setChunks(chunkProtoList); + keyManager.putKey(pipeline, keyData); + KeyData readKeyData = keyManager.getKey(keyData); + ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1); + ChunkInfo readChunk = + ChunkInfo.getFromProtoBuf(readKeyData.getChunks().get(readKeyData + .getChunks().size() - 1)); + Assert.assertEquals(lastChunk.getChecksum(), readChunk.getChecksum()); + } + + /** + * Deletes a key and tries to read it back. + * + * @throws IOException + * @throws NoSuchAlgorithmException + */ + @Test + public void testDeleteKey() throws IOException, NoSuchAlgorithmException { + String containerName = OzoneUtils.getRequestID(); + String keyName = OzoneUtils.getRequestID(); + Pipeline pipeline = createSingleNodePipeline(containerName); + ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline); + KeyData keyData = new KeyData(containerName, keyName); + List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>(); + chunkList.add(info.getProtoBufMessage()); + keyData.setChunks(chunkList); + keyManager.putKey(pipeline, keyData); + keyManager.deleteKey(pipeline, keyName); + exception.expect(IOException.class); + exception.expectMessage("Unable to find the key."); + keyManager.getKey(keyData); + } + + /** + * Tries to Deletes a key twice. + * + * @throws IOException + * @throws NoSuchAlgorithmException + */ + @Test + public void testDeleteKeyTwice() throws IOException, + NoSuchAlgorithmException { + String containerName = OzoneUtils.getRequestID(); + String keyName = OzoneUtils.getRequestID(); + Pipeline pipeline = createSingleNodePipeline(containerName); + ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline); + KeyData keyData = new KeyData(containerName, keyName); + List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>(); + chunkList.add(info.getProtoBufMessage()); + keyData.setChunks(chunkList); + keyManager.putKey(pipeline, keyData); + keyManager.deleteKey(pipeline, keyName); + exception.expect(IOException.class); + exception.expectMessage("Unable to find the key."); + keyManager.deleteKey(pipeline, keyName); + } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3044db4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index b3c7a77..4a53f9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -135,6 +135,29 @@ public class TestOzoneContainer { Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + // Put Key + ContainerProtos.ContainerCommandRequestProto putKeyRequest = + ContainerTestHelper.getPutKeyRequest(writeChunkRequest.getWriteChunk()); + + response = client.sendCommand(putKeyRequest); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + + // Get Key + request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey()); + response = client.sendCommand(request); + ContainerTestHelper.verifyGetKey(request, response); + + + // Delete Key + request = + ContainerTestHelper.getDeleteKeyRequest(putKeyRequest.getPutKey()); + response = client.sendCommand(request); + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); + //Delete Chunk request = ContainerTestHelper.getDeleteChunkRequest(writeChunkRequest .getWriteChunk());
