Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/940#discussion_r83184770
  
    --- Diff: tools/bin/hawqsync-falcon ---
    @@ -0,0 +1,1331 @@
    +#!/usr/bin/env python
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +import os
    +import sys
    +from optparse import OptionParser
    +from subprocess import Popen, PIPE
    +from hashlib import md5
    +from json import loads
    +from time import strftime, sleep, time
    +from collections import defaultdict
    +# TODO - make use of these common HAWQ libs instead of print
    +#from gppylib.gplog import setup_hawq_tool_logging, enable_verbose_logging
    +#from gppylib.commands.unix import getLocalHostname, getUserName
    +try:
    +    from xml.etree import cElementTree as ElementTree
    +except ImportError, e:
    +    from xml.etree import ElementTree
    +
    +def parseargs():
    +    parser = OptionParser(usage="HAWQ sync options.")
    +    parser.add_option('-v', '--verbose', action='store_true',
    +                      default=False)
    +    parser.add_option("-a", "--prompt", action="store_false",
    +                      dest="prompt", default=True,
    +                      help="Execute without prompt.")
    +    parser.add_option("-l", "--logdir", dest="logDir",
    +                      help="Sets the directory for log files")
    +    parser.add_option('-d', '--dryRun', action='store_true',
    +                      default=False,
    +                      dest='testMode', help="Execute in test mode")
    +    parser.add_option('-u', '--user', dest='userName', default="gpadmin",
    +                      help="The user to own Falcon ACLs and run job as")
    +    parser.add_option('--maxMaps', dest='distcpMaxMaps',
    +                      default="10",
    +                      help="The maximum number of map jobs to allow")
    +    parser.add_option('--mapBandwidth', dest='distcpMaxMBpsPerMap',
    +                      default="100",
    +                      help="The maximum allowable bandwidth for each map 
job, in MB/s")
    +    parser.add_option('-s', '--sourceNamenode', dest='sourceNamenode',
    +                      default="",
    +                      help="The IP or FQDN of the source HDFS namenode")
    +    parser.add_option('-S', '--sourceEntity', 
dest='sourceClusterEntityName',
    +                      default="source",
    +                      help="The Falcon cluster entity name of the source")
    +    parser.add_option('-m', '--sourceHawqMaster', dest='sourceHawqMaster',
    +                      default="",
    +                      help="The IP or FQDN of the source HAWQ master")
    +    parser.add_option('-M', '--targetHawqMaster', dest='targetHawqMaster',
    +                      default="",
    +                      help="The IP or FQDN of the target HAWQ master")
    +    parser.add_option('-f', '--falconUri', dest='falconUri',
    +                      default="http://localhost:15000";,
    +                      help="The URI to use for issuing Falcon REST calls")
    +    parser.add_option('-t', '--targetNamenode', dest='targetNamenode',
    +                      default="",
    +                      help="The IP or FQDN of the source HDFS namenode")
    +    parser.add_option('-T', '--targetEntity', 
dest='targetClusterEntityName',
    +                      default="target",
    +                      help="The Falcon cluster entity name of the target")
    +    parser.add_option('-e', '--executionEntity',
    +                      dest='executionClusterEntityName',
    +                      default="source",
    +                      help="The Falcon cluster entity name specifying 
where to execute the job")
    +    parser.add_option('-w', '--workflowHdfsFilename', 
dest='workflowFilename',
    +                      
default="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
    +                      help="The HDFS location of the underlying Oozie 
workflow to use for sync job")
    +    parser.add_option('-p', '--pathToSync', dest='pathToSync',
    +                      default="/tmp/syncTest",
    +                      help="The root directory to be syncronized")
    +    parser.add_option('-j', '--jobName', dest='jobName', default="drSync",
    +                      help="The Falcon job entity name to be executed")
    +
    +    (options, args) = parser.parse_args()
    +    return (options, args)
    +
    +def extractFilenameAndSize(line, hdfsPort):
    +    """Utility function to extract filename and file
    +    size from a line of output from `hdfs dfs -ls -R`
    +
    +    """
    +
    +    tokens = line.split()
    +    return tokens[-1].split(":" + hdfsPort)[-1], tokens[4]
    +
    +def flattenFilelist(data, hdfsPort):
    +    """Utility function to convert a list of output
    +    lines from `hdfs dfs -ls -R` into a single, sorted, 
    +    delimited string to be used as a syncronization
    +    fingerprint
    +
    +    """
    +
    +    # Ensure record contains expected number of fields
    +    isValid = lambda r: len(r.strip().split()) == 8
    +
    +    # Subset the records to only filename and size fields
    +    filenameAndSize = [extractFilenameAndSize(line, hdfsPort) for line in 
data.split("\n") if isValid(line)]
    +
    +    # Reverse sort the list by filename column
    +    sortedFilenameAndSize = sorted(filenameAndSize, key=lambda r: r[0], 
reverse=True)
    +
    +    # Flatten a single line into a delimited string
    +    mergeLines = lambda l: "-".join(l)
    +
    +    # Perform the flatten for every line and join lines into a string
    +    return "\n".join(map(mergeLines, sortedFilenameAndSize))
    +
    +def computeMd5(data):
    +    """Utility function to compute MD5 checksum
    +
    +    """
    +    hasher = md5()
    +    hasher.update(data)
    +
    +    return hasher.hexdigest()
    +
    +def getHdfsFingerprint(hdfsUri="", hdfsDir="/hawq_default", 
isTesting=False):
    +    """Utility function to compute an MD5 
    +    hash from the output of a recursive HDFS 
    +    directory listing
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsPort = hdfsUri.split(":")[-1]
    +
    +    hdfsCommand = "hdfs dfs -ls -R {u}{d}".format(u=hdfsUri, d=hdfsDir)
    +    #print hdfsCommand
    +
    +    filelist = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as gpadmin user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'gpadmin'
    +        hdfsProcess = Popen(hdfsCommand.split(), env=env,
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (filelist, stderr) = hdfsProcess.communicate()
    +
    +        retVal = hdfsProcess.returncode
    +
    +        if retVal != 0:
    +            return retVal, stderr
    +
    +    # Sample output to follow
    +    else:
    +        filelist = """
    +        drwx------   - gpadmin gpadmin          0 2016-07-14 12:32 
hdfs://sandbox:8020/hawq_default/16385
    +        drwx------   - gpadmin gpadmin          0 2016-08-04 18:58 
hdfs://sandbox:8020/hawq_default/16385/16387
    +        drwx------   - gpadmin gpadmin          0 2016-07-14 12:14 
hdfs://sandbox:8020/hawq_default/16385/16387/18947
    +        """;
    +
    +        retVal = 0
    +
    +    data = flattenFilelist(filelist, hdfsPort)
    +
    +    # sample yields: 342f414e7519f8c6a9eacce94777ba08
    +    return retVal, computeMd5(data)
    +
    +def etree_to_dict(t):
    +    """Utility function to turn an XML 
    +    element tree into a dictionary
    +
    +    """
    +
    +    d = {t.tag: {} if t.attrib else None}
    +    children = list(t)
    +    if children:
    +        dd = defaultdict(list)
    +        for dc in map(etree_to_dict, children):
    +            for k, v in dc.iteritems():
    +                dd[k].append(v)
    +
    +        d[t.tag] = dict((k, v[0]) if len(v) == 1 else dict((k, v)) for k, 
v in dd.iteritems())
    +    if t.attrib:
    +        d[t.tag].update(('@' + k, v) for k, v in t.attrib.iteritems())
    +    if t.text:
    +        text = t.text.strip()
    +        if children or t.attrib:
    +            if text:
    +              d[t.tag]['#text'] = text
    +        else:
    +            d[t.tag] = text
    +    return d
    +
    +def xmlToDict(data):
    +    """Wrapper function to convert 
    +    XML string into a Python dictionary
    +
    +    """
    +
    +    e = ElementTree.XML(data)
    +    return etree_to_dict(e)
    +
    +def getFalconStatus(falconUri="http://localhost:15000";, 
entity="drSyncTest",
    +                    user="gpadmin", onlyRuntime=False, isTesting=False,
    +                    doDebug=False):
    +    """Get the current status of an existing Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. 
http://host:port)
    +        entity (str): the Falcon process entity name to get status for
    +        user (str): the username used for authorization
    +        onlyRuntime (bool): only query for process runtime (e.g. 
post-completion)
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +        doDebug (bool): debugging mode for additional verbosity
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one 
otherwise. message can contain process start time or logfile
    +        message (str): message can contain process end time or status 
string
    +
    +    """
    +
    +    retVal = -1
    +
    +    # Example REST call:
    +    # GET 
http://localhost:15000/api/entities/status/process/drSyncTest?user.name=falcon&fields=status,clusters,tags
    +
    +    endpoint = 
"/api/instance/status/process/{e}?user.name={u}&fields=status".format(u=user, 
e=entity)
    +
    +    curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
    +
    +    if doDebug:
    +        print curlCommand
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +    if doDebug:
    +        print "stdout:", stdout, "stderr:", stderr
    +
    +    try:
    +        result = loads(stdout)
    +        if 'instances' not in result:
    +            print "No instance was started, try deleting the job and 
re-running"
    +            return -1, "stdout: {0}, stderr: {1}".format(stdout, stderr)
    +        if onlyRuntime:
    +            # Parse the start/end times in JSON result from cURL
    +            resultPayload = result['instances'][0]
    +            return resultPayload['startTime'], resultPayload['endTime']
    +        else:
    +            # Parse the logfile/status in JSON result from cURL
    +            resultPayload = result['instances'][0] #['actions'][0]
    +            return resultPayload['logFile'], resultPayload['status']
    +    except KeyError as e:
    +        print "KeyError in  getFalconStatus()", str(e), "\n", stdout
    +        return -1, str(e)
    +    except ValueError as e:
    +        print "ValueError in  getFalconStatus()", str(e), "\n", stdout
    +        print "Is Falcon running at : {} ?".format(falconUri)
    +        return -1, str(e)
    +
    +    # Example output follows:
    +    else:
    +        stdout = """{
    +            "status":"SUCCEEDED",
    +            "message":"default/STATUS\n",
    +            "requestId":"default/1436392466@qtp-1730704097-152 - 
8dd8f7fa-2024-4bdb-a048-c188759c2f47\n",
    +            "instances":[{
    +                "instance":"2016-08-17T13:22Z",
    +                "status":"SUSPENDED",
    +                
"logFile":"http://sandbox2.hortonworks.com:11000/oozie?job=0000014-160805194358788-oozie-oozi-W";,
    +                "cluster":"secondaryIDP",
    +                "startTime":"2016-08-17T12:25:23-07:00",
    +                "details":"",
    +                "actions":[{
    +                    "action":"user-action",
    +                    "status":"RUNNING"
    +                }, {
    +                    "action":"failed-post-processing",
    +                    "status":"RUNNING",
    +                    
"logFile":"http://sandbox2.hortonworks.com:8088/proxy/application_1470437437449_0002/";
    +                }]
    +            }]
    +        }""".replace("\n", "");
    +
    +        return 0, loads(stdout)['instances'][0]['actions'][0]
    +
    +def doFalconSchedule(falconUri="http://localhost:15000";, 
jobName="drSyncTest",
    +                     userName="gpadmin", isTesting=False):
    +    """Schedule an existing Falcon process/job entity for execution
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. 
http://host:port)
    +        jobName (str): the Falcon process entity name to get status for
    +        userName (str): the username used for authorization
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST 
http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryRun=false
    +
    +    endpoint = 
"/api/entities/schedule/process/{n}?user.name={u}&skipDryRun=true".format(n=jobName,
 u=userName)
    +
    +    curlCommand = "curl -H Content-Type:text/xml -X POST 
{0}".format(falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        try:
    +            # Parse the XML result from cURL into a dictionary
    +            resultPayload = xmlToDict(stdout)['result']
    +            return resultPayload['status'], resultPayload['message']
    +        except KeyError:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +        except:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +        <result>
    +            <status>SUCCEEDED</status>
    +            <message>default/drSyncTest(process) scheduled 
successfully</message>
    +            <requestId>default/2028387903@qtp-1730704097-6 - 
89554f01-91cf-4bbd-97c2-ee175711b2ba</requestId>
    +        </result>
    +        """.replace("\n", "");
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +# Falcon process entity template used to create/update job attributes
    +drSyncTemplate="""<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +<process name="{name}" xmlns="uri:falcon:process:0.1">
    +    <tags>_falcon_mirroring_type=HDFS</tags>
    +    <clusters>
    +        <cluster name="{executionClusterEntityName}">
    +            <validity start="{startTime}" end="{endTime}"/>
    +        </cluster>
    +    </clusters>
    +    <parallel>1</parallel>
    +    <order>LAST_ONLY</order>
    +    <frequency>days(7)</frequency>
    +    <timezone>GMT{gmtOffset}</timezone>
    +    <properties>
    +        <property name="oozie.wf.subworkflow.classpath.inheritance" 
value="true"/>
    +        <property name="distcpMaxMaps" value="{distcpMaxMaps}"/>
    +        <property name="distcpMapBandwidth" value="{distcpMaxMBpsPerMap}"/>
    +        <property name="drSourceDir" value="{pathToSync}"/>
    +        <property name="drTargetDir" value="{pathToSync}"/>
    +        <property name="drTargetClusterFS" value="{targetHdfsUri}"/>
    +        <property name="drSourceClusterFS" value="{sourceHdfsUri}"/>
    +        <!-- This can be a list of emails for notifications -->
    +        <property name="drNotificationReceivers" value="NA"/>
    +        <property name="targetCluster" value="{targetClusterEntityName}"/>
    +        <property name="sourceCluster" value="{sourceClusterEntityName}"/>
    +        <property name="queueName" value="default"/>
    +        <property name="jobPriority" value="HIGH"/>
    +    </properties>
    +    <workflow name="drSyncTest-WF" engine="oozie" 
path="{workflowFilename}" lib=""/>
    +    <retry policy="periodic" delay="minutes(1)" attempts="3"/>
    +    <ACL owner="{userName}" group="users" permission="0755"/>
    +</process>"""
    +
    +def doFalconSubmit(falconUri="http://localhost:15000";, jobParameters=None,
    +                   isTesting=False):
    +    """Submit/create a Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. 
http://host:port)
    +        jobParameters (dict): a dictionary containing process entity 
configuration 
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    if jobParameters is None:
    +        return retVal, "You must provide a job parameters dictionary"
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST 
http://localhost:15000/api/entities/submit/process?user.name=falcon
    +
    +    endpoint = 
"/api/entities/submit/process?user.name={u}".format(u=jobParameters['userName'])
    +
    +    thisMinute = int(strftime("%M"))
    +    thisYear = int(strftime("%Y"))
    +    gmtOffset = strftime("%z")
    +
    +    oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 
1))
    +    oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
    +
    +    # TODO long term would be to encapsulate Falcon functions in a class 
structure
    +    # -- (i.e. full Python Falcon abstraction/API)
    +    # -- Targeting S3 or Azure Blob will require a different template
    +    payload = drSyncTemplate.format(startTime=oneMinuteLater,
    +                                    endTime=oneYearLater,
    +                                    gmtOffset=gmtOffset,
    +                                    
distcpMaxMaps=jobParameters['distcpMaxMaps'],
    +                                    
distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
    +                                    
sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
    +                                    
sourceHdfsUri=jobParameters['sourceHdfsUri'],
    +                                    
targetClusterEntityName=jobParameters['targetClusterEntityName'],
    +                                    
targetHdfsUri=jobParameters['targetHdfsUri'],
    +                                    
executionClusterEntityName=jobParameters['executionClusterEntityName'],
    +                                    
workflowFilename=jobParameters['workflowFilename'],
    +                                    pathToSync=jobParameters['pathToSync'],
    +                                    name=jobParameters['jobName'],
    +                                    userName=jobParameters['userName'])
    +
    +    curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        try:
    +            # Parse the XML result from cURL into a dictionary
    +            resultPayload = xmlToDict(stdout)['result']
    +            return resultPayload['status'], resultPayload['message']
    +        except ElementTree.ParseError:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +        <result>
    +            <status>SUCCEEDED</status>
    +            <message>falcon/default/Submit successful (process) 
drSyncTest</message>
    +            <requestId>falcon/default/2028387903@qtp-1730704097-6 - 
7ddba052-527b-462f-823f-e7dd0a1a08fa</requestId>
    +        </result>
    +        """.replace("\n", "");
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +def doFalconSoar(falconUri="http://localhost:15000";, jobParameters=None,
    +                 isTesting=False):
    +    """Update, schedule, and monitor a Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. 
http://host:port)
    +        jobParameters (dict): a dictionary containing process entity 
configuration 
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    if jobParameters is None:
    +        return retVal, "You must provide a job parameters dictionary"
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST 
http://localhost:15000/api/entities/update/process/drSyncTest?user.name=falcon
    +
    +    endpoint = 
"/api/entities/update/process/{n}?user.name={u}".format(n=jobParameters['jobName'],
    +                                                                       
u=jobParameters['userName'])
    +
    +    thisMinute = int(strftime("%M"))
    +    thisYear = int(strftime("%Y"))
    +    gmtOffset = strftime("%z")[:3] + ":00"
    +
    +    oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 
1))
    +    oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
    +
    +    print "Scheduling for", oneMinuteLater
    +    print "Ending on", oneYearLater
    +
    +    # TODO encapsulate everything in a class structure
    +    # -- (i.e. full Python Falcon abstraction/API)
    +    # -- Targeting AWS S3 or Azure Blob will require a different template
    +    payload = drSyncTemplate.format(startTime=oneMinuteLater,
    +                                    endTime=oneYearLater,
    +                                    gmtOffset=gmtOffset,
    +                                    
distcpMaxMaps=jobParameters['distcpMaxMaps'],
    +                                    
distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
    +                                    
sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
    +                                    
sourceHdfsUri=jobParameters['sourceHdfsUri'],
    +                                    
targetClusterEntityName=jobParameters['targetClusterEntityName'],
    +                                    
targetHdfsUri=jobParameters['targetHdfsUri'],
    +                                    
executionClusterEntityName=jobParameters['executionClusterEntityName'],
    +                                    
workflowFilename=jobParameters['workflowFilename'],
    +                                    pathToSync=jobParameters['pathToSync'],
    +                                    name=jobParameters['jobName'],
    +                                    userName=jobParameters['userName'])
    +
    +    # TODO output for debug level
    +    #from pprint import pprint
    +    #pprint (payload)
    +
    +    curlCommand = "curl -H Content-Type:text/xml -X POST {uri} -d 
