http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java deleted file mode 100644 index 3bb284e..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license - * agreements. See the NOTICE file distributed with this work for additional - * information regarding - * copyright ownership. The ASF licenses this file to you under the Apache - * License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the - * License. You may obtain a - * copy of the License at - * - * <p>http://www.apache.org/licenses/LICENSE-2.0 - * - * <p>Unless required by applicable law or agreed to in writing, software - * distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.server; - -import com.google.protobuf.BlockingService; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; -import org.apache.hadoop.hdds.scm.HddsServerUtil; -import org.apache.hadoop.hdds.scm.ScmInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; -import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; -import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; -import org.apache.hadoop.ozone.protocolPB - .ScmBlockLocationProtocolServerSideTranslatorPB; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_HANDLER_COUNT_KEY; -import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; -import static org.apache.hadoop.hdds.scm.server.StorageContainerManager - .startRpcServer; - -/** - * SCM block protocol is the protocol used by Namenode and OzoneManager to get - * blocks from the SCM. - */ -public class SCMBlockProtocolServer implements ScmBlockLocationProtocol { - private static final Logger LOG = - LoggerFactory.getLogger(SCMBlockProtocolServer.class); - - private final StorageContainerManager scm; - private final OzoneConfiguration conf; - private final RPC.Server blockRpcServer; - private final InetSocketAddress blockRpcAddress; - - /** - * The RPC server that listens to requests from block service clients. - */ - public SCMBlockProtocolServer(OzoneConfiguration conf, - StorageContainerManager scm) throws IOException { - this.scm = scm; - this.conf = conf; - final int handlerCount = - conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY, - OZONE_SCM_HANDLER_COUNT_DEFAULT); - - RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class, - ProtobufRpcEngine.class); - // SCM Block Service RPC - BlockingService blockProtoPbService = - ScmBlockLocationProtocolProtos.ScmBlockLocationProtocolService - .newReflectiveBlockingService( - new ScmBlockLocationProtocolServerSideTranslatorPB(this)); - - final InetSocketAddress scmBlockAddress = HddsServerUtil - .getScmBlockClientBindAddress(conf); - blockRpcServer = - startRpcServer( - conf, - scmBlockAddress, - ScmBlockLocationProtocolPB.class, - blockProtoPbService, - handlerCount); - blockRpcAddress = - updateRPCListenAddress( - conf, OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, - blockRpcServer); - - } - - public RPC.Server getBlockRpcServer() { - return blockRpcServer; - } - - public InetSocketAddress getBlockRpcAddress() { - return blockRpcAddress; - } - - public void start() { - LOG.info( - StorageContainerManager.buildRpcServerStartMessage( - "RPC server for Block Protocol", getBlockRpcAddress())); - getBlockRpcServer().start(); - } - - public void stop() { - try { - LOG.info("Stopping the RPC server for Block Protocol"); - getBlockRpcServer().stop(); - } catch (Exception ex) { - LOG.error("Block Protocol RPC stop failed.", ex); - } - IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager()); - } - - public void join() throws InterruptedException { - LOG.trace("Join RPC server for Block Protocol"); - getBlockRpcServer().join(); - } - - @Override - public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType - type, HddsProtos.ReplicationFactor factor, String owner) throws - IOException { - return scm.getScmBlockManager().allocateBlock(size, type, factor, owner); - } - - /** - * Delete blocks for a set of object keys. - * - * @param keyBlocksInfoList list of block keys with object keys to delete. - * @return deletion results. - */ - @Override - public List<DeleteBlockGroupResult> deleteKeyBlocks( - List<BlockGroup> keyBlocksInfoList) throws IOException { - LOG.info("SCM is informed by OM to delete {} blocks", keyBlocksInfoList - .size()); - List<DeleteBlockGroupResult> results = new ArrayList<>(); - for (BlockGroup keyBlocks : keyBlocksInfoList) { - ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result resultCode; - try { - // We delete blocks in an atomic operation to prevent getting - // into state like only a partial of blocks are deleted, - // which will leave key in an inconsistent state. - scm.getScmBlockManager().deleteBlocks(keyBlocks.getBlockIDList()); - resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult - .Result.success; - } catch (SCMException scmEx) { - LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx); - switch (scmEx.getResult()) { - case CHILL_MODE_EXCEPTION: - resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult - .Result.chillMode; - break; - case FAILED_TO_FIND_BLOCK: - resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult - .Result.errorNotFound; - break; - default: - resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult - .Result.unknownFailure; - } - } catch (IOException ex) { - LOG.warn("Fail to delete blocks for object key: {}", keyBlocks - .getGroupID(), ex); - resultCode = ScmBlockLocationProtocolProtos.DeleteScmBlockResult - .Result.unknownFailure; - } - List<DeleteBlockResult> blockResultList = new ArrayList<>(); - for (BlockID blockKey : keyBlocks.getBlockIDList()) { - blockResultList.add(new DeleteBlockResult(blockKey, resultCode)); - } - results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(), - blockResultList)); - } - return results; - } - - @Override - public ScmInfo getScmInfo() throws IOException { - ScmInfo.Builder builder = - new ScmInfo.Builder() - .setClusterId(scm.getScmStorage().getClusterID()) - .setScmId(scm.getScmStorage().getScmId()); - return builder.build(); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java deleted file mode 100644 index 3c1cc8f..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements.ââSee the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership.ââThe ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License.ââYou may obtain a copy of the License at - * - * ââââ http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.server; - -import com.google.common.annotations.VisibleForTesting; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer - .NodeRegistrationContainerReport; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.hdds.server.events.EventQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * StorageContainerManager enters chill mode on startup to allow system to - * reach a stable state before becoming fully functional. SCM will wait - * for certain resources to be reported before coming out of chill mode. - * - * ChillModeExitRule defines format to define new rules which must be satisfied - * to exit Chill mode. - * ContainerChillModeRule defines the only exit criteria right now. - * On every new datanode registration event this class adds replicas - * for reported containers and validates if cutoff threshold for - * containers is meet. - */ -public class SCMChillModeManager implements - EventHandler<NodeRegistrationContainerReport> { - - private static final Logger LOG = - LoggerFactory.getLogger(SCMChillModeManager.class); - private AtomicBoolean inChillMode = new AtomicBoolean(true); - private AtomicLong containerWithMinReplicas = new AtomicLong(0); - private Map<String, ChillModeExitRule> exitRules = new HashMap(1); - private Configuration config; - private static final String CONT_EXIT_RULE = "ContainerChillModeRule"; - private final EventQueue eventPublisher; - - SCMChillModeManager(Configuration conf, List<ContainerInfo> allContainers, - EventQueue eventQueue) { - this.config = conf; - this.eventPublisher = eventQueue; - exitRules - .put(CONT_EXIT_RULE, new ContainerChillModeRule(config, allContainers)); - if (!conf.getBoolean(HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED, - HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT)) { - exitChillMode(eventQueue); - } - emitChillModeStatus(); - } - - /** - * Emit Chill mode status. - */ - @VisibleForTesting - public void emitChillModeStatus() { - eventPublisher.fireEvent(SCMEvents.CHILL_MODE_STATUS, inChillMode.get()); - } - - private void validateChillModeExitRules(EventPublisher eventQueue) { - for (ChillModeExitRule exitRule : exitRules.values()) { - if (!exitRule.validate()) { - return; - } - } - exitChillMode(eventQueue); - } - - /** - * Exit chill mode. It does following actions: - * 1. Set chill mode status to fale. - * 2. Emits START_REPLICATION for ReplicationManager. - * 3. Cleanup resources. - * 4. Emit chill mode status. - * @param eventQueue - */ - @VisibleForTesting - public void exitChillMode(EventPublisher eventQueue) { - LOG.info("SCM exiting chill mode."); - setInChillMode(false); - - // TODO: Remove handler registration as there is no need to listen to - // register events anymore. - - for (ChillModeExitRule e : exitRules.values()) { - e.cleanup(); - } - emitChillModeStatus(); - } - - @Override - public void onMessage( - NodeRegistrationContainerReport nodeRegistrationContainerReport, - EventPublisher publisher) { - if (getInChillMode()) { - exitRules.get(CONT_EXIT_RULE).process(nodeRegistrationContainerReport); - validateChillModeExitRules(publisher); - } - } - - public boolean getInChillMode() { - return inChillMode.get(); - } - - /** - * Set chill mode status. - */ - public void setInChillMode(boolean inChillMode) { - this.inChillMode.set(inChillMode); - } - - /** - * Interface for defining chill mode exit rules. - * - * @param <T> - */ - public interface ChillModeExitRule<T> { - - boolean validate(); - - void process(T report); - - void cleanup(); - } - - /** - * Class defining Chill mode exit criteria for Containers. - */ - public class ContainerChillModeRule implements - ChillModeExitRule<NodeRegistrationContainerReport> { - - // Required cutoff % for containers with at least 1 reported replica. - private double chillModeCutoff; - // Containers read from scm db. - private Map<Long, ContainerInfo> containerMap; - private double maxContainer; - - public ContainerChillModeRule(Configuration conf, - List<ContainerInfo> containers) { - chillModeCutoff = conf - .getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT, - HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT); - containerMap = new ConcurrentHashMap<>(); - if(containers != null) { - containers.forEach(c -> { - if (c != null) { - containerMap.put(c.getContainerID(), c); - } - }); - maxContainer = containers.size(); - } - } - - @Override - public boolean validate() { - if (maxContainer == 0) { - return true; - } - return getCurrentContainerThreshold() >= chillModeCutoff; - } - - @VisibleForTesting - public double getCurrentContainerThreshold() { - return (containerWithMinReplicas.doubleValue() / maxContainer); - } - - @Override - public void process(NodeRegistrationContainerReport reportsProto) { - if (maxContainer == 0) { - // No container to check. - return; - } - - reportsProto.getReport().getReportsList().forEach(c -> { - if (containerMap.containsKey(c.getContainerID())) { - if(containerMap.remove(c.getContainerID()) != null) { - containerWithMinReplicas.getAndAdd(1); - } - } - }); - if(inChillMode.get()) { - LOG.info("SCM in chill mode. {} % containers have at least one" - + " reported replica.", - (containerWithMinReplicas.get() / maxContainer) * 100); - } - } - - @Override - public void cleanup() { - containerMap.clear(); - } - } - - @VisibleForTesting - public static Logger getLogger() { - return LOG; - } - - @VisibleForTesting - public double getCurrentContainerThreshold() { - return ((ContainerChillModeRule) exitRules.get(CONT_EXIT_RULE)) - .getCurrentContainerThreshold(); - } - - /** - * Operations restricted in SCM chill mode. - */ - public static class ChillModeRestrictedOps { - private static EnumSet restrictedOps = EnumSet.noneOf(ScmOps.class); - - static { - restrictedOps.add(ScmOps.allocateBlock); - restrictedOps.add(ScmOps.allocateContainer); - } - - public static boolean isRestrictedInChillMode(ScmOps opName) { - return restrictedOps.contains(opName); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java deleted file mode 100644 index 66136f1..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ /dev/null @@ -1,380 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license - * agreements. See the NOTICE file distributed with this work for additional - * information regarding - * copyright ownership. The ASF licenses this file to you under the Apache - * License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the - * License. You may obtain a - * copy of the License at - * - * <p>http://www.apache.org/licenses/LICENSE-2.0 - * - * <p>Unless required by applicable law or agreed to in writing, software - * distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.server; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.protobuf.BlockingService; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos; -import org.apache.hadoop.hdds.scm.HddsServerUtil; -import org.apache.hadoop.hdds.scm.ScmInfo; -import org.apache.hadoop.hdds.scm.ScmUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; -import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ozone.protocolPB - .StorageContainerLocationProtocolServerSideTranslatorPB; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos - .StorageContainerLocationProtocolService.newReflectiveBlockingService; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CLIENT_ADDRESS_KEY; - -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_HANDLER_COUNT_KEY; -import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; -import static org.apache.hadoop.hdds.scm.server.StorageContainerManager - .startRpcServer; - -/** - * The RPC server that listens to requests from clients. - */ -public class SCMClientProtocolServer implements - StorageContainerLocationProtocol, EventHandler<Boolean> { - private static final Logger LOG = - LoggerFactory.getLogger(SCMClientProtocolServer.class); - private final RPC.Server clientRpcServer; - private final InetSocketAddress clientRpcAddress; - private final StorageContainerManager scm; - private final OzoneConfiguration conf; - private ChillModePrecheck chillModePrecheck = new ChillModePrecheck(); - - public SCMClientProtocolServer(OzoneConfiguration conf, - StorageContainerManager scm) throws IOException { - this.scm = scm; - this.conf = conf; - final int handlerCount = - conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY, - OZONE_SCM_HANDLER_COUNT_DEFAULT); - RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, - ProtobufRpcEngine.class); - - // SCM Container Service RPC - BlockingService storageProtoPbService = - newReflectiveBlockingService( - new StorageContainerLocationProtocolServerSideTranslatorPB(this)); - - final InetSocketAddress scmAddress = HddsServerUtil - .getScmClientBindAddress(conf); - clientRpcServer = - startRpcServer( - conf, - scmAddress, - StorageContainerLocationProtocolPB.class, - storageProtoPbService, - handlerCount); - clientRpcAddress = - updateRPCListenAddress(conf, OZONE_SCM_CLIENT_ADDRESS_KEY, - scmAddress, clientRpcServer); - - } - - public RPC.Server getClientRpcServer() { - return clientRpcServer; - } - - public InetSocketAddress getClientRpcAddress() { - return clientRpcAddress; - } - - public void start() { - LOG.info( - StorageContainerManager.buildRpcServerStartMessage( - "RPC server for Client ", getClientRpcAddress())); - getClientRpcServer().start(); - } - - public void stop() { - try { - LOG.info("Stopping the RPC server for Client Protocol"); - getClientRpcServer().stop(); - } catch (Exception ex) { - LOG.error("Client Protocol RPC stop failed.", ex); - } - IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager()); - } - - public void join() throws InterruptedException { - LOG.trace("Join RPC server for Client Protocol"); - getClientRpcServer().join(); - } - - @VisibleForTesting - public String getRpcRemoteUsername() { - UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser(); - return user == null ? null : user.getUserName(); - } - - @Override - public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType - replicationType, HddsProtos.ReplicationFactor factor, - String owner) throws IOException { - ScmUtils.preCheck(ScmOps.allocateContainer, chillModePrecheck); - String remoteUser = getRpcRemoteUsername(); - getScm().checkAdminAccess(remoteUser); - - return scm.getScmContainerManager() - .allocateContainer(replicationType, factor, owner); - } - - @Override - public ContainerInfo getContainer(long containerID) throws IOException { - String remoteUser = getRpcRemoteUsername(); - getScm().checkAdminAccess(remoteUser); - return scm.getScmContainerManager() - .getContainer(containerID); - } - - @Override - public ContainerWithPipeline getContainerWithPipeline(long containerID) - throws IOException { - if (chillModePrecheck.isInChillMode()) { - ContainerInfo contInfo = scm.getScmContainerManager() - .getContainer(containerID); - if (contInfo.isContainerOpen()) { - if (!hasRequiredReplicas(contInfo)) { - throw new SCMException("Open container " + containerID + " doesn't" - + " have enough replicas to service this operation in " - + "Chill mode.", ResultCodes.CHILL_MODE_EXCEPTION); - } - } - } - String remoteUser = getRpcRemoteUsername(); - getScm().checkAdminAccess(remoteUser); - return scm.getScmContainerManager() - .getContainerWithPipeline(containerID); - } - - /** - * Check if container reported replicas are equal or greater than required - * replication factor. - */ - private boolean hasRequiredReplicas(ContainerInfo contInfo) { - try{ - return getScm().getScmContainerManager().getStateManager() - .getContainerReplicas(contInfo.containerID()) - .size() >= contInfo.getReplicationFactor().getNumber(); - } catch (SCMException ex) { - // getContainerReplicas throws exception if no replica's exist for given - // container. - return false; - } - } - - @Override - public List<ContainerInfo> listContainer(long startContainerID, - int count) throws IOException { - return scm.getScmContainerManager(). - listContainer(startContainerID, count); - } - - @Override - public void deleteContainer(long containerID) throws IOException { - String remoteUser = getRpcRemoteUsername(); - getScm().checkAdminAccess(remoteUser); - scm.getScmContainerManager().deleteContainer(containerID); - - } - - @Override - public List<HddsProtos.Node> queryNode(HddsProtos.NodeState state, - HddsProtos.QueryScope queryScope, String poolName) throws - IOException { - - if (queryScope == HddsProtos.QueryScope.POOL) { - throw new IllegalArgumentException("Not Supported yet"); - } - - List<HddsProtos.Node> result = new ArrayList<>(); - queryNode(state).forEach(node -> result.add(HddsProtos.Node.newBuilder() - .setNodeID(node.getProtoBufMessage()) - .addNodeStates(state) - .build())); - - return result; - - } - - @Override - public void notifyObjectStageChange(StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Type type, long id, - StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto.Op - op, StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Stage stage) throws IOException { - - LOG.info("Object type {} id {} op {} new stage {}", type, id, op, - stage); - if (type == StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Type.container) { - if (op == StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Op.create) { - if (stage == StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Stage.begin) { - scm.getScmContainerManager().updateContainerState(id, HddsProtos - .LifeCycleEvent.CREATE); - } else { - scm.getScmContainerManager().updateContainerState(id, HddsProtos - .LifeCycleEvent.CREATED); - } - } else { - if (op == StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Op.close) { - if (stage == StorageContainerLocationProtocolProtos - .ObjectStageChangeRequestProto.Stage.begin) { - scm.getScmContainerManager().updateContainerState(id, HddsProtos - .LifeCycleEvent.FINALIZE); - } else { - scm.getScmContainerManager().updateContainerState(id, HddsProtos - .LifeCycleEvent.CLOSE); - } - } - } - } // else if (type == ObjectStageChangeRequestProto.Type.pipeline) { - // TODO: pipeline state update will be addressed in future patch. - // } - - } - - @Override - public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool) - throws IOException { - // TODO: will be addressed in future patch. - // This is needed only for debugging purposes to make sure cluster is - // working correctly. - return null; - } - - @Override - public ScmInfo getScmInfo() throws IOException { - ScmInfo.Builder builder = - new ScmInfo.Builder() - .setClusterId(scm.getScmStorage().getClusterID()) - .setScmId(scm.getScmStorage().getScmId()); - return builder.build(); - } - - /** - * Check if SCM is in chill mode. - * - * @return Returns true if SCM is in chill mode else returns false. - * @throws IOException - */ - @Override - public boolean inChillMode() throws IOException { - return scm.isInChillMode(); - } - - /** - * Force SCM out of Chill mode. - * - * @return returns true if operation is successful. - * @throws IOException - */ - @Override - public boolean forceExitChillMode() throws IOException { - return scm.exitChillMode(); - } - - /** - * Queries a list of Node that match a set of statuses. - * - * <p>For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER, then - * this call will return all - * healthy nodes which members in Raft pipeline. - * - * <p>Right now we don't support operations, so we assume it is an AND - * operation between the - * operators. - * - * @param state - NodeStates. - * @return List of Datanodes. - */ - public List<DatanodeDetails> queryNode(HddsProtos.NodeState state) { - Preconditions.checkNotNull(state, "Node Query set cannot be null"); - return new LinkedList<>(queryNodeState(state)); - } - - @VisibleForTesting - public StorageContainerManager getScm() { - return scm; - } - - /** - * Set chill mode status based on SCMEvents.CHILL_MODE_STATUS event. - */ - @Override - public void onMessage(Boolean inChillMOde, EventPublisher publisher) { - chillModePrecheck.setInChillMode(inChillMOde); - } - - /** - * Set chill mode status based on . - */ - public boolean getChillModeStatus() { - return chillModePrecheck.isInChillMode(); - } - - - /** - * Query the System for Nodes. - * - * @param nodeState - NodeState that we are interested in matching. - * @return Set of Datanodes that match the NodeState. - */ - private Set<DatanodeDetails> queryNodeState(HddsProtos.NodeState nodeState) { - Set<DatanodeDetails> returnSet = new TreeSet<>(); - List<DatanodeDetails> tmp = scm.getScmNodeManager().getNodes(nodeState); - if ((tmp != null) && (tmp.size() > 0)) { - returnSet.addAll(tmp); - } - return returnSet; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java deleted file mode 100644 index d9a0875..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ /dev/null @@ -1,231 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.server; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineActionsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerActionsProto; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.server.events.EventPublisher; - -import com.google.protobuf.GeneratedMessage; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS; -import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT; -import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT; -import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT; -import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_ACTIONS; -import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT; - -/** - * This class is responsible for dispatching heartbeat from datanode to - * appropriate EventHandler at SCM. - */ -public final class SCMDatanodeHeartbeatDispatcher { - - private static final Logger LOG = - LoggerFactory.getLogger(SCMDatanodeHeartbeatDispatcher.class); - - private final NodeManager nodeManager; - private final EventPublisher eventPublisher; - - - public SCMDatanodeHeartbeatDispatcher(NodeManager nodeManager, - EventPublisher eventPublisher) { - Preconditions.checkNotNull(nodeManager); - Preconditions.checkNotNull(eventPublisher); - this.nodeManager = nodeManager; - this.eventPublisher = eventPublisher; - } - - - /** - * Dispatches heartbeat to registered event handlers. - * - * @param heartbeat heartbeat to be dispatched. - * - * @return list of SCMCommand - */ - public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) { - DatanodeDetails datanodeDetails = - DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails()); - // should we dispatch heartbeat through eventPublisher? - List<SCMCommand> commands = nodeManager.processHeartbeat(datanodeDetails); - if (heartbeat.hasNodeReport()) { - LOG.debug("Dispatching Node Report."); - eventPublisher.fireEvent(NODE_REPORT, - new NodeReportFromDatanode(datanodeDetails, - heartbeat.getNodeReport())); - } - - if (heartbeat.hasContainerReport()) { - LOG.debug("Dispatching Container Report."); - eventPublisher.fireEvent(CONTAINER_REPORT, - new ContainerReportFromDatanode(datanodeDetails, - heartbeat.getContainerReport())); - - } - - if (heartbeat.hasContainerActions()) { - LOG.debug("Dispatching Container Actions."); - eventPublisher.fireEvent(CONTAINER_ACTIONS, - new ContainerActionsFromDatanode(datanodeDetails, - heartbeat.getContainerActions())); - } - - if (heartbeat.hasPipelineReports()) { - LOG.debug("Dispatching Pipeline Report."); - eventPublisher.fireEvent(PIPELINE_REPORT, - new PipelineReportFromDatanode(datanodeDetails, - heartbeat.getPipelineReports())); - - } - - if (heartbeat.hasPipelineActions()) { - LOG.debug("Dispatching Pipeline Actions."); - eventPublisher.fireEvent(PIPELINE_ACTIONS, - new PipelineActionsFromDatanode(datanodeDetails, - heartbeat.getPipelineActions())); - } - - if (heartbeat.getCommandStatusReportsCount() != 0) { - for (CommandStatusReportsProto commandStatusReport : heartbeat - .getCommandStatusReportsList()) { - eventPublisher.fireEvent(CMD_STATUS_REPORT, - new CommandStatusReportFromDatanode(datanodeDetails, - commandStatusReport)); - } - } - - return commands; - } - - /** - * Wrapper class for events with the datanode origin. - */ - public static class ReportFromDatanode<T extends GeneratedMessage> { - - private final DatanodeDetails datanodeDetails; - - private final T report; - - public ReportFromDatanode(DatanodeDetails datanodeDetails, T report) { - this.datanodeDetails = datanodeDetails; - this.report = report; - } - - public DatanodeDetails getDatanodeDetails() { - return datanodeDetails; - } - - public T getReport() { - return report; - } - } - - /** - * Node report event payload with origin. - */ - public static class NodeReportFromDatanode - extends ReportFromDatanode<NodeReportProto> { - - public NodeReportFromDatanode(DatanodeDetails datanodeDetails, - NodeReportProto report) { - super(datanodeDetails, report); - } - } - - /** - * Container report event payload with origin. - */ - public static class ContainerReportFromDatanode - extends ReportFromDatanode<ContainerReportsProto> { - - public ContainerReportFromDatanode(DatanodeDetails datanodeDetails, - ContainerReportsProto report) { - super(datanodeDetails, report); - } - } - - /** - * Container action event payload with origin. - */ - public static class ContainerActionsFromDatanode - extends ReportFromDatanode<ContainerActionsProto> { - - public ContainerActionsFromDatanode(DatanodeDetails datanodeDetails, - ContainerActionsProto actions) { - super(datanodeDetails, actions); - } - } - - /** - * Pipeline report event payload with origin. - */ - public static class PipelineReportFromDatanode - extends ReportFromDatanode<PipelineReportsProto> { - - public PipelineReportFromDatanode(DatanodeDetails datanodeDetails, - PipelineReportsProto report) { - super(datanodeDetails, report); - } - } - - /** - * Pipeline action event payload with origin. - */ - public static class PipelineActionsFromDatanode - extends ReportFromDatanode<PipelineActionsProto> { - - public PipelineActionsFromDatanode(DatanodeDetails datanodeDetails, - PipelineActionsProto actions) { - super(datanodeDetails, actions); - } - } - - /** - * Container report event payload with origin. - */ - public static class CommandStatusReportFromDatanode - extends ReportFromDatanode<CommandStatusReportsProto> { - - public CommandStatusReportFromDatanode(DatanodeDetails datanodeDetails, - CommandStatusReportsProto report) { - super(datanodeDetails, report); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java deleted file mode 100644 index 9c6fa88..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ /dev/null @@ -1,315 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license - * agreements. See the NOTICE file distributed with this work for additional - * information regarding - * copyright ownership. The ASF licenses this file to you under the Apache - * License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the - * License. You may obtain a - * copy of the License at - * - * <p>http://www.apache.org/licenses/LICENSE-2.0 - * - * <p>Unless required by applicable law or agreed to in writing, software - * distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.server; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.protobuf.BlockingService; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; - -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto - .Type.closeContainerCommand; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto - .Type.deleteBlocksCommand; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type - .replicateContainerCommand; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto - .Type.reregisterCommand; - - - -import org.apache.hadoop.hdds.scm.HddsServerUtil; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .ReportFromDatanode; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .PipelineReportFromDatanode; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; -import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; -import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; -import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; -import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; -import org.apache.hadoop.ozone.protocolPB - .StorageContainerDatanodeProtocolServerSideTranslatorPB; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.LinkedList; -import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY; - -import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT; -import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer; -import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; - -/** - * Protocol Handler for Datanode Protocol. - */ -public class SCMDatanodeProtocolServer implements - StorageContainerDatanodeProtocol { - - private static final Logger LOG = LoggerFactory.getLogger( - SCMDatanodeProtocolServer.class); - - /** - * The RPC server that listens to requests from DataNodes. - */ - private final RPC.Server datanodeRpcServer; - - private final StorageContainerManager scm; - private final InetSocketAddress datanodeRpcAddress; - private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher; - private final EventPublisher eventPublisher; - - public SCMDatanodeProtocolServer(final OzoneConfiguration conf, - StorageContainerManager scm, EventPublisher eventPublisher) - throws IOException { - - Preconditions.checkNotNull(scm, "SCM cannot be null"); - Preconditions.checkNotNull(eventPublisher, "EventPublisher cannot be null"); - - this.scm = scm; - this.eventPublisher = eventPublisher; - final int handlerCount = - conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY, - OZONE_SCM_HANDLER_COUNT_DEFAULT); - - heartbeatDispatcher = new SCMDatanodeHeartbeatDispatcher( - scm.getScmNodeManager(), eventPublisher); - - RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, - ProtobufRpcEngine.class); - BlockingService dnProtoPbService = - StorageContainerDatanodeProtocolProtos - .StorageContainerDatanodeProtocolService - .newReflectiveBlockingService( - new StorageContainerDatanodeProtocolServerSideTranslatorPB( - this)); - - InetSocketAddress datanodeRpcAddr = - HddsServerUtil.getScmDataNodeBindAddress(conf); - - datanodeRpcServer = - startRpcServer( - conf, - datanodeRpcAddr, - StorageContainerDatanodeProtocolPB.class, - dnProtoPbService, - handlerCount); - - datanodeRpcAddress = - updateRPCListenAddress( - conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, - datanodeRpcServer); - - } - - public void start() { - LOG.info( - StorageContainerManager.buildRpcServerStartMessage( - "RPC server for DataNodes", datanodeRpcAddress)); - datanodeRpcServer.start(); - } - - public InetSocketAddress getDatanodeRpcAddress() { - return datanodeRpcAddress; - } - - @Override - public SCMVersionResponseProto getVersion(SCMVersionRequestProto - versionRequest) - throws IOException { - return scm.getScmNodeManager().getVersion(versionRequest) - .getProtobufMessage(); - } - - @Override - public SCMRegisteredResponseProto register( - HddsProtos.DatanodeDetailsProto datanodeDetailsProto, - NodeReportProto nodeReport, - ContainerReportsProto containerReportsProto, - PipelineReportsProto pipelineReportsProto) - throws IOException { - DatanodeDetails datanodeDetails = DatanodeDetails - .getFromProtoBuf(datanodeDetailsProto); - // TODO : Return the list of Nodes that forms the SCM HA. - RegisteredCommand registeredCommand = scm.getScmNodeManager() - .register(datanodeDetails, nodeReport, pipelineReportsProto); - if (registeredCommand.getError() - == SCMRegisteredResponseProto.ErrorCode.success) { - scm.getScmContainerManager().processContainerReports(datanodeDetails, - containerReportsProto, true); - eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT, - new NodeRegistrationContainerReport(datanodeDetails, - containerReportsProto)); - eventPublisher.fireEvent(PIPELINE_REPORT, - new PipelineReportFromDatanode(datanodeDetails, - pipelineReportsProto)); - } - return getRegisteredResponse(registeredCommand); - } - - @VisibleForTesting - public static SCMRegisteredResponseProto getRegisteredResponse( - RegisteredCommand cmd) { - return SCMRegisteredResponseProto.newBuilder() - // TODO : Fix this later when we have multiple SCM support. - // .setAddressList(addressList) - .setErrorCode(cmd.getError()) - .setClusterID(cmd.getClusterID()) - .setDatanodeUUID(cmd.getDatanodeUUID()) - .build(); - } - - @Override - public SCMHeartbeatResponseProto sendHeartbeat( - SCMHeartbeatRequestProto heartbeat) throws IOException { - List<SCMCommandProto> cmdResponses = new LinkedList<>(); - for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) { - cmdResponses.add(getCommandResponse(cmd)); - } - return SCMHeartbeatResponseProto.newBuilder() - .setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid()) - .addAllCommands(cmdResponses).build(); - } - - /** - * Returns a SCMCommandRepose from the SCM Command. - * - * @param cmd - Cmd - * @return SCMCommandResponseProto - * @throws IOException - */ - @VisibleForTesting - public SCMCommandProto getCommandResponse(SCMCommand cmd) - throws IOException { - SCMCommandProto.Builder builder = - SCMCommandProto.newBuilder(); - switch (cmd.getType()) { - case reregisterCommand: - return builder - .setCommandType(reregisterCommand) - .setReregisterCommandProto(ReregisterCommandProto - .getDefaultInstance()) - .build(); - case deleteBlocksCommand: - // Once SCM sends out the deletion message, increment the count. - // this is done here instead of when SCM receives the ACK, because - // DN might not be able to response the ACK for sometime. In case - // it times out, SCM needs to re-send the message some more times. - List<Long> txs = - ((DeleteBlocksCommand) cmd) - .blocksTobeDeleted() - .stream() - .map(tx -> tx.getTxID()) - .collect(Collectors.toList()); - scm.getScmBlockManager().getDeletedBlockLog().incrementCount(txs); - return builder - .setCommandType(deleteBlocksCommand) - .setDeleteBlocksCommandProto(((DeleteBlocksCommand) cmd).getProto()) - .build(); - case closeContainerCommand: - return builder - .setCommandType(closeContainerCommand) - .setCloseContainerCommandProto( - ((CloseContainerCommand) cmd).getProto()) - .build(); - case replicateContainerCommand: - return builder - .setCommandType(replicateContainerCommand) - .setReplicateContainerCommandProto( - ((ReplicateContainerCommand)cmd).getProto()) - .build(); - default: - throw new IllegalArgumentException("Not implemented"); - } - } - - - public void join() throws InterruptedException { - LOG.trace("Join RPC server for DataNodes"); - datanodeRpcServer.join(); - } - - public void stop() { - try { - LOG.info("Stopping the RPC server for DataNodes"); - datanodeRpcServer.stop(); - } catch (Exception ex) { - LOG.error(" datanodeRpcServer stop failed.", ex); - } - IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager()); - } - - /** - * Wrapper class for events with the datanode origin. - */ - public static class NodeRegistrationContainerReport extends - ReportFromDatanode<ContainerReportsProto> { - - public NodeRegistrationContainerReport(DatanodeDetails datanodeDetails, - ContainerReportsProto report) { - super(datanodeDetails, report); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java deleted file mode 100644 index 22d4d56..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMMXBean.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.server; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdds.server.ServiceRuntimeInfo; - -import java.util.Map; - -/** - * - * This is the JMX management interface for scm information. - */ -@InterfaceAudience.Private -public interface SCMMXBean extends ServiceRuntimeInfo { - - /** - * Get the SCM RPC server port that used to listen to datanode requests. - * @return SCM datanode RPC server port - */ - String getDatanodeRpcPort(); - - /** - * Get the SCM RPC server port that used to listen to client requests. - * @return SCM client RPC server port - */ - String getClientRpcPort(); - - /** - * Get container report info that includes container IO stats of nodes. - * @return The datanodeUUid to report json string mapping - */ - Map<String, String> getContainerReport(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java deleted file mode 100644 index be6c1af..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.server; - -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType; -import org.apache.hadoop.ozone.common.Storage; - -import java.io.IOException; -import java.util.Properties; -import java.util.UUID; - -import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; -import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID; -import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR; - -/** - * SCMStorage is responsible for management of the StorageDirectories used by - * the SCM. - */ -public class SCMStorage extends Storage { - - /** - * Construct SCMStorage. - * @throws IOException if any directories are inaccessible. - */ - public SCMStorage(OzoneConfiguration conf) throws IOException { - super(NodeType.SCM, getOzoneMetaDirPath(conf), STORAGE_DIR); - } - - public void setScmId(String scmId) throws IOException { - if (getState() == StorageState.INITIALIZED) { - throw new IOException("SCM is already initialized."); - } else { - getStorageInfo().setProperty(SCM_ID, scmId); - } - } - - /** - * Retrieves the SCM ID from the version file. - * @return SCM_ID - */ - public String getScmId() { - return getStorageInfo().getProperty(SCM_ID); - } - - @Override - protected Properties getNodeProperties() { - String scmId = getScmId(); - if (scmId == null) { - scmId = UUID.randomUUID().toString(); - } - Properties scmProperties = new Properties(); - scmProperties.setProperty(SCM_ID, scmId); - return scmProperties; - } - -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org