Repository: ambari Updated Branches: refs/heads/branch-windows-dev 5fdb540eb -> b0e3eb9d7
AMBARI-8033 - Add HDFS Rebalance functionality in HDPWIN 2.1 stack Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b0e3eb9d Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b0e3eb9d Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b0e3eb9d Branch: refs/heads/branch-windows-dev Commit: b0e3eb9d750e29a803fd95ae9beb0c145ad9b628 Parents: 5fdb540 Author: Artem Baranchuk <[email protected]> Authored: Wed Oct 29 00:53:40 2014 +0200 Committer: Artem Baranchuk <[email protected]> Committed: Mon Nov 3 15:34:15 2014 +0200 ---------------------------------------------------------------------- .../HDFS/package/scripts/hdfs_rebalance.py | 130 +++++++++++++++++++ .../services/HDFS/package/scripts/namenode.py | 54 ++++++++ .../2.1/services/HDFS/package/scripts/params.py | 2 + 3 files changed, 186 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b0e3eb9d/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_rebalance.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_rebalance.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_rebalance.py new file mode 100644 index 0000000..aea6fce --- /dev/null +++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_rebalance.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import re + +class HdfsParser(): + def __init__(self): + self.initialLine = None + self.state = None + + def parseLine(self, line): + hdfsLine = HdfsLine() + type, matcher = hdfsLine.recognizeType(line) + if(type == HdfsLine.LineType.HeaderStart): + self.state = 'PROCESS_STARTED' + elif (type == HdfsLine.LineType.Progress): + self.state = 'PROGRESS' + hdfsLine.parseProgressLog(line, matcher) + if(self.initialLine == None): self.initialLine = hdfsLine + + return hdfsLine + elif (type == HdfsLine.LineType.ProgressEnd): + self.state = 'PROCESS_FINISED' + return None + +class HdfsLine(): + + class LineType: + HeaderStart, Progress, ProgressEnd, Unknown = range(4) + + + MEMORY_SUFFIX = ['B','KB','MB','GB','TB','PB','EB'] + MEMORY_PATTERN = '(?P<memmult_%d>(?P<memory_%d>(\d+)(.|,)?(\d+)?) (?P<mult_%d>'+"|".join(MEMORY_SUFFIX)+'))' + + HEADER_BEGIN_PATTERN = re.compile('Time Stamp\w+Iteration#\w+Bytes Already Moved\w+Bytes Left To Move\w+Bytes Being Moved') + PROGRESS_PATTERN = re.compile( + "(?P<date>.*?)\s+" + + "(?P<iteration>\d+)\s+" + + MEMORY_PATTERN % (1,1,1) + "\s+" + + MEMORY_PATTERN % (2,2,2) + "\s+" + + MEMORY_PATTERN % (3,3,3) + ) + PROGRESS_END_PATTERN = re.compile('(The cluster is balanced. Exiting...|The cluster is balanced. Exiting...)') + + def __init__(self): + self.date = None + self.iteration = None + self.bytesAlreadyMoved = None + self.bytesLeftToMove = None + self.bytesBeingMoved = None + self.bytesAlreadyMovedStr = None + self.bytesLeftToMoveStr = None + self.bytesBeingMovedStr = None + + def recognizeType(self, line): + for (type, pattern) in ( + (HdfsLine.LineType.HeaderStart, self.HEADER_BEGIN_PATTERN), + (HdfsLine.LineType.Progress, self.PROGRESS_PATTERN), + (HdfsLine.LineType.ProgressEnd, self.PROGRESS_END_PATTERN) + ): + m = re.match(pattern, line) + if m: + return type, m + return HdfsLine.LineType.Unknown, None + + def parseProgressLog(self, line, m): + ''' + Parse the line of 'hdfs rebalancer' output. The example output being parsed: + + Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved + Jul 28, 2014 5:01:49 PM 0 0 B 5.74 GB 9.79 GB + Jul 28, 2014 5:03:00 PM 1 0 B 5.58 GB 9.79 GB + + Throws AmbariException in case of parsing errors + + ''' + m = re.match(self.PROGRESS_PATTERN, line) + if m: + self.date = m.group('date') + self.iteration = int(m.group('iteration')) + + self.bytesAlreadyMoved = self.parseMemory(m.group('memory_1'), m.group('mult_1')) + self.bytesLeftToMove = self.parseMemory(m.group('memory_2'), m.group('mult_2')) + self.bytesBeingMoved = self.parseMemory(m.group('memory_3'), m.group('mult_3')) + + self.bytesAlreadyMovedStr = m.group('memmult_1') + self.bytesLeftToMoveStr = m.group('memmult_2') + self.bytesBeingMovedStr = m.group('memmult_3') + else: + raise AmbariException("Failed to parse line [%s]") + + def parseMemory(self, memorySize, multiplier_type): + try: + factor = self.MEMORY_SUFFIX.index(multiplier_type) + except ValueError: + raise AmbariException("Failed to memory value [%s %s]" % (memorySize, multiplier_type)) + + return float(memorySize) * (1024 ** factor) + def toJson(self): + return { + 'timeStamp' : self.date, + 'iteration' : self.iteration, + + 'dataMoved': self.bytesAlreadyMovedStr, + 'dataLeft' : self.bytesLeftToMoveStr, + 'dataBeingMoved': self.bytesBeingMovedStr, + + 'bytesMoved': self.bytesAlreadyMoved, + 'bytesLeft' : self.bytesLeftToMove, + 'bytesBeingMoved': self.bytesBeingMoved, + } + def __str__(self): + return "[ date=%s,iteration=%d, bytesAlreadyMoved=%d, bytesLeftToMove=%d, bytesBeingMoved=%d]"%(self.date, self.iteration, self.bytesAlreadyMoved, self.bytesLeftToMove, self.bytesBeingMoved) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b0e3eb9d/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py index 7f7c8a0..32fc681 100644 --- a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py +++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py @@ -20,6 +20,14 @@ limitations under the License. from resource_management import * from hdfs import hdfs import service_mapping +import hdfs_rebalance +import time +import json +import subprocess +import sys +import os +from datetime import datetime +from ambari_commons.os_windows import * class NameNode(Script): def install(self, env): @@ -70,5 +78,51 @@ class NameNode(Script): nn_refresh_cmd = format('cmd /c hadoop dfsadmin -refreshNodes') Execute(nn_refresh_cmd, user=hdfs_user) + + def rebalancehdfs(self, env): + import params + env.set_params(params) + + hdfs_user = params.hdfs_user + + name_node_parameters = json.loads( params.name_node_params ) + threshold = name_node_parameters['threshold'] + _print("Starting balancer with threshold = %s\n" % threshold) + + def calculateCompletePercent(first, current): + return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove + + def startRebalancingProcess(threshold): + rebalanceCommand = 'hdfs balancer -threshold %s' % threshold + return ['cmd', '/C', rebalanceCommand] + + command = startRebalancingProcess(threshold) + basedir = os.path.join(env.config.basedir, 'scripts') + + _print("Executing command %s\n" % command) + + parser = hdfs_rebalance.HdfsParser() + returncode, stdout, err = run_os_command_impersonated(' '.join(command), hdfs_user, Script.get_password(hdfs_user)) + + for line in stdout.split('\n'): + _print('[balancer] %s %s' % (str(datetime.now()), line )) + pl = parser.parseLine(line) + if pl: + res = pl.toJson() + res['completePercent'] = calculateCompletePercent(parser.initialLine, pl) + + self.put_structured_out(res) + elif parser.state == 'PROCESS_FINISED' : + _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' )) + self.put_structured_out({'completePercent' : 1}) + break + + if returncode != None and returncode != 0: + raise Fail('Hdfs rebalance process exited with error. See the log output') + +def _print(line): + sys.stdout.write(line) + sys.stdout.flush() + if __name__ == "__main__": NameNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/b0e3eb9d/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/params.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/params.py index cd57f5f..1abad5c 100644 --- a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/params.py +++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/params.py @@ -61,3 +61,5 @@ if 'integratedSecurity=true' not in dburl: hdfs_user = "hadoop" grep_exe = "findstr" + +name_node_params = default("/commandParams/namenode", None) \ No newline at end of file