".format(uri=falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Note, needed to seperate out the payload as it can't be split on 
spaces
    +        curlProcess = Popen(curlCommand.split() + [payload],
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        # Curl process completed successfully
    +        if retVal == 0:
    +
    +            try:
    +                # Parse the XML result from cURL into a dictionary
    +                result = xmlToDict(stdout)['result']
    +                stderr = result['message']
    +
    +                # Falcon REST update operation successful
    +                if "SUCCEEDED" in result['status']:
    +                    print "Falcon process update was successful"
    +
    +                    # We should doFalconSchedule() here
    +                    status, message = doFalconSchedule(falconUri=falconUri,
    +                                                       
jobName=jobParameters['jobName'],
    +                                                       isTesting=False)
    +
    +                    # If we suceeded in scheduling
    +                    if "SUCCEEDED" in status:
    +                        print "Falcon process scheduling was successful"
    +
    +                        # Reset retVal to catch error between scheduled 
and running states
    +                        retVal = -1
    +                        sleep(5)
    +
    +                        message, status = 
getFalconStatus(falconUri=falconUri,
    +                                                          
entity=jobParameters['jobName'])
    +
    +                        # Continuously poll for hdfs-mirroring status
    +                        while "RUNNING" in status:
    +                            message, status = 
getFalconStatus(falconUri=falconUri,
    +                                                              
entity=jobParameters['jobName'])
    +                            print status
    +
    +                            # flag RUNNING state reached using retVal
    +                            retVal = 0
    +                            sleep(10)
    +
    +                        if status == "KILLED":
    +                            return -1, message
    +
    +                        # Poll one last time for runtimes
    +                        start, finish = 
getFalconStatus(falconUri=falconUri,
    +                                                        
entity=jobParameters['jobName'],
    +                                                        onlyRuntime=True)
    +
    +                        return retVal, "logfile: {0} started: {1} 
finished: {2}".format(message, start, finish)
    +
    +                    # Scheduling failed
    +                    else:
    +                        return -1, message
    +
    +                # Falcon REST update operation NOT successful
    +                else:
    +                    print "Falcon REST operation not successful"
    +                    return result['status'], stderr
    +
    +            except KeyError:
    +                print "Are you using the correct Falcon server URI?", 
falconUri
    +                return -1, stdout
    +
    +        # Curl process did not complete successfully
    +        else:
    +            print "Curl command failed"
    +            return retVal, stderr
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +            <result>
    +                <status>SUCCEEDED</status>
    +                <message>falcon/update/default/Updated 
successfully</message>
    +                
<requestId>falcon/update/default/868391317@qtp-1730704097-47 - 
b2391bd7-3ae0-468e-b39c-5d002099a446</requestId>
    +            </result>
    +        """.replace("\n", "");
    +
    +        # Parse the XML result from cURL into a dictionary
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +def hdfsFingerprintsMatch(reference, comparison):
    +    """Helper function to compare two fingerprints / md5 hashes
    +
    +    Args:
    +        reference (str): the reference MD5 checksum string
    +        comparison (str): the comparison MD5 checksum string
    +    
    +    Returns:
    +        isEqual (bool): Zero for success, negative one otherwise 
    +
    +    """
    +
    +    return reference == comparison
    +
    +
    +def stopHawq(masterHost=None, isTesting=False):
    +    """Optionally connect to a remote HAWQ master
    +    and do a quick stop of the HAWQ master process
    +
    +    Args:
    +        masterHost (str): the remote host to SSH to first, otherwise 
localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one 
otherwise
    +        message (str): message contains a stderr string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hawqStopCommand = "hawq stop master -a -M fast"
    +
    +    if masterHost is not None:
    +        hawqStopCommand = "ssh {h} -- '{c}'".format(h=masterHost,
    +                                                    c=hawqStopCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        hawqStopProcess = Popen(hawqStopCommand,
    +                                stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hawqStopProcess.communicate()
    +
    +        return hawqStopProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TEST MODE"
    +
    +def startHawq(masterHost=None, isTesting=False):
    +    """Optionally connect to a remote HAWQ master
    +    and do a start of the HAWQ master process
    +
    +    Args:
    +        masterHost (str): the remote host to SSH to first, otherwise 
localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one 
otherwise
    +        message (str): message contains a stderr string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hawqStartCommand = "hawq start master -a"
    +
    +    if masterHost is not None:
    +        hawqStartCommand = "ssh {h} -- '{c}'".format(h=masterHost,
    +                                                     c=hawqStartCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        hawqStartProcess = Popen(hawqStartCommand,
    +                                 stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hawqStartProcess.communicate()
    +
    +        return hawqStartProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TEST MODE"
    +
    +
    +def copyToHdfs(source, dest, isTesting=False):
    +    """Utility function to copy a source file
    +    to the destination HDFS directory/file
    +
    +    Args:
    +        source (str): the source file on the local FS
    +        dest (str): the target HDFS directory and filename
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one 
otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfs -copyFromLocal {s} 
{d}".format(s=source,
    +                                                                    d=dest)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'gpadmin'
    +        hdfsProcess = Popen(hdfsCommand.split(), env=env,
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def checkHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to query HDFS for 
    +    safemode enabled or disabled
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise 
localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one 
otherwise
    +        message (str): message contains status string
    +
    +    """
    +    
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode get"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs 
{c}'".format(h=namenodeHost,
    +                                                                          
c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        try:
    +            offOrOn = True if "ON" in stdout.split()[-1] else False
    +        except IndexError as e:
    +            return -1, str(e)
    +
    +        return hdfsProcess.returncode, offOrOn
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def enableHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to enable safemode
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise 
localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one 
otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode enter"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs 
{c}'".format(h=namenodeHost,
    +                                                                          
c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def disableHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to disable safemode
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise 
localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one 
otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode leave"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs 
{c}'".format(h=namenodeHost,
    +                                                                          
c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def forceHdfsCheckpoint(namenodeHost=None, isTesting=False):
    +    """Utility function to force an HDFS checkpoint
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise 
localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one 
otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -saveNamespace"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs 
{c}'".format(h=namenodeHost,
    +                                                                          
c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +
    +def createTarball(masterDataBase="/data/hawq/",
    +                  targetTarball="/tmp/hawqMdd-{t}.tar", isTesting=False):
    +    """Utility function to create a tarball of the HAWQ 
MASTER_DATA_DIRECTORY
    +
    +    Args:
    +        masterDataBase (str): the base directory containing the 
MASTER_DATA_DIRECTORY
    +        targetTarball (str): the target directory and filename of the 
tarball
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one 
otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +    checksum = None
    +
    +    # Example invocation (note: excluding most of pg_log contents)
    +    # tar cpf /tmp/test.tar.bz2 --exclude=csv -C /data/hawq master
    +
    +    theTime = strftime("%Y-%m-%d-%H%M")
    +
    +    tarCommand = "tar -cpf {t} --exclude=csv -C {c} 
master".format(t=targetTarball.format(t=theTime),
    +                                                                   
c=masterDataBase)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +
    +        try:
    +            tarProcess = Popen(tarCommand.split(), stdout=PIPE, 
stderr=PIPE)
    +
    +            (stdout, stderr) = tarProcess.communicate()
    +
    +        except OSError as e:
    +            return -1, str(e), -1
    +
    +        if tarProcess.returncode == 0:
    +
    +            md5Command = "md5sum 
{f}".format(f=targetTarball.format(t=theTime))
    +
    +            try:
    +                md5Process = Popen(md5Command.split(),
    +                                   stdout=PIPE, stderr=PIPE)
    +
    +                (stdout2, stderr2) = md5Process.communicate()
    +
    +                checksum = stdout2.split()[0].strip()
    +
    +                if md5Process.returncode != 0:
    +                    return -1, "md5 checksum creation failed : " + 
stderr2, -1
    +                else:
    +                    return 0, targetTarball.format(t=theTime), checksum
    +
    +            except OSError as e:
    +                return -1, str(e), -1
    +
    +        else:
    +            print "Tarball creation failed : " + stderr
    +            return -1, stderr, -1
    +
    +    else:
    +        return 0, "TEST BRANCH", -1
    +
    +def cleanupTarball(filename, isTesting=False):
    +    """Utility function to delete a tarball of the HAWQ 
MASTER_DATA_DIRECTORY
    +
    +    Args:
    +        filename (str): the target directory and filename of the tarball 
to clean up
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one 
otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +    checksum = None
    +
    +    # Example invocation (note: excluding most of pg_log contents)
    +    # rm -f /tmp/test.tar
    +
    +    rmCommand = "rm -f {f}".format(f=filename)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +
    +        try:
    +            rmProcess = Popen(rmCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +            (stdout, stderr) = rmProcess.communicate()
    +
    +            retVal = rmProcess.returncode
    +
    +            return retVal, stderr
    +
    +        except OSError as e:
    +            return -1, str(e)
    +    else:
    +        return 0, "TEST BRANCH"
    +
    +if __name__ == '__main__':
    +    options, args = parseargs()
    +
    +    #if options.verbose:
    +    #    enable_verbose_logging()
    +
    +    # TODO - switch prints to this once using gppylibs
    +    #logger, log_filename = 
setup_hawq_tool_logging('hawq_sync',getLocalHostname(),getUserName(), 
options.logDir)
    +
    +
    +    # ### HAWQ Extract every non-system table (source)
    +    # Note: the asusmption is this has been done in
    +    # advance of executing this tool.
    +    if options.prompt:
    +        # TODO - switch to this once using gppylibs
    +        #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby 
master activate", 'N'):
    +
    +        # TODO - switch to gppylib-based logging
    +        print 
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
    +        print ""
    +        print "Please confirm you've performed hawq_extract() on all 
critical data tables"
    +        print "and saved this information outside of the cluster (e.g. 
version control)"
    +        print "or are using Falcon with an atomic option (i.e. in HDP-2.5: 
snapshot-based replication)"
    +        print ""
    +        print "This is critical for data recovery if a sync operation 
partially completes!"
    +        print ""
    +        print 
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
    +        answer = raw_input("y or n: ")
    +        if "y" not in answer and "Y" not in answer:
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Falcon cluster entities
    +    # Note: the assumption is both source and target
    +    # cluster entities have alredy been created in Falcon
    +    # TODO add a confirmation step, later a REST call to check
    +    if options.prompt:
    +        # TODO - switch to this once using gppylibs
    +        #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby 
master activate", 'N'):
    +
    +        # TODO - switch to gppylib-based logging
    +        print ""
    +        print "Please confirm you've created both source and target Falcon 
cluster entities:"
    +        print ""
    +        answer = raw_input("y or n: ")
    +        if "y" not in answer and "Y" not in answer:
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Stop HAWQ
    +    #
    +    # TODO?: switch to Ambari REST, followed by pkill -5 <<some hawq 
filter>>
    +
    +    # Source
    +    earlier = int(time())
    +    print "Stopping source HAWQ" if options.verbose else None;
    +    retVal, stderr = stopHawq(isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to stop source HAWQ master"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # Target
    +    # TODO - either use Ambari REST call or use SSH
    +    print "Stopping target HAWQ" if options.verbose else None;
    +    retVal, stderr = stopHawq(masterHost=options.targetHawqMaster,
    +                              isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to stop target HAWQ master"
    +        print "Error message was " + stderr
    +        print "Restarting source HAWQ" if options.verbose else None;
    +        retVal, stderr = startHawq(isTesting=options.testMode)
    +        later = int(time())
    +
    +    if retVal == 0:
    +        print "HAWQ was down for {0} seconds".format(later-earlier) if 
options.verbose else None;
    +    else:
    +        print "Source HAWQ failed to restart after pre-sync failure."
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Create HAWQ Master Data Directory archive
    +    print "Creating MDD tarball" if options.verbose else None;
    +    retVal, filenameOrStderr, md5sum = 
createTarball(masterDataBase="/data/hawq/",
    +                                                     
isTesting=options.testMode)
    +    print retVal, filenameOrStderr, md5sum if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to create archive of source HAWQ 
MASTER_DATA_DIRECTORY"
    +        print "Error message was : " + filenameOrStderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        retVal, stderr = startHawq(isTesting=options.testMode)
    +        later = int(time())
    +        if retVal == 0:
    +            print "HAWQ was down for {0} seconds".format(later-earlier) if 
options.verbose else None;
    +        else:
    +            print "Source HAWQ failed to restart after pre-sync failure."
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Start HAWQ
    +    print "Starting source HAWQ" if options.verbose else None;
    +    retVal, stderr = startHawq(isTesting=options.testMode)
    +    later = int(time())
    +
    +    if retVal != 0:
    +        print "Failed to start source HAWQ master"
    +        print "Error message was " + stderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "HAWQ was down for {0} seconds".format(later-earlier) if 
options.verbose else None;
    +
    +    # TODO add a CLI flag to force source into read-write
    +    if checkHdfsSafemode()[1] == True:
    +        print "Source cluster HDFS is read-only, cannot proceed"
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        sys.exit(1)
    +
    +    # ### Copy MDD archive to HDFS
    +    print "Copying MDD tarball to HDFS" if options.verbose else None;
    +    retVal, stderr = copyToHdfs(source=filenameOrStderr,
    +                                dest=options.pathToSync,
    +                                isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to copy MDD tarball to HDFS"
    +        print "Error message was " + stderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Cleanup MDD archive from /tmp
    +    print "Cleaning up MDD tarball on local FS" if options.verbose else 
None;
    +    retVal, stderr = cleanupTarball(filenameOrStderr,
    +                                    isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to clean up MDD tarball"
    +        print "Error message was " + stderr
    +        print ""
    +        print "You will need to manually remove the following file"
    +        print filenameOrStderr
    +        print ""
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    """
    +    # ### Force HDFS checkpoint and enable safemode on source
    +    print "Enabling HDFS safemode on source cluster" if options.verbose 
else None;
    +    retVal, stderr = enableHdfsSafemode(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to enable HDFS safemode on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "Forcing HDFS checkpoint on source cluster" if options.verbose 
else None;
    +    retVal, stderr = forceHdfsCheckpoint(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to force HDFS checkpoint on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +    """;
    +
    +    # ### Leave safemode on target HDFS
    +    print "Disabling HDFS safemode on target" if options.verbose else None;
    +    retVal, stderr = 
disableHdfsSafemode(namenodeHost=options.targetNamenode,
    +                                         isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to leave HDFS safemode on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # Note, the entity names refer to Falcon
    +    # entities that have been created prior
    +    # to execution of this tool
    +    """
    +    jobParameters = dict(userName="gpadmin",
    +                         distcpMaxMaps="100",
    +                         distcpMaxMBpsPerMap="1000",
    +                         sourceClusterEntityName="sourceCluster",
    +                         
sourceHdfsUri="hdfs://{0}:8020".format(sourceNamenode),
    +                         targetClusterEntityName="targetCluster",
    +                         
txMapsargetHdfsUri="hdfs://{0}:8020".format(targetNamenode),
    +                         executionClusterEntityName="sourceCluster",
    +                         
workflowFilename="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
    +                         pathToSync="/tmp/syncTest",
    +                         jobName="drSync")
    +    """;
    +
    +    jobParameters = dict(userName=options.userName,
    +                         distcpMaxMaps=options.distcpMaxMaps,
    +                         distcpMaxMBpsPerMap=options.distcpMaxMBpsPerMap,
    +                         
sourceClusterEntityName=options.sourceClusterEntityName,
    +                         
sourceHdfsUri="hdfs://{0}:8020".format(options.sourceNamenode),
    +                         
targetClusterEntityName=options.targetClusterEntityName,
    +                         
targetHdfsUri="hdfs://{0}:8020".format(options.targetNamenode),
    +                         
executionClusterEntityName=options.executionClusterEntityName,
    +                         workflowFilename=options.workflowFilename,
    +                         pathToSync=options.pathToSync,
    +                         jobName=options.jobName)
    +
    +    print jobParameters if options.verbose else None;
    +
    +    # ### Update and Schedule Job - monitor until completion
    +    print "Falcon Soar" if options.verbose else None;
    +    retVal, stderr = doFalconSoar(falconUri=options.falconUri,
    +                                  jobParameters=jobParameters,
    +                                  isTesting=options.testMode)
    +    falconOutput = stderr
    +
    +    if retVal != 0:
    +        print "Falcon replication job failed"
    +        print "Error message was " + stderr
    +        print "Source cluster will be left in safemode for remediation"
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Leave safemode on source HDFS
    +    print "Disable HDFS safemode on source cluster" if options.verbose 
else None;
    +    retVal, stderr = disableHdfsSafemode(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to leave HDFS safemode on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Force HDFS checkpoint and enable safemode on target
    +    print "Enabling HDFS safemode on target cluster" if options.verbose 
else None
    +    retVal, stderr = 
enableHdfsSafemode(namenodeHost=options.targetNamenode, 
isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to enable HDFS safemode on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "Forcing HDFS checkpoint on target cluster" if options.verbose 
else None
    +    retVal, stderr = 
forceHdfsCheckpoint(namenodeHost=options.targetNamenode, 
isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to force HDFS checkpoint on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### HDFS Fingerprint comparison
    +
    +    print "Validating HDFS fingerprints match between source and target 
clusters" if options.verbose else None
    +
    +    retVal, md5OrStderr = 
getHdfsFingerprint(hdfsUri=jobParameters['sourceHdfsUri'],
    +                                             
hdfsDir=jobParameters['pathToSync'],
    +                                             isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to generate HDFS fingerprint on source cluster"
    +        print "Error message was " + md5OrStderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    retVal, md5OrStderr2 = 
getHdfsFingerprint(hdfsUri=jobParameters['targetHdfsUri'],
    +                                              
hdfsDir=jobParameters['pathToSync'],
    +                                              isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to generate HDFS fingerprint on target cluster"
    +        print "Error message was " + md5OrStderr2
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    isValidSync = hdfsFingerprintsMatch(md5OrStderr, md5OrStderr2)
    +
    +    if not isValidSync:
    +        print "Source and target cluster HDFS fingerprints do not match."
    +        print "Source checksum : " + md5OrStderr
    +        print "Target checksum : " + md5OrStderr2
    +        print "This is bad, please check Falcon sync logs : " + 
falconOutput
    +        sys.exit(1)
    +    else:
    +        print "Source and target HDFS fingerprints match."
    +
    +    # Target
    +    # TODO - either use Ambari REST call or use SSH
    +    print "Starting target HAWQ" if options.verbose else None;
    +    retVal, stderr = startHawq(masterHost=options.targetHawqMaster,
    +                               isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to start target HAWQ master"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +    else:
    +        print "HAWQ sync completed successfully!"
    +        print """
    +        ## Manual runbook during DR event
    +        1. Copy MDD archive from HDFS to target master (CLI)
    +        2. Restore archive in /data/hawq/ (CLI)
    --- End diff --
    
    Which specific directory does this map to?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to