HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Contributed by Anu Engineer)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e646c2eb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e646c2eb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e646c2eb Branch: refs/heads/trunk Commit: e646c2eb50b9ae2b0b084d78a4ea68e106804321 Parents: 2b1b2fa Author: Arpit Agarwal <[email protected]> Authored: Wed Feb 24 16:49:30 2016 -0800 Committer: Arpit Agarwal <[email protected]> Committed: Thu Jun 23 18:18:48 2016 -0700 ---------------------------------------------------------------------- .../ClientDatanodeProtocolTranslatorPB.java | 11 +- .../server/datanode/DiskBalancerWorkStatus.java | 194 +++++++++++++++++-- .../src/main/proto/ClientDatanodeProtocol.proto | 5 +- .../hadoop-hdfs/HDFS-1312_CHANGES.txt | 5 +- ...tDatanodeProtocolServerSideTranslatorPB.java | 5 +- .../hadoop/hdfs/server/datanode/DataNode.java | 8 +- .../hdfs/server/datanode/DiskBalancer.java | 39 ++++ .../diskbalancer/TestDiskBalancerRPC.java | 26 ++- 8 files changed, 249 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e646c2eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 786d834..7076026 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBa import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolMetaInterface; @@ -392,10 +393,14 @@ public class ClientDatanodeProtocolTranslatorPB implements QueryPlanStatusRequestProto.newBuilder().build(); QueryPlanStatusResponseProto response = rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request); - return new DiskBalancerWorkStatus(response.hasResult() ? - response.getResult() : 0, + DiskBalancerWorkStatus.Result result = Result.NO_PLAN; + if(response.hasResult()) { + result = DiskBalancerWorkStatus.Result.values()[ + response.getResult()]; + } + + return new DiskBalancerWorkStatus(result, response.hasPlanID() ? response.getPlanID() : null, - response.hasStatus() ? response.getStatus() : null, response.hasCurrentStatus() ? response.getCurrentStatus() : null); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e646c2eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java index 6b29ce8..d6943cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancerWorkStatus.java @@ -19,8 +19,17 @@ package org.apache.hadoop.hdfs.server.datanode; + +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.map.ObjectMapper; + +import static org.codehaus.jackson.map.type.TypeFactory.defaultInstance; + +import java.io.IOException; +import java.util.List; +import java.util.LinkedList; /** * Helper class that reports how much work has has been done by the node. @@ -28,33 +37,69 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Unstable public class DiskBalancerWorkStatus { - private final int result; - private final String planID; - private final String status; - private final String currentState; + + private final List<DiskBalancerWorkEntry> currentState; + private Result result; + private String planID; + + /** + * Constructs a default workStatus Object. + */ + public DiskBalancerWorkStatus() { + this.currentState = new LinkedList<>(); + } + + /** + * Constructs a workStatus Object. + * + * @param result - int + * @param planID - Plan ID + */ + public DiskBalancerWorkStatus(Result result, String planID) { + this(); + this.result = result; + this.planID = planID; + } /** * Constructs a workStatus Object. * * @param result - int * @param planID - Plan ID - * @param status - Current Status * @param currentState - Current State */ - public DiskBalancerWorkStatus(int result, String planID, String status, - String currentState) { + public DiskBalancerWorkStatus(Result result, String planID, + List<DiskBalancerWorkEntry> currentState) { this.result = result; this.planID = planID; - this.status = status; this.currentState = currentState; } + + /** + * Constructs a workStatus Object. + * + * @param result - int + * @param planID - Plan ID + * @param currentState - List of WorkEntries. + */ + public DiskBalancerWorkStatus(Result result, String planID, + String currentState) throws IOException { + this.result = result; + this.planID = planID; + ObjectMapper mapper = new ObjectMapper(); + this.currentState = mapper.readValue(currentState, + defaultInstance().constructCollectionType( + List.class, DiskBalancerWorkEntry.class)); + } + + /** * Returns result. * * @return long */ - public int getResult() { + public Result getResult() { return result; } @@ -68,20 +113,135 @@ public class DiskBalancerWorkStatus { } /** - * Returns Status. + * Gets current Status. * - * @return String + * @return - Json String */ - public String getStatus() { - return status; + public List<DiskBalancerWorkEntry> getCurrentState() { + return currentState; } /** - * Gets current Status. + * Return current state as a string. * - * @return - Json String + * @throws IOException + **/ + public String getCurrentStateString() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(currentState); + } + + + /** + * Adds a new work entry to the list. + * + * @param entry - DiskBalancerWorkEntry */ - public String getCurrentState() { - return currentState; + + public void addWorkEntry(DiskBalancerWorkEntry entry) { + Preconditions.checkNotNull(entry); + currentState.add(entry); + } + + /** Various result values. **/ + public enum Result { + NO_PLAN(0), + PLAN_UNDER_PROGRESS(1), + PLAN_DONE(2), + PLAN_CANCELLED(3); + private int result; + + private Result(int result) { + this.result = result; + } + + /** + * Get int value of result. + * + * @return int + */ + public int getIntResult() { + return result; + } + } + + /** + * A class that is used to report each work item that we are working on. This + * class describes the Source, Destination and how much data has been already + * moved, errors encountered etc. This is useful for the disk balancer stats + * as well as the queryStatus RPC. + */ + public static class DiskBalancerWorkEntry { + private String sourcePath; + private String destPath; + private DiskBalancerWorkItem workItem; + + /** + * Constructs a Work Entry class. + * + * @param sourcePath - Source Path where we are moving data from. + * @param destPath - Destination path to where we are moving data to. + * @param workItem - Current work status of this move. + */ + public DiskBalancerWorkEntry(String sourcePath, String destPath, + DiskBalancerWorkItem workItem) { + this.sourcePath = sourcePath; + this.destPath = destPath; + this.workItem = workItem; + } + + /** + * Returns the source path. + * + * @return - Source path + */ + public String getSourcePath() { + return sourcePath; + } + + /** + * Sets the Source Path. + * + * @param sourcePath - Volume Path. + */ + public void setSourcePath(String sourcePath) { + this.sourcePath = sourcePath; + } + + /** + * Gets the Destination path. + * + * @return - Path + */ + public String getDestPath() { + return destPath; + } + + /** + * Sets the destination path. + * + * @param destPath - Path + */ + public void setDestPath(String destPath) { + this.destPath = destPath; + } + + /** + * Gets the current status of work for these volumes. + * + * @return - Work Item + */ + public DiskBalancerWorkItem getWorkItem() { + return workItem; + } + + /** + * Sets the work item. + * + * @param workItem - sets the work item information + */ + public void setWorkItem(DiskBalancerWorkItem workItem) { + this.workItem = workItem; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e646c2eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto index c61c700..c7cd4fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto @@ -194,9 +194,8 @@ message QueryPlanStatusRequestProto { */ message QueryPlanStatusResponseProto { optional uint32 result = 1; - optional string status = 2; - optional string planID = 3; - optional string currentStatus = 4; + optional string planID = 2; + optional string currentStatus = 3; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/e646c2eb/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt index 27de7d0..07403cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt @@ -24,6 +24,9 @@ HDFS-1312 Change Log HDFS-9647. DiskBalancer: Add getRuntimeSettings. (Anu Engineer via Arpit Agarwal) - HDFS-9671skBalancer: SubmitPlan implementation. (Anu Engineer via + HDFS-9671. DiskBalancer: SubmitPlan implementation. (Anu Engineer via + Arpit Agarwal) + + HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Anu Engineer via Arpit Agarwal) http://git-wip-us.apache.org/repos/asf/hadoop/blob/e646c2eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 692fca3..d72a060 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -296,10 +296,9 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements DiskBalancerWorkStatus result = impl.queryDiskBalancerPlan(); return QueryPlanStatusResponseProto .newBuilder() - .setResult(result.getResult()) + .setResult(result.getResult().getIntResult()) .setPlanID(result.getPlanID()) - .setStatus(result.getStatus()) - .setCurrentStatus(result.getCurrentState()) + .setCurrentStatus(result.getCurrentStateString()) .build(); } catch (Exception e) { throw new ServiceException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e646c2eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 8d805a1..56585a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3345,11 +3345,15 @@ public class DataNode extends ReconfigurableBase DiskBalancerException.Result.INTERNAL_ERROR); } + /** + * Returns the status of current or last executed work plan. + * @return DiskBalancerWorkStatus. + * @throws IOException + */ @Override public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException { checkSuperuserPrivilege(); - throw new DiskBalancerException("Not Implemented", - DiskBalancerException.Result.INTERNAL_ERROR); + return this.diskBalancer.queryWorkStatus(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/e646c2eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 1c8ba4cf..c01fb4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -25,6 +25,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.DiskBalancerWorkEntry; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants; @@ -68,6 +70,7 @@ public class DiskBalancer { private ExecutorService scheduler; private Future future; private String planID; + private DiskBalancerWorkStatus.Result currentResult; /** * Constructs a Disk Balancer object. This object takes care of reading a @@ -79,6 +82,7 @@ public class DiskBalancer { */ public DiskBalancer(String dataNodeUUID, Configuration conf, BlockMover blockMover) { + this.currentResult = Result.NO_PLAN; this.blockMover = blockMover; this.dataset = this.blockMover.getDataset(); this.dataNodeUUID = dataNodeUUID; @@ -97,6 +101,7 @@ public class DiskBalancer { lock.lock(); try { this.isDiskBalancerEnabled = false; + this.currentResult = Result.NO_PLAN; if ((this.future != null) && (!this.future.isDone())) { this.blockMover.setExitFlag(); shutdownExecutor(); @@ -151,6 +156,7 @@ public class DiskBalancer { verifyPlan(planID, planVersion, plan, bandwidth, force); createWorkPlan(nodePlan); this.planID = planID; + this.currentResult = Result.PLAN_UNDER_PROGRESS; executePlan(); } finally { lock.unlock(); @@ -158,6 +164,39 @@ public class DiskBalancer { } /** + * Returns the Current Work Status of a submitted Plan. + * + * @return DiskBalancerWorkStatus. + * @throws DiskBalancerException + */ + public DiskBalancerWorkStatus queryWorkStatus() throws DiskBalancerException { + lock.lock(); + try { + checkDiskBalancerEnabled(); + // if we had a plan in progress, check if it is finished. + if (this.currentResult == Result.PLAN_UNDER_PROGRESS && + this.future != null && + this.future.isDone()) { + this.currentResult = Result.PLAN_DONE; + } + + DiskBalancerWorkStatus status = + new DiskBalancerWorkStatus(this.currentResult, this.planID); + for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry : + workMap.entrySet()) { + DiskBalancerWorkEntry workEntry = new DiskBalancerWorkEntry( + entry.getKey().getSource().getBasePath(), + entry.getKey().getDest().getBasePath(), + entry.getValue()); + status.addWorkEntry(workEntry); + } + return status; + } finally { + lock.unlock(); + } + } + + /** * Throws if Disk balancer is disabled. * * @throws DiskBalancerException http://git-wip-us.apache.org/repos/asf/hadoop/blob/e646c2eb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java index dc24787..974e973 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; @@ -36,6 +37,9 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE; +import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS; + public class TestDiskBalancerRPC { @Rule public ExpectedException thrown = ExpectedException.none(); @@ -134,28 +138,20 @@ public class TestDiskBalancerRPC { Assert.assertEquals(cluster.getDataNodes().size(), diskBalancerCluster.getNodes().size()); diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); - DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0); + DataNode dataNode = cluster.getDataNodes().get(dnIndex); + DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID( + dataNode.getDatanodeUuid()); GreedyPlanner planner = new GreedyPlanner(10.0f, node); NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort ()); planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); - final int planVersion = 0; // So far we support only one version. - DataNode dataNode = cluster.getDataNodes().get(dnIndex); + final int planVersion = 1; // So far we support only one version. String planHash = DigestUtils.sha512Hex(plan.toJson()); - - // Since submitDiskBalancerPlan is not implemented yet, it throws an - // Exception, this will be modified with the actual implementation. - try { dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); - } catch (DiskBalancerException ex) { - // Let us ignore this for time being. - } - - // TODO : This will be fixed when we have implementation for this - // function in server side. - thrown.expect(DiskBalancerException.class); - dataNode.queryDiskBalancerPlan(); + DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan(); + Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS || + status.getResult() == PLAN_DONE); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
