HDFS-11081. Ozone:SCM: Add support for registerNode in datanode. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d10f39e7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d10f39e7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d10f39e7 Branch: refs/heads/HDFS-7240 Commit: d10f39e751ab43d7dc60c1ad86ea5cb80d5d095e Parents: 23eba15 Author: Anu Engineer <[email protected]> Authored: Wed Nov 16 13:44:24 2016 -0800 Committer: Anu Engineer <[email protected]> Committed: Wed Nov 16 13:44:24 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../apache/hadoop/ozone/OzoneClientUtils.java | 165 +++++++--- .../apache/hadoop/ozone/OzoneConfigKeys.java | 52 ++- .../statemachine/DatanodeStateMachine.java | 213 +++++++++++++ .../statemachine/EndpointStateMachine.java | 265 ++++++++++++++++ .../statemachine/SCMConnectionManager.java | 174 ++++++++++ .../common/statemachine/StateContext.java | 191 +++++++++++ .../common/statemachine/package-info.java | 28 ++ .../container/common/states/DatanodeState.java | 55 ++++ .../states/datanode/InitDatanodeState.java | 135 ++++++++ .../states/datanode/RunningDatanodeState.java | 297 ++++++++++++++++++ .../common/states/datanode/package-info.java | 21 ++ .../states/endpoint/HeartbeatEndpointTask.java | 181 +++++++++++ .../states/endpoint/RegisterEndpointTask.java | 198 ++++++++++++ .../states/endpoint/VersionEndpointTask.java | 66 ++++ .../common/states/endpoint/package-info.java | 20 ++ .../container/common/states/package-info.java | 18 ++ .../container/ozoneimpl/OzoneContainer.java | 4 +- .../StorageContainerDatanodeProtocol.java | 58 ++++ .../ozone/protocol/commands/NullCommand.java | 2 +- .../protocol/commands/RegisteredCommand.java | 27 +- .../ozone/protocol/commands/SCMCommand.java | 2 +- ...rDatanodeProtocolClientSideTranslatorPB.java | 154 +++++++++ .../StorageContainerDatanodeProtocolPB.java | 32 ++ ...rDatanodeProtocolServerSideTranslatorPB.java | 86 +++++ .../apache/hadoop/ozone/scm/VersionInfo.java | 2 + .../hadoop/ozone/scm/node/SCMNodeManager.java | 37 +-- .../StorageContainerDatanodeProtocol.proto | 70 +++-- .../ozone/container/common/SCMTestUtils.java | 188 +++++++++++ .../ozone/container/common/ScmTestMock.java | 149 +++++++++ .../common/TestDatanodeStateMachine.java | 274 ++++++++++++++++ .../ozone/container/common/TestEndPoint.java | 314 +++++++++++++++++++ .../hadoop/ozone/scm/node/TestNodeManager.java | 85 +---- 33 files changed, 3383 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c1aefe9..eab1956 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -873,7 +873,7 @@ public class DataNode extends ReconfigurableBase * @throws UnknownHostException if the dfs.datanode.dns.interface * option is used and the hostname can not be determined */ - private static String getHostName(Configuration config) + public static String getHostName(Configuration config) throws UnknownHostException { String name = config.get(DFS_DATANODE_HOST_NAME_KEY); if (name == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java index 549dc80..5d1aed8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java @@ -1,43 +1,61 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.hadoop.ozone; import com.google.common.base.Optional; +import com.google.common.net.HostAndPort; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.scm.ScmConfigKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.apache.hadoop.ozone.OzoneConfigKeys.*; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_DEADNODE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_DEADNODE_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_STALENODE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_STALENODE_INTERVAL_MS; /** * Utility methods for Ozone and Container Clients. @@ -51,6 +69,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERV public final class OzoneClientUtils { private static final Logger LOG = LoggerFactory.getLogger( OzoneClientUtils.class); + private static final int NO_PORT = -1; /** * The service ID of the solitary Ozone SCM service. @@ -139,7 +158,7 @@ public final class OzoneClientUtils { return NetUtils.createSocketAddr( host.or(OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" + - port.or(OZONE_SCM_CLIENT_PORT_DEFAULT)); + port.or(OZONE_SCM_CLIENT_PORT_DEFAULT)); } /** @@ -160,7 +179,7 @@ public final class OzoneClientUtils { return NetUtils.createSocketAddr( host.or(OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" + - port.or(OZONE_SCM_DATANODE_PORT_DEFAULT)); + port.or(OZONE_SCM_DATANODE_PORT_DEFAULT)); } /** @@ -168,7 +187,7 @@ public final class OzoneClientUtils { * Each config value may be absent, or if present in the format * host:port (the :port part is optional). * - * @param conf + * @param conf - Conf * @param keys a list of configuration key names. * * @return first hostname component found from the given keys, or absent. @@ -176,51 +195,65 @@ public final class OzoneClientUtils { * or host:port format. */ static Optional<String> getHostNameFromConfigKeys( - Configuration conf, String ... keys) { + Configuration conf, String... keys) { for (final String key : keys) { final String value = conf.getTrimmed(key); - if (value != null && !value.isEmpty()) { - String[] splits = value.split(":"); - - if(splits.length < 1 || splits.length > 2) { - throw new IllegalArgumentException( - "Invalid value " + value + " for config key " + key + - ". It should be in 'host' or 'host:port' format"); - } - return Optional.of(splits[0]); + final Optional<String> hostName = getHostName(value); + if (hostName.isPresent()) { + return hostName; } } return Optional.absent(); } /** + * Gets the hostname or Indicates that it is absent. + * @param value host or host:port + * @return hostname + */ + public static Optional<String> getHostName(String value) { + if ((value == null) || value.isEmpty()) { + return Optional.absent(); + } + return Optional.of(HostAndPort.fromString(value).getHostText()); + } + + /** + * Gets the port if there is one, throws otherwise. + * @param value String in host:port format. + * @return Port + */ + public static Optional<Integer> getHostPort(String value) { + if((value == null) || value.isEmpty()) { + return Optional.absent(); + } + int port = HostAndPort.fromString(value).getPortOrDefault(NO_PORT); + if (port == NO_PORT) { + return Optional.absent(); + } else { + return Optional.of(port); + } + } + + /** * Retrieve the port number, trying the supplied config keys in order. * Each config value may be absent, or if present in the format * host:port (the :port part is optional). * - * @param conf + * @param conf Conf * @param keys a list of configuration key names. * * @return first port number component found from the given keys, or absent. * @throws IllegalArgumentException if any values are not in the 'host' * or host:port format. */ - static Optional<Integer> getPortNumberFromConfigKeys( - Configuration conf, String ... keys) { + public static Optional<Integer> getPortNumberFromConfigKeys( + Configuration conf, String... keys) { for (final String key : keys) { final String value = conf.getTrimmed(key); - if (value != null && !value.isEmpty()) { - String[] splits = value.split(":"); - - if(splits.length < 1 || splits.length > 2) { - throw new IllegalArgumentException( - "Invalid value " + value + " for config key " + key + - ". It should be in 'host' or 'host:port' format"); - } - - if (splits.length == 2) { - return Optional.of(Integer.parseInt(splits[1])); - } + final Optional<Integer> hostPort = getHostPort(value); + if (hostPort.isPresent()) { + return hostPort; } } return Optional.absent(); @@ -259,7 +292,7 @@ public final class OzoneClientUtils { * @return long */ private static long sanitizeUserArgs(long valueTocheck, long baseValue, - long minFactor, long maxFactor) + long minFactor, long maxFactor) throws IllegalArgumentException { if ((valueTocheck >= (baseValue * minFactor)) && (valueTocheck <= (baseValue * maxFactor))) { @@ -270,7 +303,6 @@ public final class OzoneClientUtils { throw new IllegalArgumentException(errMsg); } - /** * Returns the interval in which the heartbeat processor thread runs. * @@ -282,7 +314,6 @@ public final class OzoneClientUtils { OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT); } - /** * Heartbeat Interval - Defines the heartbeat frequency from a datanode to * SCM. @@ -295,7 +326,6 @@ public final class OzoneClientUtils { OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT); } - /** * Get the Stale Node interval, which is used by SCM to flag a datanode as * stale, if the heartbeat from that node has been missing for this duration. @@ -340,7 +370,6 @@ public final class OzoneClientUtils { return staleNodeIntevalMs; } - /** * Gets the interval for dead node flagging. This has to be a value that is * greater than stale node value, and by transitive relation we also know @@ -374,8 +403,42 @@ public final class OzoneClientUtils { * @param conf Configration * @return - int -- Number of HBs to process */ - public static int getMaxHBToProcessPerLoop(Configuration conf){ + public static int getMaxHBToProcessPerLoop(Configuration conf) { return conf.getInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT); } + + /** + * Timeout value for the RPC from Datanode to SCM, primarily used for + * Heartbeats and container reports. + * + * @param conf - Ozone Config + * @return - Rpc timeout in Milliseconds. + */ + public static long getScmRpcTimeOutInMilliseconds(Configuration conf) { + return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT, + OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + } + + /** + * Log Warn interval. + * + * @param conf - Ozone Config + * @return - Log warn interval. + */ + public static int getLogWarnInterval(Configuration conf) { + return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT, + OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT); + } + + /** + * returns the Container port. + * @param conf - Conf + * @return port number. + */ + public static int getContainerPort(Configuration conf) { + return conf.getInt(ScmConfigKeys.DFS_CONTAINER_IPC_PORT, ScmConfigKeys + .DFS_CONTAINER_IPC_PORT_DEFAULT); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index ec133aa..746fefe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -43,8 +43,8 @@ public final class OzoneConfigKeys { "ozone.trace.enabled"; public static final boolean OZONE_TRACE_ENABLED_DEFAULT = false; - public static final String OZONE_METADATA_DIRS = - "ozone.metadata.dirs"; + public static final String OZONE_CONTAINER_METADATA_DIRS = + "ozone.container.metadata.dirs"; public static final String OZONE_KEY_CACHE = "ozone.key.cache.size"; public static final int OZONE_KEY_CACHE_DEFAULT = 1024; @@ -94,6 +94,54 @@ public final class OzoneConfigKeys { public static final long OZONE_SCM_STALENODE_INTERVAL_DEFAULT = OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 3L; + public static final String OZONE_SCM_CONTAINER_THREADS = + "ozone.scm.container.threads"; + public static final int OZONE_SCM_CONTAINER_THREADS_DEFAULT = + Runtime.getRuntime().availableProcessors() * 2; + + public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT = + "ozone.scm.heartbeat.rpc-timeout"; + public static final long OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT = + 100; + + /** + * Defines how frequently we will log the missing of heartbeat to a specific + * SCM. In the default case we will write a warning message for each 10 + * sequential heart beats that we miss to a specific SCM. This is to avoid + * overrunning the log with lots of HB missed Log statements. + */ + public static final String OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT = + "ozone.scm.heartbeat.log.warn.interval.count"; + public static final int OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT = + 10; + + public static final String OZONE_CONTAINER_TASK_WAIT = + "ozone.container.task.wait.seconds"; + public static final long OZONE_CONTAINER_TASK_WAIT_DEFAULT = 5; + + + // ozone.scm.names key is a set of DNS | DNS:PORT | IP Address | IP:PORT. + // Written as a comma separated string. e.g. scm1, scm2:8020, 7.7.7.7:7777 + // + // If this key is not specified datanodes will not be able to find + // SCM. The SCM membership can be dynamic, so this key should contain + // all possible SCM names. Once the SCM leader is discovered datanodes will + // get the right list of SCMs to heartbeat to from the leader. + // While it is good for the datanodes to know the names of all SCM nodes, + // it is sufficient to actually know the name of on working SCM. That SCM + // will be able to return the information about other SCMs that are part of + // the SCM replicated Log. + // + //In case of a membership change, any one of the SCM machines will be + // able to send back a new list to the datanodes. + public static final String OZONE_SCM_NAMES = "ozone.scm.names"; + + public static final int OZONE_SCM_DEFAULT_PORT = 9862; + // File Name and path where datanode ID is to written to. + // if this value is not set then container startup will fail. + public static final String OZONE_SCM_DATANODE_ID = "ozone.scm.datanode.id"; + + /** * There is no need to instantiate this class. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java new file mode 100644 index 0000000..5574c9f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * State Machine Class. + */ +public class DatanodeStateMachine implements Closeable { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(DatanodeStateMachine.class); + private final ExecutorService executorService; + private final Configuration conf; + private final SCMConnectionManager connectionManager; + private final long taskWaitTime; + private final long heartbeatFrequency; + private StateContext context; + + /** + * Constructs a container state machine. + * + * @param conf - Configration. + */ + public DatanodeStateMachine(Configuration conf) { + this.conf = conf; + executorService = HadoopExecutors.newScheduledThreadPool( + this.conf.getInt(OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS, + OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Container State Machine Thread - %d").build()); + connectionManager = new SCMConnectionManager(conf); + context = new StateContext(this.conf, DatanodeStates.getInitState(), this); + taskWaitTime = this.conf.getLong(OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT, + OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT_DEFAULT); + heartbeatFrequency = OzoneClientUtils.getScmHeartbeatInterval(conf); + } + + /** + * Returns the Connection manager for this state machine. + * + * @return - SCMConnectionManager. + */ + public SCMConnectionManager getConnectionManager() { + return connectionManager; + } + + /** + * Runs the state machine at a fixed frequency. + */ + public void start() throws IOException { + long now = 0; + long nextHB = 0; + while (context.getState() != DatanodeStates.SHUTDOWN) { + try { + nextHB = Time.monotonicNow() + heartbeatFrequency; + context.execute(executorService, taskWaitTime, TimeUnit.SECONDS); + now = Time.monotonicNow(); + if (now < nextHB) { + Thread.sleep(nextHB - now); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.error("Unable to finish the execution", e); + } + } + } + + /** + * Gets the current context. + * + * @return StateContext + */ + public StateContext getContext() { + return context; + } + + /** + * Sets the current context. + * + * @param context - Context + */ + public void setContext(StateContext context) { + this.context = context; + } + + /** + * Closes this stream and releases any system resources associated with it. If + * the stream is already closed then invoking this method has no effect. + * <p> + * <p> As noted in {@link AutoCloseable#close()}, cases where the close may + * fail require careful attention. It is strongly advised to relinquish the + * underlying resources and to internally <em>mark</em> the {@code Closeable} + * as closed, prior to throwing the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.error("Unable to shutdown statemachine properly."); + } + } catch (InterruptedException e) { + LOG.error("Error attempting to shutdown.", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + for (EndpointStateMachine endPoint : connectionManager.getValues()) { + endPoint.close(); + } + } + + /** + * States that a datanode can be in. GetNextState will move this enum from + * getInitState to getLastState. + */ + public enum DatanodeStates { + INIT(1), + RUNNING(2), + SHUTDOWN(3); + private final int value; + + /** + * Constructs ContainerStates. + * + * @param value + */ + DatanodeStates(int value) { + this.value = value; + } + + /** + * Returns the first State. + * + * @return First State. + */ + public static DatanodeStates getInitState() { + return INIT; + } + + /** + * The last state of endpoint states. + * + * @return last state. + */ + public static DatanodeStates getLastState() { + return SHUTDOWN; + } + + /** + * returns the numeric value associated with the endPoint. + * + * @return int. + */ + public int getValue() { + return value; + } + + /** + * Returns the next logical state that endPoint should move to. This + * function assumes the States are sequentially numbered. + * + * @return NextState. + */ + public DatanodeStates getNextState() { + if (this.value < getLastState().getValue()) { + int stateValue = this.getValue() + 1; + for (DatanodeStates iter : values()) { + if (stateValue == iter.getValue()) { + return iter; + } + } + } + return getLastState(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java new file mode 100644 index 0000000..2900a55 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolClientSideTranslatorPB; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Endpoint is used as holder class that keeps state around the RPC endpoint. + */ +public class EndpointStateMachine implements Closeable { + static final Logger + LOG = LoggerFactory.getLogger(EndpointStateMachine.class); + private final StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint; + private final AtomicLong missedCount; + private final InetSocketAddress address; + private final Lock lock; + private final Configuration conf; + private EndPointStates state; + private VersionResponse version; + + /** + * Constructs RPC Endpoints. + * + * @param endPoint - RPC endPoint. + */ + public EndpointStateMachine(InetSocketAddress address, + StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint, + Configuration conf) { + this.endPoint = endPoint; + this.missedCount = new AtomicLong(0); + this.address = address; + state = EndPointStates.getInitState(); + lock = new ReentrantLock(); + this.conf = conf; + } + + /** + * Takes a lock on this EndPoint so that other threads don't use this while we + * are trying to communicate via this endpoint. + */ + public void lock() { + lock.lock(); + } + + /** + * Unlocks this endpoint. + */ + public void unlock() { + lock.unlock(); + } + + /** + * Returns the version that we read from the server if anyone asks . + * + * @return - Version Response. + */ + public VersionResponse getVersion() { + return version; + } + + /** + * Sets the Version reponse we recieved from the SCM. + * + * @param version VersionResponse + */ + public void setVersion(VersionResponse version) { + this.version = version; + } + + /** + * Returns the current State this end point is in. + * + * @return - getState. + */ + public EndPointStates getState() { + return state; + } + + /** + * Sets the endpoint state. + * + * @param state - state. + */ + public EndPointStates setState(EndPointStates state) { + this.state = state; + return this.state; + } + + /** + * Closes the connection. + * + * @throws IOException + */ + @Override + public void close() throws IOException { + if (endPoint != null) { + endPoint.close(); + } + } + + /** + * We maintain a count of how many times we missed communicating with a + * specific SCM. This is not made atomic since the access to this is always + * guarded by the read or write lock. That is, it is serialized. + */ + public void incMissed() { + this.missedCount.incrementAndGet(); + } + + /** + * Returns the value of the missed count. + * + * @return int + */ + public long getMissedCount() { + return this.missedCount.get(); + } + + public void zeroMissedCount() { + this.missedCount.set(0); + } + + /** + * Returns the InetAddress of the endPoint. + * + * @return - EndPoint. + */ + public InetSocketAddress getAddress() { + return this.address; + } + + /** + * Returns real RPC endPoint. + * + * @return rpc client. + */ + public StorageContainerDatanodeProtocolClientSideTranslatorPB + getEndPoint() { + return endPoint; + } + + /** + * Returns the string that represents this endpoint. + * + * @return - String + */ + public String toString() { + return address.toString(); + } + + /** + * Logs exception if needed. + * @param ex - Exception + */ + public void logIfNeeded(Exception ex) { + LOG.trace("Incrementing the Missed count. Ex : {}", ex); + this.incMissed(); + if (this.getMissedCount() % OzoneClientUtils.getLogWarnInterval(conf) == + 0) { + LOG.warn("Unable to communicate to SCM server at {}. We have not been " + + "able to communicate to this SCM server for past {} seconds.", + this.getAddress().getHostString() + ":" + this.getAddress().getPort(), + this.getMissedCount() * OzoneClientUtils.getScmHeartbeatInterval( + this.conf)); + } + } + + + /** + * States that an Endpoint can be in. + * <p> + * This is a sorted list of states that EndPoint will traverse. + * <p> + * GetNextState will move this enum from getInitState to getLastState. + */ + public enum EndPointStates { + GETVERSION(1), + REGISTER(2), + HEARTBEAT(3), + SHUTDOWN(4); // if you add value after this please edit getLastState too. + private final int value; + + /** + * Constructs endPointStates. + * + * @param value state. + */ + EndPointStates(int value) { + this.value = value; + } + + /** + * Returns the first State. + * + * @return First State. + */ + public static EndPointStates getInitState() { + return GETVERSION; + } + + /** + * The last state of endpoint states. + * + * @return last state. + */ + public static EndPointStates getLastState() { + return SHUTDOWN; + } + + /** + * returns the numeric value associated with the endPoint. + * + * @return int. + */ + public int getValue() { + return value; + } + + /** + * Returns the next logical state that endPoint should move to. + * The next state is computed by adding 1 to the current state. + * + * @return NextState. + */ + public EndPointStates getNextState() { + if (this.getValue() < getLastState().getValue()) { + int stateValue = this.getValue() + 1; + for (EndPointStates iter : values()) { + if (stateValue == iter.getValue()) { + return iter; + } + } + } + return getLastState(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java new file mode 100644 index 0000000..33d361b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * SCMConnectionManager - Acts as a class that manages the membership + * information of the SCMs that we are working with. + */ +public class SCMConnectionManager { + private static final Logger LOG = + LoggerFactory.getLogger(SCMConnectionManager.class); + + private final ReadWriteLock mapLock; + private final Map<InetSocketAddress, EndpointStateMachine> scmMachines; + + private final int rpcTimeout; + private final Configuration conf; + + + public SCMConnectionManager(Configuration conf) { + this.mapLock = new ReentrantReadWriteLock(); + Long timeOut = OzoneClientUtils.getScmRpcTimeOutInMilliseconds(conf); + this.rpcTimeout = timeOut.intValue(); + this.scmMachines = new HashMap<>(); + this.conf = conf; + } + + /** + * Returns Config. + * + * @return ozoneConfig. + */ + public Configuration getConf() { + return conf; + } + + /** + * Get RpcTimeout. + * + * @return - Return RPC timeout. + */ + public long getRpcTimeout() { + return rpcTimeout; + } + + + /** + * Takes a read lock. + */ + public void readLock() { + this.mapLock.readLock().lock(); + } + + /** + * Releases the read lock. + */ + public void readUnlock() { + this.mapLock.readLock().unlock(); + } + + /** + * Takes the write lock. + */ + public void writeLock() { + this.mapLock.writeLock().lock(); + } + + /** + * Releases the write lock. + */ + public void writeUnlock() { + this.mapLock.writeLock().unlock(); + } + + /** + * adds a new SCM machine to the target set. + * + * @param address - Address of the SCM machine to send heatbeat to. + * @throws IOException + */ + public void addSCMServer(InetSocketAddress address) throws IOException { + writeLock(); + try { + if (scmMachines.containsKey(address)) { + LOG.warn("Trying to add an existing SCM Machine to Machines group. " + + "Ignoring the request."); + return; + } + RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + long version = + RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class); + + StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy( + StorageContainerDatanodeProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), rpcTimeout); + + StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient = + new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy); + EndpointStateMachine endPoint = + new EndpointStateMachine(address, rpcClient, conf); + scmMachines.put(address, endPoint); + } finally { + writeUnlock(); + } + } + + /** + * Removes a SCM machine for the target set. + * + * @param address - Address of the SCM machine to send heatbeat to. + * @throws IOException + */ + public void removeSCMServer(InetSocketAddress address) throws IOException { + writeLock(); + try { + if (!scmMachines.containsKey(address)) { + LOG.warn("Trying to remove a non-existent SCM machine. " + + "Ignoring the request."); + return; + } + + EndpointStateMachine endPoint = scmMachines.get(address); + endPoint.close(); + scmMachines.remove(address); + } finally { + writeUnlock(); + } + } + + /** + * Returns all known RPCEndpoints. + * + * @return - List of RPC Endpoints. + */ + public Collection<EndpointStateMachine> getValues() { + return scmMachines.values(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java new file mode 100644 index 0000000..0a20945 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode + .RunningDatanodeState; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Current Context of State Machine. + */ +public class StateContext { + private final Queue<SCMCommand> commandQueue; + private final Lock lock; + private final DatanodeStateMachine parent; + private final AtomicLong stateExecutionCount; + private final Configuration conf; + private DatanodeStateMachine.DatanodeStates state; + + /** + * Constructs a StateContext. + * + * @param conf - Configration + * @param state - State + * @param parent Parent State Machine + */ + public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates + state, DatanodeStateMachine parent) { + this.conf = conf; + this.state = state; + this.parent = parent; + commandQueue = new LinkedList<>(); + lock = new ReentrantLock(); + stateExecutionCount = new AtomicLong(0); + } + + /** + * Returns the ContainerStateMachine class that holds this state. + * + * @return ContainerStateMachine. + */ + public DatanodeStateMachine getParent() { + return parent; + } + + /** + * Returns true if we are entering a new state. + * + * @return boolean + */ + boolean isEntering() { + return stateExecutionCount.get() == 0; + } + + /** + * Returns true if we are exiting from the current state. + * + * @param newState - newState. + * @return boolean + */ + boolean isExiting(DatanodeStateMachine.DatanodeStates newState) { + boolean isExiting = state != newState && stateExecutionCount.get() > 0; + if(isExiting) { + stateExecutionCount.set(0); + } + return isExiting; + } + + /** + * Returns the current state the machine is in. + * + * @return state. + */ + public DatanodeStateMachine.DatanodeStates getState() { + return state; + } + + /** + * Sets the current state of the machine. + * + * @param state state. + */ + public void setState(DatanodeStateMachine.DatanodeStates state) { + this.state = state; + } + + /** + * Returns the next task to get executed by the datanode state machine. + * @return A callable that will be executed by the + * {@link DatanodeStateMachine} + */ + @SuppressWarnings("unchecked") + public DatanodeState<DatanodeStateMachine.DatanodeStates> getTask() { + switch (this.state) { + case INIT: + return new InitDatanodeState(this.conf, parent.getConnectionManager(), + this); + case RUNNING: + return new RunningDatanodeState(this.conf, parent.getConnectionManager(), + this); + case SHUTDOWN: + return null; + default: + throw new IllegalArgumentException("Not Implemented yet."); + } + } + + /** + * Executes the required state function. + * + * @param service - Executor Service + * @param time - seconds to wait + * @param unit - Seconds. + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + */ + public void execute(ExecutorService service, long time, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + stateExecutionCount.incrementAndGet(); + DatanodeState<DatanodeStateMachine.DatanodeStates> task = getTask(); + if (this.isEntering()) { + task.onEnter(); + } + task.execute(service); + DatanodeStateMachine.DatanodeStates newState = task.await(time, unit); + if (this.state != newState) { + if (isExiting(newState)) { + task.onExit(); + } + this.setState(newState); + } + } + + /** + * Returns the next command or null if it is empty. + * + * @return SCMCommand or Null. + */ + public SCMCommand getNextCommand() { + lock.lock(); + try { + return commandQueue.poll(); + } finally { + lock.unlock(); + } + } + + /** + * Adds a command to the State Machine queue. + * + * @param command - SCMCommand. + */ + public void addCommand(SCMCommand command) { + lock.lock(); + try { + commandQueue.add(command); + } finally { + lock.unlock(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java new file mode 100644 index 0000000..feb2f81 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; +/** + + State machine class is used by the container to denote various states a + container can be in and also is used for command processing. + + Container has the following states. + + Start - > getVersion -> Register -> Running -> Shutdown + + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java new file mode 100644 index 0000000..75142af --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.states; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * State Interface that allows tasks to maintain states. + */ +public interface DatanodeState<T> { + /** + * Called before entering this state. + */ + void onEnter(); + + /** + * Called After exiting this state. + */ + void onExit(); + + /** + * Executes one or more tasks that is needed by this state. + * + * @param executor - ExecutorService + */ + void execute(ExecutorService executor); + + /** + * Wait for execute to finish. + * + * @param time - Time + * @param timeUnit - Unit of time. + */ + T await(long time, TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java new file mode 100644 index 0000000..233cac1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.datanode; + +import com.google.common.base.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Init Datanode State is the task that gets run when we are in Init State. + */ +public class InitDatanodeState implements DatanodeState, + Callable<DatanodeStateMachine.DatanodeStates> { + static final Logger LOG = LoggerFactory.getLogger(InitDatanodeState.class); + private final SCMConnectionManager connectionManager; + private final Configuration conf; + private final StateContext context; + private Future<DatanodeStateMachine.DatanodeStates> result; + + /** + * Create InitDatanodeState Task. + * + * @param conf - Conf + * @param connectionManager - Connection Manager + * @param context - Current Context + */ + public InitDatanodeState(Configuration conf, + SCMConnectionManager connectionManager, + StateContext context) { + this.conf = conf; + this.connectionManager = connectionManager; + this.context = context; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public DatanodeStateMachine.DatanodeStates call() throws Exception { + String[] addresses = conf.getStrings(OzoneConfigKeys.OZONE_SCM_NAMES); + final Optional<Integer> defaultPort = Optional.of(OzoneConfigKeys + .OZONE_SCM_DEFAULT_PORT); + + if (addresses == null || addresses.length <= 0) { + LOG.error("SCM addresses need to be a set of valid DNS names " + + "or IP addresses. Null or empty address list found. Aborting " + + "containers."); + return DatanodeStateMachine.DatanodeStates.SHUTDOWN; + } + for (String address : addresses) { + Optional<String> hostname = OzoneClientUtils.getHostName(address); + if (!hostname.isPresent()) { + LOG.error("Invalid hostname for SCM."); + return DatanodeStateMachine.DatanodeStates.SHUTDOWN; + } + Optional<Integer> port = OzoneClientUtils.getHostPort(address); + InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(), + port.or(defaultPort.get())); + connectionManager.addSCMServer(addr); + } + return this.context.getState().getNextState(); + } + + /** + * Called before entering this state. + */ + @Override + public void onEnter() { + LOG.trace("Entering init container state"); + } + + /** + * Called After exiting this state. + */ + @Override + public void onExit() { + LOG.trace("Exiting init container state"); + } + + /** + * Executes one or more tasks that is needed by this state. + * + * @param executor - ExecutorService + */ + @Override + public void execute(ExecutorService executor) { + result = executor.submit(this); + } + + /** + * Wait for execute to finish. + * + * @param time - Time + * @param timeUnit - Unit of time. + */ + @Override + public DatanodeStateMachine.DatanodeStates await(long time, + TimeUnit timeUnit) throws InterruptedException, + ExecutionException, TimeoutException { + return result.get(time, timeUnit); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java new file mode 100644 index 0000000..69eabe6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.datanode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.ozone.container.common.states.endpoint + .HeartbeatEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint + .RegisterEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint + .VersionEndpointTask; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Class that implements handshake with SCM. + */ +public class RunningDatanodeState implements DatanodeState { + static final Logger + LOG = LoggerFactory.getLogger(RunningDatanodeState.class); + private final SCMConnectionManager connectionManager; + private final Configuration conf; + private final StateContext context; + private CompletionService<EndpointStateMachine.EndPointStates> ecs; + + public RunningDatanodeState(Configuration conf, + SCMConnectionManager connectionManager, + StateContext context) { + this.connectionManager = connectionManager; + this.conf = conf; + this.context = context; + } + + /** + * Reads a datanode ID from the persisted information. + * + * @param idPath - Path to the ID File. + * @return DatanodeID + * @throws IOException + */ + private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto + readPersistedDatanodeID(Path idPath) throws IOException { + Preconditions.checkNotNull(idPath); + StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto + containerIDProto; + try (FileInputStream stream = new FileInputStream(idPath.toFile())) { + containerIDProto = StorageContainerDatanodeProtocolProtos + .ContainerNodeIDProto.parseFrom(stream); + return containerIDProto; + } + } + + /** + * Create a DatanodeID from the datanode information. + * + * @return DatanodeID + * @throws UnknownHostException + */ + private DatanodeID createDatanodeID() throws UnknownHostException { + DatanodeID temp = new DatanodeID( + //TODO : Replace this with proper network and kerberos + // support code. + InetAddress.getLocalHost().getHostAddress().toString(), + DataNode.getHostName(conf), + UUID.randomUUID().toString(), + 0, /** XferPort - SCM does not use this port */ + 0, /** Info port - SCM does not use this port */ + 0, /** Info Secure Port - SCM does not use this port */ + 0); /** IPC port - SCM does not use this port */ + + // TODO: make this dynamically discoverable. SCM can hand out this + // port number to calling applications. This makes it easy to run multiple + // container endpoints on the same machine. + temp.setContainerPort(OzoneClientUtils.getContainerPort(conf)); + return temp; + } + + /** + * Creates a new ContainerID that persists both DatanodeID and ClusterID. + * + * @param idPath Path to the id file. + * @return ContainerNodeIDProto + * @throws UnknownHostException + */ + private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto + createNewContainerID(Path idPath) + throws IOException { + StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto + containerIDProto = StorageContainerDatanodeProtocolProtos + .ContainerNodeIDProto.newBuilder() + .setDatanodeID(createDatanodeID().getProtoBufMessage()).build(); + try (FileOutputStream stream = new FileOutputStream(idPath.toFile())) { + stream.write(containerIDProto.toByteArray()); + return containerIDProto; + } + } + + /** + * Returns ContainerNodeIDProto or null in case of Error. + * + * @return ContainerNodeIDProto + */ + private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto + getContainerNodeID() { + String dataNodeIDPath = conf.get(OzoneConfigKeys.OZONE_SCM_DATANODE_ID); + if (dataNodeIDPath == null || dataNodeIDPath.isEmpty()) { + LOG.error("A valid file path is needed for config setting {}", + OzoneConfigKeys.OZONE_SCM_DATANODE_ID); + + // This is an unrecoverable error. + this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN); + return null; + } + StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto nodeID; + // try to read an existing ContainerNode ID. + try { + nodeID = readPersistedDatanodeID(Paths.get(dataNodeIDPath)); + if (nodeID != null) { + LOG.trace("Read Node ID :", nodeID.getDatanodeID().getDatanodeUuid()); + return nodeID; + } + } catch (IOException ex) { + LOG.trace("Not able to find container Node ID, creating it.", ex); + } + // Not found, let us create a new datanode ID, persist it and return that + // info to SCM. + try { + nodeID = createNewContainerID(Paths.get(dataNodeIDPath)); + LOG.trace("Created Node ID :", nodeID.getDatanodeID().getDatanodeUuid()); + return nodeID; + } catch (IOException ex) { + LOG.error("Creating new node ID failed.", ex); + } + return null; + } + + /** + * Called before entering this state. + */ + @Override + public void onEnter() { + LOG.trace("Entering handshake task."); + } + + /** + * Called After exiting this state. + */ + @Override + public void onExit() { + LOG.trace("Exiting handshake task."); + } + + /** + * Executes one or more tasks that is needed by this state. + * + * @param executor - ExecutorService + */ + @Override + public void execute(ExecutorService executor) { + ecs = new ExecutorCompletionService<>(executor); + for (EndpointStateMachine endpoint : connectionManager.getValues()) { + Callable<EndpointStateMachine.EndPointStates> endpointTask + = getEndPointTask(endpoint); + ecs.submit(endpointTask); + } + } + + private Callable<EndpointStateMachine.EndPointStates> + getEndPointTask(EndpointStateMachine endpoint) { + switch (endpoint.getState()) { + case GETVERSION: + return new VersionEndpointTask(endpoint, conf); + case REGISTER: + return RegisterEndpointTask.newBuilder() + .setConfig(conf) + .setEndpointStateMachine(endpoint) + .setNodeID(getContainerNodeID()) + .build(); + case HEARTBEAT: + return HeartbeatEndpointTask.newBuilder() + .setConfig(conf) + .setEndpointStateMachine(endpoint) + .setNodeID(getContainerNodeID()) + .build(); + case SHUTDOWN: + break; + default: + throw new IllegalArgumentException("Illegal Argument."); + } + return null; + } + + /** + * Computes the next state the container state machine must move to by looking + * at all the state of endpoints. + * <p> + * if any endpoint state has moved to Shutdown, either we have an + * unrecoverable error or we have been told to shutdown. Either case the + * datanode state machine should move to Shutdown state, otherwise we + * remain in the Running state. + * + * @return next container state. + */ + private DatanodeStateMachine.DatanodeStates + computeNextContainerState( + List<Future<EndpointStateMachine.EndPointStates>> results) { + for (Future<EndpointStateMachine.EndPointStates> state : results) { + try { + if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) { + // if any endpoint tells us to shutdown we move to shutdown state. + return DatanodeStateMachine.DatanodeStates.SHUTDOWN; + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error in executing end point task.", e); + } + } + return DatanodeStateMachine.DatanodeStates.RUNNING; + } + + /** + * Wait for execute to finish. + * + * @param duration - Time + * @param timeUnit - Unit of duration. + */ + @Override + public DatanodeStateMachine.DatanodeStates + await(long duration, TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException { + int count = connectionManager.getValues().size(); + int returned = 0; + long timeLeft = timeUnit.toMillis(duration); + long startTime = Time.monotonicNow(); + List<Future<EndpointStateMachine.EndPointStates>> results = new + LinkedList<>(); + + while (returned < count && timeLeft > 0) { + Future<EndpointStateMachine.EndPointStates> result = + ecs.poll(timeLeft, TimeUnit.MILLISECONDS); + if (result != null) { + results.add(result); + returned++; + } + timeLeft = timeLeft - (Time.monotonicNow() - startTime); + } + return computeNextContainerState(results); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java new file mode 100644 index 0000000..6b8d16c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.datanode; +/** + This package contians files that guide the state transitions from + Init->Running->Shutdown for the datanode. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java new file mode 100644 index 0000000..4f877ff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.states.endpoint; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Callable; + +/** + * Heartbeat class for SCMs. + */ +public class HeartbeatEndpointTask + implements Callable<EndpointStateMachine.EndPointStates> { + static final Logger LOG = + LoggerFactory.getLogger(HeartbeatEndpointTask.class); + private final EndpointStateMachine rpcEndpoint; + private final Configuration conf; + private ContainerNodeIDProto containerNodeIDProto; + + /** + * Constructs a SCM heart beat. + * + * @param conf Config. + */ + public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint, + Configuration conf) { + this.rpcEndpoint = rpcEndpoint; + this.conf = conf; + } + + /** + * Get the container Node ID proto. + * + * @return ContainerNodeIDProto + */ + public ContainerNodeIDProto getContainerNodeIDProto() { + return containerNodeIDProto; + } + + /** + * Set container node ID proto. + * + * @param containerNodeIDProto - the node id. + */ + public void setContainerNodeIDProto(ContainerNodeIDProto + containerNodeIDProto) { + this.containerNodeIDProto = containerNodeIDProto; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public EndpointStateMachine.EndPointStates call() throws Exception { + rpcEndpoint.lock(); + try { + Preconditions.checkState(this.containerNodeIDProto != null); + DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this + .containerNodeIDProto.getDatanodeID()); + // TODO : Add the command to command processor queue. + rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID); + rpcEndpoint.zeroMissedCount(); + } catch (IOException ex) { + rpcEndpoint.logIfNeeded(ex + ); + } finally { + rpcEndpoint.unlock(); + } + return rpcEndpoint.getState(); + } + + /** + * Returns a builder class for HeartbeatEndpointTask task. + * @return Builder. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder class for HeartbeatEndpointTask. + */ + public static class Builder { + private EndpointStateMachine endPointStateMachine; + private Configuration conf; + private ContainerNodeIDProto containerNodeIDProto; + + /** + * Constructs the builder class. + */ + public Builder() { + } + + /** + * Sets the endpoint state machine. + * + * @param rpcEndPoint - Endpoint state machine. + * @return Builder + */ + public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) { + this.endPointStateMachine = rpcEndPoint; + return this; + } + + /** + * Sets the Config. + * + * @param config - config + * @return Builder + */ + public Builder setConfig(Configuration config) { + this.conf = config; + return this; + } + + /** + * Sets the NodeID. + * + * @param nodeID - NodeID proto + * @return Builder + */ + public Builder setNodeID(ContainerNodeIDProto nodeID) { + this.containerNodeIDProto = nodeID; + return this; + } + + public HeartbeatEndpointTask build() { + if (endPointStateMachine == null) { + LOG.error("No endpoint specified."); + throw new IllegalArgumentException("A valid endpoint state machine is" + + " needed to construct HeartbeatEndpointTask task"); + } + + if (conf == null) { + LOG.error("No config specified."); + throw new IllegalArgumentException("A valid configration is needed to" + + " construct HeartbeatEndpointTask task"); + } + + if (containerNodeIDProto == null) { + LOG.error("No nodeID specified."); + throw new IllegalArgumentException("A vaild Node ID is needed to " + + "construct HeartbeatEndpointTask task"); + } + + HeartbeatEndpointTask task = new HeartbeatEndpointTask(this + .endPointStateMachine, this.conf); + task.setContainerNodeIDProto(containerNodeIDProto); + return task; + } + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java new file mode 100644 index 0000000..63d19ea --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.endpoint; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; + +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +/** + * Register a container with SCM. + */ +public final class RegisterEndpointTask implements + Callable<EndpointStateMachine.EndPointStates> { + static final Logger LOG = LoggerFactory.getLogger(RegisterEndpointTask.class); + + private final EndpointStateMachine rpcEndPoint; + private final Configuration conf; + private Future<EndpointStateMachine.EndPointStates> result; + private ContainerNodeIDProto containerNodeIDProto; + + /** + * Creates a register endpoint task. + * + * @param rpcEndPoint - endpoint + * @param conf - conf + */ + @VisibleForTesting + public RegisterEndpointTask(EndpointStateMachine rpcEndPoint, + Configuration conf) { + this.rpcEndPoint = rpcEndPoint; + this.conf = conf; + + } + + /** + * Get the ContainerNodeID Proto. + * + * @return ContainerNodeIDProto + */ + public ContainerNodeIDProto getContainerNodeIDProto() { + return containerNodeIDProto; + } + + /** + * Set the contiainerNodeID Proto. + * + * @param containerNodeIDProto - Container Node ID. + */ + public void setContainerNodeIDProto(ContainerNodeIDProto + containerNodeIDProto) { + this.containerNodeIDProto = containerNodeIDProto; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public EndpointStateMachine.EndPointStates call() throws Exception { + + if (getContainerNodeIDProto() == null) { + LOG.error("Container ID proto cannot be null in RegisterEndpoint task, " + + "shutting down the endpoint."); + return rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN); + } + + rpcEndPoint.lock(); + try { + DatanodeID dnNodeID = DatanodeID.getFromProtoBuf( + getContainerNodeIDProto().getDatanodeID()); + + // TODO : Add responses to the command Queue. + rpcEndPoint.getEndPoint().register(dnNodeID, + conf.getStrings(OzoneConfigKeys.OZONE_SCM_NAMES)); + EndpointStateMachine.EndPointStates nextState = + rpcEndPoint.getState().getNextState(); + rpcEndPoint.setState(nextState); + rpcEndPoint.zeroMissedCount(); + } catch (IOException ex) { + rpcEndPoint.logIfNeeded(ex + ); + } finally { + rpcEndPoint.unlock(); + } + + return rpcEndPoint.getState(); + } + + /** + * Returns a builder class for RegisterEndPoint task. + * + * @return Builder. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder class for RegisterEndPoint task. + */ + public static class Builder { + private EndpointStateMachine endPointStateMachine; + private Configuration conf; + private ContainerNodeIDProto containerNodeIDProto; + + /** + * Constructs the builder class. + */ + public Builder() { + } + + /** + * Sets the endpoint state machine. + * + * @param rpcEndPoint - Endpoint state machine. + * @return Builder + */ + public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) { + this.endPointStateMachine = rpcEndPoint; + return this; + } + + /** + * Sets the Config. + * + * @param config - config + * @return Builder. + */ + public Builder setConfig(Configuration config) { + this.conf = config; + return this; + } + + /** + * Sets the NodeID. + * + * @param nodeID - NodeID proto + * @return Builder + */ + public Builder setNodeID(ContainerNodeIDProto nodeID) { + this.containerNodeIDProto = nodeID; + return this; + } + + public RegisterEndpointTask build() { + if (endPointStateMachine == null) { + LOG.error("No endpoint specified."); + throw new IllegalArgumentException("A valid endpoint state machine is" + + " needed to construct RegisterEndPoint task"); + } + + if (conf == null) { + LOG.error("No config specified."); + throw new IllegalArgumentException("A valid configration is needed to" + + " construct RegisterEndpoint task"); + } + + if (containerNodeIDProto == null) { + LOG.error("No nodeID specified."); + throw new IllegalArgumentException("A vaild Node ID is needed to " + + "construct RegisterEndpoint task"); + } + + RegisterEndpointTask task = new RegisterEndpointTask(this + .endPointStateMachine, this.conf); + task.setContainerNodeIDProto(containerNodeIDProto); + return task; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java new file mode 100644 index 0000000..1dfc432 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.endpoint; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; + +import java.io.IOException; +import java.util.concurrent.Callable; + +/** + * Task that returns version. + */ +public class VersionEndpointTask implements + Callable<EndpointStateMachine.EndPointStates> { + private final EndpointStateMachine rpcEndPoint; + private final Configuration configuration; + + public VersionEndpointTask(EndpointStateMachine rpcEndPoint, + Configuration conf) { + this.rpcEndPoint = rpcEndPoint; + this.configuration = conf; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public EndpointStateMachine.EndPointStates call() throws Exception { + rpcEndPoint.lock(); + try{ + SCMVersionResponseProto versionResponse = + rpcEndPoint.getEndPoint().getVersion(); + rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse)); + + EndpointStateMachine.EndPointStates nextState = + rpcEndPoint.getState().getNextState(); + rpcEndPoint.setState(nextState); + rpcEndPoint.zeroMissedCount(); + } catch (IOException ex) { + rpcEndPoint.logIfNeeded(ex); + } finally { + rpcEndPoint.unlock(); + } + return rpcEndPoint.getState(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java new file mode 100644 index 0000000..1122598 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.endpoint; +/** + This package contains code for RPC endpoints transitions. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d10f39e7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java new file mode 100644 index 0000000..92c953f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.states; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
