http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java new file mode 100644 index 0000000..b9ec462 --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.storage; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.cblock.CBlockConfigKeys; +import org.apache.hadoop.cblock.exception.CBlockException; +import org.apache.hadoop.cblock.meta.ContainerDescriptor; +import org.apache.hadoop.cblock.meta.VolumeDescriptor; +import org.apache.hadoop.cblock.meta.VolumeInfo; +import org.apache.hadoop.cblock.proto.MountVolumeResponse; +import org.apache.hadoop.cblock.util.KeyUtil; +import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; +import org.apache.hadoop.scm.client.ScmClient; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class maintains the key space of CBlock, more specifically, the + * volume to container mapping. The core data structure + * is a map from users to their volumes info, where volume info is a handler + * to a volume, containing information for IO on that volume and a storage + * client responsible for talking to the SCM. + */ +public class StorageManager { + private static final Logger LOGGER = + LoggerFactory.getLogger(StorageManager.class); + private final ScmClient storageClient; + private final int numThreads; + private static final int MAX_THREADS = + Runtime.getRuntime().availableProcessors() * 2; + private static final int MAX_QUEUE_CAPACITY = 1024; + private final String cblockId; + + /** + * We will NOT have the situation where same kv pair getting + * processed, but it is possible to have multiple kv pair being + * processed at same time. + * + * So using just ConcurrentHashMap should be sufficient + * + * Again since currently same user accessing from multiple places + * is not allowed, no need to consider concurrency of volume map + * within one user + */ + private ConcurrentHashMap<String, HashMap<String, VolumeDescriptor>> + user2VolumeMap; + // size of an underlying container. + // TODO : assuming all containers are of the same size + private long containerSizeB; + + public StorageManager(ScmClient storageClient, + OzoneConfiguration ozoneConfig, String cblockId) throws IOException { + this.storageClient = storageClient; + this.user2VolumeMap = new ConcurrentHashMap<>(); + this.containerSizeB = storageClient.getContainerSize(null); + this.numThreads = + ozoneConfig.getInt(CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE, + CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT); + this.cblockId = cblockId; + } + + /** + * This call will put the volume into in-memory map. + * + * more specifically, make the volume discoverable on jSCSI server + * and keep it's reference in-memory for look up. + * @param userName the user name of the volume. + * @param volumeName the name of the volume, + * @param volume a {@link VolumeDescriptor} object encapsulating the + * information about the volume. + */ + private void makeVolumeReady(String userName, String volumeName, + VolumeDescriptor volume) { + HashMap<String, VolumeDescriptor> userVolumes; + if (user2VolumeMap.containsKey(userName)) { + userVolumes = user2VolumeMap.get(userName); + } else { + userVolumes = new HashMap<>(); + user2VolumeMap.put(userName, userVolumes); + } + userVolumes.put(volumeName, volume); + } + + /** + * Called by CBlockManager to add volumes read from persistent store into + * memory, need to contact SCM to setup the reference to the containers given + * their id. + * + * Only for failover process where container meta info is read from + * persistent store, and containers themselves are alive. + * + * TODO : Currently, this method is not being called as failover process + * is not implemented yet. + * + * @param volumeDescriptor a {@link VolumeDescriptor} object encapsulating + * the information about a volume. + * @throws IOException when adding the volume failed. e.g. volume already + * exist, or no more container available. + */ + public synchronized void addVolume(VolumeDescriptor volumeDescriptor) + throws IOException{ + String userName = volumeDescriptor.getUserName(); + String volumeName = volumeDescriptor.getVolumeName(); + LOGGER.info("addVolume:" + userName + ":" + volumeName); + if (user2VolumeMap.containsKey(userName) + && user2VolumeMap.get(userName).containsKey(volumeName)) { + throw new CBlockException("Volume already exist for " + + userName + ":" + volumeName); + } + // the container ids are read from levelDB, setting up the + // container handlers here. + String[] containerIds = volumeDescriptor.getContainerIDs(); + + for (String containerId : containerIds) { + try { + Pipeline pipeline = storageClient.getContainer(containerId); + ContainerDescriptor containerDescriptor = + new ContainerDescriptor(containerId); + containerDescriptor.setPipeline(pipeline); + volumeDescriptor.addContainer(containerDescriptor); + } catch (IOException e) { + LOGGER.error("Getting container failed! Container:{} error:{}", + containerId, e); + throw e; + } + } + // now ready to put into in-memory map. + makeVolumeReady(userName, volumeName, volumeDescriptor); + } + + private class CreateContainerTask implements Runnable { + private final VolumeDescriptor volume; + private final int containerIdx; + private final ArrayList<String> containerIds; + private final AtomicInteger numFailed; + + CreateContainerTask(VolumeDescriptor volume, int containerIdx, + ArrayList<String> containerIds, + AtomicInteger numFailed) { + this.volume = volume; + this.containerIdx = containerIdx; + this.containerIds = containerIds; + this.numFailed = numFailed; + } + + /** + * When an object implementing interface <code>Runnable</code> is used + * to create a thread, starting the thread causes the object's + * <code>run</code> method to be called in that separately executing + * thread. + * <p> + * The general contract of the method <code>run</code> is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + public void run() { + ContainerDescriptor container = null; + try { + Pipeline pipeline = storageClient.createContainer( + HdslProtos.ReplicationType.STAND_ALONE, + HdslProtos.ReplicationFactor.ONE, + KeyUtil.getContainerName(volume.getUserName(), + volume.getVolumeName(), containerIdx), cblockId); + + container = new ContainerDescriptor(pipeline.getContainerName()); + + container.setPipeline(pipeline); + container.setContainerIndex(containerIdx); + volume.addContainer(container); + containerIds.set(containerIdx, container.getContainerID()); + } catch (Exception e) { + numFailed.incrementAndGet(); + if (container != null) { + LOGGER.error("Error creating container Container:{}:" + + " index:{} error:{}", container.getContainerID(), + containerIdx, e); + } else { + LOGGER.error("Error creating container.", e); + } + } + } + } + + private boolean createVolumeContainers(VolumeDescriptor volume) { + ArrayList<String> containerIds = new ArrayList<>(); + ThreadPoolExecutor executor = new ThreadPoolExecutor( + Math.min(numThreads, MAX_THREADS), + MAX_THREADS, 1, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY), + new ThreadPoolExecutor.CallerRunsPolicy()); + + AtomicInteger numFailedCreates = new AtomicInteger(0); + long allocatedSize = 0; + int containerIdx = 0; + while (allocatedSize < volume.getVolumeSize()) { + // adding null to allocate space in ArrayList + containerIds.add(containerIdx, null); + Runnable task = new CreateContainerTask(volume, containerIdx, + containerIds, numFailedCreates); + executor.submit(task); + allocatedSize += containerSizeB; + containerIdx += 1; + } + + // issue the command and then wait for it to finish + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOGGER.error("Error creating volume:{} error:{}", + volume.getVolumeName(), e); + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + + volume.setContainerIDs(containerIds); + return numFailedCreates.get() == 0; + } + + private void deleteContainer(String containerID, boolean force) { + try { + Pipeline pipeline = storageClient.getContainer(containerID); + storageClient.deleteContainer(pipeline, force); + } catch (Exception e) { + LOGGER.error("Error deleting container Container:{} error:{}", + containerID, e); + } + } + + private void deleteVolumeContainers(List<String> containers, boolean force) + throws CBlockException { + ThreadPoolExecutor executor = new ThreadPoolExecutor( + Math.min(numThreads, MAX_THREADS), + MAX_THREADS, 1, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY), + new ThreadPoolExecutor.CallerRunsPolicy()); + + for (String deleteContainer : containers) { + if (deleteContainer != null) { + Runnable task = () -> deleteContainer(deleteContainer, force); + executor.submit(task); + } + } + + // issue the command and then wait for it to finish + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOGGER.error("Error deleting containers error:{}", e); + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + /** + * Called by CBlock server when creating a fresh volume. The core + * logic is adding needed information into in-memory meta data. + * + * @param userName the user name of the volume. + * @param volumeName the name of the volume. + * @param volumeSize the size of the volume. + * @param blockSize the block size of the volume. + * @throws CBlockException when the volume can not be created. + */ + public synchronized void createVolume(String userName, String volumeName, + long volumeSize, int blockSize) throws CBlockException { + LOGGER.debug("createVolume:" + userName + ":" + volumeName); + if (user2VolumeMap.containsKey(userName) + && user2VolumeMap.get(userName).containsKey(volumeName)) { + throw new CBlockException("Volume already exist for " + + userName + ":" + volumeName); + } + if (volumeSize < blockSize) { + throw new CBlockException("Volume size smaller than block size? " + + "volume size:" + volumeSize + " block size:" + blockSize); + } + VolumeDescriptor volume + = new VolumeDescriptor(userName, volumeName, volumeSize, blockSize); + boolean success = createVolumeContainers(volume); + if (!success) { + // cleanup the containers and throw the exception + deleteVolumeContainers(volume.getContainerIDsList(), true); + throw new CBlockException("Error when creating volume:" + volumeName); + } + makeVolumeReady(userName, volumeName, volume); + } + + /** + * Called by CBlock server to delete a specific volume. Mainly + * to check whether it can be deleted, and remove it from in-memory meta + * data. + * + * @param userName the user name of the volume. + * @param volumeName the name of the volume. + * @param force if set to false, only delete volume it is empty, otherwise + * throw exception. if set to true, delete regardless. + * @throws CBlockException when the volume can not be deleted. + */ + public synchronized void deleteVolume(String userName, String volumeName, + boolean force) throws CBlockException { + if (!user2VolumeMap.containsKey(userName) + || !user2VolumeMap.get(userName).containsKey(volumeName)) { + throw new CBlockException("Deleting non-exist volume " + + userName + ":" + volumeName); + } + if (!force && !user2VolumeMap.get(userName).get(volumeName).isEmpty()) { + throw new CBlockException("Deleting a non-empty volume without force!"); + } + VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName); + deleteVolumeContainers(volume.getContainerIDsList(), force); + if (user2VolumeMap.get(userName).size() == 0) { + user2VolumeMap.remove(userName); + } + } + + /** + * Called by CBlock server to get information of a specific volume. + * + * @param userName the user name of the volume. + * @param volumeName the name of the volume. + * @return a {@link VolumeInfo} object encapsulating the information of the + * volume. + * @throws CBlockException when the information can not be retrieved. + */ + public synchronized VolumeInfo infoVolume(String userName, String volumeName) + throws CBlockException { + if (!user2VolumeMap.containsKey(userName) + || !user2VolumeMap.get(userName).containsKey(volumeName)) { + throw new CBlockException("Getting info for non-exist volume " + + userName + ":" + volumeName); + } + return user2VolumeMap.get(userName).get(volumeName).getInfo(); + } + + /** + * Called by CBlock server to check whether the given volume can be + * mounted, i.e. whether it can be found in the meta data. + * + * return a {@link MountVolumeResponse} with isValid flag to indicate + * whether the volume can be mounted or not. + * + * @param userName the user name of the volume. + * @param volumeName the name of the volume + * @return a {@link MountVolumeResponse} object encapsulating whether the + * volume is valid, and if yes, the requried information for client to + * read/write the volume. + */ + public synchronized MountVolumeResponse isVolumeValid( + String userName, String volumeName) { + if (!user2VolumeMap.containsKey(userName) + || !user2VolumeMap.get(userName).containsKey(volumeName)) { + // in the case of invalid volume, no need to set any value other than + // isValid flag. + return new MountVolumeResponse(false, null, null, 0, 0, null, null); + } + VolumeDescriptor volume = user2VolumeMap.get(userName).get(volumeName); + return new MountVolumeResponse(true, userName, + volumeName, volume.getVolumeSize(), volume.getBlockSize(), + volume.getContainerPipelines(), volume.getPipelines()); + } + + /** + * Called by CBlock manager to list all volumes. + * + * @param userName the userName whose volume to be listed, if set to null, + * all volumes will be listed. + * @return a list of {@link VolumeDescriptor} representing all volumes + * requested. + */ + public synchronized List<VolumeDescriptor> getAllVolume(String userName) { + ArrayList<VolumeDescriptor> allVolumes = new ArrayList<>(); + if (userName == null) { + for (Map.Entry<String, HashMap<String, VolumeDescriptor>> entry + : user2VolumeMap.entrySet()) { + allVolumes.addAll(entry.getValue().values()); + } + } else { + if (user2VolumeMap.containsKey(userName)) { + allVolumes.addAll(user2VolumeMap.get(userName).values()); + } + } + return allVolumes; + } + + /** + * Only for testing the behavior of create/delete volumes. + */ + @VisibleForTesting + public VolumeDescriptor getVolume(String userName, String volumeName) { + if (!user2VolumeMap.containsKey(userName) + || !user2VolumeMap.get(userName).containsKey(volumeName)) { + return null; + } + return user2VolumeMap.get(userName).get(volumeName); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java new file mode 100644 index 0000000..4426e6d --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/storage/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.storage; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java new file mode 100644 index 0000000..beb9e32 --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/KeyUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.util; + +/** + * A simply class that generates key mappings. (e.g. from (userName, volumeName) + * pair to a single string volumeKey. + */ +public final class KeyUtil { + public static String getVolumeKey(String userName, String volumeName) { + return userName + ":" + volumeName; + } + + public static String getContainerName(String userName, String volumeName, + int containerID) { + return getVolumeKey(userName, volumeName) + "#" + containerID; + } + + public static String getUserNameFromVolumeKey(String key) { + return key.split(":")[0]; + } + + public static String getVolumeFromVolumeKey(String key) { + return key.split(":")[1]; + } + + public static boolean isValidVolumeKey(String key) { + return key.contains(":"); + } + + private KeyUtil() { + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java new file mode 100644 index 0000000..5b9aa0c --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/util/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.util; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto b/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto new file mode 100644 index 0000000..160b254 --- /dev/null +++ b/hadoop-cblock/server/src/main/proto/CBlockClientServerProtocol.proto @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are private and unstable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *unstable* .proto interface. + */ +option java_package = "org.apache.hadoop.cblock.protocol.proto"; +option java_outer_classname = "CBlockClientServerProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.cblock; + +import "hdsl.proto"; +import "CBlockServiceProtocol.proto"; +/** +* This message is sent from CBlock client side to CBlock server to +* mount a volume specified by owner name and volume name. +* +* Right now, this is the only communication between client and server. +* After the volume is mounted, CBlock client will talk to containers +* by itself, nothing to do with CBlock server. +*/ +message MountVolumeRequestProto { + required string userName = 1; + required string volumeName = 2; +} + +/** +* This message is sent from CBlock server to CBlock client as response +* of mount a volume. It checks the whether the volume is valid to access +* at all.(e.g. volume exist) +* +* And include enough information (volume size, block size, list of +* containers for this volume) for client side to perform read/write on +* the volume. +*/ +message MountVolumeResponseProto { + required bool isValid = 1; + optional string userName = 2; + optional string volumeName = 3; + optional uint64 volumeSize = 4; + optional uint32 blockSize = 5; + repeated ContainerIDProto allContainerIDs = 6; +} + +/** +* This message include ID of container which can be used to locate the +* container. Since the order of containers needs to be maintained, also +* includes a index field to verify the correctness of the order. +*/ +message ContainerIDProto { + required string containerID = 1; + required uint64 index = 2; + // making pipeline optional to be compatible with exisiting tests + optional hadoop.hdsl.Pipeline pipeline = 3; +} + + +message ListVolumesRequestProto { + +} + +message ListVolumesResponseProto { + repeated VolumeInfoProto volumeEntry = 1; +} + + +service CBlockClientServerProtocolService { + /** + * mount the volume. + */ + rpc mountVolume(MountVolumeRequestProto) returns (MountVolumeResponseProto); + + rpc listVolumes(ListVolumesRequestProto) returns(ListVolumesResponseProto); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto b/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto new file mode 100644 index 0000000..36e4b59 --- /dev/null +++ b/hadoop-cblock/server/src/main/proto/CBlockServiceProtocol.proto @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are private and unstable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *unstable* .proto interface. + */ + +option java_package = "org.apache.hadoop.cblock.protocol.proto"; +option java_outer_classname = "CBlockServiceProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.cblock; + +/** +* This message is sent to CBlock server to create a volume. Creating +* volume requries four parameters: owner of the volume, name of the volume +* size of volume and block size of the volume. +*/ +message CreateVolumeRequestProto { + required string userName = 1; + required string volumeName = 2; + required uint64 volumeSize = 3; + optional uint32 blockSize = 4 [default = 4096]; +} + +/** +* Empty response message. +*/ +message CreateVolumeResponseProto { + +} + +/** +* This message is sent to CBlock server to delete a volume. The volume +* is specified by owner name and volume name. If force is set to +* false, volume will be deleted only if it is empty. Otherwise delete it +* regardless. +*/ +message DeleteVolumeRequestProto { + required string userName = 1; + required string volumeName = 2; + optional bool force = 3; +} + +/** +* Empty response message. +*/ +message DeleteVolumeResponseProto { + +} + +/** +* This message is sent to CBlock server to request info of a volume. The +* volume is specified by owner name and volume name. +*/ +message InfoVolumeRequestProto { + required string userName = 1; + required string volumeName = 2; +} + +/** +* This message describes the information of a volume. +* Currently, the info includes the volume creation parameters and a number +* as the usage of the volume, in terms of number of bytes. +*/ +message VolumeInfoProto { + required string userName = 1; + required string volumeName = 2; + required uint64 volumeSize = 3; + required uint64 blockSize = 4; + optional uint64 usage = 5; + // TODO : potentially volume ACL +} + +/** +* This message is sent from CBlock server as response of info volume request. +*/ +message InfoVolumeResponseProto { + optional VolumeInfoProto volumeInfo = 1; +} + +/** +* This message is sent to CBlock server to list all available volume. +*/ +message ListVolumeRequestProto { + optional string userName = 1; +} + +/** +* This message is sent from CBlock server as response of volume listing. +*/ +message ListVolumeResponseProto { + repeated VolumeInfoProto volumeEntry = 1; +} + +service CBlockServiceProtocolService { + /** + * Create a volume. + */ + rpc createVolume(CreateVolumeRequestProto) returns(CreateVolumeResponseProto); + + /** + * Delete a volume. + */ + rpc deleteVolume(DeleteVolumeRequestProto) returns(DeleteVolumeResponseProto); + + /** + * Get info of a volume. + */ + rpc infoVolume(InfoVolumeRequestProto) returns(InfoVolumeResponseProto); + + /** + * List all available volumes. + */ + rpc listVolume(ListVolumeRequestProto) returns(ListVolumeResponseProto); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/resources/cblock-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/resources/cblock-default.xml b/hadoop-cblock/server/src/main/resources/cblock-default.xml new file mode 100644 index 0000000..ebf36cd --- /dev/null +++ b/hadoop-cblock/server/src/main/resources/cblock-default.xml @@ -0,0 +1,347 @@ +<?xml version="1.0" encoding="UTF-8"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- Do not modify this file directly. Instead, copy entries that you --> +<!-- wish to modify from this file into ozone-site.xml and change them --> +<!-- there. If ozone-site.xml does not already exist, create it. --> + +<!--Tags supported are OZONE, CBLOCK, MANAGEMENT, SECURITY, PERFORMANCE, --> +<!--DEBUG, CLIENT, SERVER, KSM, SCM, CRITICAL, RATIS, CONTAINER, REQUIRED, --> +<!--REST, STORAGE, PIPELINE, STANDALONE --> + +<configuration> + + <!--CBlock Settings--> + <property> + <name>dfs.cblock.block.buffer.flush.interval</name> + <value>60s</value> + <tag>CBLOCK, PERFORMANCE</tag> + <description> + Controls the frequency at this the local cache flushes the + blocks to the remote containers. + </description> + </property> + <property> + <name>dfs.cblock.cache.block.buffer.size</name> + <value>512</value> + <tag>CBLOCK, PERFORMANCE</tag> + <description> + Size of the local cache for blocks. So cache size will be block + size multiplied by this number. + </description> + </property> + <property> + <name>dfs.cblock.cache.core.min.pool.size</name> + <value>16</value> + <tag>CBLOCK, PERFORMANCE</tag> + <description> + A minimum number of threads in the pool that cBlock cache will + use for the background I/O to remote containers. + </description> + </property> + <property> + <name>dfs.cblock.cache.max.pool.size</name> + <value>256</value> + <tag>CBLOCK, PERFORMANCE</tag> + <description> + Maximum number of threads in the pool that cBlock cache will + use for background I/O to remote containers. + </description> + </property> + <property> + <name>dfs.cblock.cache.keep.alive</name> + <value>60s</value> + <tag>CBLOCK, PERFORMANCE</tag> + <description> + If the cblock cache has no I/O, then the threads in the cache + pool are kept idle for this amount of time before shutting down. + </description> + </property> + <property> + <name>dfs.cblock.cache.leveldb.cache.size.mb</name> + <value>256</value> + <tag>CBLOCK, PERFORMANCE</tag> + <description> + The amount of physical memory allocated to the local cache. The + SCSI driver will allocate this much RAM cache instances. + </description> + </property> + <property> + <name>dfs.cblock.cache.max.retry</name> + <value>65536</value> + <tag>CBLOCK, PERFORMANCE</tag> + <description> + If the local cache is enabled then, CBlock writes to the local + cache when I/O happens. Then the background I/O threads write this + block to the remote containers. This value controls how many times the + background thread should attempt to do I/O to the remote containers + before giving up. + </description> + </property> + <property> + <name>dfs.cblock.cache.queue.size.in.kb</name> + <value>256</value> + <tag>CBLOCK, PERFORMANCE</tag> + <description> + Size of the in memory cache queue, that is flushed to local + disk. + </description> + </property> + <property> + <name>dfs.cblock.cache.thread.priority</name> + <value>5</value> + <tag>CBLOCK, PERFORMANCE</tag> + <description> + Priority of cache flusher thread, affecting the relative performance of + write and read. Supported values are 1, 5, 10. + Use 10 for high priority and 1 for low priority. + </description> + </property> + <property> + <name>dfs.cblock.container.size.gb</name> + <value>5</value> + <tag>CBLOCK, MANAGEMENT</tag> + <description> + The size of ozone container in the number of GBs. Note that + this is not setting container size for ozone. This setting is + instructing CBlock to manage containers at a standard size. + </description> + </property> + <property> + <name>dfs.cblock.disk.cache.path</name> + <value>${hadoop.tmp.dir}/cblockCacheDB</value> + <tag>CBLOCK, REQUIRED</tag> + <description> + The default path for the cblock local cache. If the cblock + local cache is enabled, then it must be set to a valid path. This cache + *should* be mapped to the fastest disk on a given machine, For example, + an SSD drive would be a good idea. Currently, all mounted disk on a + data node is mapped to a single path, so having a large number of IOPS + is essential. + </description> + </property> + <property> + <name>dfs.cblock.jscsi-address</name> + <value/> + <tag>CBLOCK, MANAGEMENT</tag> + <description> + The address that cblock will be bind to, should be a host:port + format, This setting is required for cblock server to start. + This address to be used by jscsi to mount volume. + </description> + </property> + <property> + <name>dfs.cblock.jscsi.cblock.server.address</name> + <value>127.0.0.1</value> + <tag>CBLOCK, MANAGEMENT</tag> + <description> + The address local jscsi server will use to talk to cblock manager. + </description> + </property> + <property> + <name>dfs.cblock.jscsi.port</name> + <value>9811</value> + <tag>CBLOCK, MANAGEMENT</tag> + <description> + The port on CBlockManager node for jSCSI to talk to. + </description> + </property> + <property> + <name>dfs.cblock.jscsi.rpc-bind-host</name> + <value>0.0.0.0</value> + <tag>CBLOCK, MANAGEMENT</tag> + <description> + The actual address the cblock jscsi rpc server will bind to. If + this optional address is set, it overrides only the hostname portion of + dfs.cblock.jscsi-address. + </description> + </property> + <property> + <name>dfs.cblock.jscsi.server.address</name> + <value>0.0.0.0</value> + <tag>CBLOCK, MANAGEMENT</tag> + <description> + The address that jscsi server will be running, it is nice have one + local jscsi server for each client(Linux JSCSI client) that tries to + mount cblock. + </description> + </property> + <property> + <name>dfs.cblock.manager.pool.size</name> + <value>16</value> + <tag>CBLOCK, PERFORMANCE</tag> + <description> + Number of active threads that cblock manager will use for container + operations. The maximum number of the threads are limited to the + processor count * 2. + </description> + </property> + <property> + <name>dfs.cblock.rpc.timeout</name> + <value>300s</value> + <tag>CBLOCK, MANAGEMENT</tag> + <description> + RPC timeout used for cblock CLI operations. When you + create very large disks, like 5TB, etc. The number of containers + allocated in the system is huge. It is will 5TB/5GB, which is 1000 + containers. The client CLI might timeout even though the cblock manager + creates the specified disk. This value allows the user to wait for a + longer period. + </description> + </property> + <property> + <name>dfs.cblock.scm.ipaddress</name> + <value>127.0.0.1</value> + <tag>CBLOCK, MANAGEMENT</tag> + <description> + IP address used by cblock to connect to SCM. + </description> + </property> + <property> + <name>dfs.cblock.scm.port</name> + <value>9860</value> + <tag>CBLOCK, MANAGEMENT</tag> + <description> + Port used by cblock to connect to SCM. + </description> + </property> + <property> + <name>dfs.cblock.service.handler.count</name> + <value>10</value> + <tag>CBLOCK, MANAGEMENT</tag> + <description> + Default number of handlers for CBlock service rpc. + </description> + </property> + <property> + <name>dfs.cblock.service.leveldb.path</name> + <value>${hadoop.tmp.dir}/cblock_server.dat</value> + <tag>CBLOCK, REQUIRED</tag> + <description> + Default path for the cblock meta data store. + </description> + </property> + <property> + <name>dfs.cblock.service.rpc-bind-host</name> + <value>0.0.0.0</value> + <tag>CBLOCK, MANAGEMENT</tag> + <description> + The actual address the cblock service RPC server will bind to. + If the optional address is set, it overrides only the hostname portion of + dfs.cblock.servicerpc-address. + </description> + </property> + <property> + <name>dfs.cblock.servicerpc-address</name> + <value/> + <tag>CBLOCK, MANAGEMENT, REQUIRED</tag> + <description> + The address that cblock will be bind to, should be a host:port + format, this setting is required for cblock server to start. + This address is used for cblock management operations like create, delete, + info and list volumes + </description> + </property> + <property> + <name>dfs.cblock.short.circuit.io</name> + <value>false</value> + <tag>CBLOCK, PERFORMANCE</tag> + <description> + Enables use of the local cache in cblock. Enabling this allows + I/O against the local cache and background threads do actual I/O against + the + containers. + </description> + </property> + <property> + <name>dfs.cblock.trace.io</name> + <value>false</value> + <tag>CBLOCK, DEBUG</tag> + <description>Default flag for enabling trace io, Trace I/O logs all I/O with + hashes of + data. This is useful for detecting things like data corruption. + </description> + </property> + + <property> + <name>dfs.cblock.iscsi.advertised.ip</name> + <value>0.0.0.0</value> + <tag>CBLOCK</tag> + <description> + IP address returned during the iscsi discovery. + </description> + </property> + + <property> + <name>dfs.cblock.iscsi.advertised.port</name> + <value>3260</value> + <tag>CBLOCK</tag> + <description> + TCP port returned during the iscsi discovery. + </description> + </property> + + <property> + <name>dfs.cblock.kubernetes.dynamic-provisioner.enabled</name> + <value>false</value> + <tag>CBLOCK, KUBERNETES</tag> + <description>Flag to enable automatic creation of cblocks and + kubernetes PersitentVolumes in kubernetes environment. + </description> + </property> + + <property> + <name>dfs.cblock.kubernetes.cblock-user</name> + <value>iqn.2001-04.org.apache.hadoop</value> + <tag>CBLOCK, KUBERNETES</tag> + <description>CBlock user to use for the dynamic provisioner. + This user will own all of the auto-created cblocks. + </description> + </property> + + <property> + <name>dfs.cblock.kubernetes.configfile</name> + <value></value> + <tag>CBLOCK, KUBERNETES</tag> + <description>Location of the kubernetes configuration file + to access the kubernetes cluster. Not required inside a pod + as the default service account will be if this value is + empty. + </description> + </property> + + <property> + <name>dfs.cblock.iscsi.advertised.ip</name> + <value></value> + <tag>CBLOCK, KUBERNETES</tag> + <description>IP where the cblock target server is available + from the kubernetes nodes. Usually it's a cluster ip address + which is defined by a deployed Service. + </description> + </property> + + <property> + <name>dfs.cblock.iscsi.advertised.port</name> + <value>3260</value> + <tag>CBLOCK, KUBERNETES</tag> + <description>Port where the cblock target server is available + from the kubernetes nodes. Could be different from the + listening port if jscsi is behind a Service. + </description> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java new file mode 100644 index 0000000..e1eb36f --- /dev/null +++ b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +import com.google.common.primitives.Longs; +import static java.util.concurrent.TimeUnit.SECONDS; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; +import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; +import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneClassicCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.TimeoutException; + + +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_DISK_CACHE_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_TRACE_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL; + +/** + * Tests for Local Cache Buffer Manager. + */ +public class TestBufferManager { + private final static long GB = 1024 * 1024 * 1024; + private final static int KB = 1024; + private static MiniOzoneCluster cluster; + private static OzoneConfiguration config; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static XceiverClientManager xceiverClientManager; + + @BeforeClass + public static void init() throws IOException { + config = new OzoneConfiguration(); + String path = GenericTestUtils.getTempPath( + TestBufferManager.class.getSimpleName()); + config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + config.setBoolean(DFS_CBLOCK_TRACE_IO, true); + config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + cluster = new MiniOzoneClassicCluster.Builder(config) + .numDataNodes(1).setHandlerType("distributed").build(); + storageContainerLocationClient = cluster + .createStorageContainerLocationClient(); + xceiverClientManager = new XceiverClientManager(config); + } + + @AfterClass + public static void shutdown() throws InterruptedException { + if (cluster != null) { + cluster.shutdown(); + } + IOUtils.cleanupWithLogger(null, storageContainerLocationClient, cluster); + } + + /** + * createContainerAndGetPipeline creates a set of containers and returns the + * Pipelines that define those containers. + * + * @param count - Number of containers to create. + * @return - List of Pipelines. + * @throws IOException + */ + private List<Pipeline> createContainerAndGetPipeline(int count) + throws IOException { + List<Pipeline> containerPipelines = new LinkedList<>(); + for (int x = 0; x < count; x++) { + String traceID = "trace" + RandomStringUtils.randomNumeric(4); + String containerName = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline = + storageContainerLocationClient.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerName, "CBLOCK"); + XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); + ContainerProtocolCalls.createContainer(client, traceID); + // This step is needed since we set private data on pipelines, when we + // read the list from CBlockServer. So we mimic that action here. + pipeline.setData(Longs.toByteArray(x)); + containerPipelines.add(pipeline); + xceiverClientManager.releaseClient(client); + } + return containerPipelines; + } + + /** + * This test writes some block to the cache and then shuts down the cache. + * The cache is then restarted to check that the + * correct number of blocks are read from Dirty Log + * + * @throws IOException + */ + @Test + public void testEmptyBlockBufferHandling() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + String path = GenericTestUtils + .getTempPath(TestBufferManager.class.getSimpleName() + + RandomStringUtils.randomNumeric(4)); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + List<Pipeline> pipelines = createContainerAndGetPipeline(10); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(pipelines) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + // Write data to the cache + cache.put(1, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(1, metrics.getNumWriteOps()); + cache.put(2, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(2, metrics.getNumWriteOps()); + + // Store the previous block buffer position + Assert.assertEquals(2, metrics.getNumBlockBufferUpdates()); + // Simulate a shutdown by closing the cache + cache.close(); + Thread.sleep(1000); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); + Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE), + metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); + Assert.assertEquals(0, metrics.getNumInterruptedBufferWaits()); + + // Restart cache and check that right number of entries are read + CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher newFlusher = + new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, newMetrics); + CBlockLocalCache newCache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(pipelines) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(newFlusher) + .setCBlockTargetMetrics(newMetrics) + .build(); + newCache.start(); + Thread fllushListenerThread = new Thread(newFlusher); + fllushListenerThread.setDaemon(true); + fllushListenerThread.start(); + + Thread.sleep(5000); + Assert.assertEquals(metrics.getNumBlockBufferUpdates(), + newMetrics.getNumDirtyLogBlockRead()); + Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead() + * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads()); + // Now shutdown again, nothing should be flushed + newFlusher.shutdown(); + Assert.assertEquals(0, newMetrics.getNumBlockBufferUpdates()); + Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten()); + } + + @Test + public void testPeriodicFlush() throws IOException, + InterruptedException, TimeoutException{ + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + String path = GenericTestUtils + .getTempPath(TestBufferManager.class.getSimpleName() + + RandomStringUtils.randomNumeric(4)); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + flushTestConfig + .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 5, SECONDS); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(createContainerAndGetPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + Thread.sleep(8000); + // Ticks will be at 5s, 10s and so on, so this count should be 1 + Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); + // Nothing pushed to cache, so nothing should be written + Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); + cache.close(); + // After close, another trigger should happen but still no data written + Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered()); + Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); + Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); + } + + @Test + public void testSingleBufferFlush() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + String path = GenericTestUtils + .getTempPath(TestBufferManager.class.getSimpleName() + + RandomStringUtils.randomNumeric(4)); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(createContainerAndGetPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + + for (int i = 0; i < 511; i++) { + cache.put(i, data.getBytes(StandardCharsets.UTF_8)); + } + // After writing 511 block no flush should happen + Assert.assertEquals(0, metrics.getNumBlockBufferFlushTriggered()); + Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); + + + // After one more block it should + cache.put(512, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); + Thread.sleep(1000); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); + cache.close(); + Assert.assertEquals(512 * (Long.SIZE / Byte.SIZE), + metrics.getNumBytesDirtyLogWritten()); + } + + @Test + public void testMultipleBuffersFlush() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + String path = GenericTestUtils + .getTempPath(TestBufferManager.class.getSimpleName() + + RandomStringUtils.randomNumeric(4)); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + flushTestConfig + .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 120, SECONDS); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(createContainerAndGetPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + + for (int i = 0; i < 4; i++) { + for (int j = 0; j < 512; j++) { + cache.put(i * 512 + j, data.getBytes(StandardCharsets.UTF_8)); + } + // Flush should be triggered after every 512 block write + Assert.assertEquals(i + 1, metrics.getNumBlockBufferFlushTriggered()); + } + Assert.assertEquals(0, metrics.getNumIllegalDirtyLogFiles()); + Assert.assertEquals(0, metrics.getNumFailedDirtyLogFileDeletes()); + cache.close(); + Assert.assertEquals(4 * 512 * (Long.SIZE / Byte.SIZE), + metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(5, metrics.getNumBlockBufferFlushTriggered()); + Assert.assertEquals(4, metrics.getNumBlockBufferFlushCompleted()); + } + + @Test + public void testSingleBlockFlush() throws IOException, + InterruptedException, TimeoutException{ + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + String path = GenericTestUtils + .getTempPath(TestBufferManager.class.getSimpleName() + + RandomStringUtils.randomNumeric(4)); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + flushTestConfig + .setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, + 5, SECONDS); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(createContainerAndGetPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + cache.put(0, data.getBytes(StandardCharsets.UTF_8)); + Thread.sleep(8000); + // Ticks will be at 5s, 10s and so on, so this count should be 1 + Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); + // 1 block written to cache, which should be flushed + Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); + cache.close(); + // After close, another trigger should happen but no data should be written + Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered()); + Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); + Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); + } + + @Test + public void testRepeatedBlockWrites() throws IOException, + InterruptedException, TimeoutException{ + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + String path = GenericTestUtils + .getTempPath(TestBufferManager.class.getSimpleName() + + RandomStringUtils.randomNumeric(4)); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(createContainerAndGetPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + Thread fllushListenerThread = new Thread(flusher); + fllushListenerThread.setDaemon(true); + fllushListenerThread.start(); + cache.start(); + for (int i = 0; i < 512; i++) { + cache.put(i, data.getBytes(StandardCharsets.UTF_8)); + } + Assert.assertEquals(512, metrics.getNumWriteOps()); + Assert.assertEquals(512, metrics.getNumBlockBufferUpdates()); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); + Thread.sleep(5000); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); + + + for (int i = 0; i < 512; i++) { + cache.put(i, data.getBytes(StandardCharsets.UTF_8)); + } + Assert.assertEquals(1024, metrics.getNumWriteOps()); + Assert.assertEquals(1024, metrics.getNumBlockBufferUpdates()); + Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered()); + + Thread.sleep(5000); + Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); + Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks()); + Assert.assertEquals(2, metrics.getNumBlockBufferFlushCompleted()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java new file mode 100644 index 0000000..4139ac6 --- /dev/null +++ b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockConfigurationFields.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +import org.apache.hadoop.conf.TestConfigurationFieldsBase; + +/** + * Tests if configuration constants documented in ozone-defaults.xml. + */ +public class TestCBlockConfigurationFields extends TestConfigurationFieldsBase { + + @Override + public void initializeMemberVariables() { + xmlFilename = new String("cblock-default.xml"); + configurationClasses = + new Class[] {CBlockConfigKeys.class}; + errorIfMissingConfigProps = true; + errorIfMissingXmlProps = true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java new file mode 100644 index 0000000..d995ba6 --- /dev/null +++ b/hadoop-cblock/server/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +import com.google.common.primitives.Longs; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; +import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; +import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.MiniOzoneClassicCluster; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationFactor; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationType; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.PipelineChannel; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.protocolPB + .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_DISK_CACHE_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_TRACE_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; + +/** + * Tests for Cblock read write functionality. + */ +public class TestCBlockReadWrite { + private final static long GB = 1024 * 1024 * 1024; + private final static int KB = 1024; + private static MiniOzoneCluster cluster; + private static OzoneConfiguration config; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static XceiverClientManager xceiverClientManager; + + @BeforeClass + public static void init() throws IOException { + config = new OzoneConfiguration(); + String path = GenericTestUtils + .getTempPath(TestCBlockReadWrite.class.getSimpleName()); + config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + config.setBoolean(DFS_CBLOCK_TRACE_IO, true); + config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + cluster = new MiniOzoneClassicCluster.Builder(config) + .numDataNodes(1) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); + storageContainerLocationClient = cluster + .createStorageContainerLocationClient(); + xceiverClientManager = new XceiverClientManager(config); + } + + @AfterClass + public static void shutdown() throws InterruptedException { + if (cluster != null) { + cluster.shutdown(); + } + IOUtils.cleanupWithLogger(null, storageContainerLocationClient, cluster); + } + + /** + * getContainerPipelines creates a set of containers and returns the + * Pipelines that define those containers. + * + * @param count - Number of containers to create. + * @return - List of Pipelines. + * @throws IOException throws Exception + */ + private List<Pipeline> getContainerPipeline(int count) throws IOException { + List<Pipeline> containerPipelines = new LinkedList<>(); + for (int x = 0; x < count; x++) { + String traceID = "trace" + RandomStringUtils.randomNumeric(4); + String containerName = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline = + storageContainerLocationClient.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerName, "CBLOCK"); + XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); + ContainerProtocolCalls.createContainer(client, traceID); + // This step is needed since we set private data on pipelines, when we + // read the list from CBlockServer. So we mimic that action here. + pipeline.setData(Longs.toByteArray(x)); + containerPipelines.add(pipeline); + } + return containerPipelines; + } + + /** + * This test creates a cache and performs a simple write / read. + * The operations are done by bypassing the cache. + * + * @throws IOException + */ + @Test + public void testDirectIO() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration cConfig = new OzoneConfiguration(); + cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); + cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + final long blockID = 0; + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + String dataHash = DigestUtils.sha256Hex(data); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(cConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(getContainerPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + Assert.assertFalse(cache.isShortCircuitIOEnabled()); + cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(1, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(1, metrics.getNumWriteOps()); + // Please note that this read is directly from remote container + LogicalBlock block = cache.get(blockID); + Assert.assertEquals(1, metrics.getNumReadOps()); + Assert.assertEquals(0, metrics.getNumReadCacheHits()); + Assert.assertEquals(1, metrics.getNumReadCacheMiss()); + Assert.assertEquals(0, metrics.getNumReadLostBlocks()); + Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); + + cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(2, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(2, metrics.getNumWriteOps()); + Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); + // Please note that this read is directly from remote container + block = cache.get(blockID + 1); + Assert.assertEquals(2, metrics.getNumReadOps()); + Assert.assertEquals(0, metrics.getNumReadCacheHits()); + Assert.assertEquals(2, metrics.getNumReadCacheMiss()); + Assert.assertEquals(0, metrics.getNumReadLostBlocks()); + String readHash = DigestUtils.sha256Hex(block.getData().array()); + Assert.assertEquals("File content does not match.", dataHash, readHash); + GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); + cache.close(); + } + + /** + * This test writes some block to the cache and then shuts down the cache + * The cache is then restarted with "short.circuit.io" disable to check + * that the blocks are read correctly from the container. + * + * @throws IOException + */ + @Test + public void testContainerWrites() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + String path = GenericTestUtils + .getTempPath(TestCBlockReadWrite.class.getSimpleName()); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + flushTestConfig.setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, 3, + TimeUnit.SECONDS); + XceiverClientManager xcm = new XceiverClientManager(flushTestConfig); + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + + int numUniqueBlocks = 4; + String[] data = new String[numUniqueBlocks]; + String[] dataHash = new String[numUniqueBlocks]; + for (int i = 0; i < numUniqueBlocks; i++) { + data[i] = RandomStringUtils.random(4 * KB); + dataHash[i] = DigestUtils.sha256Hex(data[i]); + } + + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xcm, metrics); + List<Pipeline> pipelines = getContainerPipeline(10); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(pipelines) + .setClientManager(xcm) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + Thread flushListenerThread = new Thread(flusher); + flushListenerThread.setDaemon(true); + flushListenerThread.start(); + Assert.assertTrue(cache.isShortCircuitIOEnabled()); + // Write data to the cache + for (int i = 0; i < 512; i++) { + cache.put(i, data[i % numUniqueBlocks].getBytes(StandardCharsets.UTF_8)); + } + // Close the cache and flush the data to the containers + cache.close(); + Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(512, metrics.getNumWriteOps()); + Thread.sleep(3000); + flusher.shutdown(); + Assert.assertTrue(metrics.getNumBlockBufferFlushTriggered() > 1); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); + Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); + Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks()); + Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); + // Now disable DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO and restart cache + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); + CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher newFlusher = + new ContainerCacheFlusher(flushTestConfig, xcm, newMetrics); + CBlockLocalCache newCache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(pipelines) + .setClientManager(xcm) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(newFlusher) + .setCBlockTargetMetrics(newMetrics) + .build(); + newCache.start(); + Assert.assertFalse(newCache.isShortCircuitIOEnabled()); + // this read will be from the container, also match the hash + for (int i = 0; i < 512; i++) { + LogicalBlock block = newCache.get(i); + String readHash = DigestUtils.sha256Hex(block.getData().array()); + Assert.assertEquals("File content does not match, for index:" + + i, dataHash[i % numUniqueBlocks], readHash); + } + Assert.assertEquals(0, newMetrics.getNumReadLostBlocks()); + Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks()); + newCache.close(); + newFlusher.shutdown(); + } + + @Test + public void testRetryLog() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + String path = GenericTestUtils + .getTempPath(TestCBlockReadWrite.class.getSimpleName()); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + flushTestConfig.setTimeDuration(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL, + 3, + TimeUnit.SECONDS); + + int numblocks = 10; + flushTestConfig.setInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, numblocks); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + + List<Pipeline> fakeContainerPipelines = new LinkedList<>(); + PipelineChannel pipelineChannel = new PipelineChannel("fake", + LifeCycleState.OPEN, ReplicationType.STAND_ALONE, ReplicationFactor.ONE, + "fake"); + Pipeline fakePipeline = new Pipeline("fake", pipelineChannel); + fakePipeline.setData(Longs.toByteArray(1)); + fakeContainerPipelines.add(fakePipeline); + + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(fakeContainerPipelines) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + Thread flushListenerThread = new Thread(flusher); + flushListenerThread.setDaemon(true); + flushListenerThread.start(); + + for (int i = 0; i < numblocks; i++) { + cache.put(i, data.getBytes(StandardCharsets.UTF_8)); + } + Assert.assertEquals(numblocks, metrics.getNumWriteOps()); + Thread.sleep(3000); + + // all the writes to the container will fail because of fake pipelines + Assert.assertEquals(numblocks, metrics.getNumDirtyLogBlockRead()); + Assert.assertTrue( + metrics.getNumWriteGenericExceptionRetryBlocks() >= numblocks); + Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks()); + Assert.assertEquals(0, metrics.getNumFailedRetryLogFileWrites()); + Assert.assertEquals(0, metrics.getNumFailedReleaseLevelDB()); + cache.close(); + flusher.shutdown(); + + // restart cache with correct pipelines, now blocks should be uploaded + // correctly + CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher newFlusher = + new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, newMetrics); + CBlockLocalCache newCache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(getContainerPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(newFlusher) + .setCBlockTargetMetrics(newMetrics) + .build(); + newCache.start(); + Thread newFlushListenerThread = new Thread(newFlusher); + newFlushListenerThread.setDaemon(true); + newFlushListenerThread.start(); + Thread.sleep(3000); + Assert.assertTrue(newMetrics.getNumRetryLogBlockRead() >= numblocks); + Assert.assertEquals(0, newMetrics.getNumWriteGenericExceptionRetryBlocks()); + Assert.assertEquals(0, newMetrics.getNumWriteIOExceptionRetryBlocks()); + Assert.assertEquals(0, newMetrics.getNumFailedReleaseLevelDB()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
