Repository: hadoop Updated Branches: refs/heads/HDFS-7240 23eba1540 -> d10f39e75
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index cf1e9bd..2565a04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -64,8 +64,8 @@ public class OzoneContainer { Configuration ozoneConfig, FsDatasetSpi<? extends FsVolumeSpi> dataSet) throws Exception { List<StorageLocation> locations = new LinkedList<>(); - String[] paths = ozoneConfig.getStrings(OzoneConfigKeys - .OZONE_METADATA_DIRS); + String[] paths = ozoneConfig.getStrings( + OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS); if (paths != null && paths.length > 0) { for (String p : paths) { locations.add(StorageLocation.parse(p)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java new file mode 100644 index 0000000..86ca946 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + +import java.io.IOException; + +/** + * The protocol spoken between datanodes and SCM. For specifics please the + * Protoc file that defines this protocol. + */ [email protected] +public interface StorageContainerDatanodeProtocol { + /** + * Returns SCM version. + * @return Version info. + */ + SCMVersionResponseProto getVersion() throws IOException; + + /** + * Used by data node to send a Heartbeat. + * @param datanodeID - Datanode ID. + * @return - SCMHeartbeatResponseProto + * @throws IOException + */ + SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID) + throws IOException; + + /** + * Register Datanode. + * @param datanodeID - DatanodID. + * @param scmAddresses - List of SCMs this datanode is configured to + * communicate. + * @return SCM Command. + */ + SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, + String[] scmAddresses) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java index a44bad5..4bdf422 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.ozone.protocol.commands; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java index e8cc660..f2944ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java @@ -18,15 +18,19 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.common.base.Preconditions; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + .ErrorCode; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type; /** * Response to Datanode Register call. */ -public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> { - +public class RegisteredCommand extends + SCMCommand<SCMRegisteredCmdResponseProto> { private String datanodeUUID; private String clusterID; private ErrorCode error; @@ -38,8 +42,6 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> { this.error = error; } - - /** * Returns a new builder. * @@ -56,11 +58,12 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> { */ @Override Type getType() { - return Type.registeredCmd; + return Type.registeredCommand; } /** * Returns datanode UUID. + * * @return - Datanode ID. */ public String getDatanodeUUID() { @@ -69,6 +72,7 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> { /** * Returns cluster ID. + * * @return -- ClusterID */ public String getClusterID() { @@ -77,6 +81,7 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> { /** * Returns ErrorCode. + * * @return - ErrorCode */ public ErrorCode getError() { @@ -89,8 +94,8 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> { * @return A protobuf message. */ @Override - RegisteredCmdResponseProto getProtoBufMessage() { - return RegisteredCmdResponseProto.newBuilder() + SCMRegisteredCmdResponseProto getProtoBufMessage() { + return SCMRegisteredCmdResponseProto.newBuilder() .setClusterID(this.clusterID) .setDatanodeUUID(this.datanodeUUID) .setErrorCode(this.error) @@ -122,7 +127,7 @@ public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> { * @param response - RegisteredCmdResponseProto * @return RegisteredCommand */ - public RegisteredCommand getFromProtobuf(RegisteredCmdResponseProto + public RegisteredCommand getFromProtobuf(SCMRegisteredCmdResponseProto response) { Preconditions.checkNotNull(response); return new RegisteredCommand(response.getErrorCode(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java index d0e3e85..a6acf4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.protocol.commands; import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type; + .StorageContainerDatanodeProtocolProtos.Type; import com.google.protobuf.GeneratedMessage; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java new file mode 100644 index 0000000..e71684c3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + +import java.io.Closeable; +import java.io.IOException; + +/** + * This class is the client-side translator to translate the requests made on + * the {@link StorageContainerDatanodeProtocol} interface to the RPC server + * implementing {@link StorageContainerDatanodeProtocolPB}. + */ +public class StorageContainerDatanodeProtocolClientSideTranslatorPB + implements StorageContainerDatanodeProtocol, ProtocolTranslator, Closeable { + + /** + * RpcController is not used and hence is set to null. + */ + private static final RpcController NULL_RPC_CONTROLLER = null; + private final StorageContainerDatanodeProtocolPB rpcProxy; + + /** + * Constructs a Client side interface that calls into SCM datanode protocol. + * + * @param rpcProxy - Proxy for RPC. + */ + public StorageContainerDatanodeProtocolClientSideTranslatorPB( + StorageContainerDatanodeProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } + + /** + * Closes this stream and releases any system resources associated with it. If + * the stream is already closed then invoking this method has no effect. + * <p> + * <p> As noted in {@link AutoCloseable#close()}, cases where the close may + * fail require careful attention. It is strongly advised to relinquish the + * underlying resources and to internally <em>mark</em> the {@code Closeable} + * as closed, prior to throwing the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + RPC.stopProxy(rpcProxy); + } + + /** + * Return the proxy object underlying this protocol translator. + * + * @return the proxy object underlying this protocol translator. + */ + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + /** + * Returns SCM version. + * + * @return Version info. + */ + @Override + public SCMVersionResponseProto getVersion() throws IOException { + + SCMVersionRequestProto request = + SCMVersionRequestProto.newBuilder().build(); + final SCMVersionResponseProto response; + try { + response = rpcProxy.getVersion(NULL_RPC_CONTROLLER, request); + } catch (ServiceException ex) { + throw ProtobufHelper.getRemoteException(ex); + } + return response; + } + + /** + * Send by datanode to SCM. + * + * @param datanodeID - DatanodeID + * @throws IOException + */ + + @Override + public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID) + throws IOException { + SCMHeartbeatRequestProto.Builder req = + SCMHeartbeatRequestProto.newBuilder(); + req.setDatanodeID(datanodeID.getProtoBufMessage()); + final SCMHeartbeatResponseProto resp; + try { + resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return resp; + } + + /** + * Register Datanode. + * + * @param datanodeID - DatanodID. + * @return SCM Command. + */ + @Override + public SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, + String[] scmAddresses) throws IOException { + SCMRegisterRequestProto.Builder req = + SCMRegisterRequestProto.newBuilder(); + req.setDatanodeID(datanodeID.getProtoBufMessage()); + final SCMRegisteredCmdResponseProto response; + try { + response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return response; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java new file mode 100644 index 0000000..3e6f392 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService; + +/** + * Protocol used from a datanode to StorageContainerManager. This extends + * the Protocol Buffers service interface to add Hadoop-specific annotations. + */ + +@ProtocolInfo(protocolName = + "org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol", + protocolVersion = 1) +public interface StorageContainerDatanodeProtocolPB extends + StorageContainerDatanodeProtocolService.BlockingInterface { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000..70ad414 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; + +import java.io.IOException; + +/** + * This class is the server-side translator that forwards requests received on + * {@link StorageContainerDatanodeProtocolPB} to the {@link + * StorageContainerDatanodeProtocol} server implementation. + */ +public class StorageContainerDatanodeProtocolServerSideTranslatorPB + implements StorageContainerDatanodeProtocolPB { + + private final StorageContainerDatanodeProtocol impl; + + public StorageContainerDatanodeProtocolServerSideTranslatorPB( + StorageContainerDatanodeProtocol impl) { + this.impl = impl; + } + + @Override + public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto + getVersion(RpcController controller, + StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request) + throws ServiceException { + try { + return impl.getVersion(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + register(RpcController controller, StorageContainerDatanodeProtocolProtos + .SCMRegisterRequestProto request) throws ServiceException { + String[] addressArray = null; + + if (request.hasAddressList()) { + addressArray = request.getAddressList().getAddressListList() + .toArray(new String[0]); + } + + try { + return impl.register(DatanodeID.getFromProtoBuf(request + .getDatanodeID()), addressArray); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public SCMHeartbeatResponseProto + sendHeartbeat(RpcController controller, + SCMHeartbeatRequestProto request) throws ServiceException { + try { + return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request + .getDatanodeID())); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java index eb21eb3..6bb3a22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java @@ -26,6 +26,8 @@ public final class VersionInfo { private final static VersionInfo[] VERSION_INFOS = {new VersionInfo("First version of SCM", 1)}; + + public static final String DESCRIPTION_KEY = "Description"; private final String description; private final int version; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index abf7d52..3fe7084 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -29,8 +29,11 @@ import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + .ErrorCode; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.scm.VersionInfo; import org.apache.hadoop.util.concurrent.HadoopExecutors; @@ -43,7 +46,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledExecutorService; @@ -580,21 +582,20 @@ public class SCMNodeManager @Override public SCMCommand register(DatanodeID datanodeID) { - SCMCommand errorCode = verifyDatanodeUUID(datanodeID); - if (errorCode != null) { - return errorCode; + SCMCommand responseCommand = verifyDatanodeUUID(datanodeID); + if (responseCommand != null) { + return responseCommand; } - DatanodeID newDatanodeID = new DatanodeID(UUID.randomUUID().toString(), - datanodeID); - nodes.put(newDatanodeID.getDatanodeUuid(), newDatanodeID); + + nodes.put(datanodeID.getDatanodeUuid(), datanodeID); totalNodes.incrementAndGet(); - healthyNodes.put(newDatanodeID.getDatanodeUuid(), monotonicNow()); + healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); healthyNodeCount.incrementAndGet(); LOG.info("Data node with ID: {} Registered.", - newDatanodeID.getDatanodeUuid()); + datanodeID.getDatanodeUuid()); return RegisteredCommand.newBuilder() .setErrorCode(ErrorCode.success) - .setDatanodeUUID(newDatanodeID.getDatanodeUuid()) + .setDatanodeUUID(datanodeID.getDatanodeUuid()) .setClusterID(this.clusterID) .build(); } @@ -607,20 +608,12 @@ public class SCMNodeManager */ private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) { - // Make sure that we return the right error code, so that - // data node can log the correct error. if it is already registered then - // datanode should move to heartbeat state. It implies that somehow we - // have an error where the data node is trying to re-register. - // - // We are going to let the datanode know that there is an error but allow it - // to recover by sending it the right info that is needed for recovery. - if (datanodeID.getDatanodeUuid() != null && nodes.containsKey(datanodeID.getDatanodeUuid())) { - LOG.error("Datanode is already registered. Datanode: {}", + LOG.trace("Datanode is already registered. Datanode: {}", datanodeID.toString()); return RegisteredCommand.newBuilder() - .setErrorCode(ErrorCode.errorNodeAlreadyRegistered) + .setErrorCode(ErrorCode.success) .setClusterID(this.clusterID) .setDatanodeUUID(datanodeID.getDatanodeUuid()) .build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index 78e7e74..5dea5cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -31,9 +31,13 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; package hadoop.hdfs; + import "hdfs.proto"; + import "HdfsServer.proto"; + import "DatanodeProtocol.proto"; + import "DatanodeContainerProtocol.proto"; @@ -45,6 +49,10 @@ message SCMHeartbeatRequestProto { required DatanodeIDProto datanodeID = 1; } +message SCMRegisterRequestProto { + required DatanodeIDProto datanodeID = 1; + optional SCMNodeAddressList addressList = 2; +} /** * Request for version info of the software stack on the server. @@ -59,24 +67,38 @@ message SCMVersionRequestProto { */ message SCMVersionResponseProto { required uint32 softwareVersion = 1; - repeated hadoop.hdfs.ozone.KeyValue keys = 2; + repeated hadoop.hdfs.ozone.KeyValue keys = 2; +} + +message SCMNodeAddressList { + repeated string addressList = 1; } /** * Datanode ID returned by the SCM. This is similar to name node * registeration of a datanode. */ -message RegisteredCmdResponseProto { +message SCMRegisteredCmdResponseProto { enum ErrorCode { success = 1; - errorNodeAlreadyRegistered = 2; - errorNodeNotPermitted = 3; + errorNodeNotPermitted = 2; } - required ErrorCode errorCode = 1; - optional string datanodeUUID = 2; - optional string clusterID = 3; + required ErrorCode errorCode = 2; + optional string datanodeUUID = 3; + optional string clusterID = 4; + optional SCMNodeAddressList addressList = 5; +} + +/** + * Container ID maintains the container's Identity along with cluster ID + * after the registration is done. + */ +message ContainerNodeIDProto { + required DatanodeIDProto datanodeID = 1; + optional string clusterID = 2; } + /** * Empty Command Response */ @@ -84,18 +106,21 @@ message NullCmdResponseProto { } +/** +Type of commands supported by SCM to datanode protocol. +*/ +enum Type { + nullCmd = 1; + versionCommand = 2; + registeredCommand = 3; +} + /* * These are commands returned by SCM for to the datanode to execute. */ -message SCMCommandResponseProto { - enum Type { - nullCmd = 1; - registeredCmd = 2; // Returns the datanode ID after registeration. - } - - required Type cmdType = 1; // Type of the command - optional NullCmdResponseProto nullCommand = 2; - optional RegisteredCmdResponseProto registerNode = 3; +message SCMCommandResponseProto { + required Type cmdType = 2; // Type of the command + optional NullCmdResponseProto nullCommand = 3; } @@ -160,12 +185,11 @@ message SCMHeartbeatResponseProto { * registered with some SCM. If this file is not found, datanode assumes that * it needs to do a registration. * - * If registration is need datanode moves into REGISTERING_NODE state. It will - * send a register call with datanodeID data structure, but with datanode UUID - * will be set to an empty string. + * If registration is need datanode moves into REGISTER state. It will + * send a register call with datanodeID data structure and presist that info. * - * The response to the command contains the datanode UUID and clusterID. This - * information is persisted by the datanode and moves into heartbeat state. + * The response to the command contains clusterID. This information is + * also persisted by the datanode and moves into heartbeat state. * * Once in the heartbeat state, datanode sends heartbeats and container reports * to SCM and process commands issued by SCM until it is shutdown. @@ -176,12 +200,12 @@ service StorageContainerDatanodeProtocolService { /** * Gets the version information from the SCM. */ - rpc getVersion(SCMVersionRequestProto) returns (SCMVersionResponseProto); + rpc getVersion (SCMVersionRequestProto) returns (SCMVersionResponseProto); /** * Registers a data node with SCM. */ - rpc register(SCMHeartbeatRequestProto) returns (SCMCommandResponseProto); + rpc register (SCMRegisterRequestProto) returns (SCMRegisteredCmdResponseProto); /** * Send heartbeat from datanode to SCM. HB's under SCM looks more http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java new file mode 100644 index 0000000..c09693b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common; + +import com.google.protobuf.BlockingService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.scm.node.SCMNodeManager; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.Random; +import java.util.UUID; + +/** + * Test Endpoint class. + */ +public final class SCMTestUtils { + /** + * Never constructed. + */ + private SCMTestUtils() { + } + + /** + * Starts an RPC server, if configured. + * + * @param conf configuration + * @param addr configured address of RPC server + * @param protocol RPC protocol provided by RPC server + * @param instance RPC protocol implementation instance + * @param handlerCount RPC server handler count + * @return RPC server + * @throws IOException if there is an I/O error while creating RPC server + */ + private static RPC.Server startRpcServer(Configuration conf, + InetSocketAddress addr, Class<?> + protocol, BlockingService instance, int handlerCount) + throws IOException { + RPC.Server rpcServer = new RPC.Builder(conf) + .setProtocol(protocol) + .setInstance(instance) + .setBindAddress(addr.getHostString()) + .setPort(addr.getPort()) + .setNumHandlers(handlerCount) + .setVerbose(false) + .setSecretManager(null) + .build(); + + DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer); + return rpcServer; + } + + /** + * Creates an Endpoint class for testing purpose. + * + * @param conf - Conf + * @param address - InetAddres + * @param rpcTimeout - rpcTimeOut + * @return EndPoint + * @throws Exception + */ + public static EndpointStateMachine createEndpoint(Configuration conf, + InetSocketAddress address, int rpcTimeout) throws Exception { + RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + long version = + RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class); + + StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy( + StorageContainerDatanodeProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), rpcTimeout, + RetryPolicies.TRY_ONCE_THEN_FAIL).getProxy(); + + StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient = + new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy); + return new EndpointStateMachine(address, rpcClient, conf); + } + + /** + * Start Datanode RPC server. + */ + public static RPC.Server startScmRpcServer(Configuration configuration, + StorageContainerDatanodeProtocol server, + InetSocketAddress rpcServerAddresss, int handlerCount) throws + IOException { + RPC.setProtocolEngine(configuration, + StorageContainerDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + + BlockingService scmDatanodeService = + StorageContainerDatanodeProtocolService. + newReflectiveBlockingService( + new StorageContainerDatanodeProtocolServerSideTranslatorPB( + server)); + + RPC.Server scmServer = startRpcServer(configuration, rpcServerAddresss, + StorageContainerDatanodeProtocolPB.class, scmDatanodeService, + handlerCount); + + scmServer.start(); + return scmServer; + } + + public static InetSocketAddress getReuseableAddress() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(true); + int port = socket.getLocalPort(); + String addr = InetAddress.getLoopbackAddress().getHostAddress() + .toString(); + return new InetSocketAddress(addr, port); + } + } + + public static Configuration getConf() { + return new Configuration(); + } + + public static DatanodeID getDatanodeID(SCMNodeManager nodeManager) { + + return getDatanodeID(nodeManager, UUID.randomUUID().toString()); + } + + /** + * Create a new DatanodeID with NodeID set to the string. + * + * @param uuid - node ID, it is generally UUID. + * @return DatanodeID. + */ + public static DatanodeID getDatanodeID(SCMNodeManager nodeManager, String + uuid) { + DatanodeID tempDataNode = getDatanodeID(uuid); + RegisteredCommand command = + (RegisteredCommand) nodeManager.register(tempDataNode); + return new DatanodeID(command.getDatanodeUUID(), tempDataNode); + } + + /** + * Get a datanode ID. + * + * @return DatanodeID + */ + public static DatanodeID getDatanodeID() { + return getDatanodeID(UUID.randomUUID().toString()); + } + + private static DatanodeID getDatanodeID(String uuid) { + Random random = new Random(); + String ipAddress = random.nextInt(256) + "." + + random.nextInt(256) + "." + + random.nextInt(256) + "." + + random.nextInt(256); + + String hostName = uuid; + return new DatanodeID(ipAddress, + hostName, uuid, 0, 0, 0, 0); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java new file mode 100644 index 0000000..ad805a7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common; + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.commands.NullCommand; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.scm.VersionInfo; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * SCM RPC mock class. + */ +public class ScmTestMock implements StorageContainerDatanodeProtocol { + private int rpcResponseDelay; + private AtomicInteger heartbeatCount = new AtomicInteger(0); + private AtomicInteger rpcCount = new AtomicInteger(0); + + /** + * Returns the number of heartbeats made to this class. + * + * @return int + */ + public int getHeartbeatCount() { + return heartbeatCount.get(); + } + + /** + * Returns the number of RPC calls made to this mock class instance. + * + * @return - Number of RPC calls serviced by this class. + */ + public int getRpcCount() { + return rpcCount.get(); + } + + /** + * Gets the RPC response delay. + * + * @return delay in milliseconds. + */ + public int getRpcResponseDelay() { + return rpcResponseDelay; + } + + /** + * Sets the RPC response delay. + * + * @param rpcResponseDelay - delay in milliseconds. + */ + public void setRpcResponseDelay(int rpcResponseDelay) { + this.rpcResponseDelay = rpcResponseDelay; + } + + /** + * Returns SCM version. + * + * @return Version info. + */ + @Override + public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto + getVersion() throws IOException { + rpcCount.incrementAndGet(); + sleepIfNeeded(); + VersionInfo versionInfo = VersionInfo.getLatestVersion(); + return VersionResponse.newBuilder() + .setVersion(versionInfo.getVersion()) + .addValue(VersionInfo.DESCRIPTION_KEY, versionInfo.getDescription()) + .build().getProtobufMessage(); + } + + private void sleepIfNeeded() { + if (getRpcResponseDelay() > 0) { + try { + Thread.sleep(getRpcResponseDelay()); + } catch (InterruptedException ex) { + // Just ignore this exception. + } + } + } + + /** + * Used by data node to send a Heartbeat. + * + * @param datanodeID - Datanode ID. + * @return - SCMHeartbeatResponseProto + * @throws IOException + */ + @Override + public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto + sendHeartbeat(DatanodeID datanodeID) + throws IOException { + rpcCount.incrementAndGet(); + heartbeatCount.incrementAndGet(); + sleepIfNeeded(); + StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto + cmdResponse = StorageContainerDatanodeProtocolProtos + .SCMCommandResponseProto + .newBuilder().setCmdType(StorageContainerDatanodeProtocolProtos + .Type.nullCmd) + .setNullCommand( + NullCommand.newBuilder().build().getProtoBufMessage()).build(); + return StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto + .newBuilder() + .addCommands(cmdResponse).build(); + } + + /** + * Register Datanode. + * + * @param datanodeID - DatanodID. + * @param scmAddresses - List of SCMs this datanode is configured to + * communicate. + * @return SCM Command. + */ + @Override + public StorageContainerDatanodeProtocolProtos + .SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, + String[] scmAddresses) throws IOException { + rpcCount.incrementAndGet(); + sleepIfNeeded(); + return StorageContainerDatanodeProtocolProtos + .SCMRegisteredCmdResponseProto + .newBuilder().setClusterID(UUID.randomUUID().toString()) + .setDatanodeUUID(datanodeID.getDatanodeUuid()).setErrorCode( + StorageContainerDatanodeProtocolProtos + .SCMRegisteredCmdResponseProto.ErrorCode.success).build(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java new file mode 100644 index 0000000..b75a925 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; + +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; + +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode + .InitDatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode + .RunningDatanodeState; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URL; +import java.nio.file.Paths; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Tests the datanode state machine class and its states. + */ +public class TestDatanodeStateMachine { + private final int scmServerCount = 3; + private List<String> serverAddresses; + private List<RPC.Server> scmServers; + private List<ScmTestMock> mockServers; + private ExecutorService executorService; + private Configuration conf; + private static final Logger LOG = + LoggerFactory.getLogger(TestDatanodeStateMachine.class); + + @Before + public void setUp() throws Exception { + conf = SCMTestUtils.getConf(); + serverAddresses = new LinkedList<>(); + scmServers = new LinkedList<>(); + mockServers = new LinkedList<>(); + for (int x = 0; x < scmServerCount; x++) { + int port = SCMTestUtils.getReuseableAddress().getPort(); + String address = "127.0.0.1"; + serverAddresses.add(address + ":" + port); + ScmTestMock mock = new ScmTestMock(); + + scmServers.add(SCMTestUtils.startScmRpcServer(conf, mock, + new InetSocketAddress(address, port), 10)); + mockServers.add(mock); + } + + conf.setStrings(OzoneConfigKeys.OZONE_SCM_NAMES, + serverAddresses.toArray(new String[0])); + + URL p = this.getClass().getResource(""); + String path = p.getPath().concat( + TestDatanodeStateMachine.class.getSimpleName()); + File f = new File(path); + if(!f.mkdirs()) { + LOG.info("Required directories already exist."); + } + + path = Paths.get(path.toString(), + TestDatanodeStateMachine.class.getSimpleName() + ".id").toString(); + conf.set(OzoneConfigKeys.OZONE_SCM_DATANODE_ID, path); + + executorService = HadoopExecutors.newScheduledThreadPool( + conf.getInt( + OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS, + OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Test Data Node State Machine Thread - %d").build()); + } + + @After + public void tearDown() throws Exception { + try { + executorService.shutdownNow(); + for (RPC.Server s : scmServers) { + s.stop(); + } + } catch (Exception e) { + //ignore all execption from the shutdown + } + } + + /** + * Assert that starting statemachine executes the Init State. + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testDatanodeStateMachineStartThread() throws IOException, + InterruptedException, TimeoutException { + final DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf); + Runnable startStateMachineTask = () -> { + try { + stateMachine.start(); + } catch (IOException ex) { + } + }; + Thread thread1 = new Thread(startStateMachineTask); + thread1.setDaemon(true); + thread1.start(); + + SCMConnectionManager connectionManager = + stateMachine.getConnectionManager(); + + GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3 , + 100, 1000); + + stateMachine.close(); + } + + /** + * This test explores the state machine by invoking each call in sequence just + * like as if the state machine would call it. Because this is a test we are + * able to verify each of the assumptions. + * <p> + * Here is what happens at High level. + * <p> + * 1. We start the datanodeStateMachine in the INIT State. + * <p> + * 2. We invoke the INIT state task. + * <p> + * 3. That creates a set of RPC endpoints that are ready to connect to SCMs. + * <p> + * 4. We assert that we have moved to the running state for the + * DatanodeStateMachine. + * <p> + * 5. We get the task for the Running State - Executing that running state, + * makes the first network call in of the state machine. The Endpoint is in + * the GETVERSION State and we invoke the task. + * <p> + * 6. We assert that this call was a success by checking that each of the + * endponts now have version response that it got from the SCM server that it + * was talking to and also each of the mock server serviced one RPC call. + * <p> + * 7. Since the Register is done now, next calls to get task will return + * HeartbeatTask, which sends heartbeats to SCM. We assert that we get right + * task from sub-system below. + * + * @throws IOException + */ + @Test + public void testDatanodeStateContext() throws IOException, + InterruptedException, ExecutionException, TimeoutException { + final DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf); + DatanodeStateMachine.DatanodeStates currentState = + stateMachine.getContext().getState(); + Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT, + currentState); + + DatanodeState<DatanodeStateMachine.DatanodeStates> task = + stateMachine.getContext().getTask(); + Assert.assertEquals(InitDatanodeState.class, task.getClass()); + + task.execute(executorService); + DatanodeStateMachine.DatanodeStates newState = + task.await(2, TimeUnit.SECONDS); + + for (EndpointStateMachine endpoint : + stateMachine.getConnectionManager().getValues()) { + // We assert that each of the is in State GETVERSION. + Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION, + endpoint.getState()); + } + + // The Datanode has moved into Running State, since endpoints are created. + // We move to running state when we are ready to issue RPC calls to SCMs. + Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, + newState); + + // If we had called context.execute instead of calling into each state + // this would have happened automatically. + stateMachine.getContext().setState(newState); + task = stateMachine.getContext().getTask(); + Assert.assertEquals(RunningDatanodeState.class, task.getClass()); + + // This execute will invoke getVersion calls against all SCM endpoints + // that we know of. + task.execute(executorService); + newState = task.await(2, TimeUnit.SECONDS); + + // If we are in running state, we should be in running. + Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, + newState); + + for (EndpointStateMachine endpoint : + stateMachine.getConnectionManager().getValues()) { + + // Since the earlier task.execute called into GetVersion, the + // endPointState Machine should move to REGISTER state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER, + endpoint.getState()); + + // We assert that each of the end points have gotten a version from the + // SCM Server. + Assert.assertNotNull(endpoint.getVersion()); + } + + // We can also assert that all mock servers have received only one RPC + // call at this point of time. + for (ScmTestMock mock : mockServers) { + Assert.assertEquals(1, mock.getRpcCount()); + } + + // This task is the Running task, but running task executes tasks based + // on the state of Endpoints, hence this next call will be a Register at + // the endpoint RPC level. + task = stateMachine.getContext().getTask(); + task.execute(executorService); + newState = task.await(2, TimeUnit.SECONDS); + + // If we are in running state, we should be in running. + Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, + newState); + + for (ScmTestMock mock : mockServers) { + Assert.assertEquals(2, mock.getRpcCount()); + } + + // This task is the Running task, but running task executes tasks based + // on the state of Endpoints, hence this next call will be a + // HeartbeatTask at the endpoint RPC level. + task = stateMachine.getContext().getTask(); + task.execute(executorService); + newState = task.await(2, TimeUnit.SECONDS); + + // If we are in running state, we should be in running. + Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, + newState); + + for (ScmTestMock mock : mockServers) { + Assert.assertEquals(1, mock.getHeartbeatCount()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java new file mode 100644 index 0000000..45de6d9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.states.endpoint + .HeartbeatEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint + .RegisterEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint + .VersionEndpointTask; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type; +import org.apache.hadoop.ozone.scm.VersionInfo; +import org.apache.hadoop.util.Time; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.internal.matchers.LessOrEqual; + +import java.net.InetSocketAddress; +import java.util.UUID; + +/** + * Tests the endpoints. + */ +public class TestEndPoint { + private static InetSocketAddress serverAddress; + private static RPC.Server scmServer; + private static ScmTestMock scmServerImpl; + + @Test + /** + * This test asserts that we are able to make a version call to SCM server + * and gets back the expected values. + */ + public void testGetVersion() throws Exception { + try (EndpointStateMachine rpcEndPoint = + SCMTestUtils.createEndpoint(SCMTestUtils.getConf(), + serverAddress, 1000)) { + SCMVersionResponseProto responseProto = rpcEndPoint.getEndPoint() + .getVersion(); + Assert.assertNotNull(responseProto); + Assert.assertEquals(responseProto.getKeys(0).getKey(), + VersionInfo.DESCRIPTION_KEY); + Assert.assertEquals(responseProto.getKeys(0).getValue(), + VersionInfo.getLatestVersion().getDescription()); + } + } + + @Test + /** + * We make getVersion RPC call, but via the VersionEndpointTask which is + * how the state machine would make the call. + */ + public void testGetVersionTask() throws Exception { + Configuration conf = SCMTestUtils.getConf(); + try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf, + serverAddress, 1000)) { + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf); + EndpointStateMachine.EndPointStates newState = versionTask.call(); + + // if version call worked the endpoint should automatically move to the + // next state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER, + newState); + + // Now rpcEndpoint should remember the version it got from SCM + Assert.assertNotNull(rpcEndPoint.getVersion()); + } + } + + @Test + /** + * This test makes a call to end point where there is no SCM server. We + * expect that versionTask should be able to handle it. + */ + public void testGetVersionToInvalidEndpoint() throws Exception { + Configuration conf = SCMTestUtils.getConf(); + InetSocketAddress nonExistentServerAddress = SCMTestUtils + .getReuseableAddress(); + try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf, + nonExistentServerAddress, 1000)) { + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf); + EndpointStateMachine.EndPointStates newState = versionTask.call(); + + // This version call did NOT work, so endpoint should remain in the same + // state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION, + newState); + } + } + + @Test + /** + * This test makes a getVersionRPC call, but the DummyStorageServer is + * going to respond little slowly. We will assert that we are still in the + * GETVERSION state after the timeout. + */ + public void testGetVersionAssertRpcTimeOut() throws Exception { + final long rpcTimeout = 1000; + final long tolerance = 100; + Configuration conf = SCMTestUtils.getConf(); + + try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf, + serverAddress, (int) rpcTimeout)) { + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf); + + scmServerImpl.setRpcResponseDelay(1500); + long start = Time.monotonicNow(); + EndpointStateMachine.EndPointStates newState = versionTask.call(); + long end = Time.monotonicNow(); + scmServerImpl.setRpcResponseDelay(0); + Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance)); + Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION, + newState); + } + } + + @Test + public void testRegister() throws Exception { + String[] scmAddressArray = new String[1]; + scmAddressArray[0] = serverAddress.toString(); + DatanodeID nodeToRegister = SCMTestUtils.getDatanodeID(); + try (EndpointStateMachine rpcEndPoint = + SCMTestUtils.createEndpoint( + SCMTestUtils.getConf(), serverAddress, 1000)) { + SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint() + .register(nodeToRegister, scmAddressArray); + Assert.assertNotNull(responseProto); + Assert.assertEquals(responseProto.getDatanodeUUID(), + nodeToRegister.getDatanodeUuid()); + Assert.assertNotNull(responseProto.getClusterID()); + } + } + + private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress, + int rpcTimeout, boolean clearContainerID) throws Exception { + EndpointStateMachine rpcEndPoint = + SCMTestUtils.createEndpoint(SCMTestUtils.getConf(), + scmAddress, rpcTimeout); + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.REGISTER); + RegisterEndpointTask endpointTask = + new RegisterEndpointTask(rpcEndPoint, SCMTestUtils.getConf()); + if (!clearContainerID) { + ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder() + .setClusterID(UUID.randomUUID().toString()) + .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage()) + .build(); + endpointTask.setContainerNodeIDProto(containerNodeID); + } + endpointTask.call(); + return rpcEndPoint; + } + + @Test + public void testRegisterTask() throws Exception { + try (EndpointStateMachine rpcEndpoint = + registerTaskHelper(serverAddress, 1000, false)) { + // Successful register should move us to Heartbeat state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT, + rpcEndpoint.getState()); + } + } + + @Test + public void testRegisterToInvalidEndpoint() throws Exception { + InetSocketAddress address = SCMTestUtils.getReuseableAddress(); + try (EndpointStateMachine rpcEndpoint = + registerTaskHelper(address, 1000, false)) { + Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER, + rpcEndpoint.getState()); + } + } + + @Test + public void testRegisterNoContainerID() throws Exception { + InetSocketAddress address = SCMTestUtils.getReuseableAddress(); + try (EndpointStateMachine rpcEndpoint = + registerTaskHelper(address, 1000, true)) { + // No Container ID, therefore we tell the datanode that we would like to + // shutdown. + Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN, + rpcEndpoint.getState()); + } + } + + @Test + public void testRegisterRpcTimeout() throws Exception { + final long rpcTimeout = 1000; + final long tolerance = 200; + scmServerImpl.setRpcResponseDelay(1500); + long start = Time.monotonicNow(); + registerTaskHelper(serverAddress, 1000, false).close(); + long end = Time.monotonicNow(); + scmServerImpl.setRpcResponseDelay(0); + Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance)); + } + + @Test + public void testHeartbeat() throws Exception { + DatanodeID dataNode = SCMTestUtils.getDatanodeID(); + try (EndpointStateMachine rpcEndPoint = + SCMTestUtils.createEndpoint(SCMTestUtils.getConf(), + serverAddress, 1000)) { + SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint() + .sendHeartbeat(dataNode); + Assert.assertNotNull(responseProto); + Assert.assertEquals(1, responseProto.getCommandsCount()); + Assert.assertNotNull(responseProto.getCommandsList().get(0)); + Assert.assertEquals(responseProto.getCommandsList().get(0).getCmdType(), + Type.nullCmd); + } + } + + private EndpointStateMachine heartbeatTaskHelper(InetSocketAddress scmAddress, + int rpcTimeout) throws Exception { + EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint( + SCMTestUtils.getConf(), + scmAddress, rpcTimeout); + ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder() + .setClusterID(UUID.randomUUID().toString()) + .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage()) + .build(); + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT); + HeartbeatEndpointTask endpointTask = + new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf()); + endpointTask.setContainerNodeIDProto(containerNodeID); + endpointTask.call(); + Assert.assertNotNull(endpointTask.getContainerNodeIDProto()); + return rpcEndPoint; + } + + private void heartbeatTaskHelper(InetSocketAddress address) + throws Exception { + try (EndpointStateMachine rpcEndpoint = + heartbeatTaskHelper(address, 1000)) { + Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT, + rpcEndpoint.getState()); + } + } + + @Test + public void testHeartbeatTask() throws Exception { + heartbeatTaskHelper(serverAddress); + } + + @Test + public void testHeartbeatTaskToInvalidNode() throws Exception { + InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress(); + heartbeatTaskHelper(invalidAddress); + } + + @Test + public void testHeartbeatTaskRpcTimeOut() throws Exception { + final long rpcTimeout = 1000; + final long tolerance = 200; + scmServerImpl.setRpcResponseDelay(1500); + long start = Time.monotonicNow(); + InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress(); + heartbeatTaskHelper(invalidAddress); + long end = Time.monotonicNow(); + scmServerImpl.setRpcResponseDelay(0); + Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance)); + } + + @AfterClass + public static void tearDown() throws Exception { + if (scmServer != null) { + scmServer.stop(); + } + } + + @BeforeClass + public static void setUp() throws Exception { + serverAddress = SCMTestUtils.getReuseableAddress(); + scmServerImpl = new ScmTestMock(); + scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(), + scmServerImpl, serverAddress, 10); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java index 9928721..0f02871 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; -import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.test.GenericTestUtils; import org.hamcrest.CoreMatchers; import org.junit.Assert; @@ -33,7 +33,6 @@ import org.junit.rules.ExpectedException; import java.io.IOException; import java.util.LinkedList; import java.util.List; -import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -73,37 +72,6 @@ public class TestNodeManager { } /** - * Create a new datanode ID. - * - * @return DatanodeID - */ - DatanodeID getDatanodeID(SCMNodeManager nodeManager) { - - return getDatanodeID(nodeManager, UUID.randomUUID().toString()); - } - - /** - * Create a new DatanodeID with NodeID set to the string. - * - * @param uuid - node ID, it is generally UUID. - * @return DatanodeID. - */ - DatanodeID getDatanodeID(SCMNodeManager nodeManager, String uuid) { - Random random = new Random(); - String ipAddress = random.nextInt(256) + "." - + random.nextInt(256) + "." - + random.nextInt(256) + "." - + random.nextInt(256); - - String hostName = uuid; - DatanodeID tempDataNode = new DatanodeID(ipAddress, - hostName, uuid, 0, 0, 0, 0); - RegisteredCommand command = - (RegisteredCommand) nodeManager.register(tempDataNode); - return new DatanodeID(command.getDatanodeUUID(), tempDataNode); - } - - /** * Creates a NodeManager. * * @param config - Config for the node manager. @@ -134,7 +102,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(getConf())) { // Send some heartbeats from different nodes. for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) { - DatanodeID datanodeID = getDatanodeID(nodeManager); + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); nodeManager.sendHeartbeat(datanodeID); } @@ -181,7 +149,7 @@ public class TestNodeManager { // Need 100 nodes to come out of chill mode, only one node is sending HB. nodeManager.setMinimumChillModeNodes(100); - nodeManager.sendHeartbeat(getDatanodeID(nodeManager)); + nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager)); GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, 4 * 1000); assertFalse("Not enough heartbeat, Node manager should have been in " + @@ -203,7 +171,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(getConf())) { nodeManager.setMinimumChillModeNodes(3); - DatanodeID datanodeID = getDatanodeID(nodeManager); + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); // Send 10 heartbeat from same node, and assert we never leave chill mode. for (int x = 0; x < 10; x++) { @@ -232,7 +200,7 @@ public class TestNodeManager { Configuration conf = getConf(); conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); SCMNodeManager nodeManager = createNodeManager(conf); - DatanodeID datanodeID = getDatanodeID(nodeManager); + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); nodeManager.close(); // These should never be processed. @@ -262,7 +230,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(conf)) { for (int x = 0; x < count; x++) { - DatanodeID datanodeID = getDatanodeID(nodeManager); + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); nodeManager.sendHeartbeat(datanodeID); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, @@ -346,7 +314,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(conf)) { List<DatanodeID> nodeList = createNodeSet(nodeManager, nodeCount, "staleNode"); - DatanodeID staleNode = getDatanodeID(nodeManager); + DatanodeID staleNode = SCMTestUtils.getDatanodeID(nodeManager); // Heartbeat once nodeManager.sendHeartbeat(staleNode); @@ -396,7 +364,7 @@ public class TestNodeManager { List<DatanodeID> nodeList = createNodeSet(nodeManager, nodeCount, "Node"); - DatanodeID deadNode = getDatanodeID(nodeManager); + DatanodeID deadNode = SCMTestUtils.getDatanodeID(nodeManager); // Heartbeat once nodeManager.sendHeartbeat(deadNode); @@ -428,28 +396,6 @@ public class TestNodeManager { } /** - * Asserts that if we get duplicate registration calls for a datanode, we will - * ignore it and LOG the error. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - @Test - public void testScmDuplicateRegistrationLogsError() throws IOException, - InterruptedException, TimeoutException { - try (SCMNodeManager nodeManager = createNodeManager(getConf())) { - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); - DatanodeID duplicateNodeID = getDatanodeID(nodeManager); - nodeManager.register(duplicateNodeID); - logCapturer.stopCapturing(); - assertThat(logCapturer.getOutput(), containsString("Datanode is already" + - " registered.")); - } - } - - /** * Asserts that we log an error for null in datanode ID. * * @throws IOException @@ -532,9 +478,12 @@ public class TestNodeManager { * Cluster state: Healthy: All nodes are heartbeat-ing like normal. */ try (SCMNodeManager nodeManager = createNodeManager(conf)) { - DatanodeID healthyNode = getDatanodeID(nodeManager, "HealthyNode"); - DatanodeID staleNode = getDatanodeID(nodeManager, "StaleNode"); - DatanodeID deadNode = getDatanodeID(nodeManager, "DeadNode"); + DatanodeID healthyNode = + SCMTestUtils.getDatanodeID(nodeManager, "HealthyNode"); + DatanodeID staleNode = + SCMTestUtils.getDatanodeID(nodeManager, "StaleNode"); + DatanodeID deadNode = + SCMTestUtils.getDatanodeID(nodeManager, "DeadNode"); nodeManager.sendHeartbeat(healthyNode); nodeManager.sendHeartbeat(staleNode); nodeManager.sendHeartbeat(deadNode); @@ -659,7 +608,7 @@ public class TestNodeManager { prefix) { List<DatanodeID> list = new LinkedList<>(); for (int x = 0; x < count; x++) { - list.add(getDatanodeID(nodeManager, prefix + x)); + list.add(SCMTestUtils.getDatanodeID(nodeManager, prefix + x)); } return list; } @@ -878,7 +827,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(conf)) { nodeManager.setMinimumChillModeNodes(10); - DatanodeID datanodeID = getDatanodeID(nodeManager); + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); nodeManager.sendHeartbeat(datanodeID); String status = nodeManager.getChillModeStatus(); Assert.assertThat(status, CoreMatchers.containsString("Still in chill " + @@ -908,7 +857,7 @@ public class TestNodeManager { // Assert that node manager force enter cannot be overridden by nodes HBs. for(int x= 0; x < 20; x++) { - DatanodeID datanode = getDatanodeID(nodeManager); + DatanodeID datanode = SCMTestUtils.getDatanodeID(nodeManager); nodeManager.sendHeartbeat(datanode); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
