http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/TestArchive.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/TestArchive.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/TestArchive.java new file mode 100644 index 0000000..6fff47e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/TestArchive.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.scm; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.fs.FileUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Iterator; +import java.util.Random; +import java.util.zip.Adler32; +import java.util.zip.Checksum; + +/** + * Test archive creation and unpacking. + */ +public class TestArchive { + private static final int DIR_COUNT = 10; + private static final int SUB_DIR_COUNT = 3; + private static final int FILE_COUNT = 10; + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Rule + public TemporaryFolder outputFolder = new TemporaryFolder(); + + Checksum crc = new Adler32(); + + @Before + public void setUp() throws Exception { + Random r = new Random(); + final int megaByte = 1024 * 1024; + + for (int x = 0; x < DIR_COUNT; x++) { + File subdir = folder.newFolder(String.format("dir%d", x)); + for (int y = 0; y < SUB_DIR_COUNT; y++) { + File targetDir = new File(subdir.getPath().concat(File.separator) + .concat(String.format("subdir%d%d", x, y))); + if(!targetDir.mkdirs()) { + throw new IOException("Failed to create subdirectory. " + + targetDir.toString()); + } + for (int z = 0; z < FILE_COUNT; z++) { + Path temp = Paths.get(targetDir.getPath().concat(File.separator) + .concat(String.format("File%d.txt", z))); + byte[] buf = RandomStringUtils.randomAlphanumeric(r.nextInt(megaByte)) + .getBytes("UTF-8"); + Files.write(temp, buf); + crc.update(buf, 0, buf.length); + } + } + } + } + + @Test + public void testArchive() throws Exception { + Checksum readCrc = new Adler32(); + File archiveFile = new File(outputFolder.getRoot() + File.separator + + "test.container.zip"); + long zipCheckSum = FileUtil.zip(folder.getRoot(), archiveFile); + Assert.assertTrue(zipCheckSum > 0); + File decomp = new File(outputFolder.getRoot() + File.separator + + "decompress"); + if (!decomp.exists() && !decomp.mkdirs()) { + throw new IOException("Unable to create the destination directory. " + + decomp.getPath()); + } + + FileUtil.unZip(archiveFile, decomp); + String[] patterns = {"txt"}; + Iterator<File> iter = FileUtils.iterateFiles(decomp, patterns, true); + int count = 0; + while (iter.hasNext()) { + count++; + byte[] buf = Files.readAllBytes(iter.next().toPath()); + readCrc.update(buf, 0, buf.length); + } + Assert.assertEquals(DIR_COUNT * SUB_DIR_COUNT * FILE_COUNT, count); + Assert.assertEquals(crc.getValue(), readCrc.getValue()); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/package-info.java new file mode 100644 index 0000000..9c480d6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/scm/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.scm; +/** + Test cases for SCM client classes. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java deleted file mode 100644 index 35b8961..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock; - -import static java.lang.Thread.NORM_PRIORITY; - -/** - * This class contains constants for configuration keys used in CBlock. - */ -public final class CBlockConfigKeys { - public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_KEY = - "dfs.cblock.servicerpc-address"; - public static final int DFS_CBLOCK_SERVICERPC_PORT_DEFAULT = - 9810; - public static final String DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT = - "0.0.0.0"; - - public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY = - "dfs.cblock.jscsi-address"; - - //The port on CBlockManager node for jSCSI to ask - public static final String DFS_CBLOCK_JSCSI_PORT_KEY = - "dfs.cblock.jscsi.port"; - public static final int DFS_CBLOCK_JSCSI_PORT_DEFAULT = - 9811; - - public static final String DFS_CBLOCK_SERVICERPC_BIND_HOST_KEY = - "dfs.cblock.service.rpc-bind-host"; - public static final String DFS_CBLOCK_JSCSIRPC_BIND_HOST_KEY = - "dfs.cblock.jscsi.rpc-bind-host"; - - // default block size is 4KB - public static final int DFS_CBLOCK_SERVICE_BLOCK_SIZE_DEFAULT = - 4096; - - public static final String DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY = - "dfs.storage.service.handler.count"; - public static final int DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT = 10; - - public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY = - "dfs.cblock.service.leveldb.path"; - //TODO : find a better place - public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT = - "/tmp/cblock_levelDB.dat"; - - - public static final String DFS_CBLOCK_DISK_CACHE_PATH_KEY = - "dfs.cblock.disk.cache.path"; - public static final String DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT = - "/tmp/cblockCacheDB"; - /** - * Setting this flag to true makes the block layer compute a sha256 hash of - * the data and log that information along with block ID. This is very - * useful for doing trace based simulation of various workloads. Since it is - * computing a hash for each block this could be expensive, hence default - * is false. - */ - public static final String DFS_CBLOCK_TRACE_IO = "dfs.cblock.trace.io"; - public static final boolean DFS_CBLOCK_TRACE_IO_DEFAULT = false; - - public static final String DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO = - "dfs.cblock.short.circuit.io"; - public static final boolean DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT = - false; - - /** - * Cache size in 1000s of entries. 256 indicates 256 * 1024. - */ - public static final String DFS_CBLOCK_CACHE_QUEUE_SIZE_KB = - "dfs.cblock.cache.cache.size.in.kb"; - public static final int DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT = 256; - - /** - * Minimum Number of threads that cache pool will use for background I/O. - */ - public static final String DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE = - "dfs.cblock.cache.core.min.pool.size"; - public static final int DFS_CBLOCK_CACHE_CORE_MIN_POOL_SIZE_DEFAULT = 16; - - /** - * Maximum Number of threads that cache pool will use for background I/O. - */ - - public static final String DFS_CBLOCK_CACHE_MAX_POOL_SIZE = - "dfs.cblock.cache.max.pool.size"; - public static final int DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT = 256; - - /** - * Number of seconds to keep the Thread alive when it is idle. - */ - public static final String DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS = - "dfs.cblock.cache.keep.alive.seconds"; - public static final long DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT = 60; - - /** - * Priority of cache flusher thread, affecting the relative performance of - * write and read. - */ - public static final String DFS_CBLOCK_CACHE_THREAD_PRIORITY = - "dfs.cblock.cache.thread.priority"; - public static final int DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT = - NORM_PRIORITY; - - /** - * Block Buffer size in terms of blockID entries, 512 means 512 blockIDs. - */ - public static final String DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE = - "dfs.cblock.cache.block.buffer.size"; - public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512; - - public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS = - "dfs.cblock.block.buffer.flush.interval.seconds"; - public static final int - DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT = 60; - - // jscsi server settings - public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY = - "dfs.cblock.jscsi.server.address"; - public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT = - "127.0.0.1"; - public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_KEY = - "dfs.cblock.jscsi.cblock.server.address"; - public static final String DFS_CBLOCK_JSCSI_CBLOCK_SERVER_ADDRESS_DEFAULT = - "127.0.0.1"; - - // to what address cblock server should talk to scm? - public static final String DFS_CBLOCK_SCM_IPADDRESS_KEY = - "dfs.cblock.scm.ipaddress"; - public static final String DFS_CBLOCK_SCM_IPADDRESS_DEFAULT = - "127.0.0.1"; - public static final String DFS_CBLOCK_SCM_PORT_KEY = - "dfs.cblock.scm.port"; - public static final int DFS_CBLOCK_SCM_PORT_DEFAULT = 9860; - - public static final String DFS_CBLOCK_CONTAINER_SIZE_GB_KEY = - "dfs.cblock.container.size"; - public static final int DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT = - 5; - - // LevelDB cache file uses an off-heap cache in LevelDB of 256 MB. - public static final String DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_KEY = - "dfs.cblock.cache.leveldb.cache.size.mb"; - public static final int DFS_CBLOCK_CACHE_LEVELDB_CACHE_SIZE_MB_DEFAULT = 256; - - /** - * Cache does an best case attempt to write a block to a container. - * At some point of time, we will need to handle the case where we did try - * 64K times and is till not able to write to the container. - * - * TODO: We will need cBlock Server to allow us to do a remapping of the - * block location in case of failures, at that point we should reduce the - * retry count to a more normal number. This is approximately 18 hours of - * retry. - */ - public static final String DFS_CBLOCK_CACHE_MAX_RETRY_KEY = - "dfs.cblock.cache.max.retry"; - public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT = - 64 * 1024; - - private CBlockConfigKeys() { - - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java index 144c3bb..d7349cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java @@ -34,7 +34,7 @@ import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; import org.apache.hadoop.cblock.protocolPB .CBlockServiceProtocolServerSideTranslatorPB; import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.scm.XceiverClientManager; http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java index f70e8a4..90a16ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java @@ -22,7 +22,7 @@ import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.security.UserGroupInformation; http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java index 5b76179..e6dff98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java @@ -31,12 +31,13 @@ import com.sun.jersey.api.container.ContainerFactory; import com.sun.jersey.api.core.ApplicationAdapter; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ksm.protocolPB +import org.apache.hadoop.ozone.ksm.protocolPB .KeySpaceManagerProtocolClientSideTranslatorPB; -import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB; -import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.protocolPB + .ScmBlockLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.OzoneConfiguration; -import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.protocolPB + .StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.ozone.web.handlers.ServiceFilter; import org.apache.hadoop.ozone.web.interfaces.StorageHandler; http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java index bfdecce..5114298 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java @@ -25,7 +25,7 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.HttpRequest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler; -import org.apache.hadoop.ozone.web.headers.Header; +import org.apache.hadoop.ozone.client.rest.headers.Header; import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer; import org.apache.hadoop.ozone.web.netty.RequestDispatchObjectStoreChannelHandler; http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java index 725b20f..d2aaf73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; -import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneBucket.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneBucket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneBucket.java deleted file mode 100644 index 51eefdc..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneBucket.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone; - - -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.ksm.helpers.KsmBucketInfo; -import org.apache.hadoop.ozone.OzoneConsts.Versioning; - -import java.util.List; - -/** - * A class that encapsulates OzoneBucket. - */ -public class OzoneBucket { - - /** - * Name of the volume in which the bucket belongs to. - */ - private final String volumeName; - /** - * Name of the bucket. - */ - private final String bucketName; - /** - * Bucket ACLs. - */ - private final List<OzoneAcl> acls; - - /** - * Type of storage to be used for this bucket. - * [RAM_DISK, SSD, DISK, ARCHIVE] - */ - private final StorageType storageType; - - /** - * Bucket Version flag. - */ - private final Versioning versioning; - - - /** - * Constructs OzoneBucket from KsmBucketInfo. - * - * @param ksmBucketInfo - */ - public OzoneBucket(KsmBucketInfo ksmBucketInfo) { - this.volumeName = ksmBucketInfo.getVolumeName(); - this.bucketName = ksmBucketInfo.getBucketName(); - this.acls = ksmBucketInfo.getAcls(); - this.storageType = ksmBucketInfo.getStorageType(); - this.versioning = ksmBucketInfo.getIsVersionEnabled() ? - Versioning.ENABLED : Versioning.DISABLED; - } - - /** - * Returns Volume Name. - * - * @return volumeName - */ - public String getVolumeName() { - return volumeName; - } - - /** - * Returns Bucket Name. - * - * @return bucketName - */ - public String getBucketName() { - return bucketName; - } - - /** - * Returns ACL's associated with the Bucket. - * - * @return acls - */ - public List<OzoneAcl> getAcls() { - return acls; - } - - /** - * Returns StorageType of the Bucket. - * - * @return storageType - */ - public StorageType getStorageType() { - return storageType; - } - - /** - * Returns Versioning associated with the Bucket. - * - * @return versioning - */ - public Versioning getVersioning() { - return versioning; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClient.java deleted file mode 100644 index dd52a57..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClient.java +++ /dev/null @@ -1,414 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone; - -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.ozone.OzoneConsts.Versioning; -import org.apache.hadoop.ozone.io.OzoneInputStream; -import org.apache.hadoop.ozone.io.OzoneOutputStream; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -/** - * OzoneClient can connect to a Ozone Cluster and - * perform basic operations. - */ -public interface OzoneClient { - - /** - * Creates a new Volume. - * - * @param volumeName Name of the Volume - * - * @throws IOException - */ - void createVolume(String volumeName) - throws IOException; - - /** - * Creates a new Volume, with owner set. - * - * @param volumeName Name of the Volume - * @param owner Owner to be set for Volume - * - * @throws IOException - */ - void createVolume(String volumeName, String owner) - throws IOException; - - /** - * Creates a new Volume, with owner and quota set. - * - * @param volumeName Name of the Volume - * @param owner Owner to be set for Volume - * @param acls ACLs to be added to the Volume - * - * @throws IOException - */ - void createVolume(String volumeName, String owner, - OzoneAcl... acls) - throws IOException; - - /** - * Creates a new Volume, with owner and quota set. - * - * @param volumeName Name of the Volume - * @param owner Owner to be set for Volume - * @param quota Volume Quota - * - * @throws IOException - */ - void createVolume(String volumeName, String owner, - long quota) - throws IOException; - - /** - * Creates a new Volume, with owner and quota set. - * - * @param volumeName Name of the Volume - * @param owner Owner to be set for Volume - * @param quota Volume Quota - * @param acls ACLs to be added to the Volume - * - * @throws IOException - */ - void createVolume(String volumeName, String owner, - long quota, OzoneAcl... acls) - throws IOException; - - /** - * Sets the owner of the volume. - * - * @param volumeName Name of the Volume - * @param owner to be set for the Volume - * - * @throws IOException - */ - void setVolumeOwner(String volumeName, String owner) throws IOException; - - /** - * Set Volume Quota. - * - * @param volumeName Name of the Volume - * @param quota Quota to be set for the Volume - * - * @throws IOException - */ - void setVolumeQuota(String volumeName, long quota) - throws IOException; - - /** - * Returns {@link OzoneVolume}. - * - * @param volumeName Name of the Volume - * - * @return KsmVolumeArgs - * - * @throws OzoneVolume - * */ - OzoneVolume getVolumeDetails(String volumeName) - throws IOException; - - /** - * Checks if a Volume exists and the user with a role specified has access - * to the Volume. - * - * @param volumeName Name of the Volume - * @param acl requested acls which needs to be checked for access - * - * @return Boolean - True if the user with a role can access the volume. - * This is possible for owners of the volume and admin users - * - * @throws IOException - */ - boolean checkVolumeAccess(String volumeName, OzoneAcl acl) - throws IOException; - - /** - * Deletes an Empty Volume. - * - * @param volumeName Name of the Volume - * - * @throws IOException - */ - void deleteVolume(String volumeName) throws IOException; - - /** - * Returns the List of Volumes owned by current user. - * - * @param volumePrefix Volume prefix to match - * - * @return KsmVolumeArgs Iterator - * - * @throws IOException - */ - Iterator<OzoneVolume> listVolumes(String volumePrefix) - throws IOException; - - /** - * Returns the List of Volumes owned by the specific user. - * - * @param volumePrefix Volume prefix to match - * @param user User Name - * - * @return KsmVolumeArgs Iterator - * - * @throws IOException - */ - Iterator<OzoneVolume> listVolumes(String volumePrefix, String user) - throws IOException; - - /** - * Creates a new Bucket in the Volume. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * - * @throws IOException - */ - void createBucket(String volumeName, String bucketName) - throws IOException; - - /** - * Creates a new Bucket in the Volume, with versioning set. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param versioning Bucket versioning - * - * @throws IOException - */ - void createBucket(String volumeName, String bucketName, - Versioning versioning) - throws IOException; - - /** - * Creates a new Bucket in the Volume, with storage type set. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param storageType StorageType for the Bucket - * - * @throws IOException - */ - void createBucket(String volumeName, String bucketName, - StorageType storageType) - throws IOException; - - /** - * Creates a new Bucket in the Volume, with ACLs set. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param acls OzoneAcls for the Bucket - * - * @throws IOException - */ - void createBucket(String volumeName, String bucketName, - OzoneAcl... acls) - throws IOException; - - - /** - * Creates a new Bucket in the Volume, with versioning - * storage type and ACLs set. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param storageType StorageType for the Bucket - * - * @throws IOException - */ - void createBucket(String volumeName, String bucketName, - OzoneConsts.Versioning versioning, - StorageType storageType, OzoneAcl... acls) - throws IOException; - - /** - * Adds or Removes ACLs from a Bucket. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * - * @throws IOException - */ - void addBucketAcls(String volumeName, String bucketName, - List<OzoneAcl> addAcls) - throws IOException; - - /** - * Adds or Removes ACLs from a Bucket. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * - * @throws IOException - */ - void removeBucketAcls(String volumeName, String bucketName, - List<OzoneAcl> removeAcls) - throws IOException; - - - /** - * Enables or disables Bucket Versioning. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * - * @throws IOException - */ - void setBucketVersioning(String volumeName, String bucketName, - Versioning versioning) - throws IOException; - - /** - * Sets the Storage Class of a Bucket. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * - * @throws IOException - */ - void setBucketStorageType(String volumeName, String bucketName, - StorageType storageType) - throws IOException; - - /** - * Deletes a bucket if it is empty. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * - * @throws IOException - */ - void deleteBucket(String volumeName, String bucketName) - throws IOException; - - /** - * true if the bucket exists and user has read access - * to the bucket else throws Exception. - * - * @param volumeName Name of the Volume - * - * @throws IOException - */ - void checkBucketAccess(String volumeName, String bucketName) - throws IOException; - - /** - * Returns {@link OzoneBucket}. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * - * @return OzoneBucket - * - * @throws IOException - */ - OzoneBucket getBucketDetails(String volumeName, String bucketName) - throws IOException; - - /** - * Returns the List of Buckets in the Volume. - * - * @param volumeName Name of the Volume - * @param bucketPrefix Bucket prefix to match - * - * @return KsmVolumeArgs Iterator - * - * @throws IOException - */ - Iterator<OzoneBucket> listBuckets(String volumeName, String bucketPrefix) - throws IOException; - - /** - * Writes a key in an existing bucket. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param size Size of the data - * - * @return OutputStream - * - */ - OzoneOutputStream createKey(String volumeName, String bucketName, - String keyName, long size) - throws IOException; - - /** - * Reads a key from an existing bucket. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * - * @return LengthInputStream - * - * @throws IOException - */ - OzoneInputStream getKey(String volumeName, String bucketName, String keyName) - throws IOException; - - - /** - * Deletes an existing key. - * - * @param volumeName Name of the Volume - * - * @throws IOException - */ - void deleteKey(String volumeName, String bucketName, String keyName) - throws IOException; - - - /** - * Returns list of {@link OzoneKey} in Volume/Bucket. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * - * @return OzoneKey - * - * @throws IOException - */ - List<OzoneKey> listKeys(String volumeName, String bucketName, - String keyPrefix) - throws IOException; - - - /** - * Get OzoneKey. - * - * @param volumeName Name of the Volume - * @param bucketName Name of the Bucket - * @param keyName Key name - * - * @return OzoneKey - * - * @throws IOException - */ - OzoneKey getKeyDetails(String volumeName, String bucketName, - String keyName) - throws IOException; - - /** - * Close and release the resources. - */ - void close() throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java deleted file mode 100644 index 7866f58..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone; - -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; - -/** - * Factory class to create different types of OzoneClients. - */ -public final class OzoneClientFactory { - - /** - * Private constructor, class is not meant to be initialized. - */ - private OzoneClientFactory(){} - - private static Configuration configuration; - - /** - * Returns an OzoneClient which will use RPC protocol to perform - * client operations. - * - * @return OzoneClient - * @throws IOException - */ - public static OzoneClient getRpcClient() throws IOException { - return new OzoneClientImpl(getConfiguration()); - } - - /** - * Sets the configuration, which will be used while creating OzoneClient. - * - * @param conf - */ - public static void setConfiguration(Configuration conf) { - configuration = conf; - } - - /** - * Returns the configuration if it's already set, else creates a new - * {@link OzoneConfiguration} and returns it. - * - * @return Configuration - */ - private static synchronized Configuration getConfiguration() { - if(configuration == null) { - setConfiguration(new OzoneConfiguration()); - } - return configuration; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java deleted file mode 100644 index feb4586..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java +++ /dev/null @@ -1,570 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ksm.helpers.KsmBucketArgs; -import org.apache.hadoop.ksm.helpers.KsmBucketInfo; -import org.apache.hadoop.ksm.helpers.KsmKeyArgs; -import org.apache.hadoop.ksm.helpers.KsmKeyInfo; -import org.apache.hadoop.ksm.helpers.KsmVolumeArgs; -import org.apache.hadoop.ksm.protocolPB - .KeySpaceManagerProtocolClientSideTranslatorPB; -import org.apache.hadoop.ksm.protocolPB - .KeySpaceManagerProtocolPB; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.ozone.web.storage.ChunkGroupInputStream; -import org.apache.hadoop.ozone.io.OzoneInputStream; -import org.apache.hadoop.ozone.io.OzoneOutputStream; -import org.apache.hadoop.ozone.ksm.KSMConfigKeys; -import org.apache.hadoop.ozone.OzoneConsts.Versioning; -import org.apache.hadoop.ozone.protocolPB.KSMPBHelper; -import org.apache.hadoop.ozone.web.storage.ChunkGroupOutputStream; -import org.apache.hadoop.scm.ScmConfigKeys; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.scm.protocolPB - .StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.scm.protocolPB - .StorageContainerLocationProtocolPB; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; -import java.util.stream.Collectors; - -/** - * Ozone Client Implementation, it connects to KSM, SCM and DataNode - * to execute client calls. This uses RPC protocol for communication - * with the servers. - */ -public class OzoneClientImpl implements OzoneClient, Closeable { - - private static final Logger LOG = - LoggerFactory.getLogger(OzoneClient.class); - - private final StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private final KeySpaceManagerProtocolClientSideTranslatorPB - keySpaceManagerClient; - private final XceiverClientManager xceiverClientManager; - private final int chunkSize; - - - private final UserGroupInformation ugi; - private final OzoneAcl.OzoneACLRights userRights; - private final OzoneAcl.OzoneACLRights groupRights; - - /** - * Creates OzoneClientImpl instance with new OzoneConfiguration. - * - * @throws IOException - */ - public OzoneClientImpl() throws IOException { - this(new OzoneConfiguration()); - } - - /** - * Creates OzoneClientImpl instance with the given configuration. - * - * @param conf - * - * @throws IOException - */ - public OzoneClientImpl(Configuration conf) throws IOException { - Preconditions.checkNotNull(conf); - this.ugi = UserGroupInformation.getCurrentUser(); - this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS, - KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT); - this.groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS, - KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT); - - long scmVersion = - RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class); - InetSocketAddress scmAddress = - OzoneClientUtils.getScmAddressForClients(conf); - RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, - ProtobufRpcEngine.class); - this.storageContainerLocationClient = - new StorageContainerLocationProtocolClientSideTranslatorPB( - RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion, - scmAddress, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf))); - - long ksmVersion = - RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class); - InetSocketAddress ksmAddress = OzoneClientUtils.getKsmAddress(conf); - RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class, - ProtobufRpcEngine.class); - this.keySpaceManagerClient = - new KeySpaceManagerProtocolClientSideTranslatorPB( - RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion, - ksmAddress, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf))); - - this.xceiverClientManager = new XceiverClientManager(conf); - - int configuredChunkSize = conf.getInt( - ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, - ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT); - if(configuredChunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) { - LOG.warn("The chunk size ({}) is not allowed to be more than" - + " the maximum size ({})," - + " resetting to the maximum size.", - configuredChunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE); - chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; - } else { - chunkSize = configuredChunkSize; - } - } - - @Override - public void createVolume(String volumeName) - throws IOException { - createVolume(volumeName, ugi.getUserName()); - } - - @Override - public void createVolume(String volumeName, String owner) - throws IOException { - - createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES, - (OzoneAcl[])null); - } - - @Override - public void createVolume(String volumeName, String owner, - OzoneAcl... acls) - throws IOException { - createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES, acls); - } - - @Override - public void createVolume(String volumeName, String owner, - long quota) - throws IOException { - createVolume(volumeName, owner, quota, (OzoneAcl[])null); - } - - @Override - public void createVolume(String volumeName, String owner, - long quota, OzoneAcl... acls) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(owner); - Preconditions.checkNotNull(quota); - Preconditions.checkState(quota >= 0); - OzoneAcl userAcl = - new OzoneAcl(OzoneAcl.OzoneACLType.USER, - owner, userRights); - KsmVolumeArgs.Builder builder = KsmVolumeArgs.newBuilder(); - builder.setAdminName(ugi.getUserName()) - .setOwnerName(owner) - .setVolume(volumeName) - .setQuotaInBytes(quota) - .addOzoneAcls(KSMPBHelper.convertOzoneAcl(userAcl)); - - List<OzoneAcl> listOfAcls = new ArrayList<>(); - - //Group ACLs of the User - List<String> userGroups = Arrays.asList(UserGroupInformation - .createRemoteUser(owner).getGroupNames()); - userGroups.stream().forEach((group) -> listOfAcls.add( - new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights))); - - //ACLs passed as argument - if(acls != null) { - listOfAcls.addAll(Arrays.asList(acls)); - } - - //Remove duplicates and set - for (OzoneAcl ozoneAcl : - listOfAcls.stream().distinct().collect(Collectors.toList())) { - builder.addOzoneAcls(KSMPBHelper.convertOzoneAcl(ozoneAcl)); - } - - LOG.info("Creating Volume: {}, with {} as owner and quota set to {} bytes.", - volumeName, owner, quota); - keySpaceManagerClient.createVolume(builder.build()); - } - - @Override - public void setVolumeOwner(String volumeName, String owner) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(owner); - keySpaceManagerClient.setOwner(volumeName, owner); - } - - @Override - public void setVolumeQuota(String volumeName, long quota) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(quota); - Preconditions.checkState(quota >= 0); - keySpaceManagerClient.setQuota(volumeName, quota); - } - - @Override - public OzoneVolume getVolumeDetails(String volumeName) - throws IOException { - Preconditions.checkNotNull(volumeName); - KsmVolumeArgs volumeArgs = - keySpaceManagerClient.getVolumeInfo(volumeName); - return new OzoneVolume(volumeArgs); - } - - @Override - public boolean checkVolumeAccess(String volumeName, OzoneAcl acl) - throws IOException { - Preconditions.checkNotNull(volumeName); - return keySpaceManagerClient.checkVolumeAccess(volumeName, - KSMPBHelper.convertOzoneAcl(acl)); - } - - @Override - public void deleteVolume(String volumeName) - throws IOException { - Preconditions.checkNotNull(volumeName); - keySpaceManagerClient.deleteVolume(volumeName); - } - - @Override - public Iterator<OzoneVolume> listVolumes(String volumePrefix) - throws IOException { - throw new UnsupportedOperationException("Not yet implemented."); - } - - @Override - public Iterator<OzoneVolume> listVolumes(String volumePrefix, - String user) - throws IOException { - throw new UnsupportedOperationException("Not yet implemented."); - } - - @Override - public void createBucket(String volumeName, String bucketName) - throws IOException { - createBucket(volumeName, bucketName, Versioning.NOT_DEFINED, - StorageType.DEFAULT, (OzoneAcl[])null); - } - - @Override - public void createBucket(String volumeName, String bucketName, - Versioning versioning) - throws IOException { - createBucket(volumeName, bucketName, versioning, - StorageType.DEFAULT, (OzoneAcl[])null); - } - - @Override - public void createBucket(String volumeName, String bucketName, - StorageType storageType) - throws IOException { - createBucket(volumeName, bucketName, Versioning.NOT_DEFINED, - storageType, (OzoneAcl[])null); - } - - @Override - public void createBucket(String volumeName, String bucketName, - OzoneAcl... acls) - throws IOException { - createBucket(volumeName, bucketName, Versioning.NOT_DEFINED, - StorageType.DEFAULT, acls); - } - - @Override - public void createBucket(String volumeName, String bucketName, - Versioning versioning, StorageType storageType, - OzoneAcl... acls) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - Preconditions.checkNotNull(versioning); - Preconditions.checkNotNull(storageType); - - KsmBucketInfo.Builder builder = KsmBucketInfo.newBuilder(); - builder.setVolumeName(volumeName) - .setBucketName(bucketName) - .setStorageType(storageType) - .setIsVersionEnabled(getBucketVersioningProtobuf( - versioning)); - - String owner = ugi.getUserName(); - final List<OzoneAcl> listOfAcls = new ArrayList<>(); - - //User ACL - OzoneAcl userAcl = - new OzoneAcl(OzoneAcl.OzoneACLType.USER, - owner, userRights); - listOfAcls.add(userAcl); - - //Group ACLs of the User - List<String> userGroups = Arrays.asList(UserGroupInformation - .createRemoteUser(owner).getGroupNames()); - userGroups.stream().forEach((group) -> listOfAcls.add( - new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights))); - - //ACLs passed as argument - if(acls != null) { - Arrays.stream(acls).forEach((acl) -> listOfAcls.add(acl)); - } - - //Remove duplicates and set - builder.setAcls(listOfAcls.stream().distinct() - .collect(Collectors.toList())); - LOG.info("Creating Bucket: {}/{}, with Versioning {} and " + - "Storage Type set to {}", volumeName, bucketName, versioning, - storageType); - keySpaceManagerClient.createBucket(builder.build()); - } - - /** - * Converts OzoneConts.Versioning enum to boolean. - * - * @param version - * @return corresponding boolean value - */ - private boolean getBucketVersioningProtobuf( - Versioning version) { - if(version != null) { - switch(version) { - case ENABLED: - return true; - case NOT_DEFINED: - case DISABLED: - default: - return false; - } - } - return false; - } - - @Override - public void addBucketAcls(String volumeName, String bucketName, - List<OzoneAcl> addAcls) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - Preconditions.checkNotNull(addAcls); - KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder(); - builder.setVolumeName(volumeName) - .setBucketName(bucketName) - .setAddAcls(addAcls); - keySpaceManagerClient.setBucketProperty(builder.build()); - } - - @Override - public void removeBucketAcls(String volumeName, String bucketName, - List<OzoneAcl> removeAcls) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - Preconditions.checkNotNull(removeAcls); - KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder(); - builder.setVolumeName(volumeName) - .setBucketName(bucketName) - .setRemoveAcls(removeAcls); - keySpaceManagerClient.setBucketProperty(builder.build()); - } - - @Override - public void setBucketVersioning(String volumeName, String bucketName, - Versioning versioning) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - Preconditions.checkNotNull(versioning); - KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder(); - builder.setVolumeName(volumeName) - .setBucketName(bucketName) - .setIsVersionEnabled(getBucketVersioningFlag( - versioning)); - keySpaceManagerClient.setBucketProperty(builder.build()); - } - - @Override - public void setBucketStorageType(String volumeName, String bucketName, - StorageType storageType) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - Preconditions.checkNotNull(storageType); - KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder(); - builder.setVolumeName(volumeName) - .setBucketName(bucketName) - .setStorageType(storageType); - keySpaceManagerClient.setBucketProperty(builder.build()); - } - - @Override - public void deleteBucket(String volumeName, String bucketName) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - keySpaceManagerClient.deleteBucket(volumeName, bucketName); - } - - @Override - public void checkBucketAccess(String volumeName, String bucketName) - throws IOException { - throw new UnsupportedOperationException("Not yet implemented."); - } - - @Override - public OzoneBucket getBucketDetails(String volumeName, - String bucketName) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - KsmBucketInfo bucketInfo = - keySpaceManagerClient.getBucketInfo(volumeName, bucketName); - return new OzoneBucket(bucketInfo); - } - - @Override - public Iterator<OzoneBucket> listBuckets(String volumeName, - String bucketPrefix) - throws IOException { - throw new UnsupportedOperationException("Not yet implemented."); - } - - @Override - public OzoneOutputStream createKey(String volumeName, String bucketName, - String keyName, long size) - throws IOException { - String requestId = UUID.randomUUID().toString(); - KsmKeyArgs keyArgs = new KsmKeyArgs.Builder() - .setVolumeName(volumeName) - .setBucketName(bucketName) - .setKeyName(keyName) - .setDataSize(size) - .build(); - - KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs); - ChunkGroupOutputStream groupOutputStream = - ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager, - storageContainerLocationClient, chunkSize, requestId); - return new OzoneOutputStream(groupOutputStream); - } - - @Override - public OzoneInputStream getKey(String volumeName, String bucketName, - String keyName) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - Preconditions.checkNotNull(keyName); - String requestId = UUID.randomUUID().toString(); - KsmKeyArgs keyArgs = new KsmKeyArgs.Builder() - .setVolumeName(volumeName) - .setBucketName(bucketName) - .setKeyName(keyName) - .build(); - KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs); - LengthInputStream lengthInputStream = - ChunkGroupInputStream.getFromKsmKeyInfo( - keyInfo, xceiverClientManager, storageContainerLocationClient, - requestId); - return new OzoneInputStream( - (ChunkGroupInputStream)lengthInputStream.getWrappedStream()); - } - - @Override - public void deleteKey(String volumeName, String bucketName, - String keyName) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - Preconditions.checkNotNull(keyName); - KsmKeyArgs keyArgs = new KsmKeyArgs.Builder() - .setVolumeName(volumeName) - .setBucketName(bucketName) - .setKeyName(keyName) - .build(); - keySpaceManagerClient.deleteKey(keyArgs); - } - - @Override - public List<OzoneKey> listKeys(String volumeName, String bucketName, - String keyPrefix) - throws IOException { - throw new UnsupportedOperationException("Not yet implemented."); - } - - @Override - public OzoneKey getKeyDetails(String volumeName, String bucketName, - String keyName) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - Preconditions.checkNotNull(keyName); - KsmKeyArgs keyArgs = new KsmKeyArgs.Builder() - .setVolumeName(volumeName) - .setBucketName(bucketName) - .setKeyName(keyName) - .build(); - KsmKeyInfo keyInfo = - keySpaceManagerClient.lookupKey(keyArgs); - return new OzoneKey(keyInfo); - } - - /** - * Converts Versioning to boolean. - * - * @param version - * @return corresponding boolean value - */ - private boolean getBucketVersioningFlag( - Versioning version) { - if(version != null) { - switch(version) { - case ENABLED: - return true; - case DISABLED: - case NOT_DEFINED: - default: - return false; - } - } - return false; - } - - @Override - public void close() throws IOException { - IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient); - IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient); - IOUtils.cleanupWithLogger(LOG, xceiverClientManager); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java deleted file mode 100644 index d92f49f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java +++ /dev/null @@ -1,705 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone; - -import com.google.common.base.Optional; - -import com.google.common.net.HostAndPort; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.scm.ScmConfigKeys; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.HttpRequestBase; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_PORT_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSI_PORT_DEFAULT; - -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys - .OZONE_KSM_BIND_HOST_DEFAULT; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_PORT_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_DEADNODE_INTERVAL_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_DEADNODE_INTERVAL_MS; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS; - -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS; - -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_STALENODE_INTERVAL_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_STALENODE_INTERVAL_MS; - -/** - * Utility methods for Ozone and Container Clients. - * - * The methods to retrieve SCM service endpoints assume there is a single - * SCM service instance. This will change when we switch to replicated service - * instances for redundancy. - */ -@InterfaceAudience.Public -@InterfaceStability.Unstable -public final class OzoneClientUtils { - private static final Logger LOG = LoggerFactory.getLogger( - OzoneClientUtils.class); - private static final int NO_PORT = -1; - - /** - * The service ID of the solitary Ozone SCM service. - */ - public static final String OZONE_SCM_SERVICE_ID = "OzoneScmService"; - public static final String OZONE_SCM_SERVICE_INSTANCE_ID = - "OzoneScmServiceInstance"; - - private OzoneClientUtils() { - // Never constructed - } - - /** - * Retrieve the socket addresses of all storage container managers. - * - * @param conf - * @return A collection of SCM addresses - * @throws IllegalArgumentException If the configuration is invalid - */ - public static Collection<InetSocketAddress> getSCMAddresses( - Configuration conf) throws IllegalArgumentException { - Collection<InetSocketAddress> addresses = - new HashSet<InetSocketAddress>(); - Collection<String> names = - conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES); - if (names == null || names.isEmpty()) { - throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES - + " need to be a set of valid DNS names or IP addresses." - + " Null or empty address list found."); - } - - final com.google.common.base.Optional<Integer> - defaultPort = com.google.common.base.Optional.of(ScmConfigKeys - .OZONE_SCM_DEFAULT_PORT); - for (String address : names) { - com.google.common.base.Optional<String> hostname = - OzoneClientUtils.getHostName(address); - if (!hostname.isPresent()) { - throw new IllegalArgumentException("Invalid hostname for SCM: " - + hostname); - } - com.google.common.base.Optional<Integer> port = - OzoneClientUtils.getHostPort(address); - InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(), - port.or(defaultPort.get())); - addresses.add(addr); - } - return addresses; - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the SCM. - * - * @param conf - * @return Target InetSocketAddress for the SCM client endpoint. - */ - public static InetSocketAddress getScmAddressForClients(Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); - - if (!host.isPresent()) { - throw new IllegalArgumentException( - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY + - " must be defined. See" + - " https://wiki.apache.org/hadoop/Ozone#Configuration for details" + - " on configuring Ozone."); - } - - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); - - return NetUtils.createSocketAddr(host.get() + ":" + - port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the SCM for block service. If - * {@link ScmConfigKeys#OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY} is not defined - * then {@link ScmConfigKeys#OZONE_SCM_CLIENT_ADDRESS_KEY} is used. - * - * @param conf - * @return Target InetSocketAddress for the SCM block client endpoint. - * @throws IllegalArgumentException if configuration is not defined. - */ - public static InetSocketAddress getScmAddressForBlockClients( - Configuration conf) { - Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY); - - if (!host.isPresent()) { - host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); - if (!host.isPresent()) { - throw new IllegalArgumentException( - ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY + - " must be defined. See" + - " https://wiki.apache.org/hadoop/Ozone#Configuration for details" + - " on configuring Ozone."); - } - } - - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY); - - return NetUtils.createSocketAddr(host.get() + ":" + - port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that should be used by DataNodes to connect - * to the SCM. - * - * @param conf - * @return Target InetSocketAddress for the SCM service endpoint. - */ - public static InetSocketAddress getScmAddressForDataNodes( - Configuration conf) { - // We try the following settings in decreasing priority to retrieve the - // target host. - // - OZONE_SCM_DATANODE_ADDRESS_KEY - // - OZONE_SCM_CLIENT_ADDRESS_KEY - // - final Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); - - if (!host.isPresent()) { - throw new IllegalArgumentException( - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY + - " must be defined. See" + - " https://wiki.apache.org/hadoop/Ozone#Configuration for details" + - " on configuring Ozone."); - } - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY); - - InetSocketAddress addr = NetUtils.createSocketAddr(host.get() + ":" + - port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT)); - - return addr; - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the SCM. - * - * @param conf - * @return Target InetSocketAddress for the SCM client endpoint. - */ - public static InetSocketAddress getScmClientBindAddress( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY); - - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" + - port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the SCM Block service. - * - * @param conf - * @return Target InetSocketAddress for the SCM block client endpoint. - */ - public static InetSocketAddress getScmBlockClientBindAddress( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY); - - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT) + - ":" + port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that should be used by DataNodes to connect - * to the SCM. - * - * @param conf - * @return Target InetSocketAddress for the SCM service endpoint. - */ - public static InetSocketAddress getScmDataNodeBindAddress( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY); - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" + - port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT)); - } - - - /** - * Retrieve the socket address that is used by KSM. - * @param conf - * @return Target InetSocketAddress for the SCM service endpoint. - */ - public static InetSocketAddress getKsmAddress( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - OZONE_KSM_ADDRESS_KEY); - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - OZONE_KSM_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(OZONE_KSM_BIND_HOST_DEFAULT) + ":" + - port.or(OZONE_KSM_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that is used by CBlock Service. - * @param conf - * @return Target InetSocketAddress for the CBlock Service endpoint. - */ - public static InetSocketAddress getCblockServiceRpcAddr( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + - port.or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that is used by CBlock Server. - * @param conf - * @return Target InetSocketAddress for the CBlock Server endpoint. - */ - public static InetSocketAddress getCblockServerRpcAddr( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + - port.or(DFS_CBLOCK_JSCSI_PORT_DEFAULT)); - } - - /** - * Retrieve the hostname, trying the supplied config keys in order. - * Each config value may be absent, or if present in the format - * host:port (the :port part is optional). - * - * @param conf - Conf - * @param keys a list of configuration key names. - * - * @return first hostname component found from the given keys, or absent. - * @throws IllegalArgumentException if any values are not in the 'host' - * or host:port format. - */ - public static Optional<String> getHostNameFromConfigKeys(Configuration conf, - String... keys) { - for (final String key : keys) { - final String value = conf.getTrimmed(key); - final Optional<String> hostName = getHostName(value); - if (hostName.isPresent()) { - return hostName; - } - } - return Optional.absent(); - } - - /** - * Gets the hostname or Indicates that it is absent. - * @param value host or host:port - * @return hostname - */ - public static Optional<String> getHostName(String value) { - if ((value == null) || value.isEmpty()) { - return Optional.absent(); - } - return Optional.of(HostAndPort.fromString(value).getHostText()); - } - - /** - * Gets the port if there is one, throws otherwise. - * @param value String in host:port format. - * @return Port - */ - public static Optional<Integer> getHostPort(String value) { - if((value == null) || value.isEmpty()) { - return Optional.absent(); - } - int port = HostAndPort.fromString(value).getPortOrDefault(NO_PORT); - if (port == NO_PORT) { - return Optional.absent(); - } else { - return Optional.of(port); - } - } - - /** - * Retrieve the port number, trying the supplied config keys in order. - * Each config value may be absent, or if present in the format - * host:port (the :port part is optional). - * - * @param conf Conf - * @param keys a list of configuration key names. - * - * @return first port number component found from the given keys, or absent. - * @throws IllegalArgumentException if any values are not in the 'host' - * or host:port format. - */ - public static Optional<Integer> getPortNumberFromConfigKeys( - Configuration conf, String... keys) { - for (final String key : keys) { - final String value = conf.getTrimmed(key); - final Optional<Integer> hostPort = getHostPort(value); - if (hostPort.isPresent()) { - return hostPort; - } - } - return Optional.absent(); - } - - /** - * Return the list of service addresses for the Ozone SCM. This method is used - * by the DataNodes to determine the service instances to connect to. - * - * @param conf - * @return list of SCM service addresses. - */ - public static Map<String, ? extends Map<String, InetSocketAddress>> - getScmServiceRpcAddresses(Configuration conf) { - final Map<String, InetSocketAddress> serviceInstances = new HashMap<>(); - serviceInstances.put(OZONE_SCM_SERVICE_INSTANCE_ID, - getScmAddressForDataNodes(conf)); - - final Map<String, Map<String, InetSocketAddress>> services = - new HashMap<>(); - services.put(OZONE_SCM_SERVICE_ID, serviceInstances); - return services; - } - - /** - * Checks that a given value is with a range. - * - * For example, sanitizeUserArgs(17, 3, 5, 10) - * ensures that 17 is greater/equal than 3 * 5 and less/equal to 3 * 10. - * - * @param valueTocheck - value to check - * @param baseValue - the base value that is being used. - * @param minFactor - range min - a 2 here makes us ensure that value - * valueTocheck is at least twice the baseValue. - * @param maxFactor - range max - * @return long - */ - private static long sanitizeUserArgs(long valueTocheck, long baseValue, - long minFactor, long maxFactor) - throws IllegalArgumentException { - if ((valueTocheck >= (baseValue * minFactor)) && - (valueTocheck <= (baseValue * maxFactor))) { - return valueTocheck; - } - String errMsg = String.format("%d is not within min = %d or max = " + - "%d", valueTocheck, baseValue * minFactor, baseValue * maxFactor); - throw new IllegalArgumentException(errMsg); - } - - /** - * Returns the interval in which the heartbeat processor thread runs. - * - * @param conf - Configuration - * @return long in Milliseconds. - */ - public static long getScmheartbeatCheckerInterval(Configuration conf) { - return conf.getLong(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, - ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT); - } - - /** - * Heartbeat Interval - Defines the heartbeat frequency from a datanode to - * SCM. - * - * @param conf - Ozone Config - * @return - HB interval in seconds. - */ - public static long getScmHeartbeatInterval(Configuration conf) { - return conf.getTimeDuration( - OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, - ScmConfigKeys.OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT, - TimeUnit.SECONDS); - } - - /** - * Get the Stale Node interval, which is used by SCM to flag a datanode as - * stale, if the heartbeat from that node has been missing for this duration. - * - * @param conf - Configuration. - * @return - Long, Milliseconds to wait before flagging a node as stale. - */ - public static long getStaleNodeInterval(Configuration conf) { - - long staleNodeIntevalMs = conf.getLong(OZONE_SCM_STALENODE_INTERVAL_MS, - OZONE_SCM_STALENODE_INTERVAL_DEFAULT); - - long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf); - - long heartbeatIntervalMs = getScmHeartbeatInterval(conf) * 1000; - - - // Make sure that StaleNodeInterval is configured way above the frequency - // at which we run the heartbeat thread. - // - // Here we check that staleNodeInterval is at least five times more than the - // frequency at which the accounting thread is going to run. - try { - sanitizeUserArgs(staleNodeIntevalMs, heartbeatThreadFrequencyMs, 5, 1000); - } catch (IllegalArgumentException ex) { - LOG.error("Stale Node Interval MS is cannot be honored due to " + - "mis-configured {}. ex: {}", - OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, ex); - throw ex; - } - - // Make sure that stale node value is greater than configured value that - // datanodes are going to send HBs. - try { - sanitizeUserArgs(staleNodeIntevalMs, heartbeatIntervalMs, 3, 1000); - } catch (IllegalArgumentException ex) { - LOG.error("Stale Node Interval MS is cannot be honored due to " + - "mis-configured {}. ex: {}", - OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, ex); - throw ex; - } - return staleNodeIntevalMs; - } - - /** - * Gets the interval for dead node flagging. This has to be a value that is - * greater than stale node value, and by transitive relation we also know - * that this value is greater than heartbeat interval and heartbeatProcess - * Interval. - * - * @param conf - Configuration. - * @return - the interval for dead node flagging. - */ - public static long getDeadNodeInterval(Configuration conf) { - long staleNodeIntervalMs = getStaleNodeInterval(conf); - long deadNodeIntervalMs = conf.getLong( - OZONE_SCM_DEADNODE_INTERVAL_MS, OZONE_SCM_DEADNODE_INTERVAL_DEFAULT); - - try { - // Make sure that dead nodes Ms is at least twice the time for staleNodes - // with a max of 1000 times the staleNodes. - sanitizeUserArgs(deadNodeIntervalMs, staleNodeIntervalMs, 2, 1000); - } catch (IllegalArgumentException ex) { - LOG.error("Dead Node Interval MS is cannot be honored due to " + - "mis-configured {}. ex: {}", - OZONE_SCM_STALENODE_INTERVAL_MS, ex); - throw ex; - } - return deadNodeIntervalMs; - } - - /** - * Returns the maximum number of heartbeat to process per loop of the process - * thread. - * @param conf Configuration - * @return - int -- Number of HBs to process - */ - public static int getMaxHBToProcessPerLoop(Configuration conf) { - return conf.getInt(ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, - ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT); - } - - /** - * Timeout value for the RPC from Datanode to SCM, primarily used for - * Heartbeats and container reports. - * - * @param conf - Ozone Config - * @return - Rpc timeout in Milliseconds. - */ - public static long getScmRpcTimeOutInMilliseconds(Configuration conf) { - return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT, - OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); - } - - /** - * Log Warn interval. - * - * @param conf - Ozone Config - * @return - Log warn interval. - */ - public static int getLogWarnInterval(Configuration conf) { - return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT, - OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT); - } - - /** - * returns the Container port. - * @param conf - Conf - * @return port number. - */ - public static int getContainerPort(Configuration conf) { - return conf.getInt(ScmConfigKeys.DFS_CONTAINER_IPC_PORT, ScmConfigKeys - .DFS_CONTAINER_IPC_PORT_DEFAULT); - } - - /** - * After starting an RPC server, updates configuration with the actual - * listening address of that server. The listening address may be different - * from the configured address if, for example, the configured address uses - * port 0 to request use of an ephemeral port. - * - * @param conf configuration to update - * @param rpcAddressKey configuration key for RPC server address - * @param addr configured address - * @param rpcServer started RPC server. - */ - public static InetSocketAddress updateRPCListenAddress( - OzoneConfiguration conf, String rpcAddressKey, - InetSocketAddress addr, RPC.Server rpcServer) { - return updateListenAddress(conf, rpcAddressKey, addr, - rpcServer.getListenerAddress()); - } - - /** - * After starting an server, updates configuration with the actual - * listening address of that server. The listening address may be different - * from the configured address if, for example, the configured address uses - * port 0 to request use of an ephemeral port. - * - * @param conf configuration to update - * @param addressKey configuration key for RPC server address - * @param addr configured address - * @param listenAddr the real listening address. - */ - public static InetSocketAddress updateListenAddress(OzoneConfiguration conf, - String addressKey, InetSocketAddress addr, InetSocketAddress listenAddr) { - InetSocketAddress updatedAddr = new InetSocketAddress(addr.getHostString(), - listenAddr.getPort()); - conf.set(addressKey, - addr.getHostString() + ":" + listenAddr.getPort()); - return updatedAddr; - } - - /** - * Releases a http connection if the request is not null. - * @param request - */ - public static void releaseConnection(HttpRequestBase request) { - if (request != null) { - request.releaseConnection(); - } - } - - /** - * @return a default instance of {@link CloseableHttpClient}. - */ - public static CloseableHttpClient newHttpClient() { - return OzoneClientUtils.newHttpClient(null); - } - - /** - * Returns a {@link CloseableHttpClient} configured by given configuration. - * If conf is null, returns a default instance. - * - * @param conf configuration - * @return a {@link CloseableHttpClient} instance. - */ - public static CloseableHttpClient newHttpClient(Configuration conf) { - int socketTimeout = OzoneConfigKeys - .OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT; - int connectionTimeout = OzoneConfigKeys - .OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT; - if (conf != null) { - socketTimeout = conf.getInt( - OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS, - OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT); - connectionTimeout = conf.getInt( - OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS, - OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT); - } - - CloseableHttpClient client = HttpClients.custom() - .setDefaultRequestConfig( - RequestConfig.custom() - .setSocketTimeout(socketTimeout) - .setConnectTimeout(connectionTimeout) - .build()) - .build(); - return client; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org