HDDS-49. Standalone protocol should use grpc in place of netty. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5a914069 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5a914069 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5a914069 Branch: refs/heads/HDDS-48 Commit: 5a9140690aba295ba1226a3190b52f34347a8372 Parents: 3e5f7ea Author: Anu Engineer <aengin...@apache.org> Authored: Tue May 22 16:51:43 2018 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Tue May 22 19:56:15 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hdds/scm/XceiverClientGrpc.java | 217 +++++++++++++++++++ .../hadoop/hdds/scm/XceiverClientManager.java | 21 +- .../hadoop/hdds/scm/XceiverClientMetrics.java | 8 +- .../common/dev-support/findbugsExcludeFile.xml | 3 + hadoop-hdds/common/pom.xml | 17 ++ .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 + .../main/proto/DatanodeContainerProtocol.proto | 7 + .../common/src/main/resources/ozone-default.xml | 9 + .../common/helpers/ContainerMetrics.java | 14 +- .../transport/server/GrpcXceiverService.java | 82 +++++++ .../transport/server/XceiverServerGrpc.java | 105 +++++++++ .../container/ozoneimpl/OzoneContainer.java | 11 +- .../hadoop/ozone/MiniOzoneClusterImpl.java | 10 +- .../ozone/scm/TestXceiverClientManager.java | 67 ++++-- hadoop-project/pom.xml | 1 + 15 files changed, 540 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java new file mode 100644 index 0000000..84790e8 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; +import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.util.Time; +import org.apache.ratis.shaded.io.grpc.ManagedChannel; +import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * A Client for the storageContainer protocol. + */ +public class XceiverClientGrpc extends XceiverClientSpi { + static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); + private final Pipeline pipeline; + private final Configuration config; + private XceiverClientProtocolServiceStub asyncStub; + private XceiverClientMetrics metrics; + private ManagedChannel channel; + private final Semaphore semaphore; + + /** + * Constructs a client that can communicate with the Container framework on + * data nodes. + * + * @param pipeline - Pipeline that defines the machines. + * @param config -- Ozone Config + */ + public XceiverClientGrpc(Pipeline pipeline, Configuration config) { + super(); + Preconditions.checkNotNull(pipeline); + Preconditions.checkNotNull(config); + this.pipeline = pipeline; + this.config = config; + this.semaphore = + new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); + this.metrics = XceiverClientManager.getXceiverClientMetrics(); + } + + @Override + public void connect() throws Exception { + DatanodeDetails leader = this.pipeline.getLeader(); + + // read port from the data node, on failure use default configured + // port. + int port = leader.getContainerPort(); + if (port == 0) { + port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); + } + LOG.debug("Connecting to server Port : " + leader.getIpAddress()); + channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port) + .usePlaintext(true) + .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) + .build(); + asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel); + } + + /** + * Returns if the xceiver client connects to a server. + * + * @return True if the connection is alive, false otherwise. + */ + @VisibleForTesting + public boolean isConnected() { + return !channel.isTerminated() && !channel.isShutdown(); + } + + @Override + public void close() { + channel.shutdownNow(); + try { + channel.awaitTermination(60, TimeUnit.MINUTES); + } catch (Exception e) { + LOG.error("Unexpected exception while waiting for channel termination", + e); + } + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request) throws IOException { + try { + return sendCommandAsync(request).get(); + } catch (ExecutionException | InterruptedException e) { + /** + * In case the grpc channel handler throws an exception, + * the exception thrown will be wrapped within {@link ExecutionException}. + * Unwarpping here so that original exception gets passed + * to to the client. + */ + if (e instanceof ExecutionException) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } + } + throw new IOException( + "Unexpected exception during execution:" + e.getMessage()); + } + } + + /** + * Sends a given command to server gets a waitable future back. + * + * @param request Request + * @return Response to the command + * @throws IOException + */ + @Override + public CompletableFuture<ContainerCommandResponseProto> + sendCommandAsync(ContainerCommandRequestProto request) + throws IOException, ExecutionException, InterruptedException { + final CompletableFuture<ContainerCommandResponseProto> replyFuture = + new CompletableFuture<>(); + semaphore.acquire(); + long requestTime = Time.monotonicNowNanos(); + metrics.incrPendingContainerOpsMetrics(request.getCmdType()); + // create a new grpc stream for each non-async call. + final StreamObserver<ContainerCommandRequestProto> requestObserver = + asyncStub.send(new StreamObserver<ContainerCommandResponseProto>() { + @Override + public void onNext(ContainerCommandResponseProto value) { + replyFuture.complete(value); + metrics.decrPendingContainerOpsMetrics(request.getCmdType()); + metrics.addContainerOpsLatency(request.getCmdType(), + Time.monotonicNowNanos() - requestTime); + semaphore.release(); + } + @Override + public void onError(Throwable t) { + replyFuture.completeExceptionally(t); + metrics.decrPendingContainerOpsMetrics(request.getCmdType()); + metrics.addContainerOpsLatency(request.getCmdType(), + Time.monotonicNowNanos() - requestTime); + semaphore.release(); + } + + @Override + public void onCompleted() { + if (!replyFuture.isDone()) { + replyFuture.completeExceptionally( + new IOException("Stream completed but no reply for request " + + request)); + } + } + }); + requestObserver.onNext(request); + requestObserver.onCompleted(); + return replyFuture; + } + + /** + * Create a pipeline. + * + * @param pipelineID - Name of the pipeline. + * @param datanodes - Datanodes + */ + @Override + public void createPipeline(String pipelineID, List<DatanodeDetails> datanodes) + throws IOException { + // For stand alone pipeline, there is no notion called setup pipeline. + return; + } + + /** + * Returns pipeline Type. + * + * @return - Stand Alone as the type. + */ + @Override + public HddsProtos.ReplicationType getPipelineType() { + return HddsProtos.ReplicationType.STAND_ALONE; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index dcaa576..8919797 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -41,8 +41,6 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos - .ReplicationType.RATIS; /** * XceiverClientManager is responsible for the lifecycle of XceiverClient @@ -62,6 +60,7 @@ public class XceiverClientManager implements Closeable { private final Configuration conf; private final Cache<Long, XceiverClientSpi> clientCache; private final boolean useRatis; + private final boolean useGrpc; private static XceiverClientMetrics metrics; /** @@ -79,6 +78,8 @@ public class XceiverClientManager implements Closeable { this.useRatis = conf.getBoolean( ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); + this.useGrpc = conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT); this.conf = conf; this.clientCache = CacheBuilder.newBuilder() .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) @@ -146,9 +147,19 @@ public class XceiverClientManager implements Closeable { new Callable<XceiverClientSpi>() { @Override public XceiverClientSpi call() throws Exception { - XceiverClientSpi client = pipeline.getType() == RATIS ? - XceiverClientRatis.newXceiverClientRatis(pipeline, conf) - : new XceiverClient(pipeline, conf); + XceiverClientSpi client = null; + switch (pipeline.getType()) { + case RATIS: + client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf); + break; + case STAND_ALONE: + client = useGrpc ? new XceiverClientGrpc(pipeline, conf) : + new XceiverClient(pipeline, conf); + break; + case CHAINED: + default: + throw new IOException ("not implemented" + pipeline.getType()); + } client.connect(); return client; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java index fbc348c..a430400 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java @@ -49,13 +49,13 @@ public class XceiverClientMetrics { this.containerOpsLatency = new MutableRate[numEnumEntries]; for (int i = 0; i < numEnumEntries; i++) { pendingOpsArray[i] = registry.newCounter( - "numPending" + ContainerProtos.Type.valueOf(i + 1), - "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops", + "numPending" + ContainerProtos.Type.forNumber(i + 1), + "number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops", (long) 0); containerOpsLatency[i] = registry.newRate( - ContainerProtos.Type.valueOf(i + 1) + "Latency", - "latency of " + ContainerProtos.Type.valueOf(i + 1) + ContainerProtos.Type.forNumber(i + 1) + "Latency", + "latency of " + ContainerProtos.Type.forNumber(i + 1) + " ops"); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml index 3571a89..daf6fec 100644 --- a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml @@ -18,4 +18,7 @@ <Match> <Package name="org.apache.hadoop.hdds.protocol.proto"/> </Match> + <Match> + <Package name="org.apache.hadoop.hdds.protocol.datanode.proto"/> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml index 6310df1..a8a634c 100644 --- a/hadoop-hdds/common/pom.xml +++ b/hadoop-hdds/common/pom.xml @@ -61,6 +61,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <artifactId>ratis-grpc</artifactId> <groupId>org.apache.ratis</groupId> </dependency> + <dependency> + <groupId>com.google.errorprone</groupId> + <artifactId>error_prone_annotations</artifactId> + <version>2.2.0</version> + <optional>true</optional> + </dependency> <dependency> <groupId>org.rocksdb</groupId> @@ -108,7 +114,15 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <goals> <goal>compile</goal> <goal>test-compile</goal> + <goal>compile-custom</goal> + <goal>test-compile-custom</goal> </goals> + <configuration> + <pluginId>grpc-java</pluginId> + <pluginArtifact> + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + </pluginArtifact> + </configuration> </execution> </executions> </plugin> @@ -122,6 +136,9 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <replace token="com.google.protobuf" value="org.apache.ratis.shaded.com.google.protobuf" dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto"> </replace> + <replace token="io.grpc" value="org.apache.ratis.shaded.io.grpc" + dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto"> + </replace> </tasks> </configuration> <goals> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 29ccf30..85407e6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -49,6 +49,10 @@ public final class ScmConfigKeys { = "dfs.container.ratis.enabled"; public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT = false; + public static final String DFS_CONTAINER_GRPC_ENABLED_KEY + = "dfs.container.grpc.enabled"; + public static final boolean DFS_CONTAINER_GRPC_ENABLED_DEFAULT + = false; public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY = "dfs.container.ratis.rpc.type"; public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 95b7cbb..1138297 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -24,6 +24,7 @@ // This file contains protocol buffers that are used to transfer data // to and from the datanode. +syntax = "proto2"; option java_package = "org.apache.hadoop.hdds.protocol.datanode.proto"; option java_outer_classname = "ContainerProtos"; option java_generate_equals_and_hash = true; @@ -418,3 +419,9 @@ message CopyContainerResponseProto { repeated bytes data = 5; optional int64 checksum = 6; } + +service XceiverClientProtocolService { + // A client-to-datanode RPC to send container commands + rpc send(stream ContainerCommandRequestProto) returns + (stream ContainerCommandResponseProto) {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index e0aca67..7a91610 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -72,6 +72,15 @@ </description> </property> <property> + <name>dfs.container.grpc.enabled</name> + <value>false</value> + <tag>OZONE, MANAGEMENT, PIPELINE, RATIS</tag> + <description>Ozone supports different kinds of replication pipelines + protocols. grpc is one of the replication pipeline protocol supported by + ozone. + </description> + </property> + <property> <name>dfs.container.ratis.ipc</name> <value>9858</value> <tag>OZONE, CONTAINER, PIPELINE, RATIS, MANAGEMENT</tag> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java index 4300b2d..714db59 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java @@ -63,20 +63,20 @@ public class ContainerMetrics { this.registry = new MetricsRegistry("StorageContainerMetrics"); for (int i = 0; i < numEnumEntries; i++) { numOpsArray[i] = registry.newCounter( - "num" + ContainerProtos.Type.valueOf(i + 1), - "number of " + ContainerProtos.Type.valueOf(i + 1) + " ops", + "num" + ContainerProtos.Type.forNumber(i + 1), + "number of " + ContainerProtos.Type.forNumber(i + 1) + " ops", (long) 0); opsBytesArray[i] = registry.newCounter( - "bytes" + ContainerProtos.Type.valueOf(i + 1), - "bytes used by " + ContainerProtos.Type.valueOf(i + 1) + "op", + "bytes" + ContainerProtos.Type.forNumber(i + 1), + "bytes used by " + ContainerProtos.Type.forNumber(i + 1) + "op", (long) 0); opsLatency[i] = registry.newRate( - "latency" + ContainerProtos.Type.valueOf(i + 1), - ContainerProtos.Type.valueOf(i + 1) + " op"); + "latency" + ContainerProtos.Type.forNumber(i + 1), + ContainerProtos.Type.forNumber(i + 1) + " op"); for (int j = 0; j < len; j++) { int interval = intervals[j]; - String quantileName = ContainerProtos.Type.valueOf(i + 1) + "Nanos" + String quantileName = ContainerProtos.Type.forNumber(i + 1) + "Nanos" + interval + "s"; opsLatQuantiles[i][j] = registry.newQuantiles(quantileName, "latency of Container ops", "ops", "latency", interval); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java new file mode 100644 index 0000000..df6220c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto + .XceiverClientProtocolServiceGrpc; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Grpc Service for handling Container Commands on datanode. + */ +public class GrpcXceiverService extends + XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceImplBase { + public static final Logger + LOG = LoggerFactory.getLogger(GrpcXceiverService.class); + + private final ContainerDispatcher dispatcher; + + public GrpcXceiverService(ContainerDispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + @Override + public StreamObserver<ContainerCommandRequestProto> send( + StreamObserver<ContainerCommandResponseProto> responseObserver) { + return new StreamObserver<ContainerCommandRequestProto>() { + private final AtomicBoolean isClosed = new AtomicBoolean(false); + + @Override + public void onNext(ContainerCommandRequestProto request) { + try { + ContainerCommandResponseProto resp = dispatcher.dispatch(request); + responseObserver.onNext(resp); + } catch (Throwable e) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} got exception when processing" + + " ContainerCommandRequestProto {}: {}", request, e); + } + responseObserver.onError(e); + } + } + + @Override + public void onError(Throwable t) { + // for now we just log a msg + LOG.info("{}: ContainerCommand send on error. Exception: {}", t); + } + + @Override + public void onCompleted() { + if (isClosed.compareAndSet(false, true)) { + LOG.info("{}: ContainerCommand send completed"); + responseObserver.onCompleted(); + } + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java new file mode 100644 index 0000000..30a2f87 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.ratis.shaded.io.grpc.Server; +import org.apache.ratis.shaded.io.grpc.ServerBuilder; +import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketAddress; + +/** + * Creates a Grpc server endpoint that acts as the communication layer for + * Ozone containers. + */ +public final class XceiverServerGrpc implements XceiverServerSpi { + private static final Logger + LOG = LoggerFactory.getLogger(XceiverServerGrpc.class); + private int port; + private Server server; + + /** + * Constructs a Grpc server class. + * + * @param conf - Configuration + */ + public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, + ContainerDispatcher dispatcher) { + Preconditions.checkNotNull(conf); + + this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); + // Get an available port on current node and + // use that as the container port + if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) { + try (ServerSocket socket = new ServerSocket()) { + socket.setReuseAddress(true); + SocketAddress address = new InetSocketAddress(0); + socket.bind(address); + this.port = socket.getLocalPort(); + LOG.info("Found a free port for the server : {}", this.port); + } catch (IOException e) { + LOG.error("Unable find a random free port for the server, " + + "fallback to use default port {}", this.port, e); + } + } + datanodeDetails.setContainerPort(port); + server = ((NettyServerBuilder) ServerBuilder.forPort(port)) + .maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) + .addService(new GrpcXceiverService(dispatcher)) + .build(); + } + + @Override + public int getIPCPort() { + return this.port; + } + + /** + * Returns the Replication type supported by this end-point. + * + * @return enum -- {Stand_Alone, Ratis, Grpc, Chained} + */ + @Override + public HddsProtos.ReplicationType getServerType() { + return HddsProtos.ReplicationType.STAND_ALONE; + } + + @Override + public void start() throws IOException { + server.start(); + } + + @Override + public void stop() { + server.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 1fc79d7..b497cdc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -40,6 +41,8 @@ import org.apache.hadoop.ozone.container.common.statemachine.background .BlockDeletingService; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.apache.hadoop.ozone.container.common.transport.server + .XceiverServerGrpc; +import org.apache.hadoop.ozone.container.common.transport.server .XceiverServerSpi; import org.apache.hadoop.ozone.container.common.transport.server.ratis .XceiverServerRatis; @@ -121,8 +124,14 @@ public class OzoneContainer { this.dispatcher = new Dispatcher(manager, this.ozoneConfig); + boolean useGrpc = this.ozoneConfig.getBoolean( + ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT); server = new XceiverServerSpi[]{ - new XceiverServer(datanodeDetails, this.ozoneConfig, this.dispatcher), + useGrpc ? new XceiverServerGrpc(datanodeDetails, + this.ozoneConfig, this.dispatcher) : + new XceiverServer(datanodeDetails, + this.ozoneConfig, this.dispatcher), XceiverServerRatis .newXceiverServerRatis(datanodeDetails, this.ozoneConfig, dispatcher) }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 08d7176..9936815 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -220,13 +220,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { datanodeService.stop(); datanodeService.join(); // ensure same ports are used across restarts. - Configuration conf = datanodeService.getConf(); + Configuration config = datanodeService.getConf(); int currentPort = datanodeService.getDatanodeDetails().getContainerPort(); - conf.setInt(DFS_CONTAINER_IPC_PORT, currentPort); - conf.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false); + config.setInt(DFS_CONTAINER_IPC_PORT, currentPort); + config.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false); int ratisPort = datanodeService.getDatanodeDetails().getRatisPort(); - conf.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort); - conf.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false); + config.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort); + config.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false); datanodeService.start(null); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java index 07ad6ef..77e4e1b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.scm; import com.google.common.cache.Cache; import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -30,13 +31,17 @@ import org.apache.hadoop.hdds.scm.protocolPB .StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.junit.Assert; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import static org.apache.hadoop.hdds.scm .ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY; @@ -44,19 +49,32 @@ import static org.apache.hadoop.hdds.scm /** * Test for XceiverClientManager caching and eviction. */ +@RunWith(Parameterized.class) public class TestXceiverClientManager { private static OzoneConfiguration config; private static MiniOzoneCluster cluster; private static StorageContainerLocationProtocolClientSideTranslatorPB storageContainerLocationClient; private static String containerOwner = "OZONE"; + private static boolean shouldUseGrpc; + + @Parameterized.Parameters + public static Collection<Object[]> withGrpc() { + return Arrays.asList(new Object[][] {{false}, {true}}); + } + + public TestXceiverClientManager(boolean useGrpc) { + shouldUseGrpc = useGrpc; + } @Rule public ExpectedException exception = ExpectedException.none(); - @BeforeClass - public static void init() throws Exception { + @Before + public void init() throws Exception { config = new OzoneConfiguration(); + config.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + shouldUseGrpc); cluster = MiniOzoneCluster.newBuilder(config) .setNumDatanodes(3) .build(); @@ -65,8 +83,8 @@ public class TestXceiverClientManager { .getStorageContainerLocationClient(); } - @AfterClass - public static void shutdown() { + @After + public void shutdown() { if (cluster != null) { cluster.shutdown(); } @@ -76,6 +94,8 @@ public class TestXceiverClientManager { @Test public void testCaching() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); + conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + shouldUseGrpc); XceiverClientManager clientManager = new XceiverClientManager(conf); ContainerInfo container1 = storageContainerLocationClient @@ -106,6 +126,8 @@ public class TestXceiverClientManager { public void testFreeByReference() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); + conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + shouldUseGrpc); XceiverClientManager clientManager = new XceiverClientManager(conf); Cache<Long, XceiverClientSpi> cache = clientManager.getClientCache(); @@ -140,10 +162,18 @@ public class TestXceiverClientManager { // After releasing the client, this connection should be closed // and any container operations should fail clientManager.releaseClient(client1); - exception.expect(IOException.class); - exception.expectMessage("This channel is not connected."); - ContainerProtocolCalls.createContainer(client1, - container1.getContainerID(), traceID1); + + String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" : + "This channel is not connected."; + try { + ContainerProtocolCalls.createContainer(client1, + container1.getContainerID(), traceID1); + Assert.fail("Create container should throw exception on closed" + + "client"); + } catch (Exception e) { + Assert.assertEquals(e.getClass(), IOException.class); + Assert.assertTrue(e.getMessage().contains(expectedMessage)); + } clientManager.releaseClient(client2); } @@ -151,6 +181,8 @@ public class TestXceiverClientManager { public void testFreeByEviction() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); + conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + shouldUseGrpc); XceiverClientManager clientManager = new XceiverClientManager(conf); Cache<Long, XceiverClientSpi> cache = clientManager.getClientCache(); @@ -181,10 +213,17 @@ public class TestXceiverClientManager { // Any container operation should now fail String traceID2 = "trace" + RandomStringUtils.randomNumeric(4); - exception.expect(IOException.class); - exception.expectMessage("This channel is not connected."); - ContainerProtocolCalls.createContainer(client1, - container1.getContainerID(), traceID2); + String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" : + "This channel is not connected."; + try { + ContainerProtocolCalls.createContainer(client1, + container1.getContainerID(), traceID2); + Assert.fail("Create container should throw exception on closed" + + "client"); + } catch (Exception e) { + Assert.assertEquals(e.getClass(), IOException.class); + Assert.assertTrue(e.getMessage().contains(expectedMessage)); + } clientManager.releaseClient(client2); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index a916108..73c3f5b 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -107,6 +107,7 @@ <!-- Maven protoc compiler --> <protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version> <protobuf-compile.version>3.5.0</protobuf-compile.version> + <grpc.version>1.10.0</grpc.version> <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version> <!-- define the Java language version used by the compiler --> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org