http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java deleted file mode 100644 index 5069220..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java +++ /dev/null @@ -1,308 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.client.protocol.ClientProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Proxy; - -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_CLIENT_PROTOCOL; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_CLIENT_PROTOCOL_REST; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_CLIENT_PROTOCOL_RPC; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys - .OZONE_KSM_HTTP_ADDRESS_KEY; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys - .OZONE_KSM_HTTP_BIND_PORT_DEFAULT; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_PORT_DEFAULT; - -/** - * Factory class to create different types of OzoneClients. - * Based on <code>ozone.client.protocol</code>, it decides which - * protocol to use for the communication. - * Default value is - * <code>org.apache.hadoop.ozone.client.rpc.RpcClient</code>.<br> - * OzoneClientFactory constructs a proxy using - * {@link OzoneClientInvocationHandler} - * and creates OzoneClient instance with it. - * {@link OzoneClientInvocationHandler} dispatches the call to - * underlying {@link ClientProtocol} implementation. - */ -public final class OzoneClientFactory { - - private static final Logger LOG = LoggerFactory.getLogger( - OzoneClientFactory.class); - - /** - * Private constructor, class is not meant to be initialized. - */ - private OzoneClientFactory(){} - - - /** - * Constructs and return an OzoneClient with default configuration. - * - * @return OzoneClient - * - * @throws IOException - */ - public static OzoneClient getClient() throws IOException { - LOG.info("Creating OzoneClient with default configuration."); - return getClient(new OzoneConfiguration()); - } - - /** - * Constructs and return an OzoneClient based on the configuration object. - * Protocol type is decided by <code>ozone.client.protocol</code>. - * - * @param config - * Configuration to be used for OzoneClient creation - * - * @return OzoneClient - * - * @throws IOException - */ - public static OzoneClient getClient(Configuration config) - throws IOException { - Preconditions.checkNotNull(config); - Class<? extends ClientProtocol> clazz = (Class<? extends ClientProtocol>) - config.getClass(OZONE_CLIENT_PROTOCOL, OZONE_CLIENT_PROTOCOL_RPC); - return getClient(getClientProtocol(clazz, config), config); - } - - /** - * Returns an OzoneClient which will use RPC protocol. - * - * @param ksmHost - * hostname of KeySpaceManager to connect. - * - * @return OzoneClient - * - * @throws IOException - */ - public static OzoneClient getRpcClient(String ksmHost) - throws IOException { - return getRpcClient(ksmHost, OZONE_KSM_PORT_DEFAULT, - new OzoneConfiguration()); - } - - /** - * Returns an OzoneClient which will use RPC protocol. - * - * @param ksmHost - * hostname of KeySpaceManager to connect. - * - * @param ksmRpcPort - * RPC port of KeySpaceManager. - * - * @return OzoneClient - * - * @throws IOException - */ - public static OzoneClient getRpcClient(String ksmHost, Integer ksmRpcPort) - throws IOException { - return getRpcClient(ksmHost, ksmRpcPort, new OzoneConfiguration()); - } - - /** - * Returns an OzoneClient which will use RPC protocol. - * - * @param ksmHost - * hostname of KeySpaceManager to connect. - * - * @param ksmRpcPort - * RPC port of KeySpaceManager. - * - * @param config - * Configuration to be used for OzoneClient creation - * - * @return OzoneClient - * - * @throws IOException - */ - public static OzoneClient getRpcClient(String ksmHost, Integer ksmRpcPort, - Configuration config) - throws IOException { - Preconditions.checkNotNull(ksmHost); - Preconditions.checkNotNull(ksmRpcPort); - Preconditions.checkNotNull(config); - config.set(OZONE_KSM_ADDRESS_KEY, ksmHost + ":" + ksmRpcPort); - return getRpcClient(config); - } - - /** - * Returns an OzoneClient which will use RPC protocol. - * - * @param config - * used for OzoneClient creation - * - * @return OzoneClient - * - * @throws IOException - */ - public static OzoneClient getRpcClient(Configuration config) - throws IOException { - Preconditions.checkNotNull(config); - return getClient(getClientProtocol(OZONE_CLIENT_PROTOCOL_RPC, config), - config); - } - - /** - * Returns an OzoneClient which will use REST protocol. - * - * @param ksmHost - * hostname of KeySpaceManager to connect. - * - * @return OzoneClient - * - * @throws IOException - */ - public static OzoneClient getRestClient(String ksmHost) - throws IOException { - return getRestClient(ksmHost, OZONE_KSM_HTTP_BIND_PORT_DEFAULT); - } - - /** - * Returns an OzoneClient which will use REST protocol. - * - * @param ksmHost - * hostname of KeySpaceManager to connect. - * - * @param ksmHttpPort - * HTTP port of KeySpaceManager. - * - * @return OzoneClient - * - * @throws IOException - */ - public static OzoneClient getRestClient(String ksmHost, Integer ksmHttpPort) - throws IOException { - return getRestClient(ksmHost, ksmHttpPort, new OzoneConfiguration()); - } - - /** - * Returns an OzoneClient which will use REST protocol. - * - * @param ksmHost - * hostname of KeySpaceManager to connect. - * - * @param ksmHttpPort - * HTTP port of KeySpaceManager. - * - * @param config - * Configuration to be used for OzoneClient creation - * - * @return OzoneClient - * - * @throws IOException - */ - public static OzoneClient getRestClient(String ksmHost, Integer ksmHttpPort, - Configuration config) - throws IOException { - Preconditions.checkNotNull(ksmHost); - Preconditions.checkNotNull(ksmHttpPort); - Preconditions.checkNotNull(config); - config.set(OZONE_KSM_HTTP_ADDRESS_KEY, ksmHost + ":" + ksmHttpPort); - return getRestClient(config); - } - - /** - * Returns an OzoneClient which will use REST protocol. - * - * @param config - * Configuration to be used for OzoneClient creation - * - * @return OzoneClient - * - * @throws IOException - */ - public static OzoneClient getRestClient(Configuration config) - throws IOException { - Preconditions.checkNotNull(config); - return getClient(getClientProtocol(OZONE_CLIENT_PROTOCOL_REST, config), - config); - } - - /** - * Creates OzoneClient with the given ClientProtocol and Configuration. - * - * @param clientProtocol - * Protocol to be used by the OzoneClient - * - * @param config - * Configuration to be used for OzoneClient creation - */ - private static OzoneClient getClient(ClientProtocol clientProtocol, - Configuration config) { - OzoneClientInvocationHandler clientHandler = - new OzoneClientInvocationHandler(clientProtocol); - ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance( - OzoneClientInvocationHandler.class.getClassLoader(), - new Class<?>[]{ClientProtocol.class}, clientHandler); - return new OzoneClient(config, proxy); - } - - /** - * Returns an instance of Protocol class. - * - * @param protocolClass - * Class object of the ClientProtocol. - * - * @param config - * Configuration used to initialize ClientProtocol. - * - * @return ClientProtocol - * - * @throws IOException - */ - private static ClientProtocol getClientProtocol( - Class<? extends ClientProtocol> protocolClass, Configuration config) - throws IOException { - try { - LOG.debug("Using {} as client protocol.", - protocolClass.getCanonicalName()); - Constructor<? extends ClientProtocol> ctor = - protocolClass.getConstructor(Configuration.class); - return ctor.newInstance(config); - } catch (Exception e) { - final String message = "Couldn't create protocol " + protocolClass; - LOG.error(message + " exception:" + e); - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } else if (e instanceof InvocationTargetException) { - throw new IOException(message, - ((InvocationTargetException) e).getTargetException()); - } else { - throw new IOException(message, e); - } - } - } - -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientInvocationHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientInvocationHandler.java deleted file mode 100644 index 3051e2d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientInvocationHandler.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -import org.apache.hadoop.ozone.client.protocol.ClientProtocol; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -/** - * Invocation Handler for ozone client which dispatches the call to underlying - * ClientProtocol implementation. - */ -public class OzoneClientInvocationHandler implements InvocationHandler { - - - private static final Logger LOG = LoggerFactory.getLogger(OzoneClient.class); - private final ClientProtocol target; - - /** - * Constructs OzoneClientInvocationHandler with the proxy. - * @param target proxy to be used for method invocation. - */ - public OzoneClientInvocationHandler(ClientProtocol target) { - this.target = target; - } - - @Override - public Object invoke(Object proxy, Method method, Object[] args) - throws Throwable { - LOG.trace("Invoking method {} on proxy {}", method, proxy); - try { - long startTime = Time.monotonicNow(); - Object result = method.invoke(target, args); - LOG.debug("Call: {} took {} ms", method, - Time.monotonicNow() - startTime); - return result; - } catch(InvocationTargetException iEx) { - throw iEx.getCause(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java deleted file mode 100644 index 1830bdf..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ /dev/null @@ -1,874 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -import com.google.common.base.Optional; - -import com.google.common.base.Preconditions; -import com.google.common.net.HostAndPort; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.scm.ScmConfigKeys; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.HttpRequestBase; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; -import java.text.ParseException; -import java.time.Instant; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_SERVICERPC_PORT_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_JSCSI_PORT_DEFAULT; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys - .OZONE_KSM_ADDRESS_KEY; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys - .OZONE_KSM_BIND_HOST_DEFAULT; -import static org.apache.hadoop.ozone.ksm.KSMConfigKeys - .OZONE_KSM_PORT_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_DEADNODE_INTERVAL_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_DEADNODE_INTERVAL; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_INTERVAL; - -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; - -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_STALENODE_INTERVAL_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_STALENODE_INTERVAL; - -/** - * Utility methods for Ozone and Container Clients. - * - * The methods to retrieve SCM service endpoints assume there is a single - * SCM service instance. This will change when we switch to replicated service - * instances for redundancy. - */ [email protected] [email protected] -public final class OzoneClientUtils { - private static final Logger LOG = LoggerFactory.getLogger( - OzoneClientUtils.class); - private static final int NO_PORT = -1; - - /** - * Date format that used in ozone. Here the format is thread safe to use. - */ - private static final ThreadLocal<DateTimeFormatter> DATE_FORMAT = - ThreadLocal.withInitial(() -> { - DateTimeFormatter format = - DateTimeFormatter.ofPattern(OzoneConsts.OZONE_DATE_FORMAT); - return format.withZone(ZoneId.of(OzoneConsts.OZONE_TIME_ZONE)); - }); - - /** - * The service ID of the solitary Ozone SCM service. - */ - public static final String OZONE_SCM_SERVICE_ID = "OzoneScmService"; - public static final String OZONE_SCM_SERVICE_INSTANCE_ID = - "OzoneScmServiceInstance"; - - private OzoneClientUtils() { - // Never constructed - } - - /** - * Retrieve the socket addresses of all storage container managers. - * - * @param conf - * @return A collection of SCM addresses - * @throws IllegalArgumentException If the configuration is invalid - */ - public static Collection<InetSocketAddress> getSCMAddresses( - Configuration conf) throws IllegalArgumentException { - Collection<InetSocketAddress> addresses = - new HashSet<InetSocketAddress>(); - Collection<String> names = - conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES); - if (names == null || names.isEmpty()) { - throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES - + " need to be a set of valid DNS names or IP addresses." - + " Null or empty address list found."); - } - - final com.google.common.base.Optional<Integer> - defaultPort = com.google.common.base.Optional.of(ScmConfigKeys - .OZONE_SCM_DEFAULT_PORT); - for (String address : names) { - com.google.common.base.Optional<String> hostname = - OzoneClientUtils.getHostName(address); - if (!hostname.isPresent()) { - throw new IllegalArgumentException("Invalid hostname for SCM: " - + hostname); - } - com.google.common.base.Optional<Integer> port = - OzoneClientUtils.getHostPort(address); - InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(), - port.or(defaultPort.get())); - addresses.add(addr); - } - return addresses; - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the SCM. - * - * @param conf - * @return Target InetSocketAddress for the SCM client endpoint. - */ - public static InetSocketAddress getScmAddressForClients(Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); - - if (!host.isPresent()) { - throw new IllegalArgumentException( - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY + - " must be defined. See" + - " https://wiki.apache.org/hadoop/Ozone#Configuration for details" + - " on configuring Ozone."); - } - - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); - - return NetUtils.createSocketAddr(host.get() + ":" + - port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the SCM for block service. If - * {@link ScmConfigKeys#OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY} is not defined - * then {@link ScmConfigKeys#OZONE_SCM_CLIENT_ADDRESS_KEY} is used. - * - * @param conf - * @return Target InetSocketAddress for the SCM block client endpoint. - * @throws IllegalArgumentException if configuration is not defined. - */ - public static InetSocketAddress getScmAddressForBlockClients( - Configuration conf) { - Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY); - - if (!host.isPresent()) { - host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); - if (!host.isPresent()) { - throw new IllegalArgumentException( - ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY + - " must be defined. See" + - " https://wiki.apache.org/hadoop/Ozone#Configuration" + - " for details on configuring Ozone."); - } - } - - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY); - - return NetUtils.createSocketAddr(host.get() + ":" + - port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that should be used by DataNodes to connect - * to the SCM. - * - * @param conf - * @return Target InetSocketAddress for the SCM service endpoint. - */ - public static InetSocketAddress getScmAddressForDataNodes( - Configuration conf) { - // We try the following settings in decreasing priority to retrieve the - // target host. - // - OZONE_SCM_DATANODE_ADDRESS_KEY - // - OZONE_SCM_CLIENT_ADDRESS_KEY - // - final Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); - - if (!host.isPresent()) { - throw new IllegalArgumentException( - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY + - " must be defined. See" + - " https://wiki.apache.org/hadoop/Ozone#Configuration for details" + - " on configuring Ozone."); - } - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY); - - InetSocketAddress addr = NetUtils.createSocketAddr(host.get() + ":" + - port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT)); - - return addr; - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the SCM. - * - * @param conf - * @return Target InetSocketAddress for the SCM client endpoint. - */ - public static InetSocketAddress getScmClientBindAddress( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY); - - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" + - port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the SCM Block service. - * - * @param conf - * @return Target InetSocketAddress for the SCM block client endpoint. - */ - public static InetSocketAddress getScmBlockClientBindAddress( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY); - - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT) + - ":" + port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that should be used by DataNodes to connect - * to the SCM. - * - * @param conf - * @return Target InetSocketAddress for the SCM service endpoint. - */ - public static InetSocketAddress getScmDataNodeBindAddress( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY); - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" + - port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT)); - } - - - /** - * Retrieve the socket address that is used by KSM. - * @param conf - * @return Target InetSocketAddress for the SCM service endpoint. - */ - public static InetSocketAddress getKsmAddress( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - OZONE_KSM_ADDRESS_KEY); - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - OZONE_KSM_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(OZONE_KSM_BIND_HOST_DEFAULT) + ":" + - port.or(OZONE_KSM_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to KSM. - * @param conf - * @return Target InetSocketAddress for the KSM service endpoint. - */ - public static InetSocketAddress getKsmAddressForClients( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - OZONE_KSM_ADDRESS_KEY); - - if (!host.isPresent()) { - throw new IllegalArgumentException( - OZONE_KSM_ADDRESS_KEY + " must be defined. See" + - " https://wiki.apache.org/hadoop/Ozone#Configuration for" + - " details on configuring Ozone."); - } - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - OZONE_KSM_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.get() + ":" + port.or(OZONE_KSM_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that is used by CBlock Service. - * @param conf - * @return Target InetSocketAddress for the CBlock Service endpoint. - */ - public static InetSocketAddress getCblockServiceRpcAddr( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - DFS_CBLOCK_SERVICERPC_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + - port.or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT)); - } - - /** - * Retrieve the socket address that is used by CBlock Server. - * @param conf - * @return Target InetSocketAddress for the CBlock Server endpoint. - */ - public static InetSocketAddress getCblockServerRpcAddr( - Configuration conf) { - final Optional<String> host = getHostNameFromConfigKeys(conf, - DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); - - // If no port number is specified then we'll just try the defaultBindPort. - final Optional<Integer> port = getPortNumberFromConfigKeys(conf, - DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY); - - return NetUtils.createSocketAddr( - host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + - port.or(DFS_CBLOCK_JSCSI_PORT_DEFAULT)); - } - - /** - * Retrieve the hostname, trying the supplied config keys in order. - * Each config value may be absent, or if present in the format - * host:port (the :port part is optional). - * - * @param conf - Conf - * @param keys a list of configuration key names. - * - * @return first hostname component found from the given keys, or absent. - * @throws IllegalArgumentException if any values are not in the 'host' - * or host:port format. - */ - public static Optional<String> getHostNameFromConfigKeys(Configuration conf, - String... keys) { - for (final String key : keys) { - final String value = conf.getTrimmed(key); - final Optional<String> hostName = getHostName(value); - if (hostName.isPresent()) { - return hostName; - } - } - return Optional.absent(); - } - - /** - * Gets the hostname or Indicates that it is absent. - * @param value host or host:port - * @return hostname - */ - public static Optional<String> getHostName(String value) { - if ((value == null) || value.isEmpty()) { - return Optional.absent(); - } - return Optional.of(HostAndPort.fromString(value).getHostText()); - } - - /** - * Gets the port if there is one, throws otherwise. - * @param value String in host:port format. - * @return Port - */ - public static Optional<Integer> getHostPort(String value) { - if((value == null) || value.isEmpty()) { - return Optional.absent(); - } - int port = HostAndPort.fromString(value).getPortOrDefault(NO_PORT); - if (port == NO_PORT) { - return Optional.absent(); - } else { - return Optional.of(port); - } - } - - /** - * Returns the cache value to be used for list calls. - * @param conf Configuration object - * @return list cache size - */ - public static int getListCacheSize(Configuration conf) { - return conf.getInt(OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE, - OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE_DEFAULT); - } - - /** - * Retrieve the port number, trying the supplied config keys in order. - * Each config value may be absent, or if present in the format - * host:port (the :port part is optional). - * - * @param conf Conf - * @param keys a list of configuration key names. - * - * @return first port number component found from the given keys, or absent. - * @throws IllegalArgumentException if any values are not in the 'host' - * or host:port format. - */ - public static Optional<Integer> getPortNumberFromConfigKeys( - Configuration conf, String... keys) { - for (final String key : keys) { - final String value = conf.getTrimmed(key); - final Optional<Integer> hostPort = getHostPort(value); - if (hostPort.isPresent()) { - return hostPort; - } - } - return Optional.absent(); - } - - /** - * Return the list of service addresses for the Ozone SCM. This method is used - * by the DataNodes to determine the service instances to connect to. - * - * @param conf - * @return list of SCM service addresses. - */ - public static Map<String, ? extends Map<String, InetSocketAddress>> - getScmServiceRpcAddresses(Configuration conf) { - final Map<String, InetSocketAddress> serviceInstances = new HashMap<>(); - serviceInstances.put(OZONE_SCM_SERVICE_INSTANCE_ID, - getScmAddressForDataNodes(conf)); - - final Map<String, Map<String, InetSocketAddress>> services = - new HashMap<>(); - services.put(OZONE_SCM_SERVICE_ID, serviceInstances); - return services; - } - - /** - * Checks that a given value is with a range. - * - * For example, sanitizeUserArgs(17, 3, 5, 10) - * ensures that 17 is greater/equal than 3 * 5 and less/equal to 3 * 10. - * - * @param valueTocheck - value to check - * @param baseValue - the base value that is being used. - * @param minFactor - range min - a 2 here makes us ensure that value - * valueTocheck is at least twice the baseValue. - * @param maxFactor - range max - * @return long - */ - private static long sanitizeUserArgs(long valueTocheck, long baseValue, - long minFactor, long maxFactor) - throws IllegalArgumentException { - if ((valueTocheck >= (baseValue * minFactor)) && - (valueTocheck <= (baseValue * maxFactor))) { - return valueTocheck; - } - String errMsg = String.format("%d is not within min = %d or max = " + - "%d", valueTocheck, baseValue * minFactor, baseValue * maxFactor); - throw new IllegalArgumentException(errMsg); - } - - /** - * Returns the interval in which the heartbeat processor thread runs. - * - * @param conf - Configuration - * @return long in Milliseconds. - */ - public static long getScmheartbeatCheckerInterval(Configuration conf) { - return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, - ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); - } - - /** - * Heartbeat Interval - Defines the heartbeat frequency from a datanode to - * SCM. - * - * @param conf - Ozone Config - * @return - HB interval in seconds. - */ - public static long getScmHeartbeatInterval(Configuration conf) { - return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, - ScmConfigKeys.OZONE_SCM_HEARBEAT_INTERVAL_DEFAULT, - TimeUnit.SECONDS); - } - - /** - * Get the Stale Node interval, which is used by SCM to flag a datanode as - * stale, if the heartbeat from that node has been missing for this duration. - * - * @param conf - Configuration. - * @return - Long, Milliseconds to wait before flagging a node as stale. - */ - public static long getStaleNodeInterval(Configuration conf) { - - long staleNodeIntervalMs = - conf.getTimeDuration(OZONE_SCM_STALENODE_INTERVAL, - OZONE_SCM_STALENODE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - - long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf); - - long heartbeatIntervalMs = getScmHeartbeatInterval(conf) * 1000; - - - // Make sure that StaleNodeInterval is configured way above the frequency - // at which we run the heartbeat thread. - // - // Here we check that staleNodeInterval is at least five times more than the - // frequency at which the accounting thread is going to run. - try { - sanitizeUserArgs(staleNodeIntervalMs, heartbeatThreadFrequencyMs, - 5, 1000); - } catch (IllegalArgumentException ex) { - LOG.error("Stale Node Interval is cannot be honored due to " + - "mis-configured {}. ex: {}", - OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, ex); - throw ex; - } - - // Make sure that stale node value is greater than configured value that - // datanodes are going to send HBs. - try { - sanitizeUserArgs(staleNodeIntervalMs, heartbeatIntervalMs, 3, 1000); - } catch (IllegalArgumentException ex) { - LOG.error("Stale Node Interval MS is cannot be honored due to " + - "mis-configured {}. ex: {}", OZONE_SCM_HEARTBEAT_INTERVAL, ex); - throw ex; - } - return staleNodeIntervalMs; - } - - /** - * Gets the interval for dead node flagging. This has to be a value that is - * greater than stale node value, and by transitive relation we also know - * that this value is greater than heartbeat interval and heartbeatProcess - * Interval. - * - * @param conf - Configuration. - * @return - the interval for dead node flagging. - */ - public static long getDeadNodeInterval(Configuration conf) { - long staleNodeIntervalMs = getStaleNodeInterval(conf); - long deadNodeIntervalMs = conf.getTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, - OZONE_SCM_DEADNODE_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); - - try { - // Make sure that dead nodes Ms is at least twice the time for staleNodes - // with a max of 1000 times the staleNodes. - sanitizeUserArgs(deadNodeIntervalMs, staleNodeIntervalMs, 2, 1000); - } catch (IllegalArgumentException ex) { - LOG.error("Dead Node Interval MS is cannot be honored due to " + - "mis-configured {}. ex: {}", OZONE_SCM_STALENODE_INTERVAL, ex); - throw ex; - } - return deadNodeIntervalMs; - } - - /** - * Returns the maximum number of heartbeat to process per loop of the process - * thread. - * @param conf Configuration - * @return - int -- Number of HBs to process - */ - public static int getMaxHBToProcessPerLoop(Configuration conf) { - return conf.getInt(ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, - ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT); - } - - /** - * Timeout value for the RPC from Datanode to SCM, primarily used for - * Heartbeats and container reports. - * - * @param conf - Ozone Config - * @return - Rpc timeout in Milliseconds. - */ - public static long getScmRpcTimeOutInMilliseconds(Configuration conf) { - return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT, - OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); - } - - /** - * Log Warn interval. - * - * @param conf - Ozone Config - * @return - Log warn interval. - */ - public static int getLogWarnInterval(Configuration conf) { - return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT, - OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT); - } - - /** - * returns the Container port. - * @param conf - Conf - * @return port number. - */ - public static int getContainerPort(Configuration conf) { - return conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, - OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); - } - - /** - * After starting an RPC server, updates configuration with the actual - * listening address of that server. The listening address may be different - * from the configured address if, for example, the configured address uses - * port 0 to request use of an ephemeral port. - * - * @param conf configuration to update - * @param rpcAddressKey configuration key for RPC server address - * @param addr configured address - * @param rpcServer started RPC server. - */ - public static InetSocketAddress updateRPCListenAddress( - OzoneConfiguration conf, String rpcAddressKey, - InetSocketAddress addr, RPC.Server rpcServer) { - return updateListenAddress(conf, rpcAddressKey, addr, - rpcServer.getListenerAddress()); - } - - /** - * After starting an server, updates configuration with the actual - * listening address of that server. The listening address may be different - * from the configured address if, for example, the configured address uses - * port 0 to request use of an ephemeral port. - * - * @param conf configuration to update - * @param addressKey configuration key for RPC server address - * @param addr configured address - * @param listenAddr the real listening address. - */ - public static InetSocketAddress updateListenAddress(OzoneConfiguration conf, - String addressKey, InetSocketAddress addr, InetSocketAddress listenAddr) { - InetSocketAddress updatedAddr = new InetSocketAddress(addr.getHostString(), - listenAddr.getPort()); - conf.set(addressKey, - addr.getHostString() + ":" + listenAddr.getPort()); - return updatedAddr; - } - - /** - * Releases a http connection if the request is not null. - * @param request - */ - public static void releaseConnection(HttpRequestBase request) { - if (request != null) { - request.releaseConnection(); - } - } - - /** - * @return a default instance of {@link CloseableHttpClient}. - */ - public static CloseableHttpClient newHttpClient() { - return OzoneClientUtils.newHttpClient(new OzoneConfiguration()); - } - - /** - * Returns a {@link CloseableHttpClient} configured by given configuration. - * If conf is null, returns a default instance. - * - * @param conf configuration - * @return a {@link CloseableHttpClient} instance. - */ - public static CloseableHttpClient newHttpClient(Configuration conf) { - long socketTimeout = OzoneConfigKeys - .OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT; - long connectionTimeout = OzoneConfigKeys - .OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT; - if (conf != null) { - socketTimeout = conf.getTimeDuration( - OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT, - OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - connectionTimeout = conf.getTimeDuration( - OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT, - OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - } - - CloseableHttpClient client = HttpClients.custom() - .setDefaultRequestConfig( - RequestConfig.custom() - .setSocketTimeout(Math.toIntExact(socketTimeout)) - .setConnectTimeout(Math.toIntExact(connectionTimeout)) - .build()) - .build(); - return client; - } - - /** - * verifies that bucket name / volume name is a valid DNS name. - * - * @param resName Bucket or volume Name to be validated - * - * @throws IllegalArgumentException - */ - public static void verifyResourceName(String resName) - throws IllegalArgumentException { - - if (resName == null) { - throw new IllegalArgumentException("Bucket or Volume name is null"); - } - - if ((resName.length() < OzoneConsts.OZONE_MIN_BUCKET_NAME_LENGTH) || - (resName.length() > OzoneConsts.OZONE_MAX_BUCKET_NAME_LENGTH)) { - throw new IllegalArgumentException( - "Bucket or Volume length is illegal, " + - "valid length is 3-63 characters"); - } - - if ((resName.charAt(0) == '.') || (resName.charAt(0) == '-')) { - throw new IllegalArgumentException( - "Bucket or Volume name cannot start with a period or dash"); - } - - if ((resName.charAt(resName.length() - 1) == '.') || - (resName.charAt(resName.length() - 1) == '-')) { - throw new IllegalArgumentException( - "Bucket or Volume name cannot end with a period or dash"); - } - - boolean isIPv4 = true; - char prev = (char) 0; - - for (int index = 0; index < resName.length(); index++) { - char currChar = resName.charAt(index); - - if (currChar != '.') { - isIPv4 = ((currChar >= '0') && (currChar <= '9')) && isIPv4; - } - - if (currChar > 'A' && currChar < 'Z') { - throw new IllegalArgumentException( - "Bucket or Volume name does not support uppercase characters"); - } - - if ((currChar != '.') && (currChar != '-')) { - if ((currChar < '0') || (currChar > '9' && currChar < 'a') || - (currChar > 'z')) { - throw new IllegalArgumentException("Bucket or Volume name has an " + - "unsupported character : " + - currChar); - } - } - - if ((prev == '.') && (currChar == '.')) { - throw new IllegalArgumentException("Bucket or Volume name should not " + - "have two contiguous periods"); - } - - if ((prev == '-') && (currChar == '.')) { - throw new IllegalArgumentException( - "Bucket or Volume name should not have period after dash"); - } - - if ((prev == '.') && (currChar == '-')) { - throw new IllegalArgumentException( - "Bucket or Volume name should not have dash after period"); - } - prev = currChar; - } - - if (isIPv4) { - throw new IllegalArgumentException( - "Bucket or Volume name cannot be an IPv4 address or all numeric"); - } - } - - /** - * Convert time in millisecond to a human readable format required in ozone. - * @return a human readable string for the input time - */ - public static String formatDateTime(long millis) { - ZonedDateTime dateTime = ZonedDateTime.ofInstant( - Instant.ofEpochSecond(millis), DATE_FORMAT.get().getZone()); - return DATE_FORMAT.get().format(dateTime); - } - - /** - * Convert time in ozone date format to millisecond. - * @return time in milliseconds - */ - public static long formatDateTime(String date) throws ParseException { - Preconditions.checkNotNull(date, "Date string should not be null."); - return ZonedDateTime.parse(date, DATE_FORMAT.get()) - .toInstant().getEpochSecond(); - } - - /** - * Returns the maximum no of outstanding async requests to be handled by - * Standalone and Ratis client. - */ - public static int getMaxOutstandingRequests(Configuration config) { - return config - .getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS, - ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java deleted file mode 100644 index 0c723dd..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -/** - * A class that encapsulates OzoneKey. - */ -public class OzoneKey { - - /** - * Name of the Volume the Key belongs to. - */ - private final String volumeName; - /** - * Name of the Bucket the Key belongs to. - */ - private final String bucketName; - /** - * Name of the Key. - */ - private final String name; - /** - * Size of the data. - */ - private final long dataSize; - /** - * Creation time of the key. - */ - private long creationTime; - /** - * Modification time of the key. - */ - private long modificationTime; - - /** - * Constructs OzoneKey from KsmKeyInfo. - * - */ - public OzoneKey(String volumeName, String bucketName, - String keyName, long size, long creationTime, - long modificationTime) { - this.volumeName = volumeName; - this.bucketName = bucketName; - this.name = keyName; - this.dataSize = size; - this.creationTime = creationTime; - this.modificationTime = modificationTime; - } - - /** - * Returns Volume Name associated with the Key. - * - * @return volumeName - */ - public String getVolumeName() { - return volumeName; - } - - /** - * Returns Bucket Name associated with the Key. - * - * @return bucketName - */ - public String getBucketName(){ - return bucketName; - } - - /** - * Returns the Key Name. - * - * @return keyName - */ - public String getName() { - return name; - } - - /** - * Returns the size of the data. - * - * @return dataSize - */ - public long getDataSize() { - return dataSize; - } - - /** - * Returns the creation time of the key. - * - * @return creation time - */ - public long getCreationTime() { - return creationTime; - } - - /** - * Returns the modification time of the key. - * - * @return modification time - */ - public long getModificationTime() { - return modificationTime; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneQuota.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneQuota.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneQuota.java deleted file mode 100644 index 032dd60..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneQuota.java +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -import org.apache.hadoop.ozone.OzoneConsts; - - -/** - * represents an OzoneQuota Object that can be applied to - * a storage volume. - */ -public class OzoneQuota { - - public static final String OZONE_QUOTA_BYTES = "BYTES"; - public static final String OZONE_QUOTA_MB = "MB"; - public static final String OZONE_QUOTA_GB = "GB"; - public static final String OZONE_QUOTA_TB = "TB"; - - private Units unit; - private long size; - - /** Quota Units.*/ - public enum Units {UNDEFINED, BYTES, KB, MB, GB, TB} - - /** - * Returns size. - * - * @return long - */ - public long getSize() { - return size; - } - - /** - * Returns Units. - * - * @return Unit in MB, GB or TB - */ - public Units getUnit() { - return unit; - } - - /** - * Constructs a default Quota object. - */ - public OzoneQuota() { - this.size = 0; - this.unit = Units.UNDEFINED; - } - - /** - * Constructor for Ozone Quota. - * - * @param size Long Size - * @param unit MB, GB or TB - */ - public OzoneQuota(long size, Units unit) { - this.size = size; - this.unit = unit; - } - - /** - * Formats a quota as a string. - * - * @param quota the quota to format - * @return string representation of quota - */ - public static String formatQuota(OzoneQuota quota) { - return String.valueOf(quota.size) + quota.unit; - } - - /** - * Parses a user provided string and returns the - * Quota Object. - * - * @param quotaString Quota String - * - * @return OzoneQuota object - * - * @throws IllegalArgumentException - */ - public static OzoneQuota parseQuota(String quotaString) - throws IllegalArgumentException { - - if ((quotaString == null) || (quotaString.isEmpty())) { - throw new IllegalArgumentException( - "Quota string cannot be null or empty."); - } - - String uppercase = quotaString.toUpperCase().replaceAll("\\s+", ""); - String size = ""; - int nSize; - Units currUnit = Units.MB; - Boolean found = false; - if (uppercase.endsWith(OZONE_QUOTA_MB)) { - size = uppercase - .substring(0, uppercase.length() - OZONE_QUOTA_MB.length()); - currUnit = Units.MB; - found = true; - } - - if (uppercase.endsWith(OZONE_QUOTA_GB)) { - size = uppercase - .substring(0, uppercase.length() - OZONE_QUOTA_GB.length()); - currUnit = Units.GB; - found = true; - } - - if (uppercase.endsWith(OZONE_QUOTA_TB)) { - size = uppercase - .substring(0, uppercase.length() - OZONE_QUOTA_TB.length()); - currUnit = Units.TB; - found = true; - } - - if (uppercase.endsWith(OZONE_QUOTA_BYTES)) { - size = uppercase - .substring(0, uppercase.length() - OZONE_QUOTA_BYTES.length()); - currUnit = Units.BYTES; - found = true; - } - - if (!found) { - throw new IllegalArgumentException( - "Quota unit not recognized. Supported values are BYTES, MB, GB and " + - "TB."); - } - - nSize = Integer.parseInt(size); - if (nSize < 0) { - throw new IllegalArgumentException("Quota cannot be negative."); - } - - return new OzoneQuota(nSize, currUnit); - } - - - /** - * Returns size in Bytes or -1 if there is no Quota. - */ - public long sizeInBytes() { - switch (this.unit) { - case BYTES: - return this.getSize(); - case MB: - return this.getSize() * OzoneConsts.MB; - case GB: - return this.getSize() * OzoneConsts.GB; - case TB: - return this.getSize() * OzoneConsts.TB; - case UNDEFINED: - default: - return -1; - } - } - - /** - * Returns OzoneQuota corresponding to size in bytes. - * - * @param sizeInBytes size in bytes to be converted - * - * @return OzoneQuota object - */ - public static OzoneQuota getOzoneQuota(long sizeInBytes) { - long size; - Units unit; - if (sizeInBytes % OzoneConsts.TB == 0) { - size = sizeInBytes / OzoneConsts.TB; - unit = Units.TB; - } else if (sizeInBytes % OzoneConsts.GB == 0) { - size = sizeInBytes / OzoneConsts.GB; - unit = Units.GB; - } else if (sizeInBytes % OzoneConsts.MB == 0) { - size = sizeInBytes / OzoneConsts.MB; - unit = Units.MB; - } else { - size = sizeInBytes; - unit = Units.BYTES; - } - return new OzoneQuota((int)size, unit); - } - - @Override - public String toString() { - return size + " " + unit; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java deleted file mode 100644 index 31cbb40..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java +++ /dev/null @@ -1,293 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.OzoneAcl; -import org.apache.hadoop.ozone.client.protocol.ClientProtocol; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -/** - * A class that encapsulates OzoneVolume. - */ -public class OzoneVolume { - - /** - * The proxy used for connecting to the cluster and perform - * client operations. - */ - private final ClientProtocol proxy; - - /** - * Name of the Volume. - */ - private final String name; - - /** - * Admin Name of the Volume. - */ - private String admin; - /** - * Owner of the Volume. - */ - private String owner; - /** - * Quota allocated for the Volume. - */ - private long quotaInBytes; - /** - * Creation time of the volume. - */ - private long creationTime; - /** - * Volume ACLs. - */ - private List<OzoneAcl> acls; - - private int listCacheSize; - - /** - * Constructs OzoneVolume instance. - * @param conf Configuration object. - * @param proxy ClientProtocol proxy. - * @param name Name of the volume. - * @param admin Volume admin. - * @param owner Volume owner. - * @param quotaInBytes Volume quota in bytes. - * @param creationTime creation time of the volume - * @param acls ACLs associated with the volume. - */ - public OzoneVolume(Configuration conf, ClientProtocol proxy, String name, - String admin, String owner, long quotaInBytes, - long creationTime, List<OzoneAcl> acls) { - this.proxy = proxy; - this.name = name; - this.admin = admin; - this.owner = owner; - this.quotaInBytes = quotaInBytes; - this.creationTime = creationTime; - this.acls = acls; - this.listCacheSize = OzoneClientUtils.getListCacheSize(conf); - } - - /** - * Returns Volume name. - * - * @return volumeName - */ - public String getName() { - return name; - } - - /** - * Returns Volume's admin name. - * - * @return adminName - */ - public String getAdmin() { - return admin; - } - - /** - * Returns Volume's owner name. - * - * @return ownerName - */ - public String getOwner() { - return owner; - } - - /** - * Returns Quota allocated for the Volume in bytes. - * - * @return quotaInBytes - */ - public long getQuota() { - return quotaInBytes; - } - - /** - * Returns creation time of the volume. - * - * @return creation time. - */ - public long getCreationTime() { - return creationTime; - } - - /** - * Returns OzoneAcl list associated with the Volume. - * - * @return aclMap - */ - public List<OzoneAcl> getAcls() { - return acls; - } - - /** - * Sets/Changes the owner of this Volume. - * @param owner new owner - * @throws IOException - */ - public void setOwner(String owner) throws IOException { - Preconditions.checkNotNull(proxy, "Client proxy is not set."); - Preconditions.checkNotNull(owner); - proxy.setVolumeOwner(name, owner); - this.owner = owner; - } - - /** - * Sets/Changes the quota of this Volume. - * @param quota new quota - * @throws IOException - */ - public void setQuota(OzoneQuota quota) throws IOException { - Preconditions.checkNotNull(proxy, "Client proxy is not set."); - Preconditions.checkNotNull(quota); - proxy.setVolumeQuota(name, quota); - this.quotaInBytes = quota.sizeInBytes(); - } - - /** - * Creates a new Bucket in this Volume, with default values. - * @param bucketName Name of the Bucket - * @throws IOException - */ - public void createBucket(String bucketName) - throws IOException { - Preconditions.checkNotNull(proxy, "Client proxy is not set."); - Preconditions.checkNotNull(bucketName); - OzoneClientUtils.verifyResourceName(bucketName); - proxy.createBucket(name, bucketName); - } - - /** - * Creates a new Bucket in this Volume, with properties set in bucketArgs. - * @param bucketName Name of the Bucket - * @param bucketArgs Properties to be set - * @throws IOException - */ - public void createBucket(String bucketName, BucketArgs bucketArgs) - throws IOException { - Preconditions.checkNotNull(proxy, "Client proxy is not set."); - Preconditions.checkNotNull(bucketName); - Preconditions.checkNotNull(bucketArgs); - OzoneClientUtils.verifyResourceName(bucketName); - proxy.createBucket(name, bucketName, bucketArgs); - } - - /** - * Get the Bucket from this Volume. - * @param bucketName Name of the Bucket - * @return OzoneBucket - * @throws IOException - */ - public OzoneBucket getBucket(String bucketName) throws IOException { - Preconditions.checkNotNull(proxy, "Client proxy is not set."); - Preconditions.checkNotNull(bucketName); - OzoneClientUtils.verifyResourceName(bucketName); - OzoneBucket bucket = proxy.getBucketDetails(name, bucketName); - return bucket; - } - - /** - * Returns Iterator to iterate over all buckets in the volume. - * The result can be restricted using bucket prefix, will return all - * buckets if bucket prefix is null. - * - * @param bucketPrefix Bucket prefix to match - * @return {@code Iterator<OzoneBucket>} - */ - public Iterator<OzoneBucket> listBuckets(String bucketPrefix) { - return new BucketIterator(bucketPrefix); - } - - /** - * Deletes the Bucket from this Volume. - * @param bucketName Name of the Bucket - * @throws IOException - */ - public void deleteBucket(String bucketName) throws IOException { - Preconditions.checkNotNull(proxy, "Client proxy is not set."); - Preconditions.checkNotNull(bucketName); - OzoneClientUtils.verifyResourceName(bucketName); - proxy.deleteBucket(name, bucketName); - } - - - /** - * An Iterator to iterate over {@link OzoneBucket} list. - */ - private class BucketIterator implements Iterator<OzoneBucket> { - - private String bucketPrefix = null; - - private Iterator<OzoneBucket> currentIterator; - private OzoneBucket currentValue; - - - /** - * Creates an Iterator to iterate over all buckets in the volume, - * which matches volume prefix. - * @param bucketPrefix - */ - BucketIterator(String bucketPrefix) { - this.bucketPrefix = bucketPrefix; - this.currentValue = null; - this.currentIterator = getNextListOfBuckets(null).iterator(); - } - - @Override - public boolean hasNext() { - if(!currentIterator.hasNext()) { - currentIterator = getNextListOfBuckets( - currentValue != null ? currentValue.getName() : null) - .iterator(); - } - return currentIterator.hasNext(); - } - - @Override - public OzoneBucket next() { - if(hasNext()) { - currentValue = currentIterator.next(); - return currentValue; - } - throw new NoSuchElementException(); - } - - /** - * Gets the next set of bucket list using proxy. - * @param prevBucket - * @return {@code List<OzoneVolume>} - */ - private List<OzoneBucket> getNextListOfBuckets(String prevBucket) { - try { - return proxy.listBuckets(name, bucketPrefix, prevBucket, listCacheSize); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/ReplicationFactor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/ReplicationFactor.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/ReplicationFactor.java deleted file mode 100644 index 971cfec..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/ReplicationFactor.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -/** - * The replication factor to be used while writing key into ozone. - */ -public enum ReplicationFactor { - ONE(1), - THREE(3); - - /** - * Integer representation of replication. - */ - private int value; - - /** - * Initializes ReplicationFactor with value. - * @param value replication value - */ - ReplicationFactor(int value) { - this.value = value; - } - - /** - * Returns enum value corresponding to the int value. - * @param value replication value - * @return ReplicationFactor - */ - public static ReplicationFactor valueOf(int value) { - if(value == 1) { - return ONE; - } - if (value == 3) { - return THREE; - } - throw new IllegalArgumentException("Unsupported value: " + value); - } - - /** - * Returns integer representation of ReplicationFactor. - * @return replication value - */ - public int getValue() { - return value; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/ReplicationType.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/ReplicationType.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/ReplicationType.java deleted file mode 100644 index 537c336..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/ReplicationType.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -/** - * The replication type to be used while writing key into ozone. - */ -public enum ReplicationType { - RATIS, - STAND_ALONE, - CHAINED -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/VolumeArgs.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/VolumeArgs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/VolumeArgs.java deleted file mode 100644 index f1aa031..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/VolumeArgs.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -import org.apache.hadoop.ozone.OzoneAcl; - -import java.io.IOException; -import java.util.List; - -/** - * This class encapsulates the arguments that are - * required for creating a volume. - */ -public final class VolumeArgs { - - private final String admin; - private final String owner; - private final String quota; - private final List<OzoneAcl> acls; - - /** - * Private constructor, constructed via builder. - * @param admin Administrator's name. - * @param owner Volume owner's name - * @param quota Volume Quota. - * @param acls User to access rights map. - */ - private VolumeArgs(String admin, String owner, - String quota, List<OzoneAcl> acls) { - this.admin = admin; - this.owner = owner; - this.quota = quota; - this.acls = acls; - } - - /** - * Returns the Admin Name. - * @return String. - */ - public String getAdmin() { - return admin; - } - - /** - * Returns the owner Name. - * @return String - */ - public String getOwner() { - return owner; - } - - /** - * Returns Volume Quota. - * @return Quota. - */ - public String getQuota() { - return quota; - } - - public List<OzoneAcl> getAcls() { - return acls; - } - /** - * Returns new builder class that builds a KsmVolumeArgs. - * - * @return Builder - */ - public static VolumeArgs.Builder newBuilder() { - return new VolumeArgs.Builder(); - } - - /** - * Builder for KsmVolumeArgs. - */ - public static class Builder { - private String adminName; - private String ownerName; - private String volumeQuota; - private List<OzoneAcl> listOfAcls; - - - public VolumeArgs.Builder setAdmin(String admin) { - this.adminName = admin; - return this; - } - - public VolumeArgs.Builder setOwner(String owner) { - this.ownerName = owner; - return this; - } - - public VolumeArgs.Builder setQuota(String quota) { - this.volumeQuota = quota; - return this; - } - - public VolumeArgs.Builder setAcls(List<OzoneAcl> acls) - throws IOException { - this.listOfAcls = acls; - return this; - } - - /** - * Constructs a CreateVolumeArgument. - * @return CreateVolumeArgs. - */ - public VolumeArgs build() { - return new VolumeArgs(adminName, ownerName, volumeQuota, listOfAcls); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java deleted file mode 100644 index afe5e45..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.client.io; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; -import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.scm.storage.ChunkInputStream; -import org.apache.hadoop.scm.storage.ContainerProtocolCalls; -import org.apache.ratis.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Maintaining a list of ChunkInputStream. Read based on offset. - */ -public class ChunkGroupInputStream extends InputStream implements Seekable { - - private static final Logger LOG = - LoggerFactory.getLogger(ChunkGroupInputStream.class); - - private static final int EOF = -1; - - private final ArrayList<ChunkInputStreamEntry> streamEntries; - // streamOffset[i] stores the offset at which chunkInputStream i stores - // data in the key - private long[] streamOffset = null; - private int currentStreamIndex; - private long length = 0; - private boolean closed = false; - private String key; - - public ChunkGroupInputStream() { - streamEntries = new ArrayList<>(); - currentStreamIndex = 0; - } - - @VisibleForTesting - public synchronized int getCurrentStreamIndex() { - return currentStreamIndex; - } - - @VisibleForTesting - public long getRemainingOfIndex(int index) { - return streamEntries.get(index).getRemaining(); - } - - /** - * Append another stream to the end of the list. - * - * @param stream the stream instance. - * @param streamLength the max number of bytes that should be written to this - * stream. - */ - public synchronized void addStream(ChunkInputStream stream, - long streamLength) { - streamEntries.add(new ChunkInputStreamEntry(stream, streamLength)); - } - - - @Override - public synchronized int read() throws IOException { - byte[] buf = new byte[1]; - if (read(buf, 0, 1) == EOF) { - return EOF; - } - return Byte.toUnsignedInt(buf[0]); - } - - @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - checkNotClosed(); - if (b == null) { - throw new NullPointerException(); - } - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return 0; - } - int totalReadLen = 0; - while (len > 0) { - if (streamEntries.size() <= currentStreamIndex) { - return totalReadLen == 0 ? EOF : totalReadLen; - } - ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex); - int readLen = Math.min(len, (int)current.getRemaining()); - int actualLen = current.read(b, off, readLen); - // this means the underlying stream has nothing at all, return - if (actualLen == EOF) { - return totalReadLen > 0 ? totalReadLen : EOF; - } - totalReadLen += actualLen; - // this means there is no more data to read beyond this point, return - if (actualLen != readLen) { - return totalReadLen; - } - off += readLen; - len -= readLen; - if (current.getRemaining() <= 0) { - currentStreamIndex += 1; - } - } - return totalReadLen; - } - - @Override - public void seek(long pos) throws IOException { - checkNotClosed(); - if (pos < 0 || pos >= length) { - if (pos == 0) { - // It is possible for length and pos to be zero in which case - // seek should return instead of throwing exception - return; - } - throw new EOFException( - "EOF encountered at pos: " + pos + " for key: " + key); - } - Preconditions.assertTrue(currentStreamIndex >= 0); - if (currentStreamIndex >= streamEntries.size()) { - currentStreamIndex = Arrays.binarySearch(streamOffset, pos); - } else if (pos < streamOffset[currentStreamIndex]) { - currentStreamIndex = - Arrays.binarySearch(streamOffset, 0, currentStreamIndex, pos); - } else if (pos >= streamOffset[currentStreamIndex] + streamEntries - .get(currentStreamIndex).length) { - currentStreamIndex = Arrays - .binarySearch(streamOffset, currentStreamIndex + 1, - streamEntries.size(), pos); - } - if (currentStreamIndex < 0) { - // Binary search returns -insertionPoint - 1 if element is not present - // in the array. insertionPoint is the point at which element would be - // inserted in the sorted array. We need to adjust the currentStreamIndex - // accordingly so that currentStreamIndex = insertionPoint - 1 - currentStreamIndex = -currentStreamIndex - 2; - } - // seek to the proper offset in the ChunkInputStream - streamEntries.get(currentStreamIndex) - .seek(pos - streamOffset[currentStreamIndex]); - } - - @Override - public long getPos() throws IOException { - return length == 0 ? 0 : - streamOffset[currentStreamIndex] + streamEntries.get(currentStreamIndex) - .getPos(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } - - @Override - public int available() throws IOException { - checkNotClosed(); - long remaining = length - getPos(); - return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE; - } - - @Override - public void close() throws IOException { - closed = true; - for (int i = 0; i < streamEntries.size(); i++) { - streamEntries.get(i).close(); - } - } - - /** - * Encapsulates ChunkInputStream. - */ - public static class ChunkInputStreamEntry extends InputStream - implements Seekable { - - private final ChunkInputStream chunkInputStream; - private final long length; - private long currentPosition; - - public ChunkInputStreamEntry(ChunkInputStream chunkInputStream, - long length) { - this.chunkInputStream = chunkInputStream; - this.length = length; - this.currentPosition = 0; - } - - synchronized long getRemaining() { - return length - currentPosition; - } - - @Override - public synchronized int read(byte[] b, int off, int len) - throws IOException { - int readLen = chunkInputStream.read(b, off, len); - currentPosition += readLen; - return readLen; - } - - @Override - public synchronized int read() throws IOException { - int data = chunkInputStream.read(); - currentPosition += 1; - return data; - } - - @Override - public synchronized void close() throws IOException { - chunkInputStream.close(); - } - - @Override - public void seek(long pos) throws IOException { - chunkInputStream.seek(pos); - } - - @Override - public long getPos() throws IOException { - return chunkInputStream.getPos(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } - } - - public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo, - XceiverClientManager xceiverClientManager, - StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient, String requestId) - throws IOException { - long length = 0; - String containerKey; - ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream(); - groupInputStream.key = keyInfo.getKeyName(); - List<KsmKeyLocationInfo> keyLocationInfos = - keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly(); - groupInputStream.streamOffset = new long[keyLocationInfos.size()]; - for (int i = 0; i < keyLocationInfos.size(); i++) { - KsmKeyLocationInfo ksmKeyLocationInfo = keyLocationInfos.get(i); - String containerName = ksmKeyLocationInfo.getContainerName(); - Pipeline pipeline = - storageContainerLocationClient.getContainer(containerName); - XceiverClientSpi xceiverClient = - xceiverClientManager.acquireClient(pipeline); - boolean success = false; - containerKey = ksmKeyLocationInfo.getBlockID(); - try { - LOG.debug("get key accessing {} {}", - xceiverClient.getPipeline().getContainerName(), containerKey); - groupInputStream.streamOffset[i] = length; - ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation - .containerKeyDataForRead( - xceiverClient.getPipeline().getContainerName(), containerKey); - ContainerProtos.GetKeyResponseProto response = ContainerProtocolCalls - .getKey(xceiverClient, containerKeyData, requestId); - List<ContainerProtos.ChunkInfo> chunks = - response.getKeyData().getChunksList(); - for (ContainerProtos.ChunkInfo chunk : chunks) { - length += chunk.getLen(); - } - success = true; - ChunkInputStream inputStream = new ChunkInputStream( - containerKey, xceiverClientManager, xceiverClient, - chunks, requestId); - groupInputStream.addStream(inputStream, - ksmKeyLocationInfo.getLength()); - } finally { - if (!success) { - xceiverClientManager.releaseClient(xceiverClient); - } - } - } - groupInputStream.length = length; - return new LengthInputStream(groupInputStream, length); - } - - /** - * Verify that the input stream is open. Non blocking; this gives - * the last state of the volatile {@link #closed} field. - * @throws IOException if the connection is closed. - */ - private void checkNotClosed() throws IOException { - if (closed) { - throw new IOException( - ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key); - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
