http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java deleted file mode 100644 index 690aa01..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.states.endpoint; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; - -/** - * Register a container with SCM. - */ -public final class RegisterEndpointTask implements - Callable<EndpointStateMachine.EndPointStates> { - static final Logger LOG = LoggerFactory.getLogger(RegisterEndpointTask.class); - - private final EndpointStateMachine rpcEndPoint; - private final Configuration conf; - private Future<EndpointStateMachine.EndPointStates> result; - private DatanodeDetails datanodeDetails; - private final OzoneContainer datanodeContainerManager; - private StateContext stateContext; - - /** - * Creates a register endpoint task. - * - * @param rpcEndPoint - endpoint - * @param conf - conf - * @param ozoneContainer - container - */ - @VisibleForTesting - public RegisterEndpointTask(EndpointStateMachine rpcEndPoint, - Configuration conf, OzoneContainer ozoneContainer, - StateContext context) { - this.rpcEndPoint = rpcEndPoint; - this.conf = conf; - this.datanodeContainerManager = ozoneContainer; - this.stateContext = context; - - } - - /** - * Get the DatanodeDetails. - * - * @return DatanodeDetailsProto - */ - public DatanodeDetails getDatanodeDetails() { - return datanodeDetails; - } - - /** - * Set the contiainerNodeID Proto. - * - * @param datanodeDetails - Container Node ID. - */ - public void setDatanodeDetails( - DatanodeDetails datanodeDetails) { - this.datanodeDetails = datanodeDetails; - } - - /** - * Computes a result, or throws an exception if unable to do so. - * - * @return computed result - * @throws Exception if unable to compute a result - */ - @Override - public EndpointStateMachine.EndPointStates call() throws Exception { - - if (getDatanodeDetails() == null) { - LOG.error("DatanodeDetails cannot be null in RegisterEndpoint task, " + - "shutting down the endpoint."); - return rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN); - } - - rpcEndPoint.lock(); - try { - - ContainerReportsProto containerReport = datanodeContainerManager - .getContainerReport(); - NodeReportProto nodeReport = datanodeContainerManager.getNodeReport(); - PipelineReportsProto pipelineReportsProto = - datanodeContainerManager.getPipelineReport(); - // TODO : Add responses to the command Queue. - SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint() - .register(datanodeDetails.getProtoBufMessage(), nodeReport, - containerReport, pipelineReportsProto); - Preconditions.checkState(UUID.fromString(response.getDatanodeUUID()) - .equals(datanodeDetails.getUuid()), - "Unexpected datanode ID in the response."); - Preconditions.checkState(!StringUtils.isBlank(response.getClusterID()), - "Invalid cluster ID in the response."); - if (response.hasHostname() && response.hasIpAddress()) { - datanodeDetails.setHostName(response.getHostname()); - datanodeDetails.setIpAddress(response.getIpAddress()); - } - EndpointStateMachine.EndPointStates nextState = - rpcEndPoint.getState().getNextState(); - rpcEndPoint.setState(nextState); - rpcEndPoint.zeroMissedCount(); - this.stateContext.configureHeartbeatFrequency(); - } catch (IOException ex) { - rpcEndPoint.logIfNeeded(ex); - } finally { - rpcEndPoint.unlock(); - } - - return rpcEndPoint.getState(); - } - - /** - * Returns a builder class for RegisterEndPoint task. - * - * @return Builder. - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Builder class for RegisterEndPoint task. - */ - public static class Builder { - private EndpointStateMachine endPointStateMachine; - private Configuration conf; - private DatanodeDetails datanodeDetails; - private OzoneContainer container; - private StateContext context; - - /** - * Constructs the builder class. - */ - public Builder() { - } - - /** - * Sets the endpoint state machine. - * - * @param rpcEndPoint - Endpoint state machine. - * @return Builder - */ - public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) { - this.endPointStateMachine = rpcEndPoint; - return this; - } - - /** - * Sets the Config. - * - * @param config - config - * @return Builder. - */ - public Builder setConfig(Configuration config) { - this.conf = config; - return this; - } - - /** - * Sets the NodeID. - * - * @param dnDetails - NodeID proto - * @return Builder - */ - public Builder setDatanodeDetails(DatanodeDetails dnDetails) { - this.datanodeDetails = dnDetails; - return this; - } - - /** - * Sets the ozonecontainer. - * @param ozoneContainer - * @return Builder - */ - public Builder setOzoneContainer(OzoneContainer ozoneContainer) { - this.container = ozoneContainer; - return this; - } - - public Builder setContext(StateContext stateContext) { - this.context = stateContext; - return this; - } - - public RegisterEndpointTask build() { - if (endPointStateMachine == null) { - LOG.error("No endpoint specified."); - throw new IllegalArgumentException("A valid endpoint state machine is" + - " needed to construct RegisterEndPoint task"); - } - - if (conf == null) { - LOG.error("No config specified."); - throw new IllegalArgumentException( - "A valid configuration is needed to construct RegisterEndpoint " - + "task"); - } - - if (datanodeDetails == null) { - LOG.error("No datanode specified."); - throw new IllegalArgumentException("A vaild Node ID is needed to " + - "construct RegisterEndpoint task"); - } - - if (container == null) { - LOG.error("Container is not specified"); - throw new IllegalArgumentException("Container is not specified to " + - "construct RegisterEndpoint task"); - } - - if (context == null) { - LOG.error("StateContext is not specified"); - throw new IllegalArgumentException("Container is not specified to " + - "construct RegisterEndpoint task"); - } - - RegisterEndpointTask task = new RegisterEndpointTask(this - .endPointStateMachine, this.conf, this.container, this.context); - task.setDatanodeDetails(datanodeDetails); - return task; - } - - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java deleted file mode 100644 index 64e078d..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.states.endpoint; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.VersionResponse; -import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.Callable; - -/** - * Task that returns version. - */ -public class VersionEndpointTask implements - Callable<EndpointStateMachine.EndPointStates> { - public static final Logger LOG = LoggerFactory.getLogger(VersionEndpointTask - .class); - private final EndpointStateMachine rpcEndPoint; - private final Configuration configuration; - private final OzoneContainer ozoneContainer; - - public VersionEndpointTask(EndpointStateMachine rpcEndPoint, - Configuration conf, OzoneContainer container) { - this.rpcEndPoint = rpcEndPoint; - this.configuration = conf; - this.ozoneContainer = container; - } - - /** - * Computes a result, or throws an exception if unable to do so. - * - * @return computed result - * @throws Exception if unable to compute a result - */ - @Override - public EndpointStateMachine.EndPointStates call() throws Exception { - rpcEndPoint.lock(); - try{ - SCMVersionResponseProto versionResponse = - rpcEndPoint.getEndPoint().getVersion(null); - VersionResponse response = VersionResponse.getFromProtobuf( - versionResponse); - rpcEndPoint.setVersion(response); - VolumeSet volumeSet = ozoneContainer.getVolumeSet(); - Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap(); - - String scmId = response.getValue(OzoneConsts.SCM_ID); - String clusterId = response.getValue(OzoneConsts.CLUSTER_ID); - - Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " + - "null"); - Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " + - "cannot be null"); - - // If version file does not exist create version file and also set scmId - for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) { - HddsVolume hddsVolume = entry.getValue(); - boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId, - clusterId, LOG); - if (!result) { - volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath()); - } - } - if (volumeSet.getVolumesList().size() == 0) { - // All volumes are inconsistent state - throw new DiskOutOfSpaceException("All configured Volumes are in " + - "Inconsistent State"); - } - ozoneContainer.getDispatcher().setScmId(scmId); - - EndpointStateMachine.EndPointStates nextState = - rpcEndPoint.getState().getNextState(); - rpcEndPoint.setState(nextState); - rpcEndPoint.zeroMissedCount(); - } catch (DiskOutOfSpaceException ex) { - rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN); - } catch(IOException ex) { - rpcEndPoint.logIfNeeded(ex); - } finally { - rpcEndPoint.unlock(); - } - return rpcEndPoint.getState(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java deleted file mode 100644 index 1122598..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.states.endpoint; -/** - This package contains code for RPC endpoints transitions. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java deleted file mode 100644 index 92c953f..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.states; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java deleted file mode 100644 index db4a86a..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto - .XceiverClientProtocolServiceGrpc; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Grpc Service for handling Container Commands on datanode. - */ -public class GrpcXceiverService extends - XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceImplBase { - public static final Logger - LOG = LoggerFactory.getLogger(GrpcXceiverService.class); - - private final ContainerDispatcher dispatcher; - - public GrpcXceiverService(ContainerDispatcher dispatcher) { - this.dispatcher = dispatcher; - } - - @Override - public StreamObserver<ContainerCommandRequestProto> send( - StreamObserver<ContainerCommandResponseProto> responseObserver) { - return new StreamObserver<ContainerCommandRequestProto>() { - private final AtomicBoolean isClosed = new AtomicBoolean(false); - - @Override - public void onNext(ContainerCommandRequestProto request) { - try { - ContainerCommandResponseProto resp = dispatcher.dispatch(request); - responseObserver.onNext(resp); - } catch (Throwable e) { - LOG.error("{} got exception when processing" - + " ContainerCommandRequestProto {}: {}", request, e); - responseObserver.onError(e); - } - } - - @Override - public void onError(Throwable t) { - // for now we just log a msg - LOG.error("{}: ContainerCommand send on error. Exception: {}", t); - } - - @Override - public void onCompleted() { - if (isClosed.compareAndSet(false, true)) { - LOG.debug("{}: ContainerCommand send completed"); - responseObserver.onCompleted(); - } - } - }; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java deleted file mode 100644 index c51da98..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.container.common.helpers. - StorageContainerException; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; - -import org.apache.ratis.shaded.io.grpc.BindableService; -import org.apache.ratis.shaded.io.grpc.Server; -import org.apache.ratis.shaded.io.grpc.ServerBuilder; -import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.SocketAddress; -import java.util.Collections; -import java.util.List; -import java.util.UUID; - -/** - * Creates a Grpc server endpoint that acts as the communication layer for - * Ozone containers. - */ -public final class XceiverServerGrpc implements XceiverServerSpi { - private static final Logger - LOG = LoggerFactory.getLogger(XceiverServerGrpc.class); - private int port; - private UUID id; - private Server server; - private final ContainerDispatcher storageContainer; - - /** - * Constructs a Grpc server class. - * - * @param conf - Configuration - */ - public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, - ContainerDispatcher dispatcher, BindableService... additionalServices) { - Preconditions.checkNotNull(conf); - - this.id = datanodeDetails.getUuid(); - this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, - OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); - // Get an available port on current node and - // use that as the container port - if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, - OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) { - try (ServerSocket socket = new ServerSocket()) { - socket.setReuseAddress(true); - SocketAddress address = new InetSocketAddress(0); - socket.bind(address); - this.port = socket.getLocalPort(); - LOG.info("Found a free port for the server : {}", this.port); - } catch (IOException e) { - LOG.error("Unable find a random free port for the server, " - + "fallback to use default port {}", this.port, e); - } - } - datanodeDetails.setPort( - DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port)); - server = ((NettyServerBuilder) ServerBuilder.forPort(port)) - .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) - .addService(new GrpcXceiverService(dispatcher)) - .build(); - NettyServerBuilder nettyServerBuilder = - ((NettyServerBuilder) ServerBuilder.forPort(port)) - .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) - .addService(new GrpcXceiverService(dispatcher)); - for (BindableService service : additionalServices) { - nettyServerBuilder.addService(service); - } - server = nettyServerBuilder.build(); - storageContainer = dispatcher; - } - - @Override - public int getIPCPort() { - return this.port; - } - - /** - * Returns the Replication type supported by this end-point. - * - * @return enum -- {Stand_Alone, Ratis, Grpc, Chained} - */ - @Override - public HddsProtos.ReplicationType getServerType() { - return HddsProtos.ReplicationType.STAND_ALONE; - } - - @Override - public void start() throws IOException { - server.start(); - } - - @Override - public void stop() { - server.shutdown(); - } - - @Override - public void submitRequest(ContainerCommandRequestProto request, - HddsProtos.PipelineID pipelineID) throws IOException { - ContainerProtos.ContainerCommandResponseProto response = - storageContainer.dispatch(request); - if (response.getResult() != ContainerProtos.Result.SUCCESS) { - throw new StorageContainerException(response.getMessage(), - response.getResult()); - } - } - - @Override - public List<PipelineReport> getPipelineReport() { - return Collections.singletonList( - PipelineReport.newBuilder() - .setPipelineID(PipelineID.valueOf(id).getProtobuf()) - .build()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java deleted file mode 100644 index 8c3fa5c..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server; - -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReport; - -import java.io.IOException; -import java.util.List; - -/** A server endpoint that acts as the communication layer for Ozone - * containers. */ -public interface XceiverServerSpi { - /** Starts the server. */ - void start() throws IOException; - - /** Stops a running server. */ - void stop(); - - /** Get server IPC port. */ - int getIPCPort(); - - /** - * Returns the Replication type supported by this end-point. - * @return enum -- {Stand_Alone, Ratis, Chained} - */ - HddsProtos.ReplicationType getServerType(); - - /** - * submits a containerRequest to be performed by the replication pipeline. - * @param request ContainerCommandRequest - */ - void submitRequest(ContainerCommandRequestProto request, - HddsProtos.PipelineID pipelineID) - throws IOException; - - /** - * Get pipeline report for the XceiverServer instance. - * @return list of report for each pipeline. - */ - List<PipelineReport> getPipelineReport(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java deleted file mode 100644 index 59c96f1..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server; - -/** - * This package contains classes for the server of the storage container - * protocol. - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java deleted file mode 100644 index b6aed60..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.transport.server.ratis; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; - -/** - * This class is for maintaining Container State Machine statistics. - */ [email protected] -@Metrics(about="Container State Machine Metrics", context="dfs") -public class CSMMetrics { - public static final String SOURCE_NAME = - CSMMetrics.class.getSimpleName(); - - // ratis op metrics metrics - private @Metric MutableCounterLong numWriteStateMachineOps; - private @Metric MutableCounterLong numReadStateMachineOps; - private @Metric MutableCounterLong numApplyTransactionOps; - - // Failure Metrics - private @Metric MutableCounterLong numWriteStateMachineFails; - private @Metric MutableCounterLong numReadStateMachineFails; - private @Metric MutableCounterLong numApplyTransactionFails; - - public CSMMetrics() { - } - - public static CSMMetrics create() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - return ms.register(SOURCE_NAME, - "Container State Machine", - new CSMMetrics()); - } - - public void incNumWriteStateMachineOps() { - numWriteStateMachineOps.incr(); - } - - public void incNumReadStateMachineOps() { - numReadStateMachineOps.incr(); - } - - public void incNumApplyTransactionsOps() { - numApplyTransactionOps.incr(); - } - - public void incNumWriteStateMachineFails() { - numWriteStateMachineFails.incr(); - } - - public void incNumReadStateMachineFails() { - numReadStateMachineFails.incr(); - } - - public void incNumApplyTransactionsFails() { - numApplyTransactionFails.incr(); - } - - @VisibleForTesting - public long getNumWriteStateMachineOps() { - return numWriteStateMachineOps.value(); - } - - @VisibleForTesting - public long getNumReadStateMachineOps() { - return numReadStateMachineOps.value(); - } - - @VisibleForTesting - public long getNumApplyTransactionsOps() { - return numApplyTransactionOps.value(); - } - - @VisibleForTesting - public long getNumWriteStateMachineFails() { - return numWriteStateMachineFails.value(); - } - - @VisibleForTesting - public long getNumReadStateMachineFails() { - return numReadStateMachineFails.value(); - } - - @VisibleForTesting - public long getNumApplyTransactionsFails() { - return numApplyTransactionFails.value(); - } - - public void unRegister() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - ms.unregisterSource(SOURCE_NAME); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java deleted file mode 100644 index a7bef86..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ /dev/null @@ -1,656 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server.ratis; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.HddsUtils; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.server.RaftServer; -import org.apache.ratis.shaded.com.google.protobuf - .InvalidProtocolBufferException; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .WriteChunkRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ReadChunkRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ReadChunkResponseProto; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.server.storage.RaftStorage; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto; -import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; -import org.apache.ratis.statemachine.StateMachineStorage; -import org.apache.ratis.statemachine.TransactionContext; -import org.apache.ratis.statemachine.impl.BaseStateMachine; -import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; -import org.apache.ratis.statemachine.impl.TransactionContextImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.stream.Collectors; - -/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. - * - * The stateMachine is responsible for handling different types of container - * requests. The container requests can be divided into readonly and write - * requests. - * - * Read only requests are classified in - * {@link org.apache.hadoop.hdds.HddsUtils#isReadOnly} - * and these readonly requests are replied from the {@link #query(Message)}. - * - * The write requests can be divided into requests with user data - * (WriteChunkRequest) and other request without user data. - * - * Inorder to optimize the write throughput, the writeChunk request is - * processed in 2 phases. The 2 phases are divided in - * {@link #startTransaction(RaftClientRequest)}, in the first phase the user - * data is written directly into the state machine via - * {@link #writeStateMachineData} and in the second phase the - * transaction is committed via {@link #applyTransaction(TransactionContext)} - * - * For the requests with no stateMachine data, the transaction is directly - * committed through - * {@link #applyTransaction(TransactionContext)} - * - * There are 2 ordering operation which are enforced right now in the code, - * 1) Write chunk operation are executed after the create container operation, - * the write chunk operation will fail otherwise as the container still hasn't - * been created. Hence the create container operation has been split in the - * {@link #startTransaction(RaftClientRequest)}, this will help in synchronizing - * the calls in {@link #writeStateMachineData} - * - * 2) Write chunk commit operation is executed after write chunk state machine - * operation. This will ensure that commit operation is sync'd with the state - * machine operation. - * - * Synchronization between {@link #writeStateMachineData} and - * {@link #applyTransaction} need to be enforced in the StateMachine - * implementation. For example, synchronization between writeChunk and - * createContainer in {@link ContainerStateMachine}. - * - * PutBlock is synchronized with WriteChunk operations, PutBlock for a block is - * executed only after all the WriteChunk preceding the PutBlock have finished. - * - * CloseContainer is synchronized with WriteChunk and PutBlock operations, - * CloseContainer for a container is processed after all the preceding write - * operations for the container have finished. - * */ -public class ContainerStateMachine extends BaseStateMachine { - static final Logger LOG = LoggerFactory.getLogger( - ContainerStateMachine.class); - private final SimpleStateMachineStorage storage - = new SimpleStateMachineStorage(); - private final ContainerDispatcher dispatcher; - private ThreadPoolExecutor chunkExecutor; - private final XceiverServerRatis ratisServer; - private final ConcurrentHashMap<Long, CompletableFuture<Message>> - writeChunkFutureMap; - private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap; - /** - * CSM metrics. - */ - private final CSMMetrics metrics; - - public ContainerStateMachine(ContainerDispatcher dispatcher, - ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer) { - this.dispatcher = dispatcher; - this.chunkExecutor = chunkExecutor; - this.ratisServer = ratisServer; - this.writeChunkFutureMap = new ConcurrentHashMap<>(); - this.stateMachineMap = new ConcurrentHashMap<>(); - metrics = CSMMetrics.create(); - } - - @Override - public StateMachineStorage getStateMachineStorage() { - return storage; - } - - public CSMMetrics getMetrics() { - return metrics; - } - - @Override - public void initialize( - RaftServer server, RaftGroupId id, RaftStorage raftStorage) - throws IOException { - super.initialize(server, id, raftStorage); - storage.init(raftStorage); - // TODO handle snapshots - - // TODO: Add a flag that tells you that initialize has been called. - // Check with Ratis if this feature is done in Ratis. - } - - @Override - public TransactionContext startTransaction(RaftClientRequest request) - throws IOException { - final ContainerCommandRequestProto proto = - getRequestProto(request.getMessage().getContent()); - - final SMLogEntryProto log; - if (proto.getCmdType() == Type.WriteChunk) { - final WriteChunkRequestProto write = proto.getWriteChunk(); - // create the state machine data proto - final WriteChunkRequestProto dataWriteChunkProto = - WriteChunkRequestProto - .newBuilder(write) - .setStage(Stage.WRITE_DATA) - .build(); - ContainerCommandRequestProto dataContainerCommandProto = - ContainerCommandRequestProto - .newBuilder(proto) - .setWriteChunk(dataWriteChunkProto) - .build(); - - // create the log entry proto - final WriteChunkRequestProto commitWriteChunkProto = - WriteChunkRequestProto.newBuilder() - .setBlockID(write.getBlockID()) - .setChunkData(write.getChunkData()) - // skipping the data field as it is - // already set in statemachine data proto - .setStage(Stage.COMMIT_DATA) - .build(); - ContainerCommandRequestProto commitContainerCommandProto = - ContainerCommandRequestProto - .newBuilder(proto) - .setWriteChunk(commitWriteChunkProto) - .build(); - - log = SMLogEntryProto.newBuilder() - .setData(commitContainerCommandProto.toByteString()) - .setStateMachineData(dataContainerCommandProto.toByteString()) - .build(); - } else if (proto.getCmdType() == Type.CreateContainer) { - log = SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .setStateMachineData(request.getMessage().getContent()) - .build(); - } else { - log = SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .build(); - } - return new TransactionContextImpl(this, request, log); - } - - private ContainerCommandRequestProto getRequestProto(ByteString request) - throws InvalidProtocolBufferException { - return ContainerCommandRequestProto.parseFrom(request); - } - - private ContainerCommandResponseProto dispatchCommand( - ContainerCommandRequestProto requestProto) { - LOG.trace("dispatch {}", requestProto); - ContainerCommandResponseProto response = dispatcher.dispatch(requestProto); - LOG.trace("response {}", response); - return response; - } - - private Message runCommand(ContainerCommandRequestProto requestProto) { - return dispatchCommand(requestProto)::toByteString; - } - - /* - * writeStateMachineData calls are not synchronized with each other - * and also with applyTransaction. - */ - @Override - public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) { - try { - metrics.incNumWriteStateMachineOps(); - final ContainerCommandRequestProto requestProto = - getRequestProto(entry.getSmLogEntry().getStateMachineData()); - Type cmdType = requestProto.getCmdType(); - long containerId = requestProto.getContainerID(); - stateMachineMap - .computeIfAbsent(containerId, k -> new StateMachineHelper()); - CompletableFuture<Message> stateMachineFuture = - stateMachineMap.get(containerId) - .handleStateMachineData(requestProto, entry.getIndex()); - if (stateMachineFuture == null) { - throw new IllegalStateException( - "Cmd Type:" + cmdType + " should not have state machine data"); - } - return stateMachineFuture; - } catch (IOException e) { - metrics.incNumWriteStateMachineFails(); - return completeExceptionally(e); - } - } - - @Override - public CompletableFuture<Message> query(Message request) { - try { - metrics.incNumReadStateMachineOps(); - final ContainerCommandRequestProto requestProto = - getRequestProto(request.getContent()); - return CompletableFuture.completedFuture(runCommand(requestProto)); - } catch (IOException e) { - metrics.incNumReadStateMachineFails(); - return completeExceptionally(e); - } - } - - private LogEntryProto readStateMachineData(LogEntryProto entry, - ContainerCommandRequestProto requestProto) { - WriteChunkRequestProto writeChunkRequestProto = - requestProto.getWriteChunk(); - // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is - // written through writeStateMachineData. - Preconditions.checkArgument(writeChunkRequestProto.getStage() - == Stage.COMMIT_DATA); - - // prepare the chunk to be read - ReadChunkRequestProto.Builder readChunkRequestProto = - ReadChunkRequestProto.newBuilder() - .setBlockID(writeChunkRequestProto.getBlockID()) - .setChunkData(writeChunkRequestProto.getChunkData()); - ContainerCommandRequestProto dataContainerCommandProto = - ContainerCommandRequestProto.newBuilder(requestProto) - .setCmdType(Type.ReadChunk) - .setReadChunk(readChunkRequestProto) - .build(); - - // read the chunk - ContainerCommandResponseProto response = - dispatchCommand(dataContainerCommandProto); - ReadChunkResponseProto responseProto = response.getReadChunk(); - - // assert that the response has data in it. - Preconditions.checkNotNull(responseProto.getData()); - - // reconstruct the write chunk request - final WriteChunkRequestProto.Builder dataWriteChunkProto = - WriteChunkRequestProto.newBuilder(writeChunkRequestProto) - // adding the state machine data - .setData(responseProto.getData()) - .setStage(Stage.WRITE_DATA); - - ContainerCommandRequestProto.Builder newStateMachineProto = - ContainerCommandRequestProto.newBuilder(requestProto) - .setWriteChunk(dataWriteChunkProto); - - return recreateLogEntryProto(entry, - newStateMachineProto.build().toByteString()); - } - - private LogEntryProto recreateLogEntryProto(LogEntryProto entry, - ByteString stateMachineData) { - // recreate the log entry - final SMLogEntryProto log = - SMLogEntryProto.newBuilder(entry.getSmLogEntry()) - .setStateMachineData(stateMachineData) - .build(); - return LogEntryProto.newBuilder(entry).setSmLogEntry(log).build(); - } - - /** - * Returns the combined future of all the writeChunks till the given log - * index. The Raft log worker will wait for the stateMachineData to complete - * flush as well. - * - * @param index log index till which the stateMachine data needs to be flushed - * @return Combined future of all writeChunks till the log index given. - */ - @Override - public CompletableFuture<Void> flushStateMachineData(long index) { - List<CompletableFuture<Message>> futureList = - writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index) - .map(x -> x.getValue()).collect(Collectors.toList()); - CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( - futureList.toArray(new CompletableFuture[futureList.size()])); - return combinedFuture; - } - /* - * This api is used by the leader while appending logs to the follower - * This allows the leader to read the state machine data from the - * state machine implementation in case cached state machine data has been - * evicted. - */ - @Override - public CompletableFuture<LogEntryProto> readStateMachineData( - LogEntryProto entry) { - SMLogEntryProto smLogEntryProto = entry.getSmLogEntry(); - if (!smLogEntryProto.getStateMachineData().isEmpty()) { - return CompletableFuture.completedFuture(entry); - } - - try { - final ContainerCommandRequestProto requestProto = - getRequestProto(entry.getSmLogEntry().getData()); - // readStateMachineData should only be called for "write" to Ratis. - Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto)); - - if (requestProto.getCmdType() == Type.WriteChunk) { - return CompletableFuture.supplyAsync(() -> - readStateMachineData(entry, requestProto), - chunkExecutor); - } else if (requestProto.getCmdType() == Type.CreateContainer) { - LogEntryProto log = - recreateLogEntryProto(entry, requestProto.toByteString()); - return CompletableFuture.completedFuture(log); - } else { - throw new IllegalStateException("Cmd type:" + requestProto.getCmdType() - + " cannot have state machine data"); - } - } catch (Exception e) { - LOG.error("unable to read stateMachineData:" + e); - return completeExceptionally(e); - } - } - - /* - * ApplyTransaction calls in Ratis are sequential. - */ - @Override - public CompletableFuture<Message> applyTransaction(TransactionContext trx) { - try { - metrics.incNumApplyTransactionsOps(); - ContainerCommandRequestProto requestProto = - getRequestProto(trx.getSMLogEntry().getData()); - Preconditions.checkState(!HddsUtils.isReadOnly(requestProto)); - stateMachineMap.computeIfAbsent(requestProto.getContainerID(), - k -> new StateMachineHelper()); - long index = - trx.getLogEntry() == null ? -1 : trx.getLogEntry().getIndex(); - return stateMachineMap.get(requestProto.getContainerID()) - .executeContainerCommand(requestProto, index); - } catch (IOException e) { - metrics.incNumApplyTransactionsFails(); - return completeExceptionally(e); - } - } - - private static <T> CompletableFuture<T> completeExceptionally(Exception e) { - final CompletableFuture<T> future = new CompletableFuture<>(); - future.completeExceptionally(e); - return future; - } - - @Override - public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) { - ratisServer.handleNodeSlowness(group, roleInfoProto); - } - - @Override - public void notifyExtendedNoLeader(RaftGroup group, - RoleInfoProto roleInfoProto) { - ratisServer.handleNoLeader(group, roleInfoProto); - } - - @Override - public void close() throws IOException { - } - - /** - * Class to manage the future tasks for writeChunks. - */ - static class CommitChunkFutureMap { - private final ConcurrentHashMap<Long, CompletableFuture<Message>> - block2ChunkMap = new ConcurrentHashMap<>(); - - synchronized int removeAndGetSize(long index) { - block2ChunkMap.remove(index); - return block2ChunkMap.size(); - } - - synchronized CompletableFuture<Message> add(long index, - CompletableFuture<Message> future) { - return block2ChunkMap.put(index, future); - } - - synchronized List<CompletableFuture<Message>> getAll() { - return new ArrayList<>(block2ChunkMap.values()); - } - } - - /** - * This class maintains maps and provide utilities to enforce synchronization - * among createContainer, writeChunk, putBlock and closeContainer. - */ - private class StateMachineHelper { - - private CompletableFuture<Message> createContainerFuture; - - // Map for maintaining all writeChunk futures mapped to blockId - private final ConcurrentHashMap<Long, CommitChunkFutureMap> - block2ChunkMap; - - // Map for putBlock futures - private final ConcurrentHashMap<Long, CompletableFuture<Message>> - blockCommitMap; - - StateMachineHelper() { - createContainerFuture = null; - block2ChunkMap = new ConcurrentHashMap<>(); - blockCommitMap = new ConcurrentHashMap<>(); - } - - // The following section handles writeStateMachineData transactions - // on a container - - // enqueue the create container future during writeStateMachineData - // so that the write stateMachine data phase of writeChunk wait on - // create container to finish. - private CompletableFuture<Message> handleCreateContainer() { - createContainerFuture = new CompletableFuture<>(); - return CompletableFuture.completedFuture(() -> ByteString.EMPTY); - } - - // This synchronizes on create container to finish - private CompletableFuture<Message> handleWriteChunk( - ContainerCommandRequestProto requestProto, long entryIndex) { - CompletableFuture<Message> containerOpFuture; - - if (createContainerFuture != null) { - containerOpFuture = createContainerFuture - .thenApplyAsync(v -> runCommand(requestProto), chunkExecutor); - } else { - containerOpFuture = CompletableFuture - .supplyAsync(() -> runCommand(requestProto), chunkExecutor); - } - writeChunkFutureMap.put(entryIndex, containerOpFuture); - return containerOpFuture; - } - - CompletableFuture<Message> handleStateMachineData( - final ContainerCommandRequestProto requestProto, long index) { - Type cmdType = requestProto.getCmdType(); - if (cmdType == Type.CreateContainer) { - return handleCreateContainer(); - } else if (cmdType == Type.WriteChunk) { - return handleWriteChunk(requestProto, index); - } else { - return null; - } - } - - // The following section handles applyTransaction transactions - // on a container - - private CompletableFuture<Message> handlePutBlock( - ContainerCommandRequestProto requestProto) { - List<CompletableFuture<Message>> futureList = new ArrayList<>(); - long localId = - requestProto.getPutBlock().getBlockData().getBlockID().getLocalID(); - // Need not wait for create container future here as it has already - // finished. - if (block2ChunkMap.get(localId) != null) { - futureList.addAll(block2ChunkMap.get(localId).getAll()); - } - CompletableFuture<Message> effectiveFuture = - runCommandAfterFutures(futureList, requestProto); - - CompletableFuture<Message> putBlockFuture = - effectiveFuture.thenApply(message -> { - blockCommitMap.remove(localId); - return message; - }); - blockCommitMap.put(localId, putBlockFuture); - return putBlockFuture; - } - - // Close Container should be executed only if all pending WriteType - // container cmds get executed. Transactions which can return a future - // are WriteChunk and PutBlock. - private CompletableFuture<Message> handleCloseContainer( - ContainerCommandRequestProto requestProto) { - List<CompletableFuture<Message>> futureList = new ArrayList<>(); - - // No need to wait for create container future here as it should have - // already finished. - block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll())); - futureList.addAll(blockCommitMap.values()); - - // There are pending write Chunk/PutBlock type requests - // Queue this closeContainer request behind all these requests - CompletableFuture<Message> closeContainerFuture = - runCommandAfterFutures(futureList, requestProto); - - return closeContainerFuture.thenApply(message -> { - stateMachineMap.remove(requestProto.getContainerID()); - return message; - }); - } - - private CompletableFuture<Message> handleChunkCommit( - ContainerCommandRequestProto requestProto, long index) { - WriteChunkRequestProto write = requestProto.getWriteChunk(); - // the data field has already been removed in start Transaction - Preconditions.checkArgument(!write.hasData()); - CompletableFuture<Message> stateMachineFuture = - writeChunkFutureMap.remove(index); - CompletableFuture<Message> commitChunkFuture = stateMachineFuture - .thenComposeAsync(v -> CompletableFuture - .completedFuture(runCommand(requestProto))); - - long localId = requestProto.getWriteChunk().getBlockID().getLocalID(); - // Put the applyTransaction Future again to the Map. - // closeContainer should synchronize with this. - block2ChunkMap - .computeIfAbsent(localId, id -> new CommitChunkFutureMap()) - .add(index, commitChunkFuture); - return commitChunkFuture.thenApply(message -> { - block2ChunkMap.computeIfPresent(localId, (containerId, chunks) - -> chunks.removeAndGetSize(index) == 0? null: chunks); - return message; - }); - } - - private CompletableFuture<Message> runCommandAfterFutures( - List<CompletableFuture<Message>> futureList, - ContainerCommandRequestProto requestProto) { - CompletableFuture<Message> effectiveFuture; - if (futureList.isEmpty()) { - effectiveFuture = CompletableFuture - .supplyAsync(() -> runCommand(requestProto)); - - } else { - CompletableFuture<Void> allFuture = CompletableFuture.allOf( - futureList.toArray(new CompletableFuture[futureList.size()])); - effectiveFuture = allFuture - .thenApplyAsync(v -> runCommand(requestProto)); - } - return effectiveFuture; - } - - CompletableFuture<Message> handleCreateContainer( - ContainerCommandRequestProto requestProto) { - CompletableFuture<Message> future = - CompletableFuture.completedFuture(runCommand(requestProto)); - future.thenAccept(m -> { - createContainerFuture.complete(m); - createContainerFuture = null; - }); - return future; - } - - CompletableFuture<Message> handleOtherCommands( - ContainerCommandRequestProto requestProto) { - return CompletableFuture.completedFuture(runCommand(requestProto)); - } - - CompletableFuture<Message> executeContainerCommand( - ContainerCommandRequestProto requestProto, long index) { - Type cmdType = requestProto.getCmdType(); - switch (cmdType) { - case WriteChunk: - return handleChunkCommit(requestProto, index); - case CloseContainer: - return handleCloseContainer(requestProto); - case PutBlock: - return handlePutBlock(requestProto); - case CreateContainer: - return handleCreateContainer(requestProto); - default: - return handleOtherCommands(requestProto); - } - } - } - - @VisibleForTesting - public ConcurrentHashMap<Long, StateMachineHelper> getStateMachineMap() { - return stateMachineMap; - } - - @VisibleForTesting - public CompletableFuture<Message> getCreateContainerFuture(long containerId) { - StateMachineHelper helper = stateMachineMap.get(containerId); - return helper == null ? null : helper.createContainerFuture; - } - - @VisibleForTesting - public List<CompletableFuture<Message>> getCommitChunkFutureMap( - long containerId) { - StateMachineHelper helper = stateMachineMap.get(containerId); - if (helper != null) { - List<CompletableFuture<Message>> futureList = new ArrayList<>(); - stateMachineMap.get(containerId).block2ChunkMap.values() - .forEach(b -> futureList.addAll(b.getAll())); - return futureList; - } - return null; - } - - @VisibleForTesting - public Collection<CompletableFuture<Message>> getWriteChunkFutureMap() { - return writeChunkFutureMap.values(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java deleted file mode 100644 index c2ef504..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ /dev/null @@ -1,429 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.transport.server.ratis; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineAction; -import org.apache.hadoop.hdds.scm.HddsServerUtil; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.common.transport.server - .XceiverServerSpi; -import org.apache.ratis.RaftConfigKeys; -import org.apache.ratis.RatisHelper; -import org.apache.ratis.client.RaftClientConfigKeys; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.netty.NettyConfigKeys; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.NotLeaderException; -import org.apache.ratis.protocol.StateMachineException; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.rpc.RpcType; -import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.shaded.proto.RaftProtos; -import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto; -import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; -import org.apache.ratis.util.SizeInBytes; -import org.apache.ratis.util.TimeDuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Creates a ratis server endpoint that acts as the communication layer for - * Ozone containers. - */ -public final class XceiverServerRatis implements XceiverServerSpi { - static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class); - private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); - - private static long nextCallId() { - return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; - } - - private final int port; - private final RaftServer server; - private ThreadPoolExecutor chunkExecutor; - private ClientId clientId = ClientId.randomId(); - private final StateContext context; - private final ReplicationLevel replicationLevel; - private long nodeFailureTimeoutMs; - - private XceiverServerRatis(DatanodeDetails dd, int port, - ContainerDispatcher dispatcher, Configuration conf, StateContext context) - throws IOException { - Objects.requireNonNull(dd, "id == null"); - this.port = port; - RaftProperties serverProperties = newRaftProperties(conf); - final int numWriteChunkThreads = conf.getInt( - OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, - OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT); - chunkExecutor = - new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, - 100, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(1024), - new ThreadPoolExecutor.CallerRunsPolicy()); - this.context = context; - this.replicationLevel = - conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY, - OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT); - ContainerStateMachine stateMachine = - new ContainerStateMachine(dispatcher, chunkExecutor, this); - this.server = RaftServer.newBuilder() - .setServerId(RatisHelper.toRaftPeerId(dd)) - .setProperties(serverProperties) - .setStateMachine(stateMachine) - .build(); - } - - - private RaftProperties newRaftProperties(Configuration conf) { - final RaftProperties properties = new RaftProperties(); - - // Set rpc type - final String rpcType = conf.get( - OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); - RaftConfigKeys.Rpc.setType(properties, rpc); - - // set raft segment size - final int raftSegmentSize = conf.getInt( - OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY, - OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT); - RaftServerConfigKeys.Log.setSegmentSizeMax(properties, - SizeInBytes.valueOf(raftSegmentSize)); - - // set raft segment pre-allocated size - final int raftSegmentPreallocatedSize = conf.getInt( - OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, - OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT); - RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties, - SizeInBytes.valueOf(raftSegmentPreallocatedSize)); - RaftServerConfigKeys.Log.setPreallocatedSize(properties, - SizeInBytes.valueOf(raftSegmentPreallocatedSize)); - - // Set max write buffer size, which is the scm chunk size - final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE; - RaftServerConfigKeys.Log.setWriteBufferSize(properties, - SizeInBytes.valueOf(maxChunkSize)); - - // Set the client requestTimeout - TimeUnit timeUnit = - OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT - .getUnit(); - long duration = conf.getTimeDuration( - OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY, - OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT - .getDuration(), timeUnit); - final TimeDuration clientRequestTimeout = - TimeDuration.valueOf(duration, timeUnit); - RaftClientConfigKeys.Rpc - .setRequestTimeout(properties, clientRequestTimeout); - - // Set the server Request timeout - timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT - .getUnit(); - duration = conf.getTimeDuration( - OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY, - OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT - .getDuration(), timeUnit); - final TimeDuration serverRequestTimeout = - TimeDuration.valueOf(duration, timeUnit); - RaftServerConfigKeys.Rpc - .setRequestTimeout(properties, serverRequestTimeout); - - // set timeout for a retry cache entry - timeUnit = - OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_DEFAULT - .getUnit(); - duration = conf.getTimeDuration( - OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY, - OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_DEFAULT - .getDuration(), timeUnit); - final TimeDuration retryCacheTimeout = - TimeDuration.valueOf(duration, timeUnit); - RaftServerConfigKeys.RetryCache - .setExpiryTime(properties, retryCacheTimeout); - - // Set the ratis leader election timeout - TimeUnit leaderElectionMinTimeoutUnit = - OzoneConfigKeys. - DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT - .getUnit(); - duration = conf.getTimeDuration( - OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, - OzoneConfigKeys. - DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT - .getDuration(), leaderElectionMinTimeoutUnit); - final TimeDuration leaderElectionMinTimeout = - TimeDuration.valueOf(duration, leaderElectionMinTimeoutUnit); - RaftServerConfigKeys.Rpc - .setTimeoutMin(properties, leaderElectionMinTimeout); - long leaderElectionMaxTimeout = - leaderElectionMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200; - RaftServerConfigKeys.Rpc.setTimeoutMax(properties, - TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS)); - // Enable batch append on raft server - RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true); - - // Set the maximum cache segments - RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2); - - // set the node failure timeout - timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT - .getUnit(); - duration = conf.getTimeDuration( - OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY, - OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT - .getDuration(), timeUnit); - final TimeDuration nodeFailureTimeout = - TimeDuration.valueOf(duration, timeUnit); - RaftServerConfigKeys.setLeaderElectionTimeout(properties, - nodeFailureTimeout); - RaftServerConfigKeys.Rpc.setSlownessTimeout(properties, - nodeFailureTimeout); - nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS); - - // Set the ratis storage directory - String storageDir = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf); - RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); - - // For grpc set the maximum message size - GrpcConfigKeys.setMessageSizeMax(properties, - SizeInBytes.valueOf(maxChunkSize + raftSegmentPreallocatedSize)); - - // Set the ratis port number - if (rpc == SupportedRpcType.GRPC) { - GrpcConfigKeys.Server.setPort(properties, port); - } else if (rpc == SupportedRpcType.NETTY) { - NettyConfigKeys.Server.setPort(properties, port); - } - return properties; - } - - public static XceiverServerRatis newXceiverServerRatis( - DatanodeDetails datanodeDetails, Configuration ozoneConf, - ContainerDispatcher dispatcher, StateContext context) throws IOException { - int localPort = ozoneConf.getInt( - OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, - OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT); - - // Get an available port on current node and - // use that as the container port - if (ozoneConf.getBoolean(OzoneConfigKeys - .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, - OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) { - try (ServerSocket socket = new ServerSocket()) { - socket.setReuseAddress(true); - SocketAddress address = new InetSocketAddress(0); - socket.bind(address); - localPort = socket.getLocalPort(); - LOG.info("Found a free port for the server : {}", localPort); - } catch (IOException e) { - LOG.error("Unable find a random free port for the server, " - + "fallback to use default port {}", localPort, e); - } - } - datanodeDetails.setPort( - DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort)); - return new XceiverServerRatis(datanodeDetails, localPort, - dispatcher, ozoneConf, context); - } - - @Override - public void start() throws IOException { - LOG.info("Starting {} {} at port {}", getClass().getSimpleName(), - server.getId(), getIPCPort()); - chunkExecutor.prestartAllCoreThreads(); - server.start(); - } - - @Override - public void stop() { - try { - chunkExecutor.shutdown(); - server.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public int getIPCPort() { - return port; - } - - /** - * Returns the Replication type supported by this end-point. - * - * @return enum -- {Stand_Alone, Ratis, Chained} - */ - @Override - public HddsProtos.ReplicationType getServerType() { - return HddsProtos.ReplicationType.RATIS; - } - - @VisibleForTesting - public RaftServer getServer() { - return server; - } - - private void processReply(RaftClientReply reply) throws IOException { - // NotLeader exception is thrown only when the raft server to which the - // request is submitted is not the leader. The request will be rejected - // and will eventually be executed once the request comes via the leader - // node. - NotLeaderException notLeaderException = reply.getNotLeaderException(); - if (notLeaderException != null) { - throw notLeaderException; - } - StateMachineException stateMachineException = - reply.getStateMachineException(); - if (stateMachineException != null) { - throw stateMachineException; - } - } - - @Override - public void submitRequest(ContainerCommandRequestProto request, - HddsProtos.PipelineID pipelineID) throws IOException { - RaftClientReply reply; - RaftClientRequest raftClientRequest = - createRaftClientRequest(request, pipelineID, - RaftClientRequest.writeRequestType(replicationLevel)); - try { - reply = server.submitClientRequestAsync(raftClientRequest).get(); - } catch (Exception e) { - throw new IOException(e.getMessage(), e); - } - processReply(reply); - } - - private RaftClientRequest createRaftClientRequest( - ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID, - RaftClientRequest.Type type) { - return new RaftClientRequest(clientId, server.getId(), - PipelineID.getFromProtobuf(pipelineID).getRaftGroupID(), - nextCallId(), 0, Message.valueOf(request.toByteString()), type); - } - - private void handlePipelineFailure(RaftGroupId groupId, - RoleInfoProto roleInfoProto) { - String msg; - UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf()); - RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId()); - switch (roleInfoProto.getRole()) { - case CANDIDATE: - msg = datanode + " is in candidate state for " + - roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms"; - break; - case LEADER: - StringBuilder sb = new StringBuilder(); - sb.append(datanode).append(" has not seen follower/s"); - for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo() - .getFollowerInfoList()) { - if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) { - sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId())) - .append(" for ").append(follower.getLastRpcElapsedTimeMs()) - .append("ms"); - } - } - msg = sb.toString(); - break; - default: - LOG.error("unknown state:" + roleInfoProto.getRole()); - throw new IllegalStateException("node" + id + " is in illegal role " - + roleInfoProto.getRole()); - } - - PipelineID pipelineID = PipelineID.valueOf(groupId); - ClosePipelineInfo.Builder closePipelineInfo = - ClosePipelineInfo.newBuilder() - .setPipelineID(pipelineID.getProtobuf()) - .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED) - .setDetailedReason(msg); - - PipelineAction action = PipelineAction.newBuilder() - .setClosePipeline(closePipelineInfo) - .setAction(PipelineAction.Action.CLOSE) - .build(); - context.addPipelineActionIfAbsent(action); - LOG.debug( - "pipeline Action " + action.getAction() + " on pipeline " + pipelineID - + ".Reason : " + action.getClosePipeline().getDetailedReason()); - } - - @Override - public List<PipelineReport> getPipelineReport() { - try { - Iterable<RaftGroupId> gids = server.getGroupIds(); - List<PipelineReport> reports = new ArrayList<>(); - for (RaftGroupId groupId : gids) { - reports.add(PipelineReport.newBuilder() - .setPipelineID(PipelineID.valueOf(groupId).getProtobuf()) - .build()); - } - return reports; - } catch (Exception e) { - return null; - } - } - - void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) { - handlePipelineFailure(group.getGroupId(), roleInfoProto); - } - - void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) { - handlePipelineFailure(group.getGroupId(), roleInfoProto); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java deleted file mode 100644 index 8debfe0..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.transport.server.ratis; - -/** - * This package contains classes for the server implementation - * using Apache Ratis - */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
