Repository: ambari
Updated Branches:
  refs/heads/branch-windows-dev 5fdb540eb -> b0e3eb9d7


AMBARI-8033 - Add HDFS Rebalance functionality in HDPWIN 2.1 stack


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b0e3eb9d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b0e3eb9d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b0e3eb9d

Branch: refs/heads/branch-windows-dev
Commit: b0e3eb9d750e29a803fd95ae9beb0c145ad9b628
Parents: 5fdb540
Author: Artem Baranchuk <[email protected]>
Authored: Wed Oct 29 00:53:40 2014 +0200
Committer: Artem Baranchuk <[email protected]>
Committed: Mon Nov 3 15:34:15 2014 +0200

----------------------------------------------------------------------
 .../HDFS/package/scripts/hdfs_rebalance.py      | 130 +++++++++++++++++++
 .../services/HDFS/package/scripts/namenode.py   |  54 ++++++++
 .../2.1/services/HDFS/package/scripts/params.py |   2 +
 3 files changed, 186 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b0e3eb9d/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_rebalance.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_rebalance.py
 
b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_rebalance.py
new file mode 100644
index 0000000..aea6fce
--- /dev/null
+++ 
b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_rebalance.py
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import re
+
+class HdfsParser():
+  def __init__(self):
+    self.initialLine = None
+    self.state = None
+
+  def parseLine(self, line):
+    hdfsLine = HdfsLine()
+    type, matcher = hdfsLine.recognizeType(line)
+    if(type == HdfsLine.LineType.HeaderStart):
+      self.state = 'PROCESS_STARTED'
+    elif (type == HdfsLine.LineType.Progress):
+      self.state = 'PROGRESS'
+      hdfsLine.parseProgressLog(line, matcher)
+      if(self.initialLine == None): self.initialLine = hdfsLine
+
+      return hdfsLine
+    elif (type == HdfsLine.LineType.ProgressEnd):
+      self.state = 'PROCESS_FINISED'
+    return None
+
+class HdfsLine():
+
+  class LineType:
+    HeaderStart, Progress, ProgressEnd, Unknown = range(4)
+
+
+  MEMORY_SUFFIX = ['B','KB','MB','GB','TB','PB','EB']
+  MEMORY_PATTERN = '(?P<memmult_%d>(?P<memory_%d>(\d+)(.|,)?(\d+)?) 
(?P<mult_%d>'+"|".join(MEMORY_SUFFIX)+'))'
+
+  HEADER_BEGIN_PATTERN = re.compile('Time Stamp\w+Iteration#\w+Bytes Already 
Moved\w+Bytes Left To Move\w+Bytes Being Moved')
+  PROGRESS_PATTERN = re.compile(
+                            "(?P<date>.*?)\s+" +
+                            "(?P<iteration>\d+)\s+" +
+                            MEMORY_PATTERN % (1,1,1) + "\s+" +
+                            MEMORY_PATTERN % (2,2,2) + "\s+" +
+                            MEMORY_PATTERN % (3,3,3)
+                            )
+  PROGRESS_END_PATTERN = re.compile('(The cluster is balanced. Exiting...|The 
cluster is balanced. Exiting...)')
+
+  def __init__(self):
+    self.date = None
+    self.iteration = None
+    self.bytesAlreadyMoved = None
+    self.bytesLeftToMove = None
+    self.bytesBeingMoved = None
+    self.bytesAlreadyMovedStr = None
+    self.bytesLeftToMoveStr = None
+    self.bytesBeingMovedStr = None
+
+  def recognizeType(self, line):
+    for (type, pattern) in (
+                            (HdfsLine.LineType.HeaderStart, 
self.HEADER_BEGIN_PATTERN),
+                            (HdfsLine.LineType.Progress, 
self.PROGRESS_PATTERN),
+                            (HdfsLine.LineType.ProgressEnd, 
self.PROGRESS_END_PATTERN)
+                            ):
+      m = re.match(pattern, line)
+      if m:
+        return type, m
+    return HdfsLine.LineType.Unknown, None
+
+  def parseProgressLog(self, line, m):
+    '''
+    Parse the line of 'hdfs rebalancer' output. The example output being 
parsed:
+
+    Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To 
Move  Bytes Being Moved
+    Jul 28, 2014 5:01:49 PM           0                  0 B             5.74 
GB            9.79 GB
+    Jul 28, 2014 5:03:00 PM           1                  0 B             5.58 
GB            9.79 GB
+
+    Throws AmbariException in case of parsing errors
+
+    '''
+    m = re.match(self.PROGRESS_PATTERN, line)
+    if m:
+      self.date = m.group('date')
+      self.iteration = int(m.group('iteration'))
+
+      self.bytesAlreadyMoved = self.parseMemory(m.group('memory_1'), 
m.group('mult_1'))
+      self.bytesLeftToMove = self.parseMemory(m.group('memory_2'), 
m.group('mult_2'))
+      self.bytesBeingMoved = self.parseMemory(m.group('memory_3'), 
m.group('mult_3'))
+
+      self.bytesAlreadyMovedStr = m.group('memmult_1')
+      self.bytesLeftToMoveStr = m.group('memmult_2')
+      self.bytesBeingMovedStr = m.group('memmult_3')
+    else:
+      raise AmbariException("Failed to parse line [%s]")
+
+  def parseMemory(self, memorySize, multiplier_type):
+    try:
+      factor = self.MEMORY_SUFFIX.index(multiplier_type)
+    except ValueError:
+      raise AmbariException("Failed to memory value [%s %s]" % (memorySize, 
multiplier_type))
+
+    return float(memorySize) * (1024 ** factor)
+  def toJson(self):
+    return {
+            'timeStamp' : self.date,
+            'iteration' : self.iteration,
+
+            'dataMoved': self.bytesAlreadyMovedStr,
+            'dataLeft' : self.bytesLeftToMoveStr,
+            'dataBeingMoved': self.bytesBeingMovedStr,
+
+            'bytesMoved': self.bytesAlreadyMoved,
+            'bytesLeft' : self.bytesLeftToMove,
+            'bytesBeingMoved': self.bytesBeingMoved,
+          }
+  def __str__(self):
+    return "[ date=%s,iteration=%d, bytesAlreadyMoved=%d, bytesLeftToMove=%d, 
bytesBeingMoved=%d]"%(self.date, self.iteration, self.bytesAlreadyMoved, 
self.bytesLeftToMove, self.bytesBeingMoved)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/b0e3eb9d/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py
 
