HDFS-13415. Ozone: Remove cblock code from HDFS-7240. Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ea85801c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ea85801c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ea85801c Branch: refs/heads/HDFS-7240 Commit: ea85801ce32eeccaac2f6c0726024c37ee1fe192 Parents: 4a8aa0e Author: Mukul Kumar Singh <msi...@apache.org> Authored: Wed Apr 11 18:42:16 2018 +0530 Committer: Mukul Kumar Singh <msi...@apache.org> Committed: Wed Apr 11 18:42:16 2018 +0530 ---------------------------------------------------------------------- dev-support/bin/dist-layout-stitching | 4 - .../main/resources/assemblies/hadoop-src.xml | 1 - hadoop-cblock/pom.xml | 61 -- .../server/dev-support/findbugsExcludeFile.xml | 21 - hadoop-cblock/server/pom.xml | 159 ----- .../apache/hadoop/cblock/CBlockConfigKeys.java | 222 ------- .../org/apache/hadoop/cblock/CBlockManager.java | 426 ------------- .../org/apache/hadoop/cblock/CblockUtils.java | 129 ---- ...ckServiceProtocolClientSideTranslatorPB.java | 135 ----- .../cblock/client/CBlockVolumeClient.java | 83 --- .../hadoop/cblock/client/package-info.java | 18 - .../cblock/exception/CBlockException.java | 29 - .../hadoop/cblock/exception/package-info.java | 18 - .../cblock/jscsiHelper/BlockWriterTask.java | 175 ------ ...ockClientProtocolClientSideTranslatorPB.java | 147 ----- .../cblock/jscsiHelper/CBlockIStorageImpl.java | 440 -------------- .../jscsiHelper/CBlockManagerHandler.java | 50 -- .../cblock/jscsiHelper/CBlockTargetMetrics.java | 334 ----------- .../cblock/jscsiHelper/CBlockTargetServer.java | 128 ---- .../jscsiHelper/ContainerCacheFlusher.java | 599 ------------------- .../cblock/jscsiHelper/SCSITargetDaemon.java | 132 ---- .../cblock/jscsiHelper/cache/CacheModule.java | 52 -- .../cblock/jscsiHelper/cache/LogicalBlock.java | 50 -- .../cache/impl/AsyncBlockWriter.java | 221 ------- .../cache/impl/BlockBufferFlushTask.java | 118 ---- .../cache/impl/BlockBufferManager.java | 184 ------ .../cache/impl/CBlockLocalCache.java | 577 ------------------ .../jscsiHelper/cache/impl/DiskBlock.java | 77 --- .../jscsiHelper/cache/impl/SyncBlockReader.java | 263 -------- .../jscsiHelper/cache/impl/package-info.java | 18 - .../cblock/jscsiHelper/cache/package-info.java | 18 - .../hadoop/cblock/jscsiHelper/package-info.java | 18 - .../cblock/kubernetes/DynamicProvisioner.java | 331 ---------- .../hadoop/cblock/kubernetes/package-info.java | 23 - .../hadoop/cblock/meta/ContainerDescriptor.java | 107 ---- .../hadoop/cblock/meta/VolumeDescriptor.java | 269 --------- .../apache/hadoop/cblock/meta/VolumeInfo.java | 79 --- .../apache/hadoop/cblock/meta/package-info.java | 18 - .../org/apache/hadoop/cblock/package-info.java | 18 - .../cblock/proto/CBlockClientProtocol.java | 38 -- .../cblock/proto/CBlockServiceProtocol.java | 45 -- .../cblock/proto/MountVolumeResponse.java | 79 --- .../hadoop/cblock/proto/package-info.java | 18 - .../CBlockClientServerProtocolPB.java | 37 -- ...entServerProtocolServerSideTranslatorPB.java | 116 ---- .../protocolPB/CBlockServiceProtocolPB.java | 35 -- ...ckServiceProtocolServerSideTranslatorPB.java | 159 ----- .../hadoop/cblock/protocolPB/package-info.java | 18 - .../hadoop/cblock/storage/StorageManager.java | 427 ------------- .../hadoop/cblock/storage/package-info.java | 18 - .../org/apache/hadoop/cblock/util/KeyUtil.java | 49 -- .../apache/hadoop/cblock/util/package-info.java | 18 - .../main/proto/CBlockClientServerProtocol.proto | 93 --- .../src/main/proto/CBlockServiceProtocol.proto | 133 ---- .../src/main/resources/cblock-default.xml | 347 ----------- .../apache/hadoop/cblock/TestBufferManager.java | 456 -------------- .../cblock/TestCBlockConfigurationFields.java | 35 -- .../hadoop/cblock/TestCBlockReadWrite.java | 377 ------------ .../apache/hadoop/cblock/TestCBlockServer.java | 212 ------- .../cblock/TestCBlockServerPersistence.java | 132 ---- .../hadoop/cblock/TestLocalBlockCache.java | 444 -------------- .../kubernetes/TestDynamicProvisioner.java | 74 --- .../cblock/util/ContainerLookUpService.java | 73 --- .../hadoop/cblock/util/MockStorageClient.java | 176 ------ .../dynamicprovisioner/expected1-pv.json | 54 -- .../dynamicprovisioner/input1-pvc.json | 55 -- hadoop-cblock/tools/pom.xml | 42 -- .../org/apache/hadoop/cblock/cli/CBlockCli.java | 265 -------- .../apache/hadoop/cblock/cli/package-info.java | 18 - .../org/apache/hadoop/cblock/TestCBlockCLI.java | 242 -------- .../src/main/bin/hadoop-functions.sh | 2 - .../hadoop-common/src/main/conf/hadoop-env.sh | 19 - hadoop-dist/pom.xml | 8 - hadoop-dist/src/main/compose/cblock/.env | 17 - hadoop-dist/src/main/compose/cblock/README.md | 42 -- .../src/main/compose/cblock/docker-compose.yaml | 66 -- .../src/main/compose/cblock/docker-config | 39 -- hadoop-ozone/common/src/main/bin/ozone | 14 - .../src/main/shellprofile.d/hadoop-ozone.sh | 4 +- hadoop-project/pom.xml | 13 - pom.xml | 1 - 81 files changed, 1 insertion(+), 10261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/dev-support/bin/dist-layout-stitching ---------------------------------------------------------------------- diff --git a/dev-support/bin/dist-layout-stitching b/dev-support/bin/dist-layout-stitching index 8c57287..df043dd 100755 --- a/dev-support/bin/dist-layout-stitching +++ b/dev-support/bin/dist-layout-stitching @@ -158,10 +158,6 @@ run copy "${ROOT}/hadoop-ozone/objectstore-service/target/hadoop-ozone-objectsto run copy "${ROOT}/hadoop-ozone/client/target/hadoop-ozone-client-${VERSION}" . run copy "${ROOT}/hadoop-ozone/tools/target/hadoop-ozone-tools-${VERSION}" . -# CBlock -run copy "${ROOT}/hadoop-cblock/server/target/hadoop-cblock-server-${VERSION}" . -run copy "${ROOT}/hadoop-cblock/tools/target/hadoop-cblock-tools-${VERSION}" . - run copy "${ROOT}/hadoop-tools/hadoop-tools-dist/target/hadoop-tools-dist-${VERSION}" . http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-assemblies/src/main/resources/assemblies/hadoop-src.xml ---------------------------------------------------------------------- diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-src.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-src.xml index 634c526..f0a8d44 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-src.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-src.xml @@ -52,7 +52,6 @@ <exclude>**/SecurityAuth.audit*</exclude> <exclude>hadoop-ozone/**</exclude> <exclude>hadoop-hdds/**</exclude> - <exclude>hadoop-cblock/**</exclude> </excludes> </fileSet> </fileSets> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-cblock/pom.xml b/hadoop-cblock/pom.xml deleted file mode 100644 index a713654..0000000 --- a/hadoop-cblock/pom.xml +++ /dev/null @@ -1,61 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. See accompanying LICENSE file. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" -xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" -xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 -http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-project-dist</artifactId> - <version>3.2.0-SNAPSHOT</version> - <relativePath>../hadoop-project-dist</relativePath> - </parent> - <artifactId>hadoop-cblock</artifactId> - <version>3.2.0-SNAPSHOT</version> - <description>Apache Hadoop Cblock parent project</description> - <name>Apache Hadoop Cblock</name> - <packaging>pom</packaging> - - <modules> - <module>server</module> - <module>tools</module> - </modules> - - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - <configuration> - <excludeFilterFile combine.self="override"></excludeFilterFile> - </configuration> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/dev-support/findbugsExcludeFile.xml b/hadoop-cblock/server/dev-support/findbugsExcludeFile.xml deleted file mode 100644 index 54d602c..0000000 --- a/hadoop-cblock/server/dev-support/findbugsExcludeFile.xml +++ /dev/null @@ -1,21 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<FindBugsFilter> - <Match> - <Package name="org.apache.hadoop.cblock.protocol.proto"/> - </Match> -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/pom.xml b/hadoop-cblock/server/pom.xml deleted file mode 100644 index 8039dad..0000000 --- a/hadoop-cblock/server/pom.xml +++ /dev/null @@ -1,159 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. See accompanying LICENSE file. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 -http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-cblock</artifactId> - <version>3.2.0-SNAPSHOT</version> - </parent> - <artifactId>hadoop-cblock-server</artifactId> - <version>3.2.0-SNAPSHOT</version> - <description>Apache Hadoop CBlock Server</description> - <name>Apache Hadoop CBlock Server</name> - <packaging>jar</packaging> - - <properties> - <hadoop.component>cblock</hadoop.component> - <is.hadoop.component>true</is.hadoop.component> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdds-server-framework</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-ozone-common</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdds-common</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdds-client</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-ozone-integration-test</artifactId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.jscsi</groupId> - <artifactId>target</artifactId> - <version>2.6.0</version> - <optional>true</optional> - <exclusions> - <exclusion> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>io.kubernetes</groupId> - <artifactId>client-java</artifactId> - <version>1.0.0-beta1</version> - <exclusions> - <exclusion> - <groupId>io.swagger</groupId> - <artifactId>swagger-annotations</artifactId> - </exclusion> - <exclusion> - <groupId>com.github.stefanbirkner</groupId> - <artifactId>system-rules</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - </dependencies> - - - <build> - <plugins> - <plugin> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-maven-plugins</artifactId> - <executions> - <execution> - <id>compile-protoc</id> - <goals> - <goal>protoc</goal> - </goals> - <configuration> - <protocVersion>${protobuf.version}</protocVersion> - <protocCommand>${protoc.path}</protocCommand> - <imports> - <param> - ${basedir}/../../hadoop-common-project/hadoop-common/src/main/proto - </param> - <param> - ${basedir}/../../hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ - </param> - <param> - ${basedir}/../../hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ - </param> - <param> - ${basedir}/../../hadoop-hdds/common/src/main/proto/ - </param> - <param>${basedir}/src/main/proto</param> - </imports> - <source> - <directory>${basedir}/src/main/proto</directory> - <includes> - <include>CBlockClientServerProtocol.proto</include> - <include>CBlockServiceProtocol.proto</include> - </includes> - </source> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - <configuration> - <excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile> - </configuration> - </plugin> - </plugins> - </build> -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java deleted file mode 100644 index ec5d7d2..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import static java.lang.Thread.NORM_PRIORITY; - -/** - * This class contains constants for configuration keys used in CBlock. - */ -public final class CBlockConfigKeys { - - public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_KEY = - "dfs.cblock.servicerpc-address"; - public static final int DFS_CBLOCK_SERVICERPC_PORT_DEFAULT = - 9810; - public static final String DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT = - "0.0.0.0"; - - public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY = - "dfs.cblock.jscsi-address"; - - //The port on CBlockManager node for jSCSI to ask - public static final String DFS_CBLOCK_JSCSI_PORT_KEY = - "dfs.cblock.jscsi.port"; - public static final int DFS_CBLOCK_JSCSI_PORT_DEFAULT = - 9811; - - public static final String DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY = - "dfs.cblock.service.rpc-bind-host"; - public static final String DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY = - "dfs.cblock.jscsi.rpc-bind-host"; - - // default block size is 4KB - public static final int DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT = - 4096; - - public static final String DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY = - "dfs.cblock.service.handler.count"; - public static final int DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT = 10; - - public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY = - "dfs.cblock.service.leveldb.path"; - //TODO : find a better place - public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT = - "/tmp/cblock_levelDB.dat"; - - - public static final String DFS_CBLOCK_DISK_CACHE_PATH_KEY = - "dfs.cblock.disk.cache.path"; - public static final String DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT = - "/tmp/cblockCacheDB"; - /** - * Setting this flag to true makes the block layer compute a sha256 hash of - * the data and log that information along with block ID. This is very - * useful for doing trace based simulation of various workloads. Since it is - * computing a hash for each block this could be expensive, hence default - * is false. - */ - public static final String DFS_CBLOCK_TRACE_IO = "dfs.cblock.trace.io"; - public static final boolean DFS_CBLOCK_TRACE_IO_DEFAULT = false; - - public static final String DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO = - "dfs.cblock.short.circuit.io"; - public static final boolean DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT = - false; - - /** - * Cache size in 1000s of entries. 256 indicates 256 * 1024. - */ - public static final String DFS_CBLOCK_CACHE_QUEUE_SIZE_KB = - "dfs.cblock.cache.queue.size.in.kb"; - public static final int DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT = 256; - - /** - * Minimum Number of threads that cache pool will use for background I/O. - */ - public static final String DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE = - "dfs.cblock.cache.core.min.pool.size"; - public static final int DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT = 16; - - /** - * Maximum Number of threads that cache pool will use for background I/O. - */ - - public static final String DFS_CBLOCK_CACHE_MAX_POOL_SIZE = - "dfs.cblock.cache.max.pool.size"; - public static final int DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT = 256; - - /** - * Number of seconds to keep the Thread alive when it is idle. - */ - public static final String DFS_CBLOCK_CACHE_KEEP_ALIVE = - "dfs.cblock.cache.keep.alive"; - public static final String DFS_CBLOCK_CACHE_KEEP_ALIVE_DEFAULT = "60s"; - - /** - * Priority of cache flusher thread, affecting the relative performance of - * write and read. - */ - public static final String DFS_CBLOCK_CACHE_THREAD_PRIORITY = - "dfs.cblock.cache.thread.priority"; - public static final int DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT = - NORM_PRIORITY; - - /** - * Block Buffer size in terms of blockID entries, 512 means 512 blockIDs. - */ - public static final String DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE = - "dfs.cblock.cache.block.buffer.size"; - public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512; - - public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL = - "dfs.cblock.block.buffer.flush.interval"; - public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_DEFAULT = - "60s"; - - // jscsi server settings - public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY = - "dfs.cblock.jscsi.server.address"; - public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT = - "0.0.0.0"; - public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY = - "dfs.cblock.jscsi.cblock.server.address"; - public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT = - "127.0.0.1"; - - // to what address cblock server should talk to scm? - public static final String DFS_CBLOCK_SCM_IPADDRESS_KEY = - "dfs.cblock.scm.ipaddress"; - public static final String DFS_CBLOCK_SCM_IPADDRESS_DEFAULT = - "127.0.0.1"; - public static final String DFS_CBLOCK_SCM_PORT_KEY = - "dfs.cblock.scm.port"; - public static final int DFS_CBLOCK_SCM_PORT_DEFAULT = 9860; - - public static final String DFS_CBLOCK_CONTAINER_SIZE_GB_KEY = - "dfs.cblock.container.size.gb"; - public static final int DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT = - 5; - - // LevelDB cache file uses an off-heap cache in LevelDB of 256 MB. - public static final String DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY = - "dfs.cblock.cache.leveldb.cache.size.mb"; - public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256; - - /** - * Cache does an best case attempt to write a block to a container. - * At some point of time, we will need to handle the case where we did try - * 64K times and is till not able to write to the container. - * - * TODO: We will need cBlock Server to allow us to do a remapping of the - * block location in case of failures, at that point we should reduce the - * retry count to a more normal number. This is approximately 18 hours of - * retry. - */ - public static final String DFS_CBLOCK_CACHE_MAX_RETRY_KEY = - "dfs.cblock.cache.max.retry"; - public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT = - 64 * 1024; - - /** - * Cblock CLI configs. - */ - public static final String DFS_CBLOCK_MANAGER_POOL_SIZE = - "dfs.cblock.manager.pool.size"; - public static final int DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT = 16; - - /** - * currently the largest supported volume is about 8TB, which might take - * > 20 seconds to finish creating containers. thus set timeout to 30 sec. - */ - public static final String DFS_CBLOCK_RPC_TIMEOUT = - "dfs.cblock.rpc.timeout"; - public static final String DFS_CBLOCK_RPC_TIMEOUT_DEFAULT = "300s"; - - public static final String DFS_CBLOCK_ISCSI_ADVERTISED_IP = - "dfs.cblock.iscsi.advertised.ip"; - - public static final String DFS_CBLOCK_ISCSI_ADVERTISED_PORT = - "dfs.cblock.iscsi.advertised.port"; - - public static final int DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT = 3260; - - - public static final String - DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED - = "dfs.cblock.kubernetes.dynamic-provisioner.enabled"; - - public static final boolean - DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT = false; - - public static final String - DFS_CBLOCK_KUBERNETES_CBLOCK_USER = - "dfs.cblock.kubernetes.cblock-user"; - - public static final String - DFS_CBLOCK_KUBERNETES_CBLOCK_USER_DEFAULT = - "iqn.2001-04.org.apache.hadoop"; - - public static final String - DFS_CBLOCK_KUBERNETES_CONFIG_FILE_KEY = - "dfs.cblock.kubernetes.configfile"; - - private CBlockConfigKeys() { - - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java deleted file mode 100644 index 9318b6c..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CBlockManager.java +++ /dev/null @@ -1,426 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.BlockingService; -import org.apache.hadoop.cblock.kubernetes.DynamicProvisioner; -import org.apache.hadoop.cblock.meta.VolumeDescriptor; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.proto.CBlockClientProtocol; -import org.apache.hadoop.cblock.proto.CBlockServiceProtocol; -import org.apache.hadoop.cblock.proto.MountVolumeResponse; -import org.apache.hadoop.cblock.protocol.proto - .CBlockClientServerProtocolProtos; -import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos; -import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB; -import org.apache.hadoop.cblock.protocolPB - .CBlockClientServerProtocolServerSideTranslatorPB; -import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; -import org.apache.hadoop.cblock.protocolPB - .CBlockServiceProtocolServerSideTranslatorPB; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.client.ContainerOperationClient; -import org.apache.hadoop.hdds.scm.client.ScmClient; -import org.apache.hadoop.cblock.storage.StorageManager; -import org.apache.hadoop.cblock.util.KeyUtil; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.protocolPB - .StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.utils.LevelDBStore; - -import static org.apache.hadoop.cblock.CblockUtils.getCblockServerRpcAddr; -import static org.apache.hadoop.cblock.CblockUtils.getCblockServiceRpcAddr; -import static org.apache.hadoop.hdds.server.ServerUtils - .updateRPCListenAddress; -import org.iq80.leveldb.DBIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CONTAINER_SIZE_GB_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SCM_IPADDRESS_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SCM_IPADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SCM_PORT_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SCM_PORT_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT; - - -/** - * The main entry point of CBlock operations, ALL the CBlock operations - * will go through this class. But NOTE that: - * - * volume operations (create/ - * delete/info) are: - * client -> CBlockManager -> StorageManager -> CBlock client - * - * IO operations (put/get block) are; - * client -> CBlock client -> container - * - */ -public class CBlockManager implements CBlockServiceProtocol, - CBlockClientProtocol { - - private static final Logger LOG = - LoggerFactory.getLogger(CBlockManager.class); - - private final RPC.Server cblockService; - private final RPC.Server cblockServer; - - private final StorageManager storageManager; - - private final LevelDBStore levelDBStore; - private final String dbPath; - - private final DynamicProvisioner kubernetesDynamicProvisioner; - - private Charset encoding = Charset.forName("UTF-8"); - - public CBlockManager(OzoneConfiguration conf, - ScmClient storageClient) throws IOException { - // Fix the cBlockManagerId generattion code here. Should support - // cBlockManager --init command which will generate a cBlockManagerId and - // persist it locally. - storageManager = - new StorageManager(storageClient, conf, "CBLOCK"); - - dbPath = conf.getTrimmed(DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY, - DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT); - levelDBStore = new LevelDBStore(new File(dbPath), true); - LOG.info("Try to load exising volume information"); - readFromPersistentStore(); - - RPC.setProtocolEngine(conf, CBlockServiceProtocolPB.class, - ProtobufRpcEngine.class); - RPC.setProtocolEngine(conf, CBlockClientServerProtocolPB.class, - ProtobufRpcEngine.class); - // start service for client command-to-cblock server service - InetSocketAddress serviceRpcAddr = - getCblockServiceRpcAddr(conf); - BlockingService cblockProto = - CBlockServiceProtocolProtos - .CBlockServiceProtocolService - .newReflectiveBlockingService( - new CBlockServiceProtocolServerSideTranslatorPB(this) - ); - cblockService = startRpcServer(conf, CBlockServiceProtocolPB.class, - cblockProto, serviceRpcAddr, - DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY, - DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY, - DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT); - InetSocketAddress cblockServiceRpcAddress = - updateRPCListenAddress(conf, - DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, serviceRpcAddr, cblockService); - LOG.info("CBlock manager listening for client commands on: {}", - cblockServiceRpcAddress); - // now start service for cblock client-to-cblock server communication - - InetSocketAddress serverRpcAddr = - getCblockServerRpcAddr(conf); - BlockingService serverProto = - CBlockClientServerProtocolProtos - .CBlockClientServerProtocolService - .newReflectiveBlockingService( - new CBlockClientServerProtocolServerSideTranslatorPB(this) - ); - cblockServer = startRpcServer( - conf, CBlockClientServerProtocolPB.class, - serverProto, serverRpcAddr, - DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY, - DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY, - DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT); - InetSocketAddress cblockServerRpcAddress = - updateRPCListenAddress(conf, - DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, serverRpcAddr, cblockServer); - LOG.info("CBlock server listening for client commands on: {}", - cblockServerRpcAddress); - - if (conf.getBoolean(DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED, - DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT)) { - - kubernetesDynamicProvisioner = - new DynamicProvisioner(conf, storageManager); - kubernetesDynamicProvisioner.init(); - - } else { - kubernetesDynamicProvisioner = null; - } - } - - public void start() { - cblockService.start(); - cblockServer.start(); - if (kubernetesDynamicProvisioner != null) { - kubernetesDynamicProvisioner.start(); - } - LOG.info("CBlock manager started!"); - } - - public void stop() { - cblockService.stop(); - cblockServer.stop(); - if (kubernetesDynamicProvisioner != null) { - kubernetesDynamicProvisioner.stop(); - } - } - - public void join() { - try { - cblockService.join(); - cblockServer.join(); - } catch (InterruptedException e) { - LOG.error("Interrupted during join"); - Thread.currentThread().interrupt(); - } - } - - /** - * Starts an RPC server, if configured. - * - * @param conf configuration - * @param protocol RPC protocol provided by RPC server - * @param instance RPC protocol implementation instance - * @param addr configured address of RPC server - * @param bindHostKey configuration key for setting explicit bind host. If - * the property is not configured, then the bind host is taken from addr. - * @param handlerCountKey configuration key for RPC server handler count - * @param handlerCountDefault default RPC server handler count if unconfigured - * @return RPC server, or null if addr is null - * @throws IOException if there is an I/O error while creating RPC server - */ - private static RPC.Server startRpcServer(OzoneConfiguration conf, - Class<?> protocol, BlockingService instance, - InetSocketAddress addr, String bindHostKey, - String handlerCountKey, int handlerCountDefault) throws IOException { - if (addr == null) { - return null; - } - String bindHost = conf.getTrimmed(bindHostKey); - if (bindHost == null || bindHost.isEmpty()) { - bindHost = addr.getHostName(); - } - int numHandlers = conf.getInt(handlerCountKey, handlerCountDefault); - RPC.Server rpcServer = new RPC.Builder(conf) - .setProtocol(protocol) - .setInstance(instance) - .setBindAddress(bindHost) - .setPort(addr.getPort()) - .setNumHandlers(numHandlers) - .setVerbose(false) - .setSecretManager(null) - .build(); - return rpcServer; - } - - @Override - public synchronized MountVolumeResponse mountVolume( - String userName, String volumeName) throws IOException { - return storageManager.isVolumeValid(userName, volumeName); - } - - @Override - public List<VolumeInfo> listVolumes() throws IOException { - return listVolume(null); - } - - @Override - public synchronized void createVolume(String userName, String volumeName, - long volumeSize, int blockSize) throws IOException { - LOG.info("Create volume received: userName: {} volumeName: {} " + - "volumeSize: {} blockSize: {}", userName, volumeName, - volumeSize, blockSize); - // It is important to create in-memory representation of the - // volume first, then writes to persistent storage (levelDB) - // such that it is guaranteed that when there is an entry in - // levelDB, the volume is allocated. (more like a UNDO log fashion) - // TODO: what if creation failed? we allocated containers but lost - // the reference to the volume and all it's containers. How to release - // the containers? - storageManager.createVolume(userName, volumeName, volumeSize, blockSize); - VolumeDescriptor volume = storageManager.getVolume(userName, volumeName); - if (volume == null) { - throw new IOException("Volume creation failed!"); - } - String volumeKey = KeyUtil.getVolumeKey(userName, volumeName); - writeToPersistentStore(volumeKey.getBytes(encoding), - volume.toProtobuf().toByteArray()); - } - - @Override - public synchronized void deleteVolume(String userName, - String volumeName, boolean force) throws IOException { - LOG.info("Delete volume received: volume: {} {} ", volumeName, force); - storageManager.deleteVolume(userName, volumeName, force); - // being here means volume is successfully deleted now - String volumeKey = KeyUtil.getVolumeKey(userName, volumeName); - removeFromPersistentStore(volumeKey.getBytes(encoding)); - } - - // No need to synchronize on the following three methods, since write and - // remove's caller are synchronized. read's caller is the constructor and - // no other method call can happen at that time. - @VisibleForTesting - public void writeToPersistentStore(byte[] key, byte[] value) { - levelDBStore.put(key, value); - } - - @VisibleForTesting - public void removeFromPersistentStore(byte[] key) { - levelDBStore.delete(key); - } - - public void readFromPersistentStore() throws IOException { - try (DBIterator iter = levelDBStore.getIterator()) { - iter.seekToFirst(); - while (iter.hasNext()) { - Map.Entry<byte[], byte[]> entry = iter.next(); - String volumeKey = new String(entry.getKey(), encoding); - try { - VolumeDescriptor volumeDescriptor = - VolumeDescriptor.fromProtobuf(entry.getValue()); - storageManager.addVolume(volumeDescriptor); - } catch (IOException e) { - LOG.error("Loading volume " + volumeKey + " error " + e); - } - } - } - } - - @Override - public synchronized VolumeInfo infoVolume(String userName, String volumeName - ) throws IOException { - LOG.info("Info volume received: volume: {}", volumeName); - return storageManager.infoVolume(userName, volumeName); - } - - @VisibleForTesting - public synchronized List<VolumeDescriptor> getAllVolumes() { - return storageManager.getAllVolume(null); - } - - public synchronized List<VolumeDescriptor> getAllVolumes(String userName) { - return storageManager.getAllVolume(userName); - } - - public synchronized void close() { - try { - levelDBStore.close(); - } catch (IOException e) { - LOG.error("Error when closing levelDB " + e); - } - } - - public synchronized void clean() { - try { - levelDBStore.close(); - levelDBStore.destroy(); - } catch (IOException e) { - LOG.error("Error when deleting levelDB " + e); - } - } - - @Override - public synchronized List<VolumeInfo> listVolume(String userName) - throws IOException { - ArrayList<VolumeInfo> response = new ArrayList<>(); - List<VolumeDescriptor> allVolumes = - storageManager.getAllVolume(userName); - for (VolumeDescriptor volume : allVolumes) { - VolumeInfo info = - new VolumeInfo(volume.getUserName(), volume.getVolumeName(), - volume.getVolumeSize(), volume.getBlockSize()); - response.add(info); - } - return response; - } - - public static void main(String[] args) throws Exception { - long version = RPC.getProtocolVersion( - StorageContainerLocationProtocolPB.class); - CblockUtils.activateConfigs(); - OzoneConfiguration ozoneConf = new OzoneConfiguration(); - String scmAddress = ozoneConf.get(DFS_CBLOCK_SCM_IPADDRESS_KEY, - DFS_CBLOCK_SCM_IPADDRESS_DEFAULT); - int scmPort = ozoneConf.getInt(DFS_CBLOCK_SCM_PORT_KEY, - DFS_CBLOCK_SCM_PORT_DEFAULT); - int containerSizeGB = ozoneConf.getInt(DFS_CBLOCK_CONTAINER_SIZE_GB_KEY, - DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT); - ContainerOperationClient.setContainerSizeB(containerSizeGB* OzoneConsts.GB); - InetSocketAddress address = new InetSocketAddress(scmAddress, scmPort); - - ozoneConf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, - OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT); - LOG.info( - "Creating StorageContainerLocationProtocol RPC client with address {}", - address); - RPC.setProtocolEngine(ozoneConf, StorageContainerLocationProtocolPB.class, - ProtobufRpcEngine.class); - StorageContainerLocationProtocolClientSideTranslatorPB client = - new StorageContainerLocationProtocolClientSideTranslatorPB( - RPC.getProxy(StorageContainerLocationProtocolPB.class, version, - address, UserGroupInformation.getCurrentUser(), ozoneConf, - NetUtils.getDefaultSocketFactory(ozoneConf), - Client.getRpcTimeout(ozoneConf))); - ScmClient storageClient = new ContainerOperationClient( - client, new XceiverClientManager(ozoneConf)); - CBlockManager cbm = new CBlockManager(ozoneConf, storageClient); - cbm.start(); - cbm.join(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java deleted file mode 100644 index f0f1d05..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/CblockUtils.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.cblock; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; - -import com.google.common.base.Optional; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSI_PORT_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_PORT_DEFAULT; -import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys; -import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys; - -/** - * Generic stateless utility functions for CBlock components. - */ -public class CblockUtils { - - private CblockUtils() { - } - - /** - * Retrieve the socket address that is used by CBlock Service. - * - * @param conf - * @return Target InetSocketAddress for the CBlock Service endpoint. - */ - public static InetSocketAddress getCblockServiceRpcAddr(Configuration conf) { - final Optional<String> host = - getHostNameFromConfigKeys(conf, DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = - getPortNumberFromConfigKeys(conf, DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + port - .or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that is used by CBlock Server. - * - * @param conf - * @return Target InetSocketAddress for the CBlock Server endpoint. - */ - public static InetSocketAddress getCblockServerRpcAddr(Configuration conf) { - final Optional<String> host = - getHostNameFromConfigKeys(conf, DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = - getPortNumberFromConfigKeys(conf, DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + port - .or(DFS_CBLOCK_JSCSI_PORT_DEFAULT)); - } - - /** - * Parse size with size prefix string and return in bytes. - * - */ - public static long parseSize(String volumeSizeArgs) throws IOException { - long multiplier = 1; - - Pattern p = Pattern.compile("([0-9]+)([a-zA-Z]+)"); - Matcher m = p.matcher(volumeSizeArgs); - - if (!m.find()) { - throw new IOException("Invalid volume size args " + volumeSizeArgs); - } - - int size = Integer.parseInt(m.group(1)); - String s = m.group(2); - - if (s.equalsIgnoreCase("MB") || - s.equalsIgnoreCase("Mi")) { - multiplier = 1024L * 1024; - } else if (s.equalsIgnoreCase("GB") || - s.equalsIgnoreCase("Gi")) { - multiplier = 1024L * 1024 * 1024; - } else if (s.equalsIgnoreCase("TB") || - s.equalsIgnoreCase("Ti")) { - multiplier = 1024L * 1024 * 1024 * 1024; - } else { - throw new IOException("Invalid volume size args " + volumeSizeArgs); - } - return size * multiplier; - } - - public static void activateConfigs(){ - Configuration.addDefaultResource("hdfs-default.xml"); - Configuration.addDefaultResource("hdfs-site.xml"); - Configuration.addDefaultResource("ozone-default.xml"); - Configuration.addDefaultResource("ozone-site.xml"); - Configuration.addDefaultResource("cblock-default.xml"); - Configuration.addDefaultResource("cblock-site.xml"); - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java deleted file mode 100644 index ec38c9c..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.client; - -import com.google.protobuf.ServiceException; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.proto.CBlockServiceProtocol; -import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos; -import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ipc.ProtobufHelper; -import org.apache.hadoop.ipc.ProtocolTranslator; -import org.apache.hadoop.ipc.RPC; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * The client side implement of CBlockServiceProtocol. - */ -@InterfaceAudience.Private -public final class CBlockServiceProtocolClientSideTranslatorPB - implements CBlockServiceProtocol, ProtocolTranslator, Closeable { - - private final CBlockServiceProtocolPB rpcProxy; - - public CBlockServiceProtocolClientSideTranslatorPB( - CBlockServiceProtocolPB rpcProxy) { - this.rpcProxy = rpcProxy; - } - - @Override - public void close() throws IOException { - RPC.stopProxy(rpcProxy); - } - - @Override - public void createVolume(String userName, String volumeName, - long volumeSize, int blockSize) throws IOException { - CBlockServiceProtocolProtos.CreateVolumeRequestProto.Builder req = - CBlockServiceProtocolProtos.CreateVolumeRequestProto.newBuilder(); - req.setUserName(userName); - req.setVolumeName(volumeName); - req.setVolumeSize(volumeSize); - req.setBlockSize(blockSize); - try { - rpcProxy.createVolume(null, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public void deleteVolume(String userName, String volumeName, boolean force) - throws IOException { - CBlockServiceProtocolProtos.DeleteVolumeRequestProto.Builder req = - CBlockServiceProtocolProtos.DeleteVolumeRequestProto.newBuilder(); - req.setUserName(userName); - req.setVolumeName(volumeName); - req.setForce(force); - try { - rpcProxy.deleteVolume(null, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public Object getUnderlyingProxyObject() { - return rpcProxy; - } - - @Override - public VolumeInfo infoVolume(String userName, String volumeName) - throws IOException { - CBlockServiceProtocolProtos.InfoVolumeRequestProto.Builder req = - CBlockServiceProtocolProtos.InfoVolumeRequestProto.newBuilder(); - req.setUserName(userName); - req.setVolumeName(volumeName); - try { - CBlockServiceProtocolProtos.InfoVolumeResponseProto resp = - rpcProxy.infoVolume(null, req.build()); - return new VolumeInfo(resp.getVolumeInfo().getUserName(), - resp.getVolumeInfo().getVolumeName(), - resp.getVolumeInfo().getVolumeSize(), - resp.getVolumeInfo().getBlockSize(), - resp.getVolumeInfo().getUsage()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public List<VolumeInfo> listVolume(String userName) throws IOException { - CBlockServiceProtocolProtos.ListVolumeRequestProto.Builder req = - CBlockServiceProtocolProtos.ListVolumeRequestProto.newBuilder(); - if (userName != null) { - req.setUserName(userName); - } - try { - CBlockServiceProtocolProtos.ListVolumeResponseProto resp = - rpcProxy.listVolume(null, req.build()); - List<VolumeInfo> respList = new ArrayList<>(); - for (CBlockServiceProtocolProtos.VolumeInfoProto entry : - resp.getVolumeEntryList()) { - VolumeInfo volumeInfo = new VolumeInfo( - entry.getUserName(), entry.getVolumeName(), entry.getVolumeSize(), - entry.getBlockSize()); - respList.add(volumeInfo); - } - return respList; - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } catch (Exception e) { - throw new IOException("got" + e.getCause() + " " + e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java deleted file mode 100644 index 9227a28..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.client; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.cblock.CBlockConfigKeys; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; - -import static org.apache.hadoop.cblock.CblockUtils.getCblockServiceRpcAddr; - -/** - * Implementation of client used by CBlock command line tool. - */ -public class CBlockVolumeClient { - private final CBlockServiceProtocolClientSideTranslatorPB cblockClient; - - public CBlockVolumeClient(OzoneConfiguration conf) throws IOException { - this(conf, null); - } - - public CBlockVolumeClient(OzoneConfiguration conf, - InetSocketAddress serverAddress) throws IOException { - InetSocketAddress address = serverAddress != null ? serverAddress : - getCblockServiceRpcAddr(conf); - long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class); - int rpcTimeout = Math.toIntExact( - conf.getTimeDuration(CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT, - CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS)); - cblockClient = new CBlockServiceProtocolClientSideTranslatorPB( - RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version, - address, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), rpcTimeout, RetryPolicies - .retryUpToMaximumCountWithFixedSleep( - 300, 1, TimeUnit.SECONDS)).getProxy()); - } - - public void createVolume(String userName, String volumeName, - long volumeSize, int blockSize) throws IOException { - cblockClient.createVolume(userName, volumeName, - volumeSize, blockSize); - } - - public void deleteVolume(String userName, String volumeName, boolean force) - throws IOException { - cblockClient.deleteVolume(userName, volumeName, force); - } - - public VolumeInfo infoVolume(String userName, String volumeName) - throws IOException { - return cblockClient.infoVolume(userName, volumeName); - } - - public List<VolumeInfo> listVolume(String userName) - throws IOException { - return cblockClient.listVolume(userName); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java deleted file mode 100644 index 761b71e..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/client/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.client; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java deleted file mode 100644 index 8f6b82b..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.exception; - -import java.io.IOException; - -/** - * The exception class used in CBlock. - */ -public class CBlockException extends IOException { - public CBlockException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java deleted file mode 100644 index 268b8cb..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/exception/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.exception; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java deleted file mode 100644 index f2d289e..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import com.google.common.base.Preconditions; -import com.google.common.primitives.Longs; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.LevelDBStore; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Paths; - -/** - * The blockWriter task. - */ -public class BlockWriterTask implements Runnable { - private final LogicalBlock block; - private int tryCount; - private final ContainerCacheFlusher flusher; - private final String dbPath; - private final String fileName; - private final int maxRetryCount; - - /** - * Constructs a BlockWriterTask. - * - * @param block - Block Information. - * @param flusher - ContainerCacheFlusher. - */ - public BlockWriterTask(LogicalBlock block, ContainerCacheFlusher flusher, - String dbPath, int tryCount, String fileName, int maxRetryCount) { - this.block = block; - this.flusher = flusher; - this.dbPath = dbPath; - this.tryCount = tryCount; - this.fileName = fileName; - this.maxRetryCount = maxRetryCount; - } - - /** - * When an object implementing interface <code>Runnable</code> is used - * to create a thread, starting the thread causes the object's - * <code>run</code> method to be called in that separately executing - * thread. - * <p> - * The general contract of the method <code>run</code> is that it may - * take any action whatsoever. - * - * @see Thread#run() - */ - @Override - public void run() { - String containerName = null; - XceiverClientSpi client = null; - LevelDBStore levelDBStore = null; - String traceID = flusher.getTraceID(new File(dbPath), block.getBlockID()); - flusher.getLOG().debug( - "Writing block to remote. block ID: {}", block.getBlockID()); - try { - incTryCount(); - Pipeline pipeline = flusher.getPipeline(this.dbPath, block.getBlockID()); - client = flusher.getXceiverClientManager().acquireClient(pipeline); - containerName = pipeline.getContainerName(); - byte[] keybuf = Longs.toByteArray(block.getBlockID()); - byte[] data; - long startTime = Time.monotonicNow(); - levelDBStore = flusher.getCacheDB(this.dbPath); - data = levelDBStore.get(keybuf); - Preconditions.checkNotNull(data); - long endTime = Time.monotonicNow(); - Preconditions.checkState(data.length > 0, "Block data is zero length"); - startTime = Time.monotonicNow(); - ContainerProtocolCalls.writeSmallFile(client, containerName, - Long.toString(block.getBlockID()), data, traceID); - endTime = Time.monotonicNow(); - flusher.getTargetMetrics().updateContainerWriteLatency( - endTime - startTime); - flusher.getLOG().debug("Time taken for Write Small File : {} ms", - endTime - startTime); - - flusher.incrementRemoteIO(); - - } catch (Exception ex) { - flusher.getLOG().error("Writing of block:{} failed, We have attempted " + - "to write this block {} times to the container {}.Trace ID:{}", - block.getBlockID(), this.getTryCount(), containerName, traceID, ex); - writeRetryBlock(block); - if (ex instanceof IOException) { - flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks(); - } else { - flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks(); - } - if (this.getTryCount() >= maxRetryCount) { - flusher.getTargetMetrics().incNumWriteMaxRetryBlocks(); - } - } finally { - flusher.incFinishCount(fileName); - if (levelDBStore != null) { - flusher.releaseCacheDB(dbPath); - } - if(client != null) { - flusher.getXceiverClientManager().releaseClient(client); - } - } - } - - - private void writeRetryBlock(LogicalBlock currentBlock) { - boolean append = false; - String retryFileName = - String.format("%s.%d.%s.%s", AsyncBlockWriter.RETRY_LOG_PREFIX, - currentBlock.getBlockID(), Time.monotonicNow(), tryCount); - File logDir = new File(this.dbPath); - if (!logDir.exists() && !logDir.mkdirs()) { - flusher.getLOG().error( - "Unable to create the log directory, Critical error cannot continue"); - return; - } - String log = Paths.get(this.dbPath, retryFileName).toString(); - ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE); - buffer.putLong(currentBlock.getBlockID()); - buffer.flip(); - try { - FileChannel channel = new FileOutputStream(log, append).getChannel(); - channel.write(buffer); - channel.close(); - flusher.processDirtyBlocks(this.dbPath, retryFileName); - } catch (IOException e) { - flusher.getTargetMetrics().incNumFailedRetryLogFileWrites(); - flusher.getLOG().error("Unable to write the retry block. Block ID: {}", - currentBlock.getBlockID(), e); - } - } - - /** - * Increments the try count. This is done each time we try this block - * write to the container. - */ - private void incTryCount() { - tryCount++; - } - - /** - * Get the retry count. - * - * @return int - */ - public int getTryCount() { - return tryCount; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea85801c/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java b/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java deleted file mode 100644 index 0b6c5f3..0000000 --- a/hadoop-cblock/server/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import com.google.common.primitives.Longs; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.cblock.exception.CBlockException; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.proto.CBlockClientProtocol; -import org.apache.hadoop.cblock.proto.MountVolumeResponse; -import org.apache.hadoop.cblock.protocol.proto - .CBlockClientServerProtocolProtos.ContainerIDProto; -import org.apache.hadoop.cblock.protocol.proto - .CBlockClientServerProtocolProtos.ListVolumesRequestProto; -import org.apache.hadoop.cblock.protocol.proto - .CBlockClientServerProtocolProtos.ListVolumesResponseProto; -import org.apache.hadoop.cblock.protocol.proto - .CBlockClientServerProtocolProtos.MountVolumeRequestProto; -import org.apache.hadoop.cblock.protocol.proto - .CBlockClientServerProtocolProtos.MountVolumeResponseProto; -import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos - .VolumeInfoProto; -import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB; -import org.apache.hadoop.ipc.ProtobufHelper; -import org.apache.hadoop.ipc.ProtocolTranslator; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -/** - * The client side of CBlockClientProtocol. - * - * CBlockClientProtocol is the protocol used between cblock client side - * and cblock manager (cblock client side is just the node where jscsi daemon - * process runs. a machines talks to jscsi daemon for mounting a volume). - * - * Right now, the only communication carried by this protocol is for client side - * to request mounting a volume. - */ -public class CBlockClientProtocolClientSideTranslatorPB - implements CBlockClientProtocol, ProtocolTranslator, Closeable { - - private final CBlockClientServerProtocolPB rpcProxy; - - public CBlockClientProtocolClientSideTranslatorPB( - CBlockClientServerProtocolPB rpcProxy) { - this.rpcProxy = rpcProxy; - } - - - @Override - public void close() throws IOException { - RPC.stopProxy(rpcProxy); - } - - @Override - public Object getUnderlyingProxyObject() { - return rpcProxy; - } - - @Override - public MountVolumeResponse mountVolume( - String userName, String volumeName) throws IOException { - MountVolumeRequestProto.Builder - request - = MountVolumeRequestProto - .newBuilder(); - request.setUserName(userName); - request.setVolumeName(volumeName); - try { - MountVolumeResponseProto resp - = rpcProxy.mountVolume(null, request.build()); - if (!resp.getIsValid()) { - throw new CBlockException( - "Not a valid volume:" + userName + ":" + volumeName); - } - List<Pipeline> containerIDs = new ArrayList<>(); - HashMap<String, Pipeline> containerPipelines = new HashMap<>(); - if (resp.getAllContainerIDsList().size() == 0) { - throw new CBlockException("Mount volume request returned no container"); - } - for (ContainerIDProto containerID : - resp.getAllContainerIDsList()) { - if (containerID.hasPipeline()) { - // it should always have a pipeline only except for tests. - Pipeline p = Pipeline.getFromProtoBuf(containerID.getPipeline()); - p.setData(Longs.toByteArray(containerID.getIndex())); - containerIDs.add(p); - containerPipelines.put(containerID.getContainerID(), p); - } else { - throw new CBlockException("ContainerID does not have pipeline!"); - } - } - return new MountVolumeResponse( - resp.getIsValid(), - resp.getUserName(), - resp.getVolumeName(), - resp.getVolumeSize(), - resp.getBlockSize(), - containerIDs, - containerPipelines); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public List<VolumeInfo> listVolumes() throws IOException { - try { - List<VolumeInfo> result = new ArrayList<>(); - ListVolumesResponseProto - listVolumesResponseProto = this.rpcProxy.listVolumes(null, - ListVolumesRequestProto.newBuilder() - .build()); - for (VolumeInfoProto volumeInfoProto : - listVolumesResponseProto - .getVolumeEntryList()) { - result.add(new VolumeInfo(volumeInfoProto.getUserName(), - volumeInfoProto.getVolumeName(), volumeInfoProto.getVolumeSize(), - volumeInfoProto.getBlockSize())); - } - return result; - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org