http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/package-info.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/package-info.java new file mode 100644 index 0000000..0630df2 --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.cli.container; \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/package-info.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/package-info.java new file mode 100644 index 0000000..4762d55 --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.cli; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/pom.xml b/hadoop-hdsl/client/pom.xml deleted file mode 100644 index 1f1eaf0..0000000 --- a/hadoop-hdsl/client/pom.xml +++ /dev/null @@ -1,49 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. See accompanying LICENSE file. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 -http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdsl</artifactId> - <version>3.2.0-SNAPSHOT</version> - </parent> - <artifactId>hadoop-hdsl-client</artifactId> - <version>3.2.0-SNAPSHOT</version> - <description>Apache Hadoop HDSL Client libraries</description> - <name>Apache Hadoop HDSL Client</name> - <packaging>jar</packaging> - - <properties> - <hadoop.component>hdsl</hadoop.component> - <is.hadoop.component>true</is.hadoop.component> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdsl-common</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> - </dependency> - - </dependencies> -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java deleted file mode 100644 index c77f965..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ /dev/null @@ -1,231 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -import java.text.ParseException; -import java.time.Instant; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdsl.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.scm.ScmConfigKeys; - -import com.google.common.base.Preconditions; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utility methods for Ozone and Container Clients. - * - * The methods to retrieve SCM service endpoints assume there is a single - * SCM service instance. This will change when we switch to replicated service - * instances for redundancy. - */ -@InterfaceAudience.Public -@InterfaceStability.Unstable -public final class OzoneClientUtils { - - private static final Logger LOG = LoggerFactory.getLogger( - OzoneClientUtils.class); - - private static final int NO_PORT = -1; - - private OzoneClientUtils() { - } - - /** - * Date format that used in ozone. Here the format is thread safe to use. - */ - private static final ThreadLocal<DateTimeFormatter> DATE_FORMAT = - ThreadLocal.withInitial(() -> { - DateTimeFormatter format = - DateTimeFormatter.ofPattern(OzoneConsts.OZONE_DATE_FORMAT); - return format.withZone(ZoneId.of(OzoneConsts.OZONE_TIME_ZONE)); - }); - - /** - * Returns the cache value to be used for list calls. - * @param conf Configuration object - * @return list cache size - */ - public static int getListCacheSize(Configuration conf) { - return conf.getInt(OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE, - OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE_DEFAULT); - } - - /** - * @return a default instance of {@link CloseableHttpClient}. - */ - public static CloseableHttpClient newHttpClient() { - return OzoneClientUtils.newHttpClient(new OzoneConfiguration()); - } - - /** - * Returns a {@link CloseableHttpClient} configured by given configuration. - * If conf is null, returns a default instance. - * - * @param conf configuration - * @return a {@link CloseableHttpClient} instance. - */ - public static CloseableHttpClient newHttpClient(Configuration conf) { - long socketTimeout = OzoneConfigKeys - .OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT; - long connectionTimeout = OzoneConfigKeys - .OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT; - if (conf != null) { - socketTimeout = conf.getTimeDuration( - OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT, - OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - connectionTimeout = conf.getTimeDuration( - OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT, - OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - } - - CloseableHttpClient client = HttpClients.custom() - .setDefaultRequestConfig( - RequestConfig.custom() - .setSocketTimeout(Math.toIntExact(socketTimeout)) - .setConnectTimeout(Math.toIntExact(connectionTimeout)) - .build()) - .build(); - return client; - } - - /** - * verifies that bucket name / volume name is a valid DNS name. - * - * @param resName Bucket or volume Name to be validated - * - * @throws IllegalArgumentException - */ - public static void verifyResourceName(String resName) - throws IllegalArgumentException { - - if (resName == null) { - throw new IllegalArgumentException("Bucket or Volume name is null"); - } - - if ((resName.length() < OzoneConsts.OZONE_MIN_BUCKET_NAME_LENGTH) || - (resName.length() > OzoneConsts.OZONE_MAX_BUCKET_NAME_LENGTH)) { - throw new IllegalArgumentException( - "Bucket or Volume length is illegal, " + - "valid length is 3-63 characters"); - } - - if ((resName.charAt(0) == '.') || (resName.charAt(0) == '-')) { - throw new IllegalArgumentException( - "Bucket or Volume name cannot start with a period or dash"); - } - - if ((resName.charAt(resName.length() - 1) == '.') || - (resName.charAt(resName.length() - 1) == '-')) { - throw new IllegalArgumentException( - "Bucket or Volume name cannot end with a period or dash"); - } - - boolean isIPv4 = true; - char prev = (char) 0; - - for (int index = 0; index < resName.length(); index++) { - char currChar = resName.charAt(index); - - if (currChar != '.') { - isIPv4 = ((currChar >= '0') && (currChar <= '9')) && isIPv4; - } - - if (currChar > 'A' && currChar < 'Z') { - throw new IllegalArgumentException( - "Bucket or Volume name does not support uppercase characters"); - } - - if ((currChar != '.') && (currChar != '-')) { - if ((currChar < '0') || (currChar > '9' && currChar < 'a') || - (currChar > 'z')) { - throw new IllegalArgumentException("Bucket or Volume name has an " + - "unsupported character : " + - currChar); - } - } - - if ((prev == '.') && (currChar == '.')) { - throw new IllegalArgumentException("Bucket or Volume name should not " + - "have two contiguous periods"); - } - - if ((prev == '-') && (currChar == '.')) { - throw new IllegalArgumentException( - "Bucket or Volume name should not have period after dash"); - } - - if ((prev == '.') && (currChar == '-')) { - throw new IllegalArgumentException( - "Bucket or Volume name should not have dash after period"); - } - prev = currChar; - } - - if (isIPv4) { - throw new IllegalArgumentException( - "Bucket or Volume name cannot be an IPv4 address or all numeric"); - } - } - - /** - * Convert time in millisecond to a human readable format required in ozone. - * @return a human readable string for the input time - */ - public static String formatDateTime(long millis) { - ZonedDateTime dateTime = ZonedDateTime.ofInstant( - Instant.ofEpochSecond(millis), DATE_FORMAT.get().getZone()); - return DATE_FORMAT.get().format(dateTime); - } - - /** - * Convert time in ozone date format to millisecond. - * @return time in milliseconds - */ - public static long formatDateTime(String date) throws ParseException { - Preconditions.checkNotNull(date, "Date string should not be null."); - return ZonedDateTime.parse(date, DATE_FORMAT.get()) - .toInstant().getEpochSecond(); - } - - /** - * Returns the maximum no of outstanding async requests to be handled by - * Standalone and Ratis client. - */ - public static int getMaxOutstandingRequests(Configuration config) { - return config - .getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS, - ScmConfigKeys - .SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java deleted file mode 100644 index 754ce84..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client; - -/** - * Generic helper classes for the client side of hdsl workflows.. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClient.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClient.java deleted file mode 100644 index 06b1e99..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClient.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdsl.protocol.DatanodeDetails; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos; -import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.client.OzoneClientUtils; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.List; -import java.util.concurrent.Semaphore; - -/** - * A Client for the storageContainer protocol. - */ -public class XceiverClient extends XceiverClientSpi { - static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); - private final Pipeline pipeline; - private final Configuration config; - private Channel channel; - private Bootstrap b; - private EventLoopGroup group; - private final Semaphore semaphore; - - /** - * Constructs a client that can communicate with the Container framework on - * data nodes. - * - * @param pipeline - Pipeline that defines the machines. - * @param config -- Ozone Config - */ - public XceiverClient(Pipeline pipeline, Configuration config) { - super(); - Preconditions.checkNotNull(pipeline); - Preconditions.checkNotNull(config); - this.pipeline = pipeline; - this.config = config; - this.semaphore = - new Semaphore(OzoneClientUtils.getMaxOutstandingRequests(config)); - } - - @Override - public void connect() throws Exception { - if (channel != null && channel.isActive()) { - throw new IOException("This client is already connected to a host."); - } - - group = new NioEventLoopGroup(); - b = new Bootstrap(); - b.group(group) - .channel(NioSocketChannel.class) - .handler(new LoggingHandler(LogLevel.INFO)) - .handler(new XceiverClientInitializer(this.pipeline, semaphore)); - DatanodeDetails leader = this.pipeline.getLeader(); - - // read port from the data node, on failure use default configured - // port. - int port = leader.getContainerPort(); - if (port == 0) { - port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, - OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); - } - LOG.debug("Connecting to server Port : " + port); - channel = b.connect(leader.getHostName(), port).sync().channel(); - } - - /** - * Returns if the exceiver client connects to a server. - * - * @return True if the connection is alive, false otherwise. - */ - @VisibleForTesting - public boolean isConnected() { - return channel.isActive(); - } - - @Override - public void close() { - if (group != null) { - group.shutdownGracefully().awaitUninterruptibly(); - } - } - - @Override - public Pipeline getPipeline() { - return pipeline; - } - - @Override - public ContainerProtos.ContainerCommandResponseProto sendCommand( - ContainerProtos.ContainerCommandRequestProto request) throws IOException { - try { - if ((channel == null) || (!channel.isActive())) { - throw new IOException("This channel is not connected."); - } - XceiverClientHandler handler = - channel.pipeline().get(XceiverClientHandler.class); - - return handler.sendCommand(request); - } catch (ExecutionException | InterruptedException e) { - /** - * In case the netty channel handler throws an exception, - * the exception thrown will be wrapped within {@link ExecutionException}. - * Unwarpping here so that original exception gets passed - * to to the client. - */ - if (e instanceof ExecutionException) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - throw (IOException) cause; - } - } - throw new IOException( - "Unexpected exception during execution:" + e.getMessage()); - } - } - - /** - * Sends a given command to server gets a waitable future back. - * - * @param request Request - * @return Response to the command - * @throws IOException - */ - @Override - public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> - sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) - throws IOException, ExecutionException, InterruptedException { - if ((channel == null) || (!channel.isActive())) { - throw new IOException("This channel is not connected."); - } - XceiverClientHandler handler = - channel.pipeline().get(XceiverClientHandler.class); - return handler.sendCommandAsync(request); - } - - /** - * Create a pipeline. - * - * @param pipelineID - Name of the pipeline. - * @param datanodes - Datanodes - */ - @Override - public void createPipeline(String pipelineID, List<DatanodeDetails> datanodes) - throws IOException { - // For stand alone pipeline, there is no notion called setup pipeline. - return; - } - - /** - * Returns pipeline Type. - * - * @return - Stand Alone as the type. - */ - @Override - public HdslProtos.ReplicationType getPipelineType() { - return HdslProtos.ReplicationType.STAND_ALONE; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java deleted file mode 100644 index 4b2d6c4..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.scm; - -import com.google.common.base.Preconditions; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos - .ContainerCommandResponseProto; - -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; - -/** - * Netty client handler. - */ -public class XceiverClientHandler extends - SimpleChannelInboundHandler<ContainerCommandResponseProto> { - - static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class); - private final ConcurrentMap<String, ResponseFuture> responses = - new ConcurrentHashMap<>(); - - private final Pipeline pipeline; - private volatile Channel channel; - private XceiverClientMetrics metrics; - private final Semaphore semaphore; - - /** - * Constructs a client that can communicate to a container server. - */ - public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) { - super(false); - Preconditions.checkNotNull(pipeline); - this.pipeline = pipeline; - this.metrics = XceiverClientManager.getXceiverClientMetrics(); - this.semaphore = semaphore; - } - - /** - * <strong>Please keep in mind that this method will be renamed to {@code - * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong> - * <p> - * Is called for each message of type {@link ContainerProtos - * .ContainerCommandResponseProto}. - * - * @param ctx the {@link ChannelHandlerContext} which this {@link - * SimpleChannelInboundHandler} belongs to - * @param msg the message to handle - * @throws Exception is thrown if an error occurred - */ - @Override - public void channelRead0(ChannelHandlerContext ctx, - ContainerProtos.ContainerCommandResponseProto msg) - throws Exception { - Preconditions.checkNotNull(msg); - metrics.decrPendingContainerOpsMetrics(msg.getCmdType()); - - String key = msg.getTraceID(); - ResponseFuture response = responses.remove(key); - semaphore.release(); - - if (response != null) { - response.getFuture().complete(msg); - - long requestTime = response.getRequestTime(); - metrics.addContainerOpsLatency(msg.getCmdType(), - Time.monotonicNowNanos() - requestTime); - } else { - LOG.error("A reply received for message that was not queued. trace " + - "ID: {}", msg.getTraceID()); - } - } - - @Override - public void channelRegistered(ChannelHandlerContext ctx) { - LOG.debug("channelRegistered: Connected to ctx"); - channel = ctx.channel(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - LOG.info("Exception in client " + cause.toString()); - Iterator<String> keyIterator = responses.keySet().iterator(); - while (keyIterator.hasNext()) { - ResponseFuture response = responses.remove(keyIterator.next()); - response.getFuture().completeExceptionally(cause); - semaphore.release(); - } - ctx.close(); - } - - /** - * Since netty is async, we send a work request and then wait until a response - * appears in the reply queue. This is simple sync interface for clients. we - * should consider building async interfaces for client if this turns out to - * be a performance bottleneck. - * - * @param request - request. - * @return -- response - */ - - public ContainerCommandResponseProto sendCommand( - ContainerProtos.ContainerCommandRequestProto request) - throws ExecutionException, InterruptedException { - Future<ContainerCommandResponseProto> future = sendCommandAsync(request); - return future.get(); - } - - /** - * SendCommandAsyc queues a command to the Netty Subsystem and returns a - * CompletableFuture. This Future is marked compeleted in the channelRead0 - * when the call comes back. - * @param request - Request to execute - * @return CompletableFuture - */ - public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( - ContainerProtos.ContainerCommandRequestProto request) - throws InterruptedException { - - // Throw an exception of request doesn't have traceId - if (StringUtils.isEmpty(request.getTraceID())) { - throw new IllegalArgumentException("Invalid trace ID"); - } - - // Setting the datanode ID in the commands, so that we can distinguish - // commands when the cluster simulator is running. - if(!request.hasDatanodeUuid()) { - throw new IllegalArgumentException("Invalid Datanode ID"); - } - - metrics.incrPendingContainerOpsMetrics(request.getCmdType()); - - CompletableFuture<ContainerCommandResponseProto> future - = new CompletableFuture<>(); - ResponseFuture response = new ResponseFuture(future, - Time.monotonicNowNanos()); - semaphore.acquire(); - ResponseFuture previous = responses.putIfAbsent( - request.getTraceID(), response); - if (previous != null) { - LOG.error("Command with Trace already exists. Ignoring this command. " + - "{}. Previous Command: {}", request.getTraceID(), - previous.toString()); - throw new IllegalStateException("Duplicate trace ID. Command with this " + - "trace ID is already executing. Please ensure that " + - "trace IDs are not reused. ID: " + request.getTraceID()); - } - - channel.writeAndFlush(request); - return response.getFuture(); - } - - /** - * Class wraps response future info. - */ - static class ResponseFuture { - private final long requestTime; - private final CompletableFuture<ContainerCommandResponseProto> future; - - ResponseFuture(CompletableFuture<ContainerCommandResponseProto> future, - long requestTime) { - this.future = future; - this.requestTime = requestTime; - } - - public long getRequestTime() { - return requestTime; - } - - public CompletableFuture<ContainerCommandResponseProto> getFuture() { - return future; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java deleted file mode 100644 index c70a686..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.scm; - -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufEncoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; - -import java.util.concurrent.Semaphore; - -/** - * Setup the netty pipeline. - */ -public class XceiverClientInitializer extends - ChannelInitializer<SocketChannel> { - private final Pipeline pipeline; - private final Semaphore semaphore; - - /** - * Constructs an Initializer for the client pipeline. - * @param pipeline - Pipeline. - */ - public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) { - this.pipeline = pipeline; - this.semaphore = semaphore; - } - - /** - * This method will be called once when the Channel is registered. After - * the method returns this instance will be removed from the - * ChannelPipeline of the Channel. - * - * @param ch Channel which was registered. - * @throws Exception is thrown if an error occurs. In that case the - * Channel will be closed. - */ - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline p = ch.pipeline(); - - p.addLast(new ProtobufVarint32FrameDecoder()); - p.addLast(new ProtobufDecoder(ContainerProtos - .ContainerCommandResponseProto.getDefaultInstance())); - - p.addLast(new ProtobufVarint32LengthFieldPrepender()); - p.addLast(new ProtobufEncoder()); - - p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore)); - - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java deleted file mode 100644 index 3f62a3a..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.Callable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; - -import static org.apache.hadoop.scm.ScmConfigKeys - .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY; -import static org.apache.hadoop.scm.ScmConfigKeys - .SCM_CONTAINER_CLIENT_MAX_SIZE_KEY; -import static org.apache.hadoop.scm.ScmConfigKeys - .SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT; -import static org.apache.hadoop.hdsl.protocol.proto.HdslProtos - .ReplicationType.RATIS; - -/** - * XceiverClientManager is responsible for the lifecycle of XceiverClient - * instances. Callers use this class to acquire an XceiverClient instance - * connected to the desired container pipeline. When done, the caller also uses - * this class to release the previously acquired XceiverClient instance. - * - * - * This class caches connection to container for reuse purpose, such that - * accessing same container frequently will be through the same connection - * without reestablishing connection. But the connection will be closed if - * not being used for a period of time. - */ -public class XceiverClientManager implements Closeable { - - //TODO : change this to SCM configuration class - private final Configuration conf; - private final Cache<String, XceiverClientSpi> clientCache; - private final boolean useRatis; - - private static XceiverClientMetrics metrics; - /** - * Creates a new XceiverClientManager. - * - * @param conf configuration - */ - public XceiverClientManager(Configuration conf) { - Preconditions.checkNotNull(conf); - int maxSize = conf.getInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, - SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT); - long staleThresholdMs = conf.getTimeDuration( - SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY, - SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS); - this.useRatis = conf.getBoolean( - ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); - this.conf = conf; - this.clientCache = CacheBuilder.newBuilder() - .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) - .maximumSize(maxSize) - .removalListener( - new RemovalListener<String, XceiverClientSpi>() { - @Override - public void onRemoval( - RemovalNotification<String, XceiverClientSpi> - removalNotification) { - synchronized (clientCache) { - // Mark the entry as evicted - XceiverClientSpi info = removalNotification.getValue(); - info.setEvicted(); - } - } - }).build(); - } - - @VisibleForTesting - public Cache<String, XceiverClientSpi> getClientCache() { - return clientCache; - } - - /** - * Acquires a XceiverClientSpi connected to a container capable of - * storing the specified key. - * - * If there is already a cached XceiverClientSpi, simply return - * the cached otherwise create a new one. - * - * @param pipeline the container pipeline for the client connection - * @return XceiverClientSpi connected to a container - * @throws IOException if a XceiverClientSpi cannot be acquired - */ - public XceiverClientSpi acquireClient(Pipeline pipeline) - throws IOException { - Preconditions.checkNotNull(pipeline); - Preconditions.checkArgument(pipeline.getMachines() != null); - Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); - - synchronized (clientCache) { - XceiverClientSpi info = getClient(pipeline); - info.incrementReference(); - return info; - } - } - - /** - * Releases a XceiverClientSpi after use. - * - * @param client client to release - */ - public void releaseClient(XceiverClientSpi client) { - Preconditions.checkNotNull(client); - synchronized (clientCache) { - client.decrementReference(); - } - } - - private XceiverClientSpi getClient(Pipeline pipeline) - throws IOException { - String containerName = pipeline.getContainerName(); - try { - return clientCache.get(containerName, - new Callable<XceiverClientSpi>() { - @Override - public XceiverClientSpi call() throws Exception { - XceiverClientSpi client = pipeline.getType() == RATIS ? - XceiverClientRatis.newXceiverClientRatis(pipeline, conf) - : new XceiverClient(pipeline, conf); - client.connect(); - return client; - } - }); - } catch (Exception e) { - throw new IOException( - "Exception getting XceiverClient: " + e.toString(), e); - } - } - - /** - * Close and remove all the cached clients. - */ - public void close() { - //closing is done through RemovalListener - clientCache.invalidateAll(); - clientCache.cleanUp(); - - if (metrics != null) { - metrics.unRegister(); - } - } - - /** - * Tells us if Ratis is enabled for this cluster. - * @return True if Ratis is enabled. - */ - public boolean isUseRatis() { - return useRatis; - } - - /** - * Returns hard coded 3 as replication factor. - * @return 3 - */ - public HdslProtos.ReplicationFactor getFactor() { - if(isUseRatis()) { - return HdslProtos.ReplicationFactor.THREE; - } - return HdslProtos.ReplicationFactor.ONE; - } - - /** - * Returns the default replication type. - * @return Ratis or Standalone - */ - public HdslProtos.ReplicationType getType() { - // TODO : Fix me and make Ratis default before release. - // TODO: Remove this as replication factor and type are pipeline properties - if(isUseRatis()) { - return HdslProtos.ReplicationType.RATIS; - } - return HdslProtos.ReplicationType.STAND_ALONE; - } - - /** - * Get xceiver client metric. - */ - public synchronized static XceiverClientMetrics getXceiverClientMetrics() { - if (metrics == null) { - metrics = XceiverClientMetrics.create(); - } - - return metrics; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java deleted file mode 100644 index bcece23..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.scm; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos; -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MetricsRegistry; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; -import org.apache.hadoop.metrics2.lib.MutableRate; - -/** - * The client metrics for the Storage Container protocol. - */ -@InterfaceAudience.Private -@Metrics(about = "Storage Container Client Metrics", context = "dfs") -public class XceiverClientMetrics { - public static final String SOURCE_NAME = XceiverClientMetrics.class - .getSimpleName(); - - private @Metric MutableCounterLong pendingOps; - private MutableCounterLong[] pendingOpsArray; - private MutableRate[] containerOpsLatency; - private MetricsRegistry registry; - - public XceiverClientMetrics() { - int numEnumEntries = ContainerProtos.Type.values().length; - this.registry = new MetricsRegistry(SOURCE_NAME); - - this.pendingOpsArray = new MutableCounterLong[numEnumEntries]; - this.containerOpsLatency = new MutableRate[numEnumEntries]; - for (int i = 0; i < numEnumEntries; i++) { - pendingOpsArray[i] = registry.newCounter( - "numPending" + ContainerProtos.Type.valueOf(i + 1), - "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops", - (long) 0); - - containerOpsLatency[i] = registry.newRate( - ContainerProtos.Type.valueOf(i + 1) + "Latency", - "latency of " + ContainerProtos.Type.valueOf(i + 1) - + " ops"); - } - } - - public static XceiverClientMetrics create() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - return ms.register(SOURCE_NAME, "Storage Container Client Metrics", - new XceiverClientMetrics()); - } - - public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) { - pendingOps.incr(); - pendingOpsArray[type.ordinal()].incr(); - } - - public void decrPendingContainerOpsMetrics(ContainerProtos.Type type) { - pendingOps.incr(-1); - pendingOpsArray[type.ordinal()].incr(-1); - } - - public void addContainerOpsLatency(ContainerProtos.Type type, - long latencyNanos) { - containerOpsLatency[type.ordinal()].add(latencyNanos); - } - - public long getContainerOpsMetrics(ContainerProtos.Type type) { - return pendingOpsArray[type.ordinal()].value(); - } - - public void unRegister() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - ms.unregisterSource(SOURCE_NAME); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java deleted file mode 100644 index 084e3e5..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm; - -import com.google.common.base.Preconditions; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdsl.protocol.DatanodeDetails; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerCommandResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; -import org.apache.hadoop.ozone.client.OzoneClientUtils; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.ratis.RatisHelper; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.rpc.RpcType; -import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicReference; - -/** - * An abstract implementation of {@link XceiverClientSpi} using Ratis. - * The underlying RPC mechanism can be chosen via the constructor. - */ -public final class XceiverClientRatis extends XceiverClientSpi { - static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class); - - public static XceiverClientRatis newXceiverClientRatis( - Pipeline pipeline, Configuration ozoneConf) { - final String rpcType = ozoneConf.get( - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final int maxOutstandingRequests = - OzoneClientUtils.getMaxOutstandingRequests(ozoneConf); - return new XceiverClientRatis(pipeline, - SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests); - } - - private final Pipeline pipeline; - private final RpcType rpcType; - private final AtomicReference<RaftClient> client = new AtomicReference<>(); - private final int maxOutstandingRequests; - - /** - * Constructs a client. - */ - private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, - int maxOutStandingChunks) { - super(); - this.pipeline = pipeline; - this.rpcType = rpcType; - this.maxOutstandingRequests = maxOutStandingChunks; - } - - /** - * {@inheritDoc} - */ - public void createPipeline(String clusterId, List<DatanodeDetails> datanodes) - throws IOException { - RaftGroup group = RatisHelper.newRaftGroup(datanodes); - LOG.debug("initializing pipeline:{} with nodes:{}", clusterId, - group.getPeers()); - reinitialize(datanodes, group); - } - - /** - * Returns Ratis as pipeline Type. - * - * @return - Ratis - */ - @Override - public HdslProtos.ReplicationType getPipelineType() { - return HdslProtos.ReplicationType.RATIS; - } - - private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup group) - throws IOException { - if (datanodes.isEmpty()) { - return; - } - - IOException exception = null; - for (DatanodeDetails d : datanodes) { - try { - reinitialize(d, group); - } catch (IOException ioe) { - if (exception == null) { - exception = new IOException( - "Failed to reinitialize some of the RaftPeer(s)", ioe); - } else { - exception.addSuppressed(ioe); - } - } - } - if (exception != null) { - throw exception; - } - } - - /** - * Adds a new peers to the Ratis Ring. - * - * @param datanode - new datanode - * @param group - Raft group - * @throws IOException - on Failure. - */ - private void reinitialize(DatanodeDetails datanode, RaftGroup group) - throws IOException { - final RaftPeer p = RatisHelper.toRaftPeer(datanode); - try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) { - client.reinitialize(group, p.getId()); - } catch (IOException ioe) { - LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ", - p, datanode, ioe); - throw new IOException("Failed to reinitialize RaftPeer " + p - + "(datanode=" + datanode + ")", ioe); - } - } - - @Override - public Pipeline getPipeline() { - return pipeline; - } - - @Override - public void connect() throws Exception { - LOG.debug("Connecting to pipeline:{} leader:{}", - getPipeline().getPipelineName(), - RatisHelper.toRaftPeerId(pipeline.getLeader())); - // TODO : XceiverClient ratis should pass the config value of - // maxOutstandingRequests so as to set the upper bound on max no of async - // requests to be handled by raft client - if (!client.compareAndSet(null, - RatisHelper.newRaftClient(rpcType, getPipeline()))) { - throw new IllegalStateException("Client is already connected."); - } - } - - @Override - public void close() { - final RaftClient c = client.getAndSet(null); - if (c != null) { - try { - c.close(); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - } - - private RaftClient getClient() { - return Objects.requireNonNull(client.get(), "client is null"); - } - - private boolean isReadOnly(ContainerCommandRequestProto proto) { - switch (proto.getCmdType()) { - case ReadContainer: - case ReadChunk: - case ListKey: - case GetKey: - case GetSmallFile: - case ListContainer: - case ListChunk: - return true; - case CloseContainer: - case WriteChunk: - case UpdateContainer: - case CompactChunk: - case CreateContainer: - case DeleteChunk: - case DeleteContainer: - case DeleteKey: - case PutKey: - case PutSmallFile: - default: - return false; - } - } - - private RaftClientReply sendRequest(ContainerCommandRequestProto request) - throws IOException { - boolean isReadOnlyRequest = isReadOnly(request); - ByteString byteString = - ShadedProtoUtil.asShadedByteString(request.toByteArray()); - LOG.debug("sendCommand {} {}", isReadOnlyRequest, request); - final RaftClientReply reply = isReadOnlyRequest ? - getClient().sendReadOnly(() -> byteString) : - getClient().send(() -> byteString); - LOG.debug("reply {} {}", isReadOnlyRequest, reply); - return reply; - } - - private CompletableFuture<RaftClientReply> sendRequestAsync( - ContainerCommandRequestProto request) throws IOException { - boolean isReadOnlyRequest = isReadOnly(request); - ByteString byteString = - ShadedProtoUtil.asShadedByteString(request.toByteArray()); - LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request); - return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) : - getClient().sendAsync(() -> byteString); - } - - @Override - public ContainerCommandResponseProto sendCommand( - ContainerCommandRequestProto request) throws IOException { - final RaftClientReply reply = sendRequest(request); - Preconditions.checkState(reply.isSuccess()); - return ContainerCommandResponseProto.parseFrom( - ShadedProtoUtil.asByteString(reply.getMessage().getContent())); - } - - /** - * Sends a given command to server gets a waitable future back. - * - * @param request Request - * @return Response to the command - * @throws IOException - */ - @Override - public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( - ContainerCommandRequestProto request) - throws IOException, ExecutionException, InterruptedException { - return sendRequestAsync(request).whenComplete((reply, e) -> - LOG.debug("received reply {} for request: {} exception: {}", request, - reply, e)) - .thenApply(reply -> { - try { - return ContainerCommandResponseProto.parseFrom( - ShadedProtoUtil.asByteString(reply.getMessage().getContent())); - } catch (InvalidProtocolBufferException e) { - throw new CompletionException(e); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java deleted file mode 100644 index 08ddfd6..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.scm.client; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerData; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ReadContainerResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; -import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.protocolPB - .StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.scm.storage.ContainerProtocolCalls; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.List; -import java.util.UUID; - -import static org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState.ALLOCATED; -import static org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState.OPEN; - -/** - * This class provides the client-facing APIs of container operations. - */ -public class ContainerOperationClient implements ScmClient { - - private static final Logger LOG = - LoggerFactory.getLogger(ContainerOperationClient.class); - private static long containerSizeB = -1; - private final StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private final XceiverClientManager xceiverClientManager; - - public ContainerOperationClient( - StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient, - XceiverClientManager xceiverClientManager) { - this.storageContainerLocationClient = storageContainerLocationClient; - this.xceiverClientManager = xceiverClientManager; - } - - /** - * Return the capacity of containers. The current assumption is that all - * containers have the same capacity. Therefore one static is sufficient for - * any container. - * @return The capacity of one container in number of bytes. - */ - public static long getContainerSizeB() { - return containerSizeB; - } - - /** - * Set the capacity of container. Should be exactly once on system start. - * @param size Capacity of one container in number of bytes. - */ - public static void setContainerSizeB(long size) { - containerSizeB = size; - } - - /** - * @inheritDoc - */ - @Override - public Pipeline createContainer(String containerId, String owner) - throws IOException { - XceiverClientSpi client = null; - try { - Pipeline pipeline = - storageContainerLocationClient.allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerId, owner); - client = xceiverClientManager.acquireClient(pipeline); - - // Allocated State means that SCM has allocated this pipeline in its - // namespace. The client needs to create the pipeline on the machines - // which was choosen by the SCM. - Preconditions.checkState(pipeline.getLifeCycleState() == ALLOCATED || - pipeline.getLifeCycleState() == OPEN, "Unexpected pipeline state"); - if (pipeline.getLifeCycleState() == ALLOCATED) { - createPipeline(client, pipeline); - } - // TODO : Container Client State needs to be updated. - // TODO : Return ContainerInfo instead of Pipeline - createContainer(containerId, client, pipeline); - return pipeline; - } finally { - if (client != null) { - xceiverClientManager.releaseClient(client); - } - } - } - - /** - * Create a container over pipeline specified by the SCM. - * - * @param containerId - Container ID - * @param client - Client to communicate with Datanodes - * @param pipeline - A pipeline that is already created. - * @throws IOException - */ - public void createContainer(String containerId, XceiverClientSpi client, - Pipeline pipeline) throws IOException { - String traceID = UUID.randomUUID().toString(); - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.container, - containerId, - ObjectStageChangeRequestProto.Op.create, - ObjectStageChangeRequestProto.Stage.begin); - ContainerProtocolCalls.createContainer(client, traceID); - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.container, - containerId, - ObjectStageChangeRequestProto.Op.create, - ObjectStageChangeRequestProto.Stage.complete); - - // Let us log this info after we let SCM know that we have completed the - // creation state. - if (LOG.isDebugEnabled()) { - LOG.debug("Created container " + containerId - + " leader:" + pipeline.getLeader() - + " machines:" + pipeline.getMachines()); - } - } - - /** - * Creates a pipeline over the machines choosen by the SCM. - * - * @param client - Client - * @param pipeline - pipeline to be createdon Datanodes. - * @throws IOException - */ - private void createPipeline(XceiverClientSpi client, Pipeline pipeline) - throws IOException { - - Preconditions.checkNotNull(pipeline.getPipelineName(), "Pipeline " + - "name cannot be null when client create flag is set."); - - // Pipeline creation is a three step process. - // - // 1. Notify SCM that this client is doing a create pipeline on - // datanodes. - // - // 2. Talk to Datanodes to create the pipeline. - // - // 3. update SCM that pipeline creation was successful. - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.pipeline, - pipeline.getPipelineName(), - ObjectStageChangeRequestProto.Op.create, - ObjectStageChangeRequestProto.Stage.begin); - - client.createPipeline(pipeline.getPipelineName(), - pipeline.getMachines()); - - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.pipeline, - pipeline.getPipelineName(), - ObjectStageChangeRequestProto.Op.create, - ObjectStageChangeRequestProto.Stage.complete); - - // TODO : Should we change the state on the client side ?? - // That makes sense, but it is not needed for the client to work. - LOG.debug("Pipeline creation successful. Pipeline: {}", - pipeline.toString()); - } - - /** - * @inheritDoc - */ - @Override - public Pipeline createContainer(HdslProtos.ReplicationType type, - HdslProtos.ReplicationFactor factor, - String containerId, String owner) throws IOException { - XceiverClientSpi client = null; - try { - // allocate container on SCM. - Pipeline pipeline = - storageContainerLocationClient.allocateContainer(type, factor, - containerId, owner); - client = xceiverClientManager.acquireClient(pipeline); - - // Allocated State means that SCM has allocated this pipeline in its - // namespace. The client needs to create the pipeline on the machines - // which was choosen by the SCM. - if (pipeline.getLifeCycleState() == ALLOCATED) { - createPipeline(client, pipeline); - } - - // TODO : Return ContainerInfo instead of Pipeline - // connect to pipeline leader and allocate container on leader datanode. - client = xceiverClientManager.acquireClient(pipeline); - createContainer(containerId, client, pipeline); - return pipeline; - } finally { - if (client != null) { - xceiverClientManager.releaseClient(client); - } - } - } - - /** - * Returns a set of Nodes that meet a query criteria. - * - * @param nodeStatuses - A set of criteria that we want the node to have. - * @param queryScope - Query scope - Cluster or pool. - * @param poolName - if it is pool, a pool name is required. - * @return A set of nodes that meet the requested criteria. - * @throws IOException - */ - @Override - public HdslProtos.NodePool queryNode(EnumSet<HdslProtos.NodeState> - nodeStatuses, HdslProtos.QueryScope queryScope, String poolName) - throws IOException { - return storageContainerLocationClient.queryNode(nodeStatuses, queryScope, - poolName); - } - - /** - * Creates a specified replication pipeline. - */ - @Override - public Pipeline createReplicationPipeline(HdslProtos.ReplicationType type, - HdslProtos.ReplicationFactor factor, HdslProtos.NodePool nodePool) - throws IOException { - return storageContainerLocationClient.createReplicationPipeline(type, - factor, nodePool); - } - - /** - * Delete the container, this will release any resource it uses. - * @param pipeline - Pipeline that represents the container. - * @param force - True to forcibly delete the container. - * @throws IOException - */ - @Override - public void deleteContainer(Pipeline pipeline, boolean force) - throws IOException { - XceiverClientSpi client = null; - try { - client = xceiverClientManager.acquireClient(pipeline); - String traceID = UUID.randomUUID().toString(); - ContainerProtocolCalls.deleteContainer(client, force, traceID); - storageContainerLocationClient - .deleteContainer(pipeline.getContainerName()); - if (LOG.isDebugEnabled()) { - LOG.debug("Deleted container {}, leader: {}, machines: {} ", - pipeline.getContainerName(), - pipeline.getLeader(), - pipeline.getMachines()); - } - } finally { - if (client != null) { - xceiverClientManager.releaseClient(client); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public List<ContainerInfo> listContainer(String startName, - String prefixName, int count) - throws IOException { - return storageContainerLocationClient.listContainer( - startName, prefixName, count); - } - - /** - * Get meta data from an existing container. - * - * @param pipeline - pipeline that represents the container. - * @return ContainerInfo - a message of protobuf which has basic info - * of a container. - * @throws IOException - */ - @Override - public ContainerData readContainer(Pipeline pipeline) throws IOException { - XceiverClientSpi client = null; - try { - client = xceiverClientManager.acquireClient(pipeline); - String traceID = UUID.randomUUID().toString(); - ReadContainerResponseProto response = - ContainerProtocolCalls.readContainer(client, - pipeline.getContainerName(), traceID); - if (LOG.isDebugEnabled()) { - LOG.debug("Read container {}, leader: {}, machines: {} ", - pipeline.getContainerName(), - pipeline.getLeader(), - pipeline.getMachines()); - } - return response.getContainerData(); - } finally { - if (client != null) { - xceiverClientManager.releaseClient(client); - } - } - } - - /** - * Given an id, return the pipeline associated with the container. - * @param containerId - String Container ID - * @return Pipeline of the existing container, corresponding to the given id. - * @throws IOException - */ - @Override - public Pipeline getContainer(String containerId) throws - IOException { - return storageContainerLocationClient.getContainer(containerId); - } - - /** - * Close a container. - * - * @param pipeline the container to be closed. - * @throws IOException - */ - @Override - public void closeContainer(Pipeline pipeline) throws IOException { - XceiverClientSpi client = null; - try { - LOG.debug("Close container {}", pipeline); - /* - TODO: two orders here, revisit this later: - 1. close on SCM first, then on data node - 2. close on data node first, then on SCM - - with 1: if client failed after closing on SCM, then there is a - container SCM thinks as closed, but is actually open. Then SCM will no - longer allocate block to it, which is fine. But SCM may later try to - replicate this "closed" container, which I'm not sure is safe. - - with 2: if client failed after close on datanode, then there is a - container SCM thinks as open, but is actually closed. Then SCM will still - try to allocate block to it. Which will fail when actually doing the - write. No more data can be written, but at least the correctness and - consistency of existing data will maintain. - - For now, take the #2 way. - */ - // Actually close the container on Datanode - client = xceiverClientManager.acquireClient(pipeline); - String traceID = UUID.randomUUID().toString(); - - String containerId = pipeline.getContainerName(); - - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.container, - containerId, - ObjectStageChangeRequestProto.Op.close, - ObjectStageChangeRequestProto.Stage.begin); - - ContainerProtocolCalls.closeContainer(client, traceID); - // Notify SCM to close the container - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.container, - containerId, - ObjectStageChangeRequestProto.Op.close, - ObjectStageChangeRequestProto.Stage.complete); - } finally { - if (client != null) { - xceiverClientManager.releaseClient(client); - } - } - } - - /** - * Get the the current usage information. - * @param pipeline - Pipeline - * @return the size of the given container. - * @throws IOException - */ - @Override - public long getContainerSize(Pipeline pipeline) throws IOException { - // TODO : Pipeline can be null, handle it correctly. - long size = getContainerSizeB(); - if (size == -1) { - throw new IOException("Container size unknown!"); - } - return size; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/package-info.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/package-info.java deleted file mode 100644 index 9febb0ac..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm.client; - -/** - * Client facing classes for the container operations. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/package-info.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/package-info.java deleted file mode 100644 index e23763d..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm; - -/** - * Classes for different type of container service client. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java deleted file mode 100644 index 8c54d65..0000000 --- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.scm.storage; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; - -import com.google.protobuf.ByteString; - -import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ReadChunkResponseProto; -import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.scm.XceiverClientManager; - -/** - * An {@link InputStream} used by the REST service in combination with the - * SCMClient to read the value of a key from a sequence - * of container chunks. All bytes of the key value are stored in container - * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} - * instances. This class encapsulates all state management for iterating - * through the sequence of chunks and the sequence of buffers within each chunk. - */ -public class ChunkInputStream extends InputStream implements Seekable { - - private static final int EOF = -1; - - private final String key; - private final String traceID; - private XceiverClientManager xceiverClientManager; - private XceiverClientSpi xceiverClient; - private List<ChunkInfo> chunks; - private int chunkIndex; - private long[] chunkOffset; - private List<ByteBuffer> buffers; - private int bufferIndex; - - /** - * Creates a new ChunkInputStream. - * - * @param key chunk key - * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls - * @param chunks list of chunks to read - * @param traceID container protocol call traceID - */ - public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) { - this.key = key; - this.traceID = traceID; - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; - this.chunks = chunks; - this.chunkIndex = -1; - // chunkOffset[i] stores offset at which chunk i stores data in - // ChunkInputStream - this.chunkOffset = new long[this.chunks.size()]; - initializeChunkOffset(); - this.buffers = null; - this.bufferIndex = 0; - } - - private void initializeChunkOffset() { - int tempOffset = 0; - for (int i = 0; i < chunks.size(); i++) { - chunkOffset[i] = tempOffset; - tempOffset += chunks.get(i).getLen(); - } - } - - @Override - public synchronized int read() - throws IOException { - checkOpen(); - int available = prepareRead(1); - return available == EOF ? EOF : - Byte.toUnsignedInt(buffers.get(bufferIndex).get()); - } - - @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - // According to the JavaDocs for InputStream, it is recommended that - // subclasses provide an override of bulk read if possible for performance - // reasons. In addition to performance, we need to do it for correctness - // reasons. The Ozone REST service uses PipedInputStream and - // PipedOutputStream to relay HTTP response data between a Jersey thread and - // a Netty thread. It turns out that PipedInputStream/PipedOutputStream - // have a subtle dependency (bug?) on the wrapped stream providing separate - // implementations of single-byte read and bulk read. Without this, get key - // responses might close the connection before writing all of the bytes - // advertised in the Content-Length. - if (b == null) { - throw new NullPointerException(); - } - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return 0; - } - checkOpen(); - int available = prepareRead(len); - if (available == EOF) { - return EOF; - } - buffers.get(bufferIndex).get(b, off, available); - return available; - } - - @Override - public synchronized void close() { - if (xceiverClientManager != null && xceiverClient != null) { - xceiverClientManager.releaseClient(xceiverClient); - xceiverClientManager = null; - xceiverClient = null; - } - } - - /** - * Checks if the stream is open. If not, throws an exception. - * - * @throws IOException if stream is closed - */ - private synchronized void checkOpen() throws IOException { - if (xceiverClient == null) { - throw new IOException("ChunkInputStream has been closed."); - } - } - - /** - * Prepares to read by advancing through chunks and buffers as needed until it - * finds data to return or encounters EOF. - * - * @param len desired length of data to read - * @return length of data available to read, possibly less than desired length - */ - private synchronized int prepareRead(int len) throws IOException { - for (;;) { - if (chunks == null || chunks.isEmpty()) { - // This must be an empty key. - return EOF; - } else if (buffers == null) { - // The first read triggers fetching the first chunk. - readChunkFromContainer(); - } else if (!buffers.isEmpty() && - buffers.get(bufferIndex).hasRemaining()) { - // Data is available from the current buffer. - ByteBuffer bb = buffers.get(bufferIndex); - return len > bb.remaining() ? bb.remaining() : len; - } else if (!buffers.isEmpty() && - !buffers.get(bufferIndex).hasRemaining() && - bufferIndex < buffers.size() - 1) { - // There are additional buffers available. - ++bufferIndex; - } else if (chunkIndex < chunks.size() - 1) { - // There are additional chunks available. - readChunkFromContainer(); - } else { - // All available input has been consumed. - return EOF; - } - } - } - - /** - * Attempts to read the chunk at the specified offset in the chunk list. If - * successful, then the data of the read chunk is saved so that its bytes can - * be returned from subsequent read calls. - * - * @throws IOException if there is an I/O error while performing the call - */ - private synchronized void readChunkFromContainer() throws IOException { - // On every chunk read chunkIndex should be increased so as to read the - // next chunk - chunkIndex += 1; - final ReadChunkResponseProto readChunkResponse; - try { - readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, - chunks.get(chunkIndex), key, traceID); - } catch (IOException e) { - throw new IOException("Unexpected OzoneException: " + e.toString(), e); - } - ByteString byteString = readChunkResponse.getData(); - buffers = byteString.asReadOnlyByteBufferList(); - bufferIndex = 0; - } - - @Override - public synchronized void seek(long pos) throws IOException { - if (pos < 0 || (chunks.size() == 0 && pos > 0) - || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1) - .getLen()) { - throw new EOFException( - "EOF encountered pos: " + pos + " container key: " + key); - } - if (chunkIndex == -1) { - chunkIndex = Arrays.binarySearch(chunkOffset, pos); - } else if (pos < chunkOffset[chunkIndex]) { - chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos); - } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex) - .getLen()) { - chunkIndex = - Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos); - } - if (chunkIndex < 0) { - // Binary search returns -insertionPoint - 1 if element is not present - // in the array. insertionPoint is the point at which element would be - // inserted in the sorted array. We need to adjust the chunkIndex - // accordingly so that chunkIndex = insertionPoint - 1 - chunkIndex = -chunkIndex -2; - } - // adjust chunkIndex so that readChunkFromContainer reads the correct chunk - chunkIndex -= 1; - readChunkFromContainer(); - adjustBufferIndex(pos); - } - - private void adjustBufferIndex(long pos) { - long tempOffest = chunkOffset[chunkIndex]; - for (int i = 0; i < buffers.size(); i++) { - if (pos - tempOffest >= buffers.get(i).capacity()) { - tempOffest += buffers.get(i).capacity(); - } else { - bufferIndex = i; - break; - } - } - buffers.get(bufferIndex).position((int) (pos - tempOffest)); - } - - @Override - public synchronized long getPos() throws IOException { - return chunkIndex == -1 ? 0 : - chunkOffset[chunkIndex] + buffers.get(bufferIndex).position(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org