http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java deleted file mode 100644 index 994b245..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ /dev/null @@ -1,281 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; -import org.apache.hadoop.ozone.container.common.states.DatanodeState; -import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedList; -import java.util.Queue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; -import static org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.ReportState.states - .noContainerReports; - -/** - * Current Context of State Machine. - */ -public class StateContext { - static final Logger LOG = - LoggerFactory.getLogger(StateContext.class); - private final Queue<SCMCommand> commandQueue; - private final Lock lock; - private final DatanodeStateMachine parent; - private final AtomicLong stateExecutionCount; - private final Configuration conf; - private DatanodeStateMachine.DatanodeStates state; - private SCMNodeReport nrState; - private ReportState reportState; - private static final ReportState DEFAULT_REPORT_STATE = - ReportState.newBuilder().setState(noContainerReports).setCount(0).build(); - - /** - * Constructs a StateContext. - * - * @param conf - Configration - * @param state - State - * @param parent Parent State Machine - */ - public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates - state, DatanodeStateMachine parent) { - this.conf = conf; - this.state = state; - this.parent = parent; - commandQueue = new LinkedList<>(); - lock = new ReentrantLock(); - stateExecutionCount = new AtomicLong(0); - nrState = SCMNodeReport.getDefaultInstance(); - } - - /** - * Returns the ContainerStateMachine class that holds this state. - * - * @return ContainerStateMachine. - */ - public DatanodeStateMachine getParent() { - return parent; - } - - /** - * Get the container server port. - * @return The container server port if available, return -1 if otherwise - */ - public int getContainerPort() { - return parent == null ? - INVALID_PORT : parent.getContainer().getContainerServerPort(); - } - - /** - * Gets the Ratis Port. - * @return int , return -1 if not valid. - */ - public int getRatisPort() { - return parent == null ? - INVALID_PORT : parent.getContainer().getRatisContainerServerPort(); - } - - /** - * Returns true if we are entering a new state. - * - * @return boolean - */ - boolean isEntering() { - return stateExecutionCount.get() == 0; - } - - /** - * Returns true if we are exiting from the current state. - * - * @param newState - newState. - * @return boolean - */ - boolean isExiting(DatanodeStateMachine.DatanodeStates newState) { - boolean isExiting = state != newState && stateExecutionCount.get() > 0; - if(isExiting) { - stateExecutionCount.set(0); - } - return isExiting; - } - - /** - * Returns the current state the machine is in. - * - * @return state. - */ - public DatanodeStateMachine.DatanodeStates getState() { - return state; - } - - /** - * Sets the current state of the machine. - * - * @param state state. - */ - public void setState(DatanodeStateMachine.DatanodeStates state) { - this.state = state; - } - - /** - * Returns the node report of the datanode state context. - * @return the node report. - */ - public SCMNodeReport getNodeReport() { - return nrState; - } - - /** - * Sets the storage location report of the datanode state context. - * @param nrReport - node report - */ - public void setReportState(SCMNodeReport nrReport) { - this.nrState = nrReport; - } - - /** - * Returns the next task to get executed by the datanode state machine. - * @return A callable that will be executed by the - * {@link DatanodeStateMachine} - */ - @SuppressWarnings("unchecked") - public DatanodeState<DatanodeStateMachine.DatanodeStates> getTask() { - switch (this.state) { - case INIT: - return new InitDatanodeState(this.conf, parent.getConnectionManager(), - this); - case RUNNING: - return new RunningDatanodeState(this.conf, parent.getConnectionManager(), - this); - case SHUTDOWN: - return null; - default: - throw new IllegalArgumentException("Not Implemented yet."); - } - } - - /** - * Executes the required state function. - * - * @param service - Executor Service - * @param time - seconds to wait - * @param unit - Seconds. - * @throws InterruptedException - * @throws ExecutionException - * @throws TimeoutException - */ - public void execute(ExecutorService service, long time, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - stateExecutionCount.incrementAndGet(); - DatanodeState<DatanodeStateMachine.DatanodeStates> task = getTask(); - if (this.isEntering()) { - task.onEnter(); - } - task.execute(service); - DatanodeStateMachine.DatanodeStates newState = task.await(time, unit); - if (this.state != newState) { - if (LOG.isDebugEnabled()) { - LOG.debug("Task {} executed, state transited from {} to {}", - task.getClass().getSimpleName(), this.state, newState); - } - if (isExiting(newState)) { - task.onExit(); - } - this.clearReportState(); - this.setState(newState); - } - } - - /** - * Returns the next command or null if it is empty. - * - * @return SCMCommand or Null. - */ - public SCMCommand getNextCommand() { - lock.lock(); - try { - return commandQueue.poll(); - } finally { - lock.unlock(); - } - } - - /** - * Adds a command to the State Machine queue. - * - * @param command - SCMCommand. - */ - public void addCommand(SCMCommand command) { - lock.lock(); - try { - commandQueue.add(command); - } finally { - lock.unlock(); - } - } - - /** - * Returns the count of the Execution. - * @return long - */ - public long getExecutionCount() { - return stateExecutionCount.get(); - } - - - /** - * Gets the ReportState. - * @return ReportState. - */ - public synchronized ReportState getContainerReportState() { - if (reportState == null) { - return DEFAULT_REPORT_STATE; - } - return reportState; - } - - /** - * Sets the ReportState. - * @param rState - ReportState. - */ - public synchronized void setContainerReportState(ReportState rState) { - this.reportState = rState; - } - - /** - * Clears report state after it has been communicated. - */ - public synchronized void clearReportState() { - if(reportState != null) { - setContainerReportState(null); - } - } - -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java deleted file mode 100644 index 80844cf..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.container.common.statemachine.background; - -import com.google.common.collect.Lists; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; -import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BackgroundService; -import org.apache.hadoop.utils.BackgroundTaskResult; -import org.apache.hadoop.utils.BackgroundTaskQueue; -import org.apache.hadoop.utils.BackgroundTask; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; -import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.List; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT; - -/** - * A per-datanode container block deleting service takes in charge - * of deleting staled ozone blocks. - */ -public class BlockDeletingService extends BackgroundService{ - - private static final Logger LOG = - LoggerFactory.getLogger(BlockDeletingService.class); - - private final ContainerManager containerManager; - private final Configuration conf; - - // Throttle number of blocks to delete per task, - // set to 1 for testing - private final int blockLimitPerTask; - - // Throttle the number of containers to process concurrently at a time, - private final int containerLimitPerInterval; - - // Task priority is useful when a to-delete block has weight. - private final static int TASK_PRIORITY_DEFAULT = 1; - // Core pool size for container tasks - private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10; - - public BlockDeletingService(ContainerManager containerManager, - long serviceInterval, long serviceTimeout, Configuration conf) { - super("BlockDeletingService", serviceInterval, - TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, - serviceTimeout); - this.containerManager = containerManager; - this.conf = conf; - this.blockLimitPerTask = conf.getInt( - OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, - OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT); - this.containerLimitPerInterval = conf.getInt( - OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, - OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT); - } - - @Override - public BackgroundTaskQueue getTasks() { - BackgroundTaskQueue queue = new BackgroundTaskQueue(); - List<ContainerData> containers = Lists.newArrayList(); - try { - // We at most list a number of containers a time, - // in case there are too many containers and start too many workers. - // We must ensure there is no empty container in this result. - // The chosen result depends on what container deletion policy is - // configured. - containers = containerManager.chooseContainerForBlockDeletion( - containerLimitPerInterval); - LOG.info("Plan to choose {} containers for block deletion, " - + "actually returns {} valid containers.", - containerLimitPerInterval, containers.size()); - - for(ContainerData container : containers) { - BlockDeletingTask containerTask = - new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT); - queue.add(containerTask); - } - } catch (StorageContainerException e) { - LOG.warn("Failed to initiate block deleting tasks, " - + "caused by unable to get containers info. " - + "Retry in next interval. ", e); - } catch (Exception e) { - // In case listContainer call throws any uncaught RuntimeException. - if (LOG.isDebugEnabled()) { - LOG.debug("Unexpected error occurs during deleting blocks.", e); - } - } - return queue; - } - - private static class ContainerBackgroundTaskResult - implements BackgroundTaskResult { - private List<String> deletedBlockIds; - - ContainerBackgroundTaskResult() { - deletedBlockIds = new LinkedList<>(); - } - - public void addBlockId(String blockId) { - deletedBlockIds.add(blockId); - } - - public void addAll(List<String> blockIds) { - deletedBlockIds.addAll(blockIds); - } - - public List<String> getDeletedBlocks() { - return deletedBlockIds; - } - - @Override - public int getSize() { - return deletedBlockIds.size(); - } - } - - private class BlockDeletingTask - implements BackgroundTask<BackgroundTaskResult> { - - private final int priority; - private final ContainerData containerData; - - BlockDeletingTask(ContainerData containerName, int priority) { - this.priority = priority; - this.containerData = containerName; - } - - @Override - public BackgroundTaskResult call() throws Exception { - ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult(); - long startTime = Time.monotonicNow(); - // Scan container's db and get list of under deletion blocks - MetadataStore meta = KeyUtils.getDB(containerData, conf); - // # of blocks to delete is throttled - KeyPrefixFilter filter = new KeyPrefixFilter( - OzoneConsts.DELETING_KEY_PREFIX); - List<Map.Entry<byte[], byte[]>> toDeleteBlocks = - meta.getSequentialRangeKVs(null, blockLimitPerTask, filter); - if (toDeleteBlocks.isEmpty()) { - LOG.debug("No under deletion block found in container : {}", - containerData.getContainerName()); - } - - List<String> succeedBlocks = new LinkedList<>(); - LOG.debug("Container : {}, To-Delete blocks : {}", - containerData.getContainerName(), toDeleteBlocks.size()); - File dataDir = ContainerUtils.getDataDirectory(containerData).toFile(); - if (!dataDir.exists() || !dataDir.isDirectory()) { - LOG.error("Invalid container data dir {} : " - + "not exist or not a directory", dataDir.getAbsolutePath()); - return crr; - } - - toDeleteBlocks.forEach(entry -> { - String blockName = DFSUtil.bytes2String(entry.getKey()); - LOG.debug("Deleting block {}", blockName); - try { - ContainerProtos.KeyData data = - ContainerProtos.KeyData.parseFrom(entry.getValue()); - for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) { - File chunkFile = dataDir.toPath() - .resolve(chunkInfo.getChunkName()).toFile(); - if (FileUtils.deleteQuietly(chunkFile)) { - LOG.debug("block {} chunk {} deleted", blockName, - chunkFile.getAbsolutePath()); - } - } - succeedBlocks.add(blockName); - } catch (InvalidProtocolBufferException e) { - LOG.error("Failed to parse block info for block {}", blockName, e); - } - }); - - // Once files are deleted ... clean up DB - BatchOperation batch = new BatchOperation(); - succeedBlocks.forEach(entry -> - batch.delete(DFSUtil.string2Bytes(entry))); - meta.writeBatch(batch); - // update count of pending deletion blocks in in-memory container status - containerManager.decrPendingDeletionBlocks(succeedBlocks.size(), - containerData.getContainerName()); - - if (!succeedBlocks.isEmpty()) { - LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms", - containerData.getContainerName(), succeedBlocks.size(), - Time.monotonicNow() - startTime); - } - crr.addAll(succeedBlocks); - return crr; - } - - @Override - public int getPriority() { - return priority; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java deleted file mode 100644 index a9e202e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.background; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java deleted file mode 100644 index f5fbcd7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.Type; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Container Report handler. - */ -public class CloseContainerHandler implements CommandHandler { - static final Logger LOG = - LoggerFactory.getLogger(CloseContainerHandler.class); - private int invocationCount; - private long totalTime; - - /** - * Constructs a ContainerReport handler. - */ - public CloseContainerHandler() { - } - - /** - * Handles a given SCM command. - * - * @param command - SCM Command - * @param container - Ozone Container. - * @param context - Current Context. - * @param connectionManager - The SCMs that we are talking to. - */ - @Override - public void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager) { - LOG.debug("Processing Close Container command."); - invocationCount++; - long startTime = Time.monotonicNow(); - String containerName = "UNKNOWN"; - try { - - SCMCloseContainerCmdResponseProto - closeContainerProto = - SCMCloseContainerCmdResponseProto - .parseFrom(command.getProtoBufMessage()); - containerName = closeContainerProto.getContainerName(); - - container.getContainerManager().closeContainer(containerName); - - } catch (Exception e) { - LOG.error("Can't close container " + containerName, e); - } finally { - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; - } - } - - /** - * Returns the command type that this command handler handles. - * - * @return Type - */ - @Override - public Type getCommandType() { - return Type.closeContainerCommand; - } - - /** - * Returns number of times this handler has been invoked. - * - * @return int - */ - @Override - public int getInvocationCount() { - return invocationCount; - } - - /** - * Returns the average time this function takes to run. - * - * @return long - */ - @Override - public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java deleted file mode 100644 index 6670ea9..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/** - * Dispatches command to the correct handler. - */ -public final class CommandDispatcher { - static final Logger LOG = - LoggerFactory.getLogger(CommandDispatcher.class); - private final StateContext context; - private final Map<Type, CommandHandler> handlerMap; - private final OzoneContainer container; - private final SCMConnectionManager connectionManager; - - /** - * Constructs a command Dispatcher. - * @param context - Context. - */ - /** - * Constructs a command dispatcher. - * - * @param container - Ozone Container - * @param context - Context - * @param handlers - Set of handlers. - */ - private CommandDispatcher(OzoneContainer container, SCMConnectionManager - connectionManager, StateContext context, - CommandHandler... handlers) { - Preconditions.checkNotNull(context); - Preconditions.checkNotNull(handlers); - Preconditions.checkArgument(handlers.length > 0); - Preconditions.checkNotNull(container); - Preconditions.checkNotNull(connectionManager); - this.context = context; - this.container = container; - this.connectionManager = connectionManager; - handlerMap = new HashMap<>(); - for (CommandHandler h : handlers) { - if(handlerMap.containsKey(h.getCommandType())){ - LOG.error("Duplicate handler for the same command. Exiting. Handle " + - "key : { }", h.getCommandType().getDescriptorForType().getName()); - throw new IllegalArgumentException("Duplicate handler for the same " + - "command."); - } - handlerMap.put(h.getCommandType(), h); - } - } - - /** - * Dispatch the command to the correct handler. - * - * @param command - SCM Command. - */ - public void handle(SCMCommand command) { - Preconditions.checkNotNull(command); - CommandHandler handler = handlerMap.get(command.getType()); - if (handler != null) { - handler.handle(command, container, context, connectionManager); - } else { - LOG.error("Unknown SCM Command queued. There is no handler for this " + - "command. Command: {}", command.getType().getDescriptorForType() - .getName()); - } - } - - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Helper class to construct command dispatcher. - */ - public static class Builder { - private final List<CommandHandler> handlerList; - private OzoneContainer container; - private StateContext context; - private SCMConnectionManager connectionManager; - - public Builder() { - handlerList = new LinkedList<>(); - } - - /** - * Adds a handler. - * - * @param handler - handler - * @return Builder - */ - public Builder addHandler(CommandHandler handler) { - Preconditions.checkNotNull(handler); - handlerList.add(handler); - return this; - } - - /** - * Add the OzoneContainer. - * - * @param ozoneContainer - ozone container. - * @return Builder - */ - public Builder setContainer(OzoneContainer ozoneContainer) { - Preconditions.checkNotNull(ozoneContainer); - this.container = ozoneContainer; - return this; - } - - /** - * Set the Connection Manager. - * - * @param scmConnectionManager - * @return this - */ - public Builder setConnectionManager(SCMConnectionManager - scmConnectionManager) { - Preconditions.checkNotNull(scmConnectionManager); - this.connectionManager = scmConnectionManager; - return this; - } - - /** - * Sets the Context. - * - * @param stateContext - StateContext - * @return this - */ - public Builder setContext(StateContext stateContext) { - Preconditions.checkNotNull(stateContext); - this.context = stateContext; - return this; - } - - /** - * Builds a command Dispatcher. - * @return Command Dispatcher. - */ - public CommandDispatcher build() { - Preconditions.checkNotNull(this.connectionManager, "Missing connection" + - " manager."); - Preconditions.checkNotNull(this.container, "Missing container."); - Preconditions.checkNotNull(this.context, "Missing context."); - Preconditions.checkArgument(this.handlerList.size() > 0); - return new CommandDispatcher(this.container, this.connectionManager, - this.context, handlerList.toArray( - new CommandHandler[handlerList.size()])); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java deleted file mode 100644 index b30d481..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; - -/** - * Generic interface for handlers. - */ -public interface CommandHandler { - - /** - * Handles a given SCM command. - * @param command - SCM Command - * @param container - Ozone Container. - * @param context - Current Context. - * @param connectionManager - The SCMs that we are talking to. - */ - void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager); - - /** - * Returns the command type that this command handler handles. - * @return Type - */ - Type getCommandType(); - - /** - * Returns number of times this handler has been invoked. - * @return int - */ - int getInvocationCount(); - - /** - * Returns the average time this function takes to run. - * @return long - */ - long getAverageRunTime(); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java deleted file mode 100644 index 57a1516..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Container Report handler. - */ -public class ContainerReportHandler implements CommandHandler { - static final Logger LOG = - LoggerFactory.getLogger(ContainerReportHandler.class); - private int invocationCount; - private long totalTime; - - /** - * Constructs a ContainerReport handler. - */ - public ContainerReportHandler() { - } - - /** - * Handles a given SCM command. - * - * @param command - SCM Command - * @param container - Ozone Container. - * @param context - Current Context. - * @param connectionManager - The SCMs that we are talking to. - */ - @Override - public void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager) { - LOG.debug("Processing Container Report."); - invocationCount++; - long startTime = Time.monotonicNow(); - try { - ContainerReportsRequestProto contianerReport = - container.getContainerReport(); - - // TODO : We send this report to all SCMs.Check if it is enough only to - // send to the leader once we have RAFT enabled SCMs. - for (EndpointStateMachine endPoint : connectionManager.getValues()) { - endPoint.getEndPoint().sendContainerReport(contianerReport); - } - } catch (IOException ex) { - LOG.error("Unable to process the Container Report command.", ex); - } finally { - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; - } - } - - /** - * Returns the command type that this command handler handles. - * - * @return Type - */ - @Override - public Type getCommandType() { - return Type.sendContainerReport; - } - - /** - * Returns number of times this handler has been invoked. - * - * @return int - */ - @Override - public int getInvocationCount() { - return invocationCount; - } - - /** - * Returns the average time this function takes to run. - * - * @return long - */ - @Override - public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java deleted file mode 100644 index 1c859ea..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.ozone.container.common.helpers.DeletedContainerBlocksSummary; -import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; -import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; - -/** - * Handle block deletion commands. - */ -public class DeleteBlocksCommandHandler implements CommandHandler { - - private static final Logger LOG = - LoggerFactory.getLogger(DeleteBlocksCommandHandler.class); - - private ContainerManager containerManager; - private Configuration conf; - private int invocationCount; - private long totalTime; - - public DeleteBlocksCommandHandler(ContainerManager containerManager, - Configuration conf) { - this.containerManager = containerManager; - this.conf = conf; - } - - @Override - public void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager) { - if (command.getType() != Type.deleteBlocksCommand) { - LOG.warn("Skipping handling command, expected command " - + "type {} but found {}", - Type.deleteBlocksCommand, command.getType()); - return; - } - LOG.debug("Processing block deletion command."); - invocationCount++; - long startTime = Time.monotonicNow(); - - // move blocks to deleting state. - // this is a metadata update, the actual deletion happens in another - // recycling thread. - DeleteBlocksCommand cmd = (DeleteBlocksCommand) command; - List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted(); - - - DeletedContainerBlocksSummary summary = - DeletedContainerBlocksSummary.getFrom(containerBlocks); - LOG.info("Start to delete container blocks, TXIDs={}, " - + "numOfContainers={}, numOfBlocks={}", - summary.getTxIDSummary(), - summary.getNumOfContainers(), - summary.getNumOfBlocks()); - - ContainerBlocksDeletionACKProto.Builder resultBuilder = - ContainerBlocksDeletionACKProto.newBuilder(); - containerBlocks.forEach(entry -> { - DeleteBlockTransactionResult.Builder txResultBuilder = - DeleteBlockTransactionResult.newBuilder(); - txResultBuilder.setTxID(entry.getTxID()); - try { - deleteContainerBlocks(entry, conf); - txResultBuilder.setSuccess(true); - } catch (IOException e) { - LOG.warn("Failed to delete blocks for container={}, TXID={}", - entry.getContainerName(), entry.getTxID(), e); - txResultBuilder.setSuccess(false); - } - resultBuilder.addResults(txResultBuilder.build()); - }); - ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build(); - - // Send ACK back to SCM as long as meta updated - // TODO Or we should wait until the blocks are actually deleted? - if (!containerBlocks.isEmpty()) { - for (EndpointStateMachine endPoint : connectionManager.getValues()) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Sending following block deletion ACK to SCM"); - for (DeleteBlockTransactionResult result : - blockDeletionACK.getResultsList()) { - LOG.debug(result.getTxID() + " : " + result.getSuccess()); - } - } - endPoint.getEndPoint() - .sendContainerBlocksDeletionACK(blockDeletionACK); - } catch (IOException e) { - LOG.error("Unable to send block deletion ACK to SCM {}", - endPoint.getAddress().toString(), e); - } - } - } - - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; - } - - /** - * Move a bunch of blocks from a container to deleting state. - * This is a meta update, the actual deletes happen in async mode. - * - * @param delTX a block deletion transaction. - * @param config configuration. - * @throws IOException if I/O error occurs. - */ - private void deleteContainerBlocks(DeletedBlocksTransaction delTX, - Configuration config) throws IOException { - String containerId = delTX.getContainerName(); - ContainerData containerInfo = containerManager.readContainer(containerId); - if (LOG.isDebugEnabled()) { - LOG.debug("Processing Container : {}, DB path : {}", containerId, - containerInfo.getDBPath()); - } - - int newDeletionBlocks = 0; - MetadataStore containerDB = KeyUtils.getDB(containerInfo, config); - for (String blk : delTX.getBlockIDList()) { - BatchOperation batch = new BatchOperation(); - byte[] blkBytes = DFSUtil.string2Bytes(blk); - byte[] blkInfo = containerDB.get(blkBytes); - if (blkInfo != null) { - // Found the block in container db, - // use an atomic update to change its state to deleting. - batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk), - blkInfo); - batch.delete(blkBytes); - try { - containerDB.writeBatch(batch); - newDeletionBlocks++; - LOG.debug("Transited Block {} to DELETING state in container {}", - blk, containerId); - } catch (IOException e) { - // if some blocks failed to delete, we fail this TX, - // without sending this ACK to SCM, SCM will resend the TX - // with a certain number of retries. - throw new IOException( - "Failed to delete blocks for TXID = " + delTX.getTxID(), e); - } - } else { - LOG.debug("Block {} not found or already under deletion in" - + " container {}, skip deleting it.", blk, containerId); - } - } - - // update pending deletion blocks count in in-memory container status - containerManager.incrPendingDeletionBlocks(newDeletionBlocks, containerId); - } - - @Override - public Type getCommandType() { - return Type.deleteBlocksCommand; - } - - @Override - public int getInvocationCount() { - return this.invocationCount; - } - - @Override - public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java deleted file mode 100644 index 1e9c8dc..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java deleted file mode 100644 index feb2f81..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine; -/** - - State machine class is used by the container to denote various states a - container can be in and also is used for command processing. - - Container has the following states. - - Start - > getVersion -> Register -> Running -> Shutdown - - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java deleted file mode 100644 index 75142af..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.states; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * State Interface that allows tasks to maintain states. - */ -public interface DatanodeState<T> { - /** - * Called before entering this state. - */ - void onEnter(); - - /** - * Called After exiting this state. - */ - void onExit(); - - /** - * Executes one or more tasks that is needed by this state. - * - * @param executor - ExecutorService - */ - void execute(ExecutorService executor); - - /** - * Wait for execute to finish. - * - * @param time - Time - * @param timeUnit - Unit of time. - */ - T await(long time, TimeUnit timeUnit) - throws InterruptedException, ExecutionException, TimeoutException; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java deleted file mode 100644 index 6ca94bf..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.states.datanode; - -import com.google.common.base.Strings; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.client.OzoneClientUtils; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.common.states.DatanodeState; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; -import org.apache.hadoop.scm.ScmConfigKeys; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Init Datanode State is the task that gets run when we are in Init State. - */ -public class InitDatanodeState implements DatanodeState, - Callable<DatanodeStateMachine.DatanodeStates> { - static final Logger LOG = LoggerFactory.getLogger(InitDatanodeState.class); - private final SCMConnectionManager connectionManager; - private final Configuration conf; - private final StateContext context; - private Future<DatanodeStateMachine.DatanodeStates> result; - - /** - * Create InitDatanodeState Task. - * - * @param conf - Conf - * @param connectionManager - Connection Manager - * @param context - Current Context - */ - public InitDatanodeState(Configuration conf, - SCMConnectionManager connectionManager, - StateContext context) { - this.conf = conf; - this.connectionManager = connectionManager; - this.context = context; - } - - /** - * Computes a result, or throws an exception if unable to do so. - * - * @return computed result - * @throws Exception if unable to compute a result - */ - @Override - public DatanodeStateMachine.DatanodeStates call() throws Exception { - Collection<InetSocketAddress> addresses = null; - try { - addresses = OzoneClientUtils.getSCMAddresses(conf); - } catch (IllegalArgumentException e) { - if(!Strings.isNullOrEmpty(e.getMessage())) { - LOG.error("Failed to get SCM addresses: " + e.getMessage()); - } - return DatanodeStateMachine.DatanodeStates.SHUTDOWN; - } - - if (addresses == null || addresses.isEmpty()) { - LOG.error("Null or empty SCM address list found."); - return DatanodeStateMachine.DatanodeStates.SHUTDOWN; - } else { - for (InetSocketAddress addr : addresses) { - connectionManager.addSCMServer(addr); - } - } - - // If datanode ID is set, persist it to the ID file. - persistContainerDatanodeID(); - - return this.context.getState().getNextState(); - } - - /** - * Update Ozone container port to the datanode ID, - * and persist the ID to a local file. - */ - private void persistContainerDatanodeID() throws IOException { - String dataNodeIDPath = OzoneUtils.getDatanodeIDPath(conf); - if (Strings.isNullOrEmpty(dataNodeIDPath)) { - LOG.error("A valid file path is needed for config setting {}", - ScmConfigKeys.OZONE_SCM_DATANODE_ID); - this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN); - return; - } - File idPath = new File(dataNodeIDPath); - int containerPort = this.context.getContainerPort(); - int ratisPort = this.context.getRatisPort(); - DatanodeID datanodeID = this.context.getParent().getDatanodeID(); - if (datanodeID != null) { - datanodeID.setContainerPort(containerPort); - datanodeID.setRatisPort(ratisPort); - ContainerUtils.writeDatanodeIDTo(datanodeID, idPath); - LOG.info("Datanode ID is persisted to {}", dataNodeIDPath); - } - } - - /** - * Called before entering this state. - */ - @Override - public void onEnter() { - LOG.trace("Entering init container state"); - } - - /** - * Called After exiting this state. - */ - @Override - public void onExit() { - LOG.trace("Exiting init container state"); - } - - /** - * Executes one or more tasks that is needed by this state. - * - * @param executor - ExecutorService - */ - @Override - public void execute(ExecutorService executor) { - result = executor.submit(this); - } - - /** - * Wait for execute to finish. - * - * @param time - Time - * @param timeUnit - Unit of time. - */ - @Override - public DatanodeStateMachine.DatanodeStates await(long time, - TimeUnit timeUnit) throws InterruptedException, - ExecutionException, TimeoutException { - return result.get(time, timeUnit); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java deleted file mode 100644 index fc9af5b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.states.datanode; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.common.states.DatanodeState; -import org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask; -import org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask; -import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; -import org.apache.hadoop.scm.ScmConfigKeys; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Class that implements handshake with SCM. - */ -public class RunningDatanodeState implements DatanodeState { - static final Logger - LOG = LoggerFactory.getLogger(RunningDatanodeState.class); - private final SCMConnectionManager connectionManager; - private final Configuration conf; - private final StateContext context; - private CompletionService<EndpointStateMachine.EndPointStates> ecs; - - public RunningDatanodeState(Configuration conf, - SCMConnectionManager connectionManager, - StateContext context) { - this.connectionManager = connectionManager; - this.conf = conf; - this.context = context; - } - - /** - * Reads a datanode ID from the persisted information. - * - * @param idPath - Path to the ID File. - * @return DatanodeID - * @throws IOException - */ - private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto - readPersistedDatanodeID(Path idPath) throws IOException { - Preconditions.checkNotNull(idPath); - DatanodeID datanodeID = null; - List<DatanodeID> datanodeIDs = - ContainerUtils.readDatanodeIDsFrom(idPath.toFile()); - int containerPort = this.context.getContainerPort(); - for(DatanodeID dnId : datanodeIDs) { - if(dnId.getContainerPort() == containerPort) { - datanodeID = dnId; - break; - } - } - - if (datanodeID == null) { - throw new IOException("No valid datanode ID found from " - + idPath.toFile().getAbsolutePath() - + " that matches container port " - + containerPort); - } else { - StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto - containerIDProto = - StorageContainerDatanodeProtocolProtos - .ContainerNodeIDProto - .newBuilder() - .setDatanodeID(datanodeID.getProtoBufMessage()) - .build(); - return containerIDProto; - } - } - - /** - * Returns ContainerNodeIDProto or null in case of Error. - * - * @return ContainerNodeIDProto - */ - private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto - getContainerNodeID() { - String dataNodeIDPath = OzoneUtils.getDatanodeIDPath(conf); - if (dataNodeIDPath == null || dataNodeIDPath.isEmpty()) { - LOG.error("A valid file path is needed for config setting {}", - ScmConfigKeys.OZONE_SCM_DATANODE_ID); - - // This is an unrecoverable error. - this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN); - return null; - } - StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto nodeID; - // try to read an existing ContainerNode ID. - try { - nodeID = readPersistedDatanodeID(Paths.get(dataNodeIDPath)); - if (nodeID != null) { - LOG.trace("Read Node ID :", nodeID.getDatanodeID().getDatanodeUuid()); - return nodeID; - } - } catch (IOException ex) { - LOG.trace("Not able to find container Node ID, creating it.", ex); - } - this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN); - return null; - } - - /** - * Called before entering this state. - */ - @Override - public void onEnter() { - LOG.trace("Entering handshake task."); - } - - /** - * Called After exiting this state. - */ - @Override - public void onExit() { - LOG.trace("Exiting handshake task."); - } - - /** - * Executes one or more tasks that is needed by this state. - * - * @param executor - ExecutorService - */ - @Override - public void execute(ExecutorService executor) { - ecs = new ExecutorCompletionService<>(executor); - for (EndpointStateMachine endpoint : connectionManager.getValues()) { - Callable<EndpointStateMachine.EndPointStates> endpointTask - = getEndPointTask(endpoint); - ecs.submit(endpointTask); - } - } - //TODO : Cache some of these tasks instead of creating them - //all the time. - private Callable<EndpointStateMachine.EndPointStates> - getEndPointTask(EndpointStateMachine endpoint) { - switch (endpoint.getState()) { - case GETVERSION: - return new VersionEndpointTask(endpoint, conf); - case REGISTER: - return RegisterEndpointTask.newBuilder() - .setConfig(conf) - .setEndpointStateMachine(endpoint) - .setNodeID(getContainerNodeID()) - .build(); - case HEARTBEAT: - return HeartbeatEndpointTask.newBuilder() - .setConfig(conf) - .setEndpointStateMachine(endpoint) - .setNodeID(getContainerNodeID()) - .setContext(context) - .build(); - case SHUTDOWN: - break; - default: - throw new IllegalArgumentException("Illegal Argument."); - } - return null; - } - - /** - * Computes the next state the container state machine must move to by looking - * at all the state of endpoints. - * <p> - * if any endpoint state has moved to Shutdown, either we have an - * unrecoverable error or we have been told to shutdown. Either case the - * datanode state machine should move to Shutdown state, otherwise we - * remain in the Running state. - * - * @return next container state. - */ - private DatanodeStateMachine.DatanodeStates - computeNextContainerState( - List<Future<EndpointStateMachine.EndPointStates>> results) { - for (Future<EndpointStateMachine.EndPointStates> state : results) { - try { - if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) { - // if any endpoint tells us to shutdown we move to shutdown state. - return DatanodeStateMachine.DatanodeStates.SHUTDOWN; - } - } catch (InterruptedException | ExecutionException e) { - LOG.error("Error in executing end point task.", e); - } - } - return DatanodeStateMachine.DatanodeStates.RUNNING; - } - - /** - * Wait for execute to finish. - * - * @param duration - Time - * @param timeUnit - Unit of duration. - */ - @Override - public DatanodeStateMachine.DatanodeStates - await(long duration, TimeUnit timeUnit) - throws InterruptedException, ExecutionException, TimeoutException { - int count = connectionManager.getValues().size(); - int returned = 0; - long timeLeft = timeUnit.toMillis(duration); - long startTime = Time.monotonicNow(); - List<Future<EndpointStateMachine.EndPointStates>> results = new - LinkedList<>(); - - while (returned < count && timeLeft > 0) { - Future<EndpointStateMachine.EndPointStates> result = - ecs.poll(timeLeft, TimeUnit.MILLISECONDS); - if (result != null) { - results.add(result); - returned++; - } - timeLeft = timeLeft - (Time.monotonicNow() - startTime); - } - return computeNextContainerState(results); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java deleted file mode 100644 index 6b8d16c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.states.datanode; -/** - This package contians files that guide the state transitions from - Init->Running->Shutdown for the datanode. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java deleted file mode 100644 index b4cce23..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.states.endpoint; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.container.common.helpers - .DeletedContainerBlocksSummary; -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine.EndPointStates; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; -import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; -import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.time.ZonedDateTime; -import java.util.concurrent.Callable; - -/** - * Heartbeat class for SCMs. - */ -public class HeartbeatEndpointTask - implements Callable<EndpointStateMachine.EndPointStates> { - static final Logger LOG = - LoggerFactory.getLogger(HeartbeatEndpointTask.class); - private final EndpointStateMachine rpcEndpoint; - private final Configuration conf; - private ContainerNodeIDProto containerNodeIDProto; - private StateContext context; - - /** - * Constructs a SCM heart beat. - * - * @param conf Config. - */ - public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint, - Configuration conf, StateContext context) { - this.rpcEndpoint = rpcEndpoint; - this.conf = conf; - this.context = context; - } - - /** - * Get the container Node ID proto. - * - * @return ContainerNodeIDProto - */ - public ContainerNodeIDProto getContainerNodeIDProto() { - return containerNodeIDProto; - } - - /** - * Set container node ID proto. - * - * @param containerNodeIDProto - the node id. - */ - public void setContainerNodeIDProto(ContainerNodeIDProto - containerNodeIDProto) { - this.containerNodeIDProto = containerNodeIDProto; - } - - /** - * Computes a result, or throws an exception if unable to do so. - * - * @return computed result - * @throws Exception if unable to compute a result - */ - @Override - public EndpointStateMachine.EndPointStates call() throws Exception { - rpcEndpoint.lock(); - try { - Preconditions.checkState(this.containerNodeIDProto != null); - DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this - .containerNodeIDProto.getDatanodeID()); - - SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() - .sendHeartbeat(datanodeID, this.context.getNodeReport(), - this.context.getContainerReportState()); - processResponse(reponse, datanodeID); - rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); - rpcEndpoint.zeroMissedCount(); - } catch (IOException ex) { - rpcEndpoint.logIfNeeded(ex); - } finally { - rpcEndpoint.unlock(); - } - return rpcEndpoint.getState(); - } - - /** - * Returns a builder class for HeartbeatEndpointTask task. - * @return Builder. - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Add this command to command processing Queue. - * - * @param response - SCMHeartbeat response. - */ - private void processResponse(SCMHeartbeatResponseProto response, - final DatanodeID datanodeID) { - for (SCMCommandResponseProto commandResponseProto : response - .getCommandsList()) { - // Verify the response is indeed for this datanode. - Preconditions.checkState(commandResponseProto.getDatanodeUUID() - .equalsIgnoreCase(datanodeID.getDatanodeUuid().toString()), - "Unexpected datanode ID in the response."); - switch (commandResponseProto.getCmdType()) { - case sendContainerReport: - this.context.addCommand(SendContainerCommand.getFromProtobuf( - commandResponseProto.getSendReport())); - break; - case reregisterCommand: - if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) { - if (LOG.isDebugEnabled()) { - LOG.debug("Received SCM notification to register." - + " Interrupt HEARTBEAT and transit to REGISTER state."); - } - rpcEndpoint.setState(EndPointStates.REGISTER); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Illegal state {} found, expecting {}.", - rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT); - } - } - break; - case deleteBlocksCommand: - DeleteBlocksCommand db = DeleteBlocksCommand - .getFromProtobuf(commandResponseProto.getDeleteBlocksProto()); - if (!db.blocksTobeDeleted().isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug(DeletedContainerBlocksSummary - .getFrom(db.blocksTobeDeleted()) - .toString()); - } - this.context.addCommand(db); - } - break; - case closeContainerCommand: - CloseContainerCommand closeContainer = - CloseContainerCommand.getFromProtobuf( - commandResponseProto.getCloseContainerProto()); - if (LOG.isDebugEnabled()) { - LOG.debug("Received SCM container close request for container {}", - closeContainer.getContainerName()); - } - this.context.addCommand(closeContainer); - break; - default: - throw new IllegalArgumentException("Unknown response : " - + commandResponseProto.getCmdType().name()); - } - } - } - - /** - * Builder class for HeartbeatEndpointTask. - */ - public static class Builder { - private EndpointStateMachine endPointStateMachine; - private Configuration conf; - private ContainerNodeIDProto containerNodeIDProto; - private StateContext context; - - /** - * Constructs the builder class. - */ - public Builder() { - } - - /** - * Sets the endpoint state machine. - * - * @param rpcEndPoint - Endpoint state machine. - * @return Builder - */ - public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) { - this.endPointStateMachine = rpcEndPoint; - return this; - } - - /** - * Sets the Config. - * - * @param config - config - * @return Builder - */ - public Builder setConfig(Configuration config) { - this.conf = config; - return this; - } - - /** - * Sets the NodeID. - * - * @param nodeID - NodeID proto - * @return Builder - */ - public Builder setNodeID(ContainerNodeIDProto nodeID) { - this.containerNodeIDProto = nodeID; - return this; - } - - /** - * Sets the context. - * @param stateContext - State context. - * @return this. - */ - public Builder setContext(StateContext stateContext) { - this.context = stateContext; - return this; - } - - public HeartbeatEndpointTask build() { - if (endPointStateMachine == null) { - LOG.error("No endpoint specified."); - throw new IllegalArgumentException("A valid endpoint state machine is" + - " needed to construct HeartbeatEndpointTask task"); - } - - if (conf == null) { - LOG.error("No config specified."); - throw new IllegalArgumentException("A valid configration is needed to" + - " construct HeartbeatEndpointTask task"); - } - - if (containerNodeIDProto == null) { - LOG.error("No nodeID specified."); - throw new IllegalArgumentException("A vaild Node ID is needed to " + - "construct HeartbeatEndpointTask task"); - } - - HeartbeatEndpointTask task = new HeartbeatEndpointTask(this - .endPointStateMachine, this.conf, this.context); - task.setContainerNodeIDProto(containerNodeIDProto); - return task; - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
