http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/pom.xml b/hadoop-cblock/server/pom.xml new file mode 100644 index 0000000..17af85d --- /dev/null +++ b/hadoop-cblock/server/pom.xml @@ -0,0 +1,169 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 +http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-cblock</artifactId> + <version>3.2.0-SNAPSHOT</version> + </parent> + <artifactId>hadoop-cblock-server</artifactId> + <version>3.2.0-SNAPSHOT</version> + <description>Apache Hadoop CBlock Server</description> + <name>Apache Hadoop CBlock Server</name> + <packaging>jar</packaging> + + <properties> + <hadoop.component>cblock</hadoop.component> + <is.hadoop.component>true</is.hadoop.component> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdsl-server-framework</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-ozone-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdsl-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdsl-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-ozone-integration-test</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.jscsi</groupId> + <artifactId>target</artifactId> + <version>2.6.0</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>io.kubernetes</groupId> + <artifactId>client-java</artifactId> + <version>1.0.0-beta1</version> + <exclusions> + <exclusion> + <groupId>io.swagger</groupId> + <artifactId>swagger-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.github.stefanbirkner</groupId> + <artifactId>system-rules</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + </dependencies> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/dynamicprovisioner/expected1-pv.json</exclude> + <exclude>src/test/resources/dynamicprovisioner/input1-pvc.json</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-maven-plugins</artifactId> + <executions> + <execution> + <id>compile-protoc</id> + <goals> + <goal>protoc</goal> + </goals> + <configuration> + <protocVersion>${protobuf.version}</protocVersion> + <protocCommand>${protoc.path}</protocCommand> + <imports> + <param> + ${basedir}/../../hadoop-common-project/hadoop-common/src/main/proto + </param> + <param> + ${basedir}/../../hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ + </param> + <param> + ${basedir}/../../hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ + </param> + <param> + ${basedir}/../../hadoop-hdsl/common/src/main/proto/ + </param> + <param>${basedir}/src/main/proto</param> + </imports> + <source> + <directory>${basedir}/src/main/proto</directory> + <includes> + <include>CBlockClientServerProtocol.proto</include> + <include>CBlockServiceProtocol.proto</include> + </includes> + </source> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <configuration> + <excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile> + </configuration> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java new file mode 100644 index 0000000..ec5d7d2 --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +import static java.lang.Thread.NORM_PRIORITY; + +/** + * This class contains constants for configuration keys used in CBlock. + */ +public final class CBlockConfigKeys { + + public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_KEY = + "dfs.cblock.servicerpc-address"; + public static final int DFS_CBLOCK_SERVICERPC_PORT_DEFAULT = + 9810; + public static final String DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT = + "0.0.0.0"; + + public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY = + "dfs.cblock.jscsi-address"; + + //The port on CBlockManager node for jSCSI to ask + public static final String DFS_CBLOCK_JSCSI_PORT_KEY = + "dfs.cblock.jscsi.port"; + public static final int DFS_CBLOCK_JSCSI_PORT_DEFAULT = + 9811; + + public static final String DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY = + "dfs.cblock.service.rpc-bind-host"; + public static final String DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY = + "dfs.cblock.jscsi.rpc-bind-host"; + + // default block size is 4KB + public static final int DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT = + 4096; + + public static final String DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY = + "dfs.cblock.service.handler.count"; + public static final int DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT = 10; + + public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY = + "dfs.cblock.service.leveldb.path"; + //TODO : find a better place + public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT = + "/tmp/cblock_levelDB.dat"; + + + public static final String DFS_CBLOCK_DISK_CACHE_PATH_KEY = + "dfs.cblock.disk.cache.path"; + public static final String DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT = + "/tmp/cblockCacheDB"; + /** + * Setting this flag to true makes the block layer compute a sha256 hash of + * the data and log that information along with block ID. This is very + * useful for doing trace based simulation of various workloads. Since it is + * computing a hash for each block this could be expensive, hence default + * is false. + */ + public static final String DFS_CBLOCK_TRACE_IO = "dfs.cblock.trace.io"; + public static final boolean DFS_CBLOCK_TRACE_IO_DEFAULT = false; + + public static final String DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO = + "dfs.cblock.short.circuit.io"; + public static final boolean DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT = + false; + + /** + * Cache size in 1000s of entries. 256 indicates 256 * 1024. + */ + public static final String DFS_CBLOCK_CACHE_QUEUE_SIZE_KB = + "dfs.cblock.cache.queue.size.in.kb"; + public static final int DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT = 256; + + /** + * Minimum Number of threads that cache pool will use for background I/O. + */ + public static final String DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE = + "dfs.cblock.cache.core.min.pool.size"; + public static final int DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT = 16; + + /** + * Maximum Number of threads that cache pool will use for background I/O. + */ + + public static final String DFS_CBLOCK_CACHE_MAX_POOL_SIZE = + "dfs.cblock.cache.max.pool.size"; + public static final int DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT = 256; + + /** + * Number of seconds to keep the Thread alive when it is idle. + */ + public static final String DFS_CBLOCK_CACHE_KEEP_ALIVE = + "dfs.cblock.cache.keep.alive"; + public static final String DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT = "60s"; + + /** + * Priority of cache flusher thread, affecting the relative performance of + * write and read. + */ + public static final String DFS_CBLOCK_CACHE_THREAD_PRIORITY = + "dfs.cblock.cache.thread.priority"; + public static final int DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT = + NORM_PRIORITY; + + /** + * Block Buffer size in terms of blockID entries, 512 means 512 blockIDs. + */ + public static final String DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE = + "dfs.cblock.cache.block.buffer.size"; + public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512; + + public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL = + "dfs.cblock.block.buffer.flush.interval"; + public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_DEFAULT = + "60s"; + + // jscsi server settings + public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY = + "dfs.cblock.jscsi.server.address"; + public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT = + "0.0.0.0"; + public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY = + "dfs.cblock.jscsi.cblock.server.address"; + public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT = + "127.0.0.1"; + + // to what address cblock server should talk to scm? + public static final String DFS_CBLOCK_SCM_IPADDRESS_KEY = + "dfs.cblock.scm.ipaddress"; + public static final String DFS_CBLOCK_SCM_IPADDRESS_DEFAULT = + "127.0.0.1"; + public static final String DFS_CBLOCK_SCM_PORT_KEY = + "dfs.cblock.scm.port"; + public static final int DFS_CBLOCK_SCM_PORT_DEFAULT = 9860; + + public static final String DFS_CBLOCK_CONTAINER_SIZE_GB_KEY = + "dfs.cblock.container.size.gb"; + public static final int DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT = + 5; + + // LevelDB cache file uses an off-heap cache in LevelDB of 256 MB. + public static final String DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY = + "dfs.cblock.cache.leveldb.cache.size.mb"; + public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256; + + /** + * Cache does an best case attempt to write a block to a container. + * At some point of time, we will need to handle the case where we did try + * 64K times and is till not able to write to the container. + * + * TODO: We will need cBlock Server to allow us to do a remapping of the + * block location in case of failures, at that point we should reduce the + * retry count to a more normal number. This is approximately 18 hours of + * retry. + */ + public static final String DFS_CBLOCK_CACHE_MAX_RETRY_KEY = + "dfs.cblock.cache.max.retry"; + public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT = + 64 * 1024; + + /** + * Cblock CLI configs. + */ + public static final String DFS_CBLOCK_MANAGER_POOL_SIZE = + "dfs.cblock.manager.pool.size"; + public static final int DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT = 16; + + /** + * currently the largest supported volume is about 8TB, which might take + * > 20 seconds to finish creating containers. thus set timeout to 30 sec. + */ + public static final String DFS_CBLOCK_RPC_TIMEOUT = + "dfs.cblock.rpc.timeout"; + public static final String DFS_CBLOCK_RPC_TIMEOUT_DEFAULT = "300s"; + + public static final String DFS_CBLOCK_ISCSI_ADVERTISED_IP = + "dfs.cblock.iscsi.advertised.ip"; + + public static final String DFS_CBLOCK_ISCSI_ADVERTISED_PORT = + "dfs.cblock.iscsi.advertised.port"; + + public static final int DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT = 3260; + + + public static final String + DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED + = "dfs.cblock.kubernetes.dynamic-provisioner.enabled"; + + public static final boolean + DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT = false; + + public static final String + DFS_CBLOCK_KUBERNETES_CBLOCK_USER = + "dfs.cblock.kubernetes.cblock-user"; + + public static final String + DFS_CBLOCK_KUBERNETES_CBLOCK_USER_DEFAULT = + "iqn.2001-04.org.apache.hadoop"; + + public static final String + DFS_CBLOCK_KUBERNETES_CONFIG_FILE_KEY = + "dfs.cblock.kubernetes.configfile"; + + private CBlockConfigKeys() { + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java new file mode 100644 index 0000000..788d01e --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingService; +import org.apache.hadoop.cblock.kubernetes.DynamicProvisioner; +import org.apache.hadoop.cblock.meta.VolumeDescriptor; +import org.apache.hadoop.cblock.meta.VolumeInfo; +import org.apache.hadoop.cblock.proto.CBlockClientProtocol; +import org.apache.hadoop.cblock.proto.CBlockServiceProtocol; +import org.apache.hadoop.cblock.proto.MountVolumeResponse; +import org.apache.hadoop.cblock.protocol.proto + .CBlockClientServerProtocolProtos; +import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos; +import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB; +import org.apache.hadoop.cblock.protocolPB + .CBlockClientServerProtocolServerSideTranslatorPB; +import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; +import org.apache.hadoop.cblock.protocolPB + .CBlockServiceProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.client.ContainerOperationClient; +import org.apache.hadoop.scm.client.ScmClient; +import org.apache.hadoop.cblock.storage.StorageManager; +import org.apache.hadoop.cblock.util.KeyUtil; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.scm.protocolPB + .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.utils.LevelDBStore; + +import static org.apache.hadoop.cblock.CblockUtils.getCblockServerRpcAddr; +import static org.apache.hadoop.cblock.CblockUtils.getCblockServiceRpcAddr; +import static org.apache.hadoop.ozone.web.util.ServerUtils + .updateRPCListenAddress; +import org.iq80.leveldb.DBIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CONTAINER_SIZE_GB_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SCM_IPADDRESS_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SCM_IPADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SCM_PORT_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SCM_PORT_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT; + + +/** + * The main entry point of CBlock operations, ALL the CBlock operations + * will go through this class. But NOTE that: + * + * volume operations (create/ + * delete/info) are: + * client -> CBlockManager -> StorageManager -> CBlock client + * + * IO operations (put/get block) are; + * client -> CBlock client -> container + * + */ +public class CBlockManager implements CBlockServiceProtocol, + CBlockClientProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(CBlockManager.class); + + private final RPC.Server cblockService; + private final RPC.Server cblockServer; + + private final StorageManager storageManager; + + private final LevelDBStore levelDBStore; + private final String dbPath; + + private final DynamicProvisioner kubernetesDynamicProvisioner; + + private Charset encoding = Charset.forName("UTF-8"); + + public CBlockManager(OzoneConfiguration conf, + ScmClient storageClient) throws IOException { + // Fix the cBlockManagerId generattion code here. Should support + // cBlockManager --init command which will generate a cBlockManagerId and + // persist it locally. + storageManager = + new StorageManager(storageClient, conf, "CBLOCK"); + + dbPath = conf.getTrimmed(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, + DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT); + levelDBStore = new LevelDBStore(new File(dbPath), true); + LOG.info("Try to load exising volume information"); + readFromPersistentStore(); + + RPC.setProtocolEngine(conf, CBlockServiceProtocolPB.class, + ProtobufRpcEngine.class); + RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class, + ProtobufRpcEngine.class); + // start service for client command-to-cblock server service + InetSocketAddress serviceRpcAddr = + getCblockServiceRpcAddr(conf); + BlockingService cblockProto = + CBlockServiceProtocolProtos + .CBlockServiceProtocolService + .newReflectiveBlockingService( + new CBlockServiceProtocolServerSideTranslatorPB(this) + ); + cblockService = startRpcServer(conf, CBlockServiceProtocolPB.class, + cblockProto, serviceRpcAddr, + DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY, + DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY, + DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT); + InetSocketAddress cblockServiceRpcAddress = + updateRPCListenAddress(conf, + DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, serviceRpcAddr, cblockService); + LOG.info("CBlock manager listening for client commands on: {}", + cblockServiceRpcAddress); + // now start service for cblock client-to-cblock server communication + + InetSocketAddress serverRpcAddr = + getCblockServerRpcAddr(conf); + BlockingService serverProto = + CBlockClientServerProtocolProtos + .CBlockClientServerProtocolService + .newReflectiveBlockingService( + new CBlockClientServerProtocolServerSideTranslatorPB(this) + ); + cblockServer = startRpcServer( + conf, CBlockClientServerProtocolPB.class, + serverProto, serverRpcAddr, + DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY, + DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY, + DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT); + InetSocketAddress cblockServerRpcAddress = + updateRPCListenAddress(conf, + DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, serverRpcAddr, cblockServer); + LOG.info("CBlock server listening for client commands on: {}", + cblockServerRpcAddress); + + if (conf.getBoolean(DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED, + DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT)) { + + kubernetesDynamicProvisioner = + new DynamicProvisioner(conf, storageManager); + kubernetesDynamicProvisioner.init(); + + } else { + kubernetesDynamicProvisioner = null; + } + } + + public void start() { + cblockService.start(); + cblockServer.start(); + if (kubernetesDynamicProvisioner != null) { + kubernetesDynamicProvisioner.start(); + } + LOG.info("CBlock manager started!"); + } + + public void stop() { + cblockService.stop(); + cblockServer.stop(); + if (kubernetesDynamicProvisioner != null) { + kubernetesDynamicProvisioner.stop(); + } + } + + public void join() { + try { + cblockService.join(); + cblockServer.join(); + } catch (InterruptedException e) { + LOG.error("Interrupted during join"); + Thread.currentThread().interrupt(); + } + } + + /** + * Starts an RPC server, if configured. + * + * @param conf configuration + * @param protocol RPC protocol provided by RPC server + * @param instance RPC protocol implementation instance + * @param addr configured address of RPC server + * @param bindHostKey configuration key for setting explicit bind host. If + * the property is not configured, then the bind host is taken from addr. + * @param handlerCountKey configuration key for RPC server handler count + * @param handlerCountDefault default RPC server handler count if unconfigured + * @return RPC server, or null if addr is null + * @throws IOException if there is an I/O error while creating RPC server + */ + private static RPC.Server startRpcServer(OzoneConfiguration conf, + Class<?> protocol, BlockingService instance, + InetSocketAddress addr, String bindHostKey, + String handlerCountKey, int handlerCountDefault) throws IOException { + if (addr == null) { + return null; + } + String bindHost = conf.getTrimmed(bindHostKey); + if (bindHost == null || bindHost.isEmpty()) { + bindHost = addr.getHostName(); + } + int numHandlers = conf.getInt(handlerCountKey, handlerCountDefault); + RPC.Server rpcServer = new RPC.Builder(conf) + .setProtocol(protocol) + .setInstance(instance) + .setBindAddress(bindHost) + .setPort(addr.getPort()) + .setNumHandlers(numHandlers) + .setVerbose(false) + .setSecretManager(null) + .build(); + return rpcServer; + } + + @Override + public synchronized MountVolumeResponse mountVolume( + String userName, String volumeName) throws IOException { + return storageManager.isVolumeValid(userName, volumeName); + } + + @Override + public List<VolumeInfo> listVolumes() throws IOException { + return listVolume(null); + } + + @Override + public synchronized void createVolume(String userName, String volumeName, + long volumeSize, int blockSize) throws IOException { + LOG.info("Create volume received: userName: {} volumeName: {} " + + "volumeSize: {} blockSize: {}", userName, volumeName, + volumeSize, blockSize); + // It is important to create in-memory representation of the + // volume first, then writes to persistent storage (levelDB) + // such that it is guaranteed that when there is an entry in + // levelDB, the volume is allocated. (more like a UNDO log fashion) + // TODO: what if creation failed? we allocated containers but lost + // the reference to the volume and all it's containers. How to release + // the containers? + storageManager.createVolume(userName, volumeName, volumeSize, blockSize); + VolumeDescriptor volume = storageManager.getVolume(userName, volumeName); + if (volume == null) { + throw new IOException("Volume creation failed!"); + } + String volumeKey = KeyUtil.getVolumeKey(userName, volumeName); + writeToPersistentStore(volumeKey.getBytes(encoding), + volume.toProtobuf().toByteArray()); + } + + @Override + public synchronized void deleteVolume(String userName, + String volumeName, boolean force) throws IOException { + LOG.info("Delete volume received: volume: {} {} ", volumeName, force); + storageManager.deleteVolume(userName, volumeName, force); + // being here means volume is successfully deleted now + String volumeKey = KeyUtil.getVolumeKey(userName, volumeName); + removeFromPersistentStore(volumeKey.getBytes(encoding)); + } + + // No need to synchronize on the following three methods, since write and + // remove's caller are synchronized. read's caller is the constructor and + // no other method call can happen at that time. + @VisibleForTesting + public void writeToPersistentStore(byte[] key, byte[] value) { + levelDBStore.put(key, value); + } + + @VisibleForTesting + public void removeFromPersistentStore(byte[] key) { + levelDBStore.delete(key); + } + + public void readFromPersistentStore() throws IOException { + try (DBIterator iter = levelDBStore.getIterator()) { + iter.seekToFirst(); + while (iter.hasNext()) { + Map.Entry<byte[], byte[]> entry = iter.next(); + String volumeKey = new String(entry.getKey(), encoding); + try { + VolumeDescriptor volumeDescriptor = + VolumeDescriptor.fromProtobuf(entry.getValue()); + storageManager.addVolume(volumeDescriptor); + } catch (IOException e) { + LOG.error("Loading volume " + volumeKey + " error " + e); + } + } + } + } + + @Override + public synchronized VolumeInfo infoVolume(String userName, String volumeName + ) throws IOException { + LOG.info("Info volume received: volume: {}", volumeName); + return storageManager.infoVolume(userName, volumeName); + } + + @VisibleForTesting + public synchronized List<VolumeDescriptor> getAllVolumes() { + return storageManager.getAllVolume(null); + } + + public synchronized List<VolumeDescriptor> getAllVolumes(String userName) { + return storageManager.getAllVolume(userName); + } + + public synchronized void close() { + try { + levelDBStore.close(); + } catch (IOException e) { + LOG.error("Error when closing levelDB " + e); + } + } + + public synchronized void clean() { + try { + levelDBStore.close(); + levelDBStore.destroy(); + } catch (IOException e) { + LOG.error("Error when deleting levelDB " + e); + } + } + + @Override + public synchronized List<VolumeInfo> listVolume(String userName) + throws IOException { + ArrayList<VolumeInfo> response = new ArrayList<>(); + List<VolumeDescriptor> allVolumes = + storageManager.getAllVolume(userName); + for (VolumeDescriptor volume : allVolumes) { + VolumeInfo info = + new VolumeInfo(volume.getUserName(), volume.getVolumeName(), + volume.getVolumeSize(), volume.getBlockSize()); + response.add(info); + } + return response; + } + + public static void main(String[] args) throws Exception { + long version = RPC.getProtocolVersion( + StorageContainerLocationProtocolPB.class); + CblockUtils.activateConfigs(); + OzoneConfiguration ozoneConf = new OzoneConfiguration(); + String scmAddress = ozoneConf.get(DFS_CBLOCK_SCM_IPADDRESS_KEY, + DFS_CBLOCK_SCM_IPADDRESS_DEFAULT); + int scmPort = ozoneConf.getInt(DFS_CBLOCK_SCM_PORT_KEY, + DFS_CBLOCK_SCM_PORT_DEFAULT); + int containerSizeGB = ozoneConf.getInt(DFS_CBLOCK_CONTAINER_SIZE_GB_KEY, + DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT); + ContainerOperationClient.setContainerSizeB(containerSizeGB* OzoneConsts.GB); + InetSocketAddress address = new InetSocketAddress(scmAddress, scmPort); + + ozoneConf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, + OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT); + LOG.info( + "Creating StorageContainerLocationProtocol RPC client with address {}", + address); + RPC.setProtocolEngine(ozoneConf, StorageContainerLocationProtocolPB.class, + ProtobufRpcEngine.class); + StorageContainerLocationProtocolClientSideTranslatorPB client = + new StorageContainerLocationProtocolClientSideTranslatorPB( + RPC.getProxy(StorageContainerLocationProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), ozoneConf, + NetUtils.getDefaultSocketFactory(ozoneConf), + Client.getRpcTimeout(ozoneConf))); + ScmClient storageClient = new ContainerOperationClient( + client, new XceiverClientManager(ozoneConf)); + CBlockManager cbm = new CBlockManager(ozoneConf, storageClient); + cbm.start(); + cbm.join(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java new file mode 100644 index 0000000..99ffde0 --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.cblock; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; + +import com.google.common.base.Optional; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSI_PORT_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_SERVICERPC_PORT_DEFAULT; +import static org.apache.hadoop.hdsl.HdslUtils.getHostNameFromConfigKeys; +import static org.apache.hadoop.hdsl.HdslUtils.getPortNumberFromConfigKeys; + +/** + * Generic stateless utility functions for CBlock components. + */ +public class CblockUtils { + + private CblockUtils() { + } + + /** + * Retrieve the socket address that is used by CBlock Service. + * + * @param conf + * @return Target InetSocketAddress for the CBlock Service endpoint. + */ + public static InetSocketAddress getCblockServiceRpcAddr(Configuration conf) { + final Optional<String> host = + getHostNameFromConfigKeys(conf, DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); + + // If no port number is specified then we'll just try the defaultBindPort. + final Optional<Integer> port = + getPortNumberFromConfigKeys(conf, DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); + + return NetUtils.createSocketAddr( + host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + port + .or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT)); + } + + /** + * Retrieve the socket address that is used by CBlock Server. + * + * @param conf + * @return Target InetSocketAddress for the CBlock Server endpoint. + */ + public static InetSocketAddress getCblockServerRpcAddr(Configuration conf) { + final Optional<String> host = + getHostNameFromConfigKeys(conf, DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); + + // If no port number is specified then we'll just try the defaultBindPort. + final Optional<Integer> port = + getPortNumberFromConfigKeys(conf, DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); + + return NetUtils.createSocketAddr( + host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + port + .or(DFS_CBLOCK_JSCSI_PORT_DEFAULT)); + } + + /** + * Parse size with size prefix string and return in bytes. + * + */ + public static long parseSize(String volumeSizeArgs) throws IOException { + long multiplier = 1; + + Pattern p = Pattern.compile("([0-9]+)([a-zA-Z]+)"); + Matcher m = p.matcher(volumeSizeArgs); + + if (!m.find()) { + throw new IOException("Invalid volume size args " + volumeSizeArgs); + } + + int size = Integer.parseInt(m.group(1)); + String s = m.group(2); + + if (s.equalsIgnoreCase("MB") || + s.equalsIgnoreCase("Mi")) { + multiplier = 1024L * 1024; + } else if (s.equalsIgnoreCase("GB") || + s.equalsIgnoreCase("Gi")) { + multiplier = 1024L * 1024 * 1024; + } else if (s.equalsIgnoreCase("TB") || + s.equalsIgnoreCase("Ti")) { + multiplier = 1024L * 1024 * 1024 * 1024; + } else { + throw new IOException("Invalid volume size args " + volumeSizeArgs); + } + return size * multiplier; + } + + public static void activateConfigs(){ + Configuration.addDefaultResource("hdfs-default.xml"); + Configuration.addDefaultResource("hdfs-site.xml"); + Configuration.addDefaultResource("ozone-default.xml"); + Configuration.addDefaultResource("ozone-site.xml"); + Configuration.addDefaultResource("cblock-default.xml"); + Configuration.addDefaultResource("cblock-site.xml"); + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java new file mode 100644 index 0000000..ec38c9c --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.client; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.cblock.meta.VolumeInfo; +import org.apache.hadoop.cblock.proto.CBlockServiceProtocol; +import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos; +import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ipc.RPC; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The client side implement of CBlockServiceProtocol. + */ +@InterfaceAudience.Private +public final class CBlockServiceProtocolClientSideTranslatorPB + implements CBlockServiceProtocol, ProtocolTranslator, Closeable { + + private final CBlockServiceProtocolPB rpcProxy; + + public CBlockServiceProtocolClientSideTranslatorPB( + CBlockServiceProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } + + @Override + public void close() throws IOException { + RPC.stopProxy(rpcProxy); + } + + @Override + public void createVolume(String userName, String volumeName, + long volumeSize, int blockSize) throws IOException { + CBlockServiceProtocolProtos.CreateVolumeRequestProto.Builder req = + CBlockServiceProtocolProtos.CreateVolumeRequestProto.newBuilder(); + req.setUserName(userName); + req.setVolumeName(volumeName); + req.setVolumeSize(volumeSize); + req.setBlockSize(blockSize); + try { + rpcProxy.createVolume(null, req.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void deleteVolume(String userName, String volumeName, boolean force) + throws IOException { + CBlockServiceProtocolProtos.DeleteVolumeRequestProto.Builder req = + CBlockServiceProtocolProtos.DeleteVolumeRequestProto.newBuilder(); + req.setUserName(userName); + req.setVolumeName(volumeName); + req.setForce(force); + try { + rpcProxy.deleteVolume(null, req.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + @Override + public VolumeInfo infoVolume(String userName, String volumeName) + throws IOException { + CBlockServiceProtocolProtos.InfoVolumeRequestProto.Builder req = + CBlockServiceProtocolProtos.InfoVolumeRequestProto.newBuilder(); + req.setUserName(userName); + req.setVolumeName(volumeName); + try { + CBlockServiceProtocolProtos.InfoVolumeResponseProto resp = + rpcProxy.infoVolume(null, req.build()); + return new VolumeInfo(resp.getVolumeInfo().getUserName(), + resp.getVolumeInfo().getVolumeName(), + resp.getVolumeInfo().getVolumeSize(), + resp.getVolumeInfo().getBlockSize(), + resp.getVolumeInfo().getUsage()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public List<VolumeInfo> listVolume(String userName) throws IOException { + CBlockServiceProtocolProtos.ListVolumeRequestProto.Builder req = + CBlockServiceProtocolProtos.ListVolumeRequestProto.newBuilder(); + if (userName != null) { + req.setUserName(userName); + } + try { + CBlockServiceProtocolProtos.ListVolumeResponseProto resp = + rpcProxy.listVolume(null, req.build()); + List<VolumeInfo> respList = new ArrayList<>(); + for (CBlockServiceProtocolProtos.VolumeInfoProto entry : + resp.getVolumeEntryList()) { + VolumeInfo volumeInfo = new VolumeInfo( + entry.getUserName(), entry.getVolumeName(), entry.getVolumeSize(), + entry.getBlockSize()); + respList.add(volumeInfo); + } + return respList; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } catch (Exception e) { + throw new IOException("got" + e.getCause() + " " + e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java new file mode 100644 index 0000000..4a3878a --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.client; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.cblock.CBlockConfigKeys; +import org.apache.hadoop.cblock.meta.VolumeInfo; +import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; +import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; + +import static org.apache.hadoop.cblock.CblockUtils.getCblockServiceRpcAddr; + +/** + * Implementation of client used by CBlock command line tool. + */ +public class CBlockVolumeClient { + private final CBlockServiceProtocolClientSideTranslatorPB cblockClient; + + public CBlockVolumeClient(OzoneConfiguration conf) throws IOException { + this(conf, null); + } + + public CBlockVolumeClient(OzoneConfiguration conf, + InetSocketAddress serverAddress) throws IOException { + InetSocketAddress address = serverAddress != null ? serverAddress : + getCblockServiceRpcAddr(conf); + long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class); + int rpcTimeout = Math.toIntExact( + conf.getTimeDuration(CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT, + CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS)); + cblockClient = new CBlockServiceProtocolClientSideTranslatorPB( + RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), rpcTimeout, RetryPolicies + .retryUpToMaximumCountWithFixedSleep( + 300, 1, TimeUnit.SECONDS)).getProxy()); + } + + public void createVolume(String userName, String volumeName, + long volumeSize, int blockSize) throws IOException { + cblockClient.createVolume(userName, volumeName, + volumeSize, blockSize); + } + + public void deleteVolume(String userName, String volumeName, boolean force) + throws IOException { + cblockClient.deleteVolume(userName, volumeName, force); + } + + public VolumeInfo infoVolume(String userName, String volumeName) + throws IOException { + return cblockClient.infoVolume(userName, volumeName); + } + + public List<VolumeInfo> listVolume(String userName) + throws IOException { + return cblockClient.listVolume(userName); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java new file mode 100644 index 0000000..761b71e --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.client; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java new file mode 100644 index 0000000..8f6b82b --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.exception; + +import java.io.IOException; + +/** + * The exception class used in CBlock. + */ +public class CBlockException extends IOException { + public CBlockException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java new file mode 100644 index 0000000..268b8cb --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.exception; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java new file mode 100644 index 0000000..04fe3a4 --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.LevelDBStore; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Paths; + +/** + * The blockWriter task. + */ +public class BlockWriterTask implements Runnable { + private final LogicalBlock block; + private int tryCount; + private final ContainerCacheFlusher flusher; + private final String dbPath; + private final String fileName; + private final int maxRetryCount; + + /** + * Constructs a BlockWriterTask. + * + * @param block - Block Information. + * @param flusher - ContainerCacheFlusher. + */ + public BlockWriterTask(LogicalBlock block, ContainerCacheFlusher flusher, + String dbPath, int tryCount, String fileName, int maxRetryCount) { + this.block = block; + this.flusher = flusher; + this.dbPath = dbPath; + this.tryCount = tryCount; + this.fileName = fileName; + this.maxRetryCount = maxRetryCount; + } + + /** + * When an object implementing interface <code>Runnable</code> is used + * to create a thread, starting the thread causes the object's + * <code>run</code> method to be called in that separately executing + * thread. + * <p> + * The general contract of the method <code>run</code> is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() { + String containerName = null; + XceiverClientSpi client = null; + LevelDBStore levelDBStore = null; + String traceID = flusher.getTraceID(new File(dbPath), block.getBlockID()); + flusher.getLOG().debug( + "Writing block to remote. block ID: {}", block.getBlockID()); + try { + incTryCount(); + Pipeline pipeline = flusher.getPipeline(this.dbPath, block.getBlockID()); + client = flusher.getXceiverClientManager().acquireClient(pipeline); + containerName = pipeline.getContainerName(); + byte[] keybuf = Longs.toByteArray(block.getBlockID()); + byte[] data; + long startTime = Time.monotonicNow(); + levelDBStore = flusher.getCacheDB(this.dbPath); + data = levelDBStore.get(keybuf); + Preconditions.checkNotNull(data); + long endTime = Time.monotonicNow(); + Preconditions.checkState(data.length > 0, "Block data is zero length"); + startTime = Time.monotonicNow(); + ContainerProtocolCalls.writeSmallFile(client, containerName, + Long.toString(block.getBlockID()), data, traceID); + endTime = Time.monotonicNow(); + flusher.getTargetMetrics().updateContainerWriteLatency( + endTime - startTime); + flusher.getLOG().debug("Time taken for Write Small File : {} ms", + endTime - startTime); + + flusher.incrementRemoteIO(); + + } catch (Exception ex) { + flusher.getLOG().error("Writing of block:{} failed, We have attempted " + + "to write this block {} times to the container {}.Trace ID:{}", + block.getBlockID(), this.getTryCount(), containerName, traceID, ex); + writeRetryBlock(block); + if (ex instanceof IOException) { + flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks(); + } else { + flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks(); + } + if (this.getTryCount() >= maxRetryCount) { + flusher.getTargetMetrics().incNumWriteMaxRetryBlocks(); + } + } finally { + flusher.incFinishCount(fileName); + if (levelDBStore != null) { + flusher.releaseCacheDB(dbPath); + } + if(client != null) { + flusher.getXceiverClientManager().releaseClient(client); + } + } + } + + + private void writeRetryBlock(LogicalBlock currentBlock) { + boolean append = false; + String retryFileName = + String.format("%s.%d.%s.%s", AsyncBlockWriter.RETRY_LOG_PREFIX, + currentBlock.getBlockID(), Time.monotonicNow(), tryCount); + File logDir = new File(this.dbPath); + if (!logDir.exists() && !logDir.mkdirs()) { + flusher.getLOG().error( + "Unable to create the log directory, Critical error cannot continue"); + return; + } + String log = Paths.get(this.dbPath, retryFileName).toString(); + ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE); + buffer.putLong(currentBlock.getBlockID()); + buffer.flip(); + try { + FileChannel channel = new FileOutputStream(log, append).getChannel(); + channel.write(buffer); + channel.close(); + flusher.processDirtyBlocks(this.dbPath, retryFileName); + } catch (IOException e) { + flusher.getTargetMetrics().incNumFailedRetryLogFileWrites(); + flusher.getLOG().error("Unable to write the retry block. Block ID: {}", + currentBlock.getBlockID(), e); + } + } + + /** + * Increments the try count. This is done each time we try this block + * write to the container. + */ + private void incTryCount() { + tryCount++; + } + + /** + * Get the retry count. + * + * @return int + */ + public int getTryCount() { + return tryCount; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java new file mode 100644 index 0000000..84b68e3 --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper; + +import com.google.common.primitives.Longs; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.cblock.exception.CBlockException; +import org.apache.hadoop.cblock.meta.VolumeInfo; +import org.apache.hadoop.cblock.proto.CBlockClientProtocol; +import org.apache.hadoop.cblock.proto.MountVolumeResponse; +import org.apache.hadoop.cblock.protocol.proto + .CBlockClientServerProtocolProtos.ContainerIDProto; +import org.apache.hadoop.cblock.protocol.proto + .CBlockClientServerProtocolProtos.ListVolumesRequestProto; +import org.apache.hadoop.cblock.protocol.proto + .CBlockClientServerProtocolProtos.ListVolumesResponseProto; +import org.apache.hadoop.cblock.protocol.proto + .CBlockClientServerProtocolProtos.MountVolumeRequestProto; +import org.apache.hadoop.cblock.protocol.proto + .CBlockClientServerProtocolProtos.MountVolumeResponseProto; +import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos + .VolumeInfoProto; +import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * The client side of CBlockClientProtocol. + * + * CBlockClientProtocol is the protocol used between cblock client side + * and cblock manager (cblock client side is just the node where jscsi daemon + * process runs. a machines talks to jscsi daemon for mounting a volume). + * + * Right now, the only communication carried by this protocol is for client side + * to request mounting a volume. + */ +public class CBlockClientProtocolClientSideTranslatorPB + implements CBlockClientProtocol, ProtocolTranslator, Closeable { + + private final CBlockClientServerProtocolPB rpcProxy; + + public CBlockClientProtocolClientSideTranslatorPB( + CBlockClientServerProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } + + + @Override + public void close() throws IOException { + RPC.stopProxy(rpcProxy); + } + + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + @Override + public MountVolumeResponse mountVolume( + String userName, String volumeName) throws IOException { + MountVolumeRequestProto.Builder + request + = MountVolumeRequestProto + .newBuilder(); + request.setUserName(userName); + request.setVolumeName(volumeName); + try { + MountVolumeResponseProto resp + = rpcProxy.mountVolume(null, request.build()); + if (!resp.getIsValid()) { + throw new CBlockException( + "Not a valid volume:" + userName + ":" + volumeName); + } + List<Pipeline> containerIDs = new ArrayList<>(); + HashMap<String, Pipeline> containerPipelines = new HashMap<>(); + if (resp.getAllContainerIDsList().size() == 0) { + throw new CBlockException("Mount volume request returned no container"); + } + for (ContainerIDProto containerID : + resp.getAllContainerIDsList()) { + if (containerID.hasPipeline()) { + // it should always have a pipeline only except for tests. + Pipeline p = Pipeline.getFromProtoBuf(containerID.getPipeline()); + p.setData(Longs.toByteArray(containerID.getIndex())); + containerIDs.add(p); + containerPipelines.put(containerID.getContainerID(), p); + } else { + throw new CBlockException("ContainerID does not have pipeline!"); + } + } + return new MountVolumeResponse( + resp.getIsValid(), + resp.getUserName(), + resp.getVolumeName(), + resp.getVolumeSize(), + resp.getBlockSize(), + containerIDs, + containerPipelines); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public List<VolumeInfo> listVolumes() throws IOException { + try { + List<VolumeInfo> result = new ArrayList<>(); + ListVolumesResponseProto + listVolumesResponseProto = this.rpcProxy.listVolumes(null, + ListVolumesRequestProto.newBuilder() + .build()); + for (VolumeInfoProto volumeInfoProto : + listVolumesResponseProto + .getVolumeEntryList()) { + result.add(new VolumeInfo(volumeInfoProto.getUserName(), + volumeInfoProto.getVolumeName(), volumeInfoProto.getVolumeSize(), + volumeInfoProto.getBlockSize())); + } + return result; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java new file mode 100644 index 0000000..2f35668 --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java @@ -0,0 +1,440 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule; +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.jscsi.target.storage.IStorageModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_TRACE_IO_DEFAULT; + +/** + * The SCSI Target class for CBlockSCSIServer. + */ +final public class CBlockIStorageImpl implements IStorageModule { + private static final Logger LOGGER = + LoggerFactory.getLogger(CBlockIStorageImpl.class); + private static final Logger TRACER = + LoggerFactory.getLogger("TraceIO"); + + private CacheModule cache; + private final long volumeSize; + private final int blockSize; + private final String userName; + private final String volumeName; + private final boolean traceEnabled; + private final Configuration conf; + private final ContainerCacheFlusher flusher; + private List<Pipeline> fullContainerList; + + /** + * private: constructs a SCSI Target. + * + * @param config - config + * @param userName - Username + * @param volumeName - Name of the volume + * @param volumeSize - Size of the volume + * @param blockSize - Size of the block + * @param fullContainerList - Ordered list of containers that make up this + * volume. + * @param flusher - flusher which is used to flush data from + * level db cache to containers + * @throws IOException - Throws IOException. + */ + private CBlockIStorageImpl(Configuration config, String userName, + String volumeName, long volumeSize, int blockSize, + List<Pipeline> fullContainerList, ContainerCacheFlusher flusher) { + this.conf = config; + this.userName = userName; + this.volumeName = volumeName; + this.volumeSize = volumeSize; + this.blockSize = blockSize; + this.fullContainerList = new ArrayList<>(fullContainerList); + this.flusher = flusher; + this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO, + DFS_CBLOCK_TRACE_IO_DEFAULT); + } + + /** + * private: initialize the cache. + * + * @param xceiverClientManager - client manager that is used for creating new + * connections to containers. + * @param metrics - target metrics to maintain metrics for target server + * @throws IOException - Throws IOException. + */ + private void initCache(XceiverClientManager xceiverClientManager, + CBlockTargetMetrics metrics) throws IOException { + this.cache = CBlockLocalCache.newBuilder() + .setConfiguration(conf) + .setVolumeName(this.volumeName) + .setUserName(this.userName) + .setPipelines(this.fullContainerList) + .setClientManager(xceiverClientManager) + .setBlockSize(blockSize) + .setVolumeSize(volumeSize) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + this.cache.start(); + } + + /** + * Gets a new builder for CBlockStorageImpl. + * + * @return builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Get Cache. + * + * @return - Cache + */ + public CacheModule getCache() { + return cache; + } + + /** + * Returns block size of this volume. + * + * @return int size of block for this volume. + */ + @Override + public int getBlockSize() { + return blockSize; + } + + /** + * Checks the index boundary of a block address. + * + * @param logicalBlockAddress the index of the first block of data to be read + * or written + * @param transferLengthInBlocks the total number of consecutive blocks about + * to be read or written + * @return 0 == Success, 1 indicates the LBA address is out of bounds and 2 + * indicates that LBA + transfer size is out of bounds. + */ + @Override + public int checkBounds(long logicalBlockAddress, int transferLengthInBlocks) { + long sizeInBlocks = volumeSize / blockSize; + int res = 0; + if (logicalBlockAddress < 0 || logicalBlockAddress >= sizeInBlocks) { + res = 1; + } + + if (transferLengthInBlocks < 0 || + logicalBlockAddress + transferLengthInBlocks > sizeInBlocks) { + if (res == 0) { + res = 2; + } + } + return res; + } + + /** + * Number of blocks that make up this volume. + * + * @return long - count of blocks. + */ + @Override + public long getSizeInBlocks() { + return volumeSize / blockSize; + } + + /** + * Reads the number of bytes that can be read into the bytes buffer from the + * location indicated. + * + * @param bytes the array into which the data will be copied will be filled + * with data from storage + * @param storageIndex the position of the first byte to be copied + * @throws IOException + */ + @Override + public void read(byte[] bytes, long storageIndex) throws IOException { + int startingIdxInBlock = (int) storageIndex % blockSize; + int idxInBytes = 0; + if (this.traceEnabled) { + TRACER.info("Task=ReadStart,length={},location={}", + bytes.length, storageIndex); + } + while (idxInBytes < bytes.length - 1) { + long blockId = (storageIndex + idxInBytes) / blockSize; + byte[] dataBytes; + + try { + LogicalBlock block = this.cache.get(blockId); + dataBytes = block.getData().array(); + + if (this.traceEnabled) { + TRACER.info("Task=ReadBlock,BlockID={},length={},SHA={}", + blockId, + dataBytes.length, + dataBytes.length > 0 ? DigestUtils.sha256Hex(dataBytes) : null); + } + } catch (IOException e) { + // For an non-existing block cache.get will return a block with zero + // bytes filled. So any error here is a real error. + LOGGER.error("getting errors when reading data:" + e); + throw e; + } + + int length = blockSize - startingIdxInBlock; + if (length > bytes.length - idxInBytes) { + length = bytes.length - idxInBytes; + } + if (dataBytes.length >= length) { + System.arraycopy(dataBytes, startingIdxInBlock, bytes, idxInBytes, + length); + } + startingIdxInBlock = 0; + idxInBytes += length; + } + if (this.traceEnabled) { + TRACER.info("Task=ReadEnd,length={},location={},SHA={}", + bytes.length, storageIndex, DigestUtils.sha256Hex(bytes)); + } + } + + @Override + public void write(byte[] bytes, long storageIndex) throws IOException { + int startingIdxInBlock = (int) storageIndex % blockSize; + int idxInBytes = 0; + if (this.traceEnabled) { + TRACER.info("Task=WriteStart,length={},location={},SHA={}", + bytes.length, storageIndex, + bytes.length > 0 ? DigestUtils.sha256Hex(bytes) : null); + } + + ByteBuffer dataByte = ByteBuffer.allocate(blockSize); + while (idxInBytes < bytes.length - 1) { + long blockId = (storageIndex + idxInBytes) / blockSize; + int length = blockSize - startingIdxInBlock; + if (length > bytes.length - idxInBytes) { + length = bytes.length - idxInBytes; + } + System.arraycopy(bytes, idxInBytes, dataByte.array(), startingIdxInBlock, + length); + this.cache.put(blockId, dataByte.array()); + + if (this.traceEnabled) { + TRACER.info("Task=WriteBlock,BlockID={},length={},SHA={}", + blockId, dataByte.array().length, + dataByte.array().length > 0 ? + DigestUtils.sha256Hex(dataByte.array()) : null); + } + dataByte.clear(); + startingIdxInBlock = 0; + idxInBytes += length; + } + + if (this.traceEnabled) { + TRACER.info("Task=WriteEnd,length={},location={} ", + bytes.length, storageIndex); + } + } + + @Override + public void close() throws IOException { + try { + cache.close(); + } catch (IllegalStateException ise) { + LOGGER.error("Can not close the storage {}", ise); + throw ise; + } + } + + /** + * Builder class for CBlocklocalCache. + */ + public static class Builder { + private String userName; + private String volumeName; + private long volumeSize; + private int blockSize; + private List<Pipeline> containerList; + private Configuration conf; + private XceiverClientManager clientManager; + private ContainerCacheFlusher flusher; + private CBlockTargetMetrics metrics; + + /** + * Constructs a builder. + */ + Builder() { + + } + + public Builder setFlusher(ContainerCacheFlusher cacheFlusher) { + this.flusher = cacheFlusher; + return this; + } + + /** + * set config. + * + * @param config - config + * @return Builder + */ + public Builder setConf(Configuration config) { + this.conf = config; + return this; + } + + /** + * set user name. + * + * @param cblockUserName - user name + * @return Builder + */ + public Builder setUserName(String cblockUserName) { + this.userName = cblockUserName; + return this; + } + + /** + * set volume name. + * + * @param cblockVolumeName -- volume name + * @return Builder + */ + public Builder setVolumeName(String cblockVolumeName) { + this.volumeName = cblockVolumeName; + return this; + } + + /** + * set volume size. + * + * @param cblockVolumeSize -- set volume size. + * @return Builder + */ + public Builder setVolumeSize(long cblockVolumeSize) { + this.volumeSize = cblockVolumeSize; + return this; + } + + /** + * set block size. + * + * @param cblockBlockSize -- block size + * @return Builder + */ + public Builder setBlockSize(int cblockBlockSize) { + this.blockSize = cblockBlockSize; + return this; + } + + /** + * Set contianer list. + * + * @param cblockContainerList - set the pipeline list + * @return Builder + */ + public Builder setContainerList(List<Pipeline> cblockContainerList) { + this.containerList = cblockContainerList; + return this; + } + + /** + * Set client manager. + * + * @param xceiverClientManager -- sets the client manager. + * @return Builder + */ + public Builder setClientManager(XceiverClientManager xceiverClientManager) { + this.clientManager = xceiverClientManager; + return this; + } + + /** + * Set Cblock Target Metrics. + * + * @param targetMetrics -- sets the cblock target metrics + * @return Builder + */ + public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) { + this.metrics = targetMetrics; + return this; + } + + /** + * Builds the CBlockStorageImpl. + * + * @return builds the CBlock Scsi Target. + */ + public CBlockIStorageImpl build() throws IOException { + if (StringUtils.isBlank(userName)) { + throw new IllegalArgumentException("User name cannot be null or empty" + + "."); + } + if (StringUtils.isBlank(volumeName)) { + throw new IllegalArgumentException("Volume name cannot be null or " + + "empty"); + } + + if (volumeSize < 1) { + throw new IllegalArgumentException("Volume size cannot be negative or" + + " zero."); + } + + if (blockSize < 1) { + throw new IllegalArgumentException("Block size cannot be negative or " + + "zero."); + } + + if (containerList == null || containerList.size() == 0) { + throw new IllegalArgumentException("Container list cannot be null or " + + "empty"); + } + if (clientManager == null) { + throw new IllegalArgumentException("Client manager cannot be null"); + } + if (conf == null) { + throw new IllegalArgumentException("Configuration cannot be null"); + } + + if (flusher == null) { + throw new IllegalArgumentException("Flusher Cannot be null."); + } + CBlockIStorageImpl impl = new CBlockIStorageImpl(this.conf, this.userName, + this.volumeName, this.volumeSize, this.blockSize, this.containerList, + this.flusher); + impl.initCache(this.clientManager, this.metrics); + return impl; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java new file mode 100644 index 0000000..6367c61 --- /dev/null +++ b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper; + +import org.apache.hadoop.cblock.meta.VolumeInfo; +import org.apache.hadoop.cblock.proto.MountVolumeResponse; + +import java.io.IOException; +import java.util.List; + +/** + * This class is the handler of CBlockManager used by target server + * to communicate with CBlockManager. + * + * More specifically, this class will expose local methods to target + * server, and make RPC calls to CBlockManager accordingly + */ +public class CBlockManagerHandler { + + private final CBlockClientProtocolClientSideTranslatorPB handler; + + public CBlockManagerHandler( + CBlockClientProtocolClientSideTranslatorPB handler) { + this.handler = handler; + } + + public MountVolumeResponse mountVolume( + String userName, String volumeName) throws IOException { + return handler.mountVolume(userName, volumeName); + } + + public List<VolumeInfo> listVolumes() throws IOException { + return handler.listVolumes(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org