b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py
index 7f7c8a0..32fc681 100644
--- 
a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py
+++ 
b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py
@@ -20,6 +20,14 @@ limitations under the License.
 from resource_management import *
 from hdfs import hdfs
 import service_mapping
+import hdfs_rebalance
+import time
+import json
+import subprocess
+import sys
+import os
+from datetime import datetime
+from ambari_commons.os_windows import *
 
 class NameNode(Script):
   def install(self, env):
@@ -70,5 +78,51 @@ class NameNode(Script):
       nn_refresh_cmd = format('cmd /c hadoop dfsadmin -refreshNodes')
     Execute(nn_refresh_cmd, user=hdfs_user)
 
+
+  def rebalancehdfs(self, env):
+    import params
+    env.set_params(params)
+
+    hdfs_user = params.hdfs_user
+
+    name_node_parameters = json.loads( params.name_node_params )
+    threshold = name_node_parameters['threshold']
+    _print("Starting balancer with threshold = %s\n" % threshold)
+
+    def calculateCompletePercent(first, current):
+      return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove
+
+    def startRebalancingProcess(threshold):
+      rebalanceCommand = 'hdfs balancer -threshold %s' % threshold
+      return ['cmd', '/C', rebalanceCommand]
+
+    command = startRebalancingProcess(threshold)
+    basedir = os.path.join(env.config.basedir, 'scripts')
+
+    _print("Executing command %s\n" % command)
+
+    parser = hdfs_rebalance.HdfsParser()
+    returncode, stdout, err = run_os_command_impersonated(' '.join(command), 
hdfs_user, Script.get_password(hdfs_user))
+
+    for line in stdout.split('\n'):
+      _print('[balancer] %s %s' % (str(datetime.now()), line ))
+      pl = parser.parseLine(line)
+      if pl:
+        res = pl.toJson()
+        res['completePercent'] = calculateCompletePercent(parser.initialLine, 
pl)
+
+        self.put_structured_out(res)
+      elif parser.state == 'PROCESS_FINISED' :
+        _print('[balancer] %s %s' % (str(datetime.now()), 'Process is 
finished' ))
+        self.put_structured_out({'completePercent' : 1})
+        break
+
+    if returncode != None and returncode != 0:
+      raise Fail('Hdfs rebalance process exited with error. See the log 
output')
+
+def _print(line):
+  sys.stdout.write(line)
+  sys.stdout.flush()
+
 if __name__ == "__main__":
   NameNode().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/b0e3eb9d/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/params.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/params.py
 
b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/params.py
index cd57f5f..1abad5c 100644
--- 
a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/params.py
+++ 
b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/params.py
@@ -61,3 +61,5 @@ if 'integratedSecurity=true' not in dburl:
 hdfs_user = "hadoop"
 
 grep_exe = "findstr"
+
+name_node_params = default("/commandParams/namenode", None)
\ No newline at end of file

Reply via email to