http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java new file mode 100644 index 0000000..8e9482f --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler + .CloseContainerHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler + .CommandDispatcher; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler + .ContainerReportHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler + .DeleteBlocksCommandHandler; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval; + +/** + * State Machine Class. + */ +public class DatanodeStateMachine implements Closeable { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(DatanodeStateMachine.class); + private final ExecutorService executorService; + private final Configuration conf; + private final SCMConnectionManager connectionManager; + private final long heartbeatFrequency; + private StateContext context; + private final OzoneContainer container; + private DatanodeDetails datanodeDetails; + private final CommandDispatcher commandDispatcher; + private long commandsHandled; + private AtomicLong nextHB; + private Thread stateMachineThread = null; + private Thread cmdProcessThread = null; + + /** + * Constructs a a datanode state machine. + * + * @param datanodeDetails - DatanodeDetails used to identify a datanode + * @param conf - Configuration. + */ + public DatanodeStateMachine(DatanodeDetails datanodeDetails, + Configuration conf) throws IOException { + this.conf = conf; + this.datanodeDetails = datanodeDetails; + executorService = HadoopExecutors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Datanode State Machine Thread - %d").build()); + connectionManager = new SCMConnectionManager(conf); + context = new StateContext(this.conf, DatanodeStates.getInitState(), this); + heartbeatFrequency = TimeUnit.SECONDS.toMillis( + getScmHeartbeatInterval(conf)); + container = new OzoneContainer(this.datanodeDetails, + new OzoneConfiguration(conf)); + nextHB = new AtomicLong(Time.monotonicNow()); + + // When we add new handlers just adding a new handler here should do the + // trick. + commandDispatcher = CommandDispatcher.newBuilder() + .addHandler(new ContainerReportHandler()) + .addHandler(new CloseContainerHandler()) + .addHandler(new DeleteBlocksCommandHandler( + container.getContainerManager(), conf)) + .setConnectionManager(connectionManager) + .setContainer(container) + .setContext(context) + .build(); + } + + /** + * + * Return DatanodeDetails if set, return null otherwise. + * + * @return DatanodeDetails + */ + public DatanodeDetails getDatanodeDetails() { + return datanodeDetails; + } + + + /** + * Returns the Connection manager for this state machine. + * + * @return - SCMConnectionManager. + */ + public SCMConnectionManager getConnectionManager() { + return connectionManager; + } + + public OzoneContainer getContainer() { + return this.container; + } + + /** + * Runs the state machine at a fixed frequency. + */ + private void start() throws IOException { + long now = 0; + + container.start(); + initCommandHandlerThread(conf); + while (context.getState() != DatanodeStates.SHUTDOWN) { + try { + LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); + nextHB.set(Time.monotonicNow() + heartbeatFrequency); + context.setReportState(container.getNodeReport()); + context.setContainerReportState(container.getContainerReportState()); + context.execute(executorService, heartbeatFrequency, + TimeUnit.MILLISECONDS); + now = Time.monotonicNow(); + if (now < nextHB.get()) { + Thread.sleep(nextHB.get() - now); + } + } catch (InterruptedException e) { + // Ignore this exception. + } catch (Exception e) { + LOG.error("Unable to finish the execution.", e); + } + } + } + + /** + * Gets the current context. + * + * @return StateContext + */ + public StateContext getContext() { + return context; + } + + /** + * Sets the current context. + * + * @param context - Context + */ + public void setContext(StateContext context) { + this.context = context; + } + + /** + * Closes this stream and releases any system resources associated with it. If + * the stream is already closed then invoking this method has no effect. + * <p> + * <p> As noted in {@link AutoCloseable#close()}, cases where the close may + * fail require careful attention. It is strongly advised to relinquish the + * underlying resources and to internally <em>mark</em> the {@code Closeable} + * as closed, prior to throwing the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + if (stateMachineThread != null) { + stateMachineThread.interrupt(); + } + if (cmdProcessThread != null) { + cmdProcessThread.interrupt(); + } + context.setState(DatanodeStates.getLastState()); + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.error("Unable to shutdown state machine properly."); + } + } catch (InterruptedException e) { + LOG.error("Error attempting to shutdown.", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + if (connectionManager != null) { + connectionManager.close(); + } + + if(container != null) { + container.stop(); + } + } + + /** + * States that a datanode can be in. GetNextState will move this enum from + * getInitState to getLastState. + */ + public enum DatanodeStates { + INIT(1), + RUNNING(2), + SHUTDOWN(3); + private final int value; + + /** + * Constructs states. + * + * @param value Enum Value + */ + DatanodeStates(int value) { + this.value = value; + } + + /** + * Returns the first State. + * + * @return First State. + */ + public static DatanodeStates getInitState() { + return INIT; + } + + /** + * The last state of endpoint states. + * + * @return last state. + */ + public static DatanodeStates getLastState() { + return SHUTDOWN; + } + + /** + * returns the numeric value associated with the endPoint. + * + * @return int. + */ + public int getValue() { + return value; + } + + /** + * Returns the next logical state that endPoint should move to. This + * function assumes the States are sequentially numbered. + * + * @return NextState. + */ + public DatanodeStates getNextState() { + if (this.value < getLastState().getValue()) { + int stateValue = this.getValue() + 1; + for (DatanodeStates iter : values()) { + if (stateValue == iter.getValue()) { + return iter; + } + } + } + return getLastState(); + } + } + + /** + * Start datanode state machine as a single thread daemon. + */ + public void startDaemon() { + Runnable startStateMachineTask = () -> { + try { + start(); + LOG.info("Ozone container server started."); + } catch (Exception ex) { + LOG.error("Unable to start the DatanodeState Machine", ex); + } + }; + stateMachineThread = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Datanode State Machine Thread - %d") + .build().newThread(startStateMachineTask); + stateMachineThread.start(); + } + + /** + * Stop the daemon thread of the datanode state machine. + */ + public synchronized void stopDaemon() { + try { + context.setState(DatanodeStates.SHUTDOWN); + this.close(); + LOG.info("Ozone container server stopped."); + } catch (IOException e) { + LOG.error("Stop ozone container server failed.", e); + } + } + + /** + * + * Check if the datanode state machine daemon is stopped. + * + * @return True if datanode state machine daemon is stopped + * and false otherwise. + */ + @VisibleForTesting + public boolean isDaemonStopped() { + return this.executorService.isShutdown() + && this.getContext().getExecutionCount() == 0 + && this.getContext().getState() == DatanodeStates.SHUTDOWN; + } + + /** + * Create a command handler thread. + * + * @param config + */ + private void initCommandHandlerThread(Configuration config) { + + /** + * Task that periodically checks if we have any outstanding commands. + * It is assumed that commands can be processed slowly and in order. + * This assumption might change in future. Right now due to this assumption + * we have single command queue process thread. + */ + Runnable processCommandQueue = () -> { + long now; + while (getContext().getState() != DatanodeStates.SHUTDOWN) { + SCMCommand command = getContext().getNextCommand(); + if (command != null) { + commandDispatcher.handle(command); + commandsHandled++; + } else { + try { + // Sleep till the next HB + 1 second. + now = Time.monotonicNow(); + if (nextHB.get() > now) { + Thread.sleep((nextHB.get() - now) + 1000L); + } + } catch (InterruptedException e) { + // Ignore this exception. + } + } + } + }; + + // We will have only one thread for command processing in a datanode. + cmdProcessThread = getCommandHandlerThread(processCommandQueue); + cmdProcessThread.start(); + } + + private Thread getCommandHandlerThread(Runnable processCommandQueue) { + Thread handlerThread = new Thread(processCommandQueue); + handlerThread.setDaemon(true); + handlerThread.setName("Command processor thread"); + handlerThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> { + // Let us just restart this thread after logging a critical error. + // if this thread is not running we cannot handle commands from SCM. + LOG.error("Critical Error : Command processor thread encountered an " + + "error. Thread: {}", t.toString(), e); + getCommandHandlerThread(processCommandQueue).start(); + }); + return handlerThread; + } + + /** + * Returns the number of commands handled by the datanode. + * @return count + */ + @VisibleForTesting + public long getCommandHandled() { + return commandsHandled; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java new file mode 100644 index 0000000..7e85923 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolClientSideTranslatorPB; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.time.ZonedDateTime; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.hadoop.hdds.scm.HddsServerUtil.getLogWarnInterval; +import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval; + +/** + * Endpoint is used as holder class that keeps state around the RPC endpoint. + */ +public class EndpointStateMachine + implements Closeable, EndpointStateMachineMBean { + static final Logger + LOG = LoggerFactory.getLogger(EndpointStateMachine.class); + private final StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint; + private final AtomicLong missedCount; + private final InetSocketAddress address; + private final Lock lock; + private final Configuration conf; + private EndPointStates state; + private VersionResponse version; + private ZonedDateTime lastSuccessfulHeartbeat; + + /** + * Constructs RPC Endpoints. + * + * @param endPoint - RPC endPoint. + */ + public EndpointStateMachine(InetSocketAddress address, + StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint, + Configuration conf) { + this.endPoint = endPoint; + this.missedCount = new AtomicLong(0); + this.address = address; + state = EndPointStates.getInitState(); + lock = new ReentrantLock(); + this.conf = conf; + } + + /** + * Takes a lock on this EndPoint so that other threads don't use this while we + * are trying to communicate via this endpoint. + */ + public void lock() { + lock.lock(); + } + + /** + * Unlocks this endpoint. + */ + public void unlock() { + lock.unlock(); + } + + /** + * Returns the version that we read from the server if anyone asks . + * + * @return - Version Response. + */ + public VersionResponse getVersion() { + return version; + } + + /** + * Sets the Version reponse we recieved from the SCM. + * + * @param version VersionResponse + */ + public void setVersion(VersionResponse version) { + this.version = version; + } + + /** + * Returns the current State this end point is in. + * + * @return - getState. + */ + public EndPointStates getState() { + return state; + } + + @Override + public int getVersionNumber() { + if (version != null) { + return version.getProtobufMessage().getSoftwareVersion(); + } else { + return -1; + } + } + + /** + * Sets the endpoint state. + * + * @param epState - end point state. + */ + public EndPointStates setState(EndPointStates epState) { + this.state = epState; + return this.state; + } + + /** + * Closes the connection. + * + * @throws IOException + */ + @Override + public void close() throws IOException { + if (endPoint != null) { + endPoint.close(); + } + } + + /** + * We maintain a count of how many times we missed communicating with a + * specific SCM. This is not made atomic since the access to this is always + * guarded by the read or write lock. That is, it is serialized. + */ + public void incMissed() { + this.missedCount.incrementAndGet(); + } + + /** + * Returns the value of the missed count. + * + * @return int + */ + public long getMissedCount() { + return this.missedCount.get(); + } + + @Override + public String getAddressString() { + return getAddress().toString(); + } + + public void zeroMissedCount() { + this.missedCount.set(0); + } + + /** + * Returns the InetAddress of the endPoint. + * + * @return - EndPoint. + */ + public InetSocketAddress getAddress() { + return this.address; + } + + /** + * Returns real RPC endPoint. + * + * @return rpc client. + */ + public StorageContainerDatanodeProtocolClientSideTranslatorPB + getEndPoint() { + return endPoint; + } + + /** + * Returns the string that represents this endpoint. + * + * @return - String + */ + public String toString() { + return address.toString(); + } + + /** + * Logs exception if needed. + * @param ex - Exception + */ + public void logIfNeeded(Exception ex) { + LOG.trace("Incrementing the Missed count. Ex : {}", ex); + this.incMissed(); + if (this.getMissedCount() % getLogWarnInterval(conf) == + 0) { + LOG.warn("Unable to communicate to SCM server at {}. We have not been " + + "able to communicate to this SCM server for past {} seconds.", + this.getAddress().getHostString() + ":" + this.getAddress().getPort(), + this.getMissedCount() * getScmHeartbeatInterval( + this.conf)); + } + } + + + /** + * States that an Endpoint can be in. + * <p> + * This is a sorted list of states that EndPoint will traverse. + * <p> + * GetNextState will move this enum from getInitState to getLastState. + */ + public enum EndPointStates { + GETVERSION(1), + REGISTER(2), + HEARTBEAT(3), + SHUTDOWN(4); // if you add value after this please edit getLastState too. + private final int value; + + /** + * Constructs endPointStates. + * + * @param value state. + */ + EndPointStates(int value) { + this.value = value; + } + + /** + * Returns the first State. + * + * @return First State. + */ + public static EndPointStates getInitState() { + return GETVERSION; + } + + /** + * The last state of endpoint states. + * + * @return last state. + */ + public static EndPointStates getLastState() { + return SHUTDOWN; + } + + /** + * returns the numeric value associated with the endPoint. + * + * @return int. + */ + public int getValue() { + return value; + } + + /** + * Returns the next logical state that endPoint should move to. + * The next state is computed by adding 1 to the current state. + * + * @return NextState. + */ + public EndPointStates getNextState() { + if (this.getValue() < getLastState().getValue()) { + int stateValue = this.getValue() + 1; + for (EndPointStates iter : values()) { + if (stateValue == iter.getValue()) { + return iter; + } + } + } + return getLastState(); + } + } + + public long getLastSuccessfulHeartbeat() { + return lastSuccessfulHeartbeat == null ? + 0 : + lastSuccessfulHeartbeat.toEpochSecond(); + } + + public void setLastSuccessfulHeartbeat( + ZonedDateTime lastSuccessfulHeartbeat) { + this.lastSuccessfulHeartbeat = lastSuccessfulHeartbeat; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java new file mode 100644 index 0000000..4f64bde --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + + +/** + * JMX representation of an EndpointStateMachine. + */ +public interface EndpointStateMachineMBean { + + long getMissedCount(); + + String getAddressString(); + + EndpointStateMachine.EndPointStates getState(); + + int getVersionNumber(); + + long getLastSuccessfulHeartbeat(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java new file mode 100644 index 0000000..19722f0 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.ObjectName; +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.hadoop.hdds.scm.HddsServerUtil + .getScmRpcTimeOutInMilliseconds; + +/** + * SCMConnectionManager - Acts as a class that manages the membership + * information of the SCMs that we are working with. + */ +public class SCMConnectionManager + implements Closeable, SCMConnectionManagerMXBean { + private static final Logger LOG = + LoggerFactory.getLogger(SCMConnectionManager.class); + + private final ReadWriteLock mapLock; + private final Map<InetSocketAddress, EndpointStateMachine> scmMachines; + + private final int rpcTimeout; + private final Configuration conf; + private final ObjectName jmxBean; + + public SCMConnectionManager(Configuration conf) { + this.mapLock = new ReentrantReadWriteLock(); + Long timeOut = getScmRpcTimeOutInMilliseconds(conf); + this.rpcTimeout = timeOut.intValue(); + this.scmMachines = new HashMap<>(); + this.conf = conf; + jmxBean = MBeans.register("OzoneDataNode", + "SCMConnectionManager", + this); + } + + + /** + * Returns Config. + * + * @return ozoneConfig. + */ + public Configuration getConf() { + return conf; + } + + /** + * Get RpcTimeout. + * + * @return - Return RPC timeout. + */ + public int getRpcTimeout() { + return rpcTimeout; + } + + + /** + * Takes a read lock. + */ + public void readLock() { + this.mapLock.readLock().lock(); + } + + /** + * Releases the read lock. + */ + public void readUnlock() { + this.mapLock.readLock().unlock(); + } + + /** + * Takes the write lock. + */ + public void writeLock() { + this.mapLock.writeLock().lock(); + } + + /** + * Releases the write lock. + */ + public void writeUnlock() { + this.mapLock.writeLock().unlock(); + } + + /** + * adds a new SCM machine to the target set. + * + * @param address - Address of the SCM machine to send heatbeat to. + * @throws IOException + */ + public void addSCMServer(InetSocketAddress address) throws IOException { + writeLock(); + try { + if (scmMachines.containsKey(address)) { + LOG.warn("Trying to add an existing SCM Machine to Machines group. " + + "Ignoring the request."); + return; + } + RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + long version = + RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class); + + StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy( + StorageContainerDatanodeProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), getRpcTimeout()); + + StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient = + new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy); + + EndpointStateMachine endPoint = + new EndpointStateMachine(address, rpcClient, conf); + scmMachines.put(address, endPoint); + } finally { + writeUnlock(); + } + } + + /** + * Removes a SCM machine for the target set. + * + * @param address - Address of the SCM machine to send heatbeat to. + * @throws IOException + */ + public void removeSCMServer(InetSocketAddress address) throws IOException { + writeLock(); + try { + if (!scmMachines.containsKey(address)) { + LOG.warn("Trying to remove a non-existent SCM machine. " + + "Ignoring the request."); + return; + } + + EndpointStateMachine endPoint = scmMachines.get(address); + endPoint.close(); + scmMachines.remove(address); + } finally { + writeUnlock(); + } + } + + /** + * Returns all known RPCEndpoints. + * + * @return - List of RPC Endpoints. + */ + public Collection<EndpointStateMachine> getValues() { + return scmMachines.values(); + } + + @Override + public void close() throws IOException { + getValues().forEach(endpointStateMachine + -> IOUtils.cleanupWithLogger(LOG, endpointStateMachine)); + MBeans.unregister(jmxBean); + } + + @Override + public List<EndpointStateMachineMBean> getSCMServers() { + readLock(); + try { + return Collections + .unmodifiableList(new ArrayList<>(scmMachines.values())); + + } finally { + readUnlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java new file mode 100644 index 0000000..25ef163 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import java.util.List; + +/** + * JMX information about the connected SCM servers. + */ +public interface SCMConnectionManagerMXBean { + + List<EndpointStateMachineMBean> getSCMServers(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java new file mode 100644 index 0000000..55476fd --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode + .InitDatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode + .RunningDatanodeState; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState.states + .noContainerReports; +import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; + +/** + * Current Context of State Machine. + */ +public class StateContext { + static final Logger LOG = + LoggerFactory.getLogger(StateContext.class); + private final Queue<SCMCommand> commandQueue; + private final Lock lock; + private final DatanodeStateMachine parent; + private final AtomicLong stateExecutionCount; + private final Configuration conf; + private DatanodeStateMachine.DatanodeStates state; + private SCMNodeReport nrState; + private ReportState reportState; + private static final ReportState DEFAULT_REPORT_STATE = + ReportState.newBuilder().setState(noContainerReports).setCount(0).build(); + + /** + * Constructs a StateContext. + * + * @param conf - Configration + * @param state - State + * @param parent Parent State Machine + */ + public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates + state, DatanodeStateMachine parent) { + this.conf = conf; + this.state = state; + this.parent = parent; + commandQueue = new LinkedList<>(); + lock = new ReentrantLock(); + stateExecutionCount = new AtomicLong(0); + nrState = SCMNodeReport.getDefaultInstance(); + } + + /** + * Returns the ContainerStateMachine class that holds this state. + * + * @return ContainerStateMachine. + */ + public DatanodeStateMachine getParent() { + return parent; + } + + /** + * Get the container server port. + * @return The container server port if available, return -1 if otherwise + */ + public int getContainerPort() { + return parent == null ? + INVALID_PORT : parent.getContainer().getContainerServerPort(); + } + + /** + * Gets the Ratis Port. + * @return int , return -1 if not valid. + */ + public int getRatisPort() { + return parent == null ? + INVALID_PORT : parent.getContainer().getRatisContainerServerPort(); + } + + /** + * Returns true if we are entering a new state. + * + * @return boolean + */ + boolean isEntering() { + return stateExecutionCount.get() == 0; + } + + /** + * Returns true if we are exiting from the current state. + * + * @param newState - newState. + * @return boolean + */ + boolean isExiting(DatanodeStateMachine.DatanodeStates newState) { + boolean isExiting = state != newState && stateExecutionCount.get() > 0; + if(isExiting) { + stateExecutionCount.set(0); + } + return isExiting; + } + + /** + * Returns the current state the machine is in. + * + * @return state. + */ + public DatanodeStateMachine.DatanodeStates getState() { + return state; + } + + /** + * Sets the current state of the machine. + * + * @param state state. + */ + public void setState(DatanodeStateMachine.DatanodeStates state) { + this.state = state; + } + + /** + * Returns the node report of the datanode state context. + * @return the node report. + */ + public SCMNodeReport getNodeReport() { + return nrState; + } + + /** + * Sets the storage location report of the datanode state context. + * @param nrReport - node report + */ + public void setReportState(SCMNodeReport nrReport) { + this.nrState = nrReport; + } + + /** + * Returns the next task to get executed by the datanode state machine. + * @return A callable that will be executed by the + * {@link DatanodeStateMachine} + */ + @SuppressWarnings("unchecked") + public DatanodeState<DatanodeStateMachine.DatanodeStates> getTask() { + switch (this.state) { + case INIT: + return new InitDatanodeState(this.conf, parent.getConnectionManager(), + this); + case RUNNING: + return new RunningDatanodeState(this.conf, parent.getConnectionManager(), + this); + case SHUTDOWN: + return null; + default: + throw new IllegalArgumentException("Not Implemented yet."); + } + } + + /** + * Executes the required state function. + * + * @param service - Executor Service + * @param time - seconds to wait + * @param unit - Seconds. + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + */ + public void execute(ExecutorService service, long time, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + stateExecutionCount.incrementAndGet(); + DatanodeState<DatanodeStateMachine.DatanodeStates> task = getTask(); + if (this.isEntering()) { + task.onEnter(); + } + task.execute(service); + DatanodeStateMachine.DatanodeStates newState = task.await(time, unit); + if (this.state != newState) { + if (LOG.isDebugEnabled()) { + LOG.debug("Task {} executed, state transited from {} to {}", + task.getClass().getSimpleName(), this.state, newState); + } + if (isExiting(newState)) { + task.onExit(); + } + this.clearReportState(); + this.setState(newState); + } + } + + /** + * Returns the next command or null if it is empty. + * + * @return SCMCommand or Null. + */ + public SCMCommand getNextCommand() { + lock.lock(); + try { + return commandQueue.poll(); + } finally { + lock.unlock(); + } + } + + /** + * Adds a command to the State Machine queue. + * + * @param command - SCMCommand. + */ + public void addCommand(SCMCommand command) { + lock.lock(); + try { + commandQueue.add(command); + } finally { + lock.unlock(); + } + } + + /** + * Returns the count of the Execution. + * @return long + */ + public long getExecutionCount() { + return stateExecutionCount.get(); + } + + + /** + * Gets the ReportState. + * @return ReportState. + */ + public synchronized ReportState getContainerReportState() { + if (reportState == null) { + return DEFAULT_REPORT_STATE; + } + return reportState; + } + + /** + * Sets the ReportState. + * @param rState - ReportState. + */ + public synchronized void setContainerReportState(ReportState rState) { + this.reportState = rState; + } + + /** + * Clears report state after it has been communicated. + */ + public synchronized void clearReportState() { + if(reportState != null) { + setContainerReportState(null); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java new file mode 100644 index 0000000..ac95b2a --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java @@ -0,0 +1,239 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.background; + +import com.google.common.collect.Lists; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.BackgroundService; +import org.apache.hadoop.utils.BackgroundTask; +import org.apache.hadoop.utils.BackgroundTaskQueue; +import org.apache.hadoop.utils.BackgroundTaskResult; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; +import org.apache.hadoop.utils.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT; + +/** + * A per-datanode container block deleting service takes in charge + * of deleting staled ozone blocks. + */ +public class BlockDeletingService extends BackgroundService{ + + private static final Logger LOG = + LoggerFactory.getLogger(BlockDeletingService.class); + + private final ContainerManager containerManager; + private final Configuration conf; + + // Throttle number of blocks to delete per task, + // set to 1 for testing + private final int blockLimitPerTask; + + // Throttle the number of containers to process concurrently at a time, + private final int containerLimitPerInterval; + + // Task priority is useful when a to-delete block has weight. + private final static int TASK_PRIORITY_DEFAULT = 1; + // Core pool size for container tasks + private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10; + + public BlockDeletingService(ContainerManager containerManager, + long serviceInterval, long serviceTimeout, Configuration conf) { + super("BlockDeletingService", serviceInterval, + TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, + serviceTimeout); + this.containerManager = containerManager; + this.conf = conf; + this.blockLimitPerTask = conf.getInt( + OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, + OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT); + this.containerLimitPerInterval = conf.getInt( + OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, + OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT); + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + List<ContainerData> containers = Lists.newArrayList(); + try { + // We at most list a number of containers a time, + // in case there are too many containers and start too many workers. + // We must ensure there is no empty container in this result. + // The chosen result depends on what container deletion policy is + // configured. + containers = containerManager.chooseContainerForBlockDeletion( + containerLimitPerInterval); + LOG.info("Plan to choose {} containers for block deletion, " + + "actually returns {} valid containers.", + containerLimitPerInterval, containers.size()); + + for(ContainerData container : containers) { + BlockDeletingTask containerTask = + new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT); + queue.add(containerTask); + } + } catch (StorageContainerException e) { + LOG.warn("Failed to initiate block deleting tasks, " + + "caused by unable to get containers info. " + + "Retry in next interval. ", e); + } catch (Exception e) { + // In case listContainer call throws any uncaught RuntimeException. + if (LOG.isDebugEnabled()) { + LOG.debug("Unexpected error occurs during deleting blocks.", e); + } + } + return queue; + } + + private static class ContainerBackgroundTaskResult + implements BackgroundTaskResult { + private List<String> deletedBlockIds; + + ContainerBackgroundTaskResult() { + deletedBlockIds = new LinkedList<>(); + } + + public void addBlockId(String blockId) { + deletedBlockIds.add(blockId); + } + + public void addAll(List<String> blockIds) { + deletedBlockIds.addAll(blockIds); + } + + public List<String> getDeletedBlocks() { + return deletedBlockIds; + } + + @Override + public int getSize() { + return deletedBlockIds.size(); + } + } + + private class BlockDeletingTask + implements BackgroundTask<BackgroundTaskResult> { + + private final int priority; + private final ContainerData containerData; + + BlockDeletingTask(ContainerData containerName, int priority) { + this.priority = priority; + this.containerData = containerName; + } + + @Override + public BackgroundTaskResult call() throws Exception { + ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult(); + long startTime = Time.monotonicNow(); + // Scan container's db and get list of under deletion blocks + MetadataStore meta = KeyUtils.getDB(containerData, conf); + // # of blocks to delete is throttled + KeyPrefixFilter filter = new KeyPrefixFilter( + OzoneConsts.DELETING_KEY_PREFIX); + List<Map.Entry<byte[], byte[]>> toDeleteBlocks = + meta.getSequentialRangeKVs(null, blockLimitPerTask, filter); + if (toDeleteBlocks.isEmpty()) { + LOG.debug("No under deletion block found in container : {}", + containerData.getContainerName()); + } + + List<String> succeedBlocks = new LinkedList<>(); + LOG.debug("Container : {}, To-Delete blocks : {}", + containerData.getContainerName(), toDeleteBlocks.size()); + File dataDir = ContainerUtils.getDataDirectory(containerData).toFile(); + if (!dataDir.exists() || !dataDir.isDirectory()) { + LOG.error("Invalid container data dir {} : " + + "not exist or not a directory", dataDir.getAbsolutePath()); + return crr; + } + + toDeleteBlocks.forEach(entry -> { + String blockName = DFSUtil.bytes2String(entry.getKey()); + LOG.debug("Deleting block {}", blockName); + try { + ContainerProtos.KeyData data = + ContainerProtos.KeyData.parseFrom(entry.getValue()); + for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) { + File chunkFile = dataDir.toPath() + .resolve(chunkInfo.getChunkName()).toFile(); + if (FileUtils.deleteQuietly(chunkFile)) { + LOG.debug("block {} chunk {} deleted", blockName, + chunkFile.getAbsolutePath()); + } + } + succeedBlocks.add(blockName); + } catch (InvalidProtocolBufferException e) { + LOG.error("Failed to parse block info for block {}", blockName, e); + } + }); + + // Once files are deleted ... clean up DB + BatchOperation batch = new BatchOperation(); + succeedBlocks.forEach(entry -> + batch.delete(DFSUtil.string2Bytes(entry))); + meta.writeBatch(batch); + // update count of pending deletion blocks in in-memory container status + containerManager.decrPendingDeletionBlocks(succeedBlocks.size(), + containerData.getContainerName()); + + if (!succeedBlocks.isEmpty()) { + LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms", + containerData.getContainerName(), succeedBlocks.size(), + Time.monotonicNow() - startTime); + } + crr.addAll(succeedBlocks); + return crr; + } + + @Override + public int getPriority() { + return priority; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java new file mode 100644 index 0000000..a9e202e --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.background; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java new file mode 100644 index 0000000..f7b49b7 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Container Report handler. + */ +public class CloseContainerHandler implements CommandHandler { + static final Logger LOG = + LoggerFactory.getLogger(CloseContainerHandler.class); + private int invocationCount; + private long totalTime; + + /** + * Constructs a ContainerReport handler. + */ + public CloseContainerHandler() { + } + + /** + * Handles a given SCM command. + * + * @param command - SCM Command + * @param container - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + @Override + public void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + LOG.debug("Processing Close Container command."); + invocationCount++; + long startTime = Time.monotonicNow(); + String containerName = "UNKNOWN"; + try { + + SCMCloseContainerCmdResponseProto + closeContainerProto = + SCMCloseContainerCmdResponseProto + .parseFrom(command.getProtoBufMessage()); + containerName = closeContainerProto.getContainerName(); + + container.getContainerManager().closeContainer(containerName); + + } catch (Exception e) { + LOG.error("Can't close container " + containerName, e); + } finally { + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } + } + + /** + * Returns the command type that this command handler handles. + * + * @return Type + */ + @Override + public SCMCmdType getCommandType() { + return SCMCmdType.closeContainerCommand; + } + + /** + * Returns number of times this handler has been invoked. + * + * @return int + */ + @Override + public int getInvocationCount() { + return invocationCount; + } + + /** + * Returns the average time this function takes to run. + * + * @return long + */ + @Override + public long getAverageRunTime() { + if (invocationCount > 0) { + return totalTime / invocationCount; + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java new file mode 100644 index 0000000..40feca3 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Dispatches command to the correct handler. + */ +public final class CommandDispatcher { + static final Logger LOG = + LoggerFactory.getLogger(CommandDispatcher.class); + private final StateContext context; + private final Map<SCMCmdType, CommandHandler> handlerMap; + private final OzoneContainer container; + private final SCMConnectionManager connectionManager; + + /** + * Constructs a command Dispatcher. + * @param context - Context. + */ + /** + * Constructs a command dispatcher. + * + * @param container - Ozone Container + * @param context - Context + * @param handlers - Set of handlers. + */ + private CommandDispatcher(OzoneContainer container, SCMConnectionManager + connectionManager, StateContext context, + CommandHandler... handlers) { + Preconditions.checkNotNull(context); + Preconditions.checkNotNull(handlers); + Preconditions.checkArgument(handlers.length > 0); + Preconditions.checkNotNull(container); + Preconditions.checkNotNull(connectionManager); + this.context = context; + this.container = container; + this.connectionManager = connectionManager; + handlerMap = new HashMap<>(); + for (CommandHandler h : handlers) { + if(handlerMap.containsKey(h.getCommandType())){ + LOG.error("Duplicate handler for the same command. Exiting. Handle " + + "key : { }", h.getCommandType().getDescriptorForType().getName()); + throw new IllegalArgumentException("Duplicate handler for the same " + + "command."); + } + handlerMap.put(h.getCommandType(), h); + } + } + + /** + * Dispatch the command to the correct handler. + * + * @param command - SCM Command. + */ + public void handle(SCMCommand command) { + Preconditions.checkNotNull(command); + CommandHandler handler = handlerMap.get(command.getType()); + if (handler != null) { + handler.handle(command, container, context, connectionManager); + } else { + LOG.error("Unknown SCM Command queued. There is no handler for this " + + "command. Command: {}", command.getType().getDescriptorForType() + .getName()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Helper class to construct command dispatcher. + */ + public static class Builder { + private final List<CommandHandler> handlerList; + private OzoneContainer container; + private StateContext context; + private SCMConnectionManager connectionManager; + + public Builder() { + handlerList = new LinkedList<>(); + } + + /** + * Adds a handler. + * + * @param handler - handler + * @return Builder + */ + public Builder addHandler(CommandHandler handler) { + Preconditions.checkNotNull(handler); + handlerList.add(handler); + return this; + } + + /** + * Add the OzoneContainer. + * + * @param ozoneContainer - ozone container. + * @return Builder + */ + public Builder setContainer(OzoneContainer ozoneContainer) { + Preconditions.checkNotNull(ozoneContainer); + this.container = ozoneContainer; + return this; + } + + /** + * Set the Connection Manager. + * + * @param scmConnectionManager + * @return this + */ + public Builder setConnectionManager(SCMConnectionManager + scmConnectionManager) { + Preconditions.checkNotNull(scmConnectionManager); + this.connectionManager = scmConnectionManager; + return this; + } + + /** + * Sets the Context. + * + * @param stateContext - StateContext + * @return this + */ + public Builder setContext(StateContext stateContext) { + Preconditions.checkNotNull(stateContext); + this.context = stateContext; + return this; + } + + /** + * Builds a command Dispatcher. + * @return Command Dispatcher. + */ + public CommandDispatcher build() { + Preconditions.checkNotNull(this.connectionManager, "Missing connection" + + " manager."); + Preconditions.checkNotNull(this.container, "Missing container."); + Preconditions.checkNotNull(this.context, "Missing context."); + Preconditions.checkArgument(this.handlerList.size() > 0); + return new CommandDispatcher(this.container, this.connectionManager, + this.context, handlerList.toArray( + new CommandHandler[handlerList.size()])); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java new file mode 100644 index 0000000..13d9f72 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; + +/** + * Generic interface for handlers. + */ +public interface CommandHandler { + + /** + * Handles a given SCM command. + * @param command - SCM Command + * @param container - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager); + + /** + * Returns the command type that this command handler handles. + * @return Type + */ + SCMCmdType getCommandType(); + + /** + * Returns number of times this handler has been invoked. + * @return int + */ + int getInvocationCount(); + + /** + * Returns the average time this function takes to run. + * @return long + */ + long getAverageRunTime(); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java new file mode 100644 index 0000000..ba6b418 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Container Report handler. + */ +public class ContainerReportHandler implements CommandHandler { + static final Logger LOG = + LoggerFactory.getLogger(ContainerReportHandler.class); + private int invocationCount; + private long totalTime; + + /** + * Constructs a ContainerReport handler. + */ + public ContainerReportHandler() { + } + + /** + * Handles a given SCM command. + * + * @param command - SCM Command + * @param container - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + @Override + public void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + LOG.debug("Processing Container Report."); + invocationCount++; + long startTime = Time.monotonicNow(); + try { + ContainerReportsRequestProto contianerReport = + container.getContainerReport(); + + // TODO : We send this report to all SCMs.Check if it is enough only to + // send to the leader once we have RAFT enabled SCMs. + for (EndpointStateMachine endPoint : connectionManager.getValues()) { + endPoint.getEndPoint().sendContainerReport(contianerReport); + } + } catch (IOException ex) { + LOG.error("Unable to process the Container Report command.", ex); + } finally { + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } + } + + /** + * Returns the command type that this command handler handles. + * + * @return Type + */ + @Override + public SCMCmdType getCommandType() { + return SCMCmdType.sendContainerReport; + } + + /** + * Returns number of times this handler has been invoked. + * + * @return int + */ + @Override + public int getInvocationCount() { + return invocationCount; + } + + /** + * Returns the average time this function takes to run. + * + * @return long + */ + @Override + public long getAverageRunTime() { + if (invocationCount > 0) { + return totalTime / invocationCount; + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java new file mode 100644 index 0000000..f106e3d --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto + .DeleteBlockTransactionResult; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers + .DeletedContainerBlocksSummary; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * Handle block deletion commands. + */ +public class DeleteBlocksCommandHandler implements CommandHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(DeleteBlocksCommandHandler.class); + + private ContainerManager containerManager; + private Configuration conf; + private int invocationCount; + private long totalTime; + + public DeleteBlocksCommandHandler(ContainerManager containerManager, + Configuration conf) { + this.containerManager = containerManager; + this.conf = conf; + } + + @Override + public void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + if (command.getType() != SCMCmdType.deleteBlocksCommand) { + LOG.warn("Skipping handling command, expected command " + + "type {} but found {}", + SCMCmdType.deleteBlocksCommand, command.getType()); + return; + } + LOG.debug("Processing block deletion command."); + invocationCount++; + long startTime = Time.monotonicNow(); + + // move blocks to deleting state. + // this is a metadata update, the actual deletion happens in another + // recycling thread. + DeleteBlocksCommand cmd = (DeleteBlocksCommand) command; + List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted(); + + + DeletedContainerBlocksSummary summary = + DeletedContainerBlocksSummary.getFrom(containerBlocks); + LOG.info("Start to delete container blocks, TXIDs={}, " + + "numOfContainers={}, numOfBlocks={}", + summary.getTxIDSummary(), + summary.getNumOfContainers(), + summary.getNumOfBlocks()); + + ContainerBlocksDeletionACKProto.Builder resultBuilder = + ContainerBlocksDeletionACKProto.newBuilder(); + containerBlocks.forEach(entry -> { + DeleteBlockTransactionResult.Builder txResultBuilder = + DeleteBlockTransactionResult.newBuilder(); + txResultBuilder.setTxID(entry.getTxID()); + try { + deleteContainerBlocks(entry, conf); + txResultBuilder.setSuccess(true); + } catch (IOException e) { + LOG.warn("Failed to delete blocks for container={}, TXID={}", + entry.getContainerName(), entry.getTxID(), e); + txResultBuilder.setSuccess(false); + } + resultBuilder.addResults(txResultBuilder.build()); + }); + ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build(); + + // Send ACK back to SCM as long as meta updated + // TODO Or we should wait until the blocks are actually deleted? + if (!containerBlocks.isEmpty()) { + for (EndpointStateMachine endPoint : connectionManager.getValues()) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending following block deletion ACK to SCM"); + for (DeleteBlockTransactionResult result : + blockDeletionACK.getResultsList()) { + LOG.debug(result.getTxID() + " : " + result.getSuccess()); + } + } + endPoint.getEndPoint() + .sendContainerBlocksDeletionACK(blockDeletionACK); + } catch (IOException e) { + LOG.error("Unable to send block deletion ACK to SCM {}", + endPoint.getAddress().toString(), e); + } + } + } + + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } + + /** + * Move a bunch of blocks from a container to deleting state. + * This is a meta update, the actual deletes happen in async mode. + * + * @param delTX a block deletion transaction. + * @param config configuration. + * @throws IOException if I/O error occurs. + */ + private void deleteContainerBlocks(DeletedBlocksTransaction delTX, + Configuration config) throws IOException { + String containerId = delTX.getContainerName(); + ContainerData containerInfo = containerManager.readContainer(containerId); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing Container : {}, DB path : {}", containerId, + containerInfo.getDBPath()); + } + + int newDeletionBlocks = 0; + MetadataStore containerDB = KeyUtils.getDB(containerInfo, config); + for (String blk : delTX.getBlockIDList()) { + BatchOperation batch = new BatchOperation(); + byte[] blkBytes = DFSUtil.string2Bytes(blk); + byte[] blkInfo = containerDB.get(blkBytes); + if (blkInfo != null) { + // Found the block in container db, + // use an atomic update to change its state to deleting. + batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk), + blkInfo); + batch.delete(blkBytes); + try { + containerDB.writeBatch(batch); + newDeletionBlocks++; + LOG.debug("Transited Block {} to DELETING state in container {}", + blk, containerId); + } catch (IOException e) { + // if some blocks failed to delete, we fail this TX, + // without sending this ACK to SCM, SCM will resend the TX + // with a certain number of retries. + throw new IOException( + "Failed to delete blocks for TXID = " + delTX.getTxID(), e); + } + } else { + LOG.debug("Block {} not found or already under deletion in" + + " container {}, skip deleting it.", blk, containerId); + } + } + + // update pending deletion blocks count in in-memory container status + containerManager.incrPendingDeletionBlocks(newDeletionBlocks, containerId); + } + + @Override + public SCMCmdType getCommandType() { + return SCMCmdType.deleteBlocksCommand; + } + + @Override + public int getInvocationCount() { + return this.invocationCount; + } + + @Override + public long getAverageRunTime() { + if (invocationCount > 0) { + return totalTime / invocationCount; + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java new file mode 100644 index 0000000..1e9c8dc --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java new file mode 100644 index 0000000..feb2f81 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; +/** + + State machine class is used by the container to denote various states a + container can be in and also is used for command processing. + + Container has the following states. + + Start - > getVersion -> Register -> Running -> Shutdown + + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java new file mode 100644 index 0000000..75142af --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.states; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * State Interface that allows tasks to maintain states. + */ +public interface DatanodeState<T> { + /** + * Called before entering this state. + */ + void onEnter(); + + /** + * Called After exiting this state. + */ + void onExit(); + + /** + * Executes one or more tasks that is needed by this state. + * + * @param executor - ExecutorService + */ + void execute(ExecutorService executor); + + /** + * Wait for execute to finish. + * + * @param time - Time + * @param timeUnit - Unit of time. + */ + T await(long time, TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException; + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
