This is an automated email from the ASF dual-hosted git repository.

vogievetsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ca37c20a6 Python 3 support for post-index-task. (#12841)
0ca37c20a6 is described below

commit 0ca37c20a6ab8e774931e6d504a3a4aa27a149bb
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Aug 2 17:53:34 2022 -0700

    Python 3 support for post-index-task. (#12841)
    
    * Python 3 support for post-index-task.
    
    Useful when running on macOS or any other system that
    doesn't have Python 2.
    
    * Encode JSON returned by read_task_file.
    
    * Adjust.
    
    * Skip needless loads.
    
    * Add a decode.
    
    * Additional decodes needed.
---
 examples/bin/post-index-task                       |  5 ++-
 examples/bin/post-index-task-main                  |  2 +-
 ...{post-index-task-main => post-index-task-main3} | 39 +++++++++++-----------
 3 files changed, 25 insertions(+), 21 deletions(-)

diff --git a/examples/bin/post-index-task b/examples/bin/post-index-task
index d7711baf2c..05cf674444 100755
--- a/examples/bin/post-index-task
+++ b/examples/bin/post-index-task
@@ -21,7 +21,10 @@ PWD="$(pwd)"
 WHEREAMI="$(dirname "$0")"
 WHEREAMI="$(cd "$WHEREAMI" && pwd)"
 
-if [ -x "$(command -v python2)" ]
+if [ -x "$(command -v python3)" ]
+then
+  exec python3 "$WHEREAMI/post-index-task-main3" "$@"
+elif [ -x "$(command -v python2)" ]
 then
   exec python2 "$WHEREAMI/post-index-task-main" "$@"
 else
diff --git a/examples/bin/post-index-task-main 
b/examples/bin/post-index-task-main
index 03436bc394..10b962ae90 100755
--- a/examples/bin/post-index-task-main
+++ b/examples/bin/post-index-task-main
@@ -152,7 +152,7 @@ def main():
   if task_json['type'] == "compact":
     datasource = task_json['dataSource']
   else:
-    datasource = json.loads(task_contents)["spec"]["dataSchema"]["dataSource"]
+    datasource = task_json["spec"]["dataSchema"]["dataSource"]
   sys.stderr.write("Beginning indexing data for {0}\n".format(datasource))
 
   task_id = json.loads(post_task(args, task_contents, 
submit_timeout_at))["task"]
diff --git a/examples/bin/post-index-task-main 
b/examples/bin/post-index-task-main3
old mode 100755
new mode 100644
similarity index 85%
copy from examples/bin/post-index-task-main
copy to examples/bin/post-index-task-main3
index 03436bc394..6e90861e43
--- a/examples/bin/post-index-task-main
+++ b/examples/bin/post-index-task-main3
@@ -23,16 +23,17 @@ import json
 import re
 import sys
 import time
-import urllib2
-import urlparse
+import urllib.request, urllib.error, urllib.parse
+import urllib.parse
 
+# Read a file. Returns string.
 def read_task_file(args):
   with open(args.file, 'r') as f:
     contents = f.read()
     # We don't use the parsed data, but we want to throw early if it's invalid
     try:
       json.loads(contents)
-    except Exception, e:
+    except Exception as e:
       sys.stderr.write('Invalid JSON in task file "{0}": 
{1}\n'.format(args.file, repr(e)))
       sys.exit(1)
     return contents
@@ -42,27 +43,27 @@ def add_basic_auth_header(args, req):
     basic_auth_encoded = base64.b64encode('%s:%s' % (args.user, args.password))
     req.add_header("Authorization", "Basic %s" % basic_auth_encoded)
 
-# Keep trying until timeout_at, maybe die then
+# Keep trying until timeout_at, maybe die then. Returns bytes.
 def post_task(args, task_json, timeout_at):
   try:
     url = args.url.rstrip("/") + "/druid/indexer/v1/task"
-    req = urllib2.Request(url, task_json, {'Content-Type' : 
'application/json'})
+    req = urllib.request.Request(url, task_json, {'Content-Type' : 
'application/json'})
     add_basic_auth_header(args, req)
     timeleft = timeout_at - time.time()
     response_timeout = min(max(timeleft, 5), 10)
-    response = urllib2.urlopen(req, None, response_timeout)
+    response = urllib.request.urlopen(req, None, response_timeout)
     return response.read().rstrip()
-  except urllib2.URLError as e:
-    if isinstance(e, urllib2.HTTPError) and e.code >= 400 and e.code <= 500:
+  except urllib.error.URLError as e:
+    if isinstance(e, urllib.error.HTTPError) and e.code >= 400 and e.code <= 
500:
       # 4xx (problem with the request) or 500 (something wrong on the server)
       raise_friendly_error(e)
     elif time.time() >= timeout_at:
       # No futher retries
       raise_friendly_error(e)
-    elif isinstance(e, urllib2.HTTPError) and e.code in [301, 302, 303, 305, 
307] and \
+    elif isinstance(e, urllib.error.HTTPError) and e.code in [301, 302, 303, 
305, 307] and \
         e.info().getheader("Location") is not None:
       # Set the new location in args.url so it can be used by 
await_task_completion and re-issue the request
-      location = urlparse.urlparse(e.info().getheader("Location"))
+      location = urllib.parse.urlparse(e.info().getheader("Location"))
       args.url = "{0}://{1}".format(location.scheme, location.netloc)
       sys.stderr.write("Redirect response received, setting url to 
[{0}]\n".format(args.url))
       return post_task(args, task_json, timeout_at)
@@ -82,12 +83,12 @@ def post_task(args, task_json, timeout_at):
 def await_task_completion(args, task_id, timeout_at):
   while True:
     url = args.url.rstrip("/") + 
"/druid/indexer/v1/task/{0}/status".format(task_id)
-    req = urllib2.Request(url)
+    req = urllib.request.Request(url)
     add_basic_auth_header(args, req)
     timeleft = timeout_at - time.time()
     response_timeout = min(max(timeleft, 5), 10)
-    response = urllib2.urlopen(req, None, response_timeout)
-    response_obj = json.loads(response.read())
+    response = urllib.request.urlopen(req, None, response_timeout)
+    response_obj = json.loads(response.read().decode('utf-8'))
     response_status_code = response_obj["status"]["statusCode"]
     if response_status_code in ['SUCCESS', 'FAILED']:
       return response_status_code
@@ -101,7 +102,7 @@ def await_task_completion(args, task_id, timeout_at):
         raise Exception("Task {0} did not finish in time!".format(task_id))
 
 def raise_friendly_error(e):
-  if isinstance(e, urllib2.HTTPError):
+  if isinstance(e, urllib.error.HTTPError):
     text = e.read().strip()
     reresult = re.search(r'<pre>(.*?)</pre>', text, re.DOTALL)
     if reresult:
@@ -112,12 +113,12 @@ def raise_friendly_error(e):
 def await_load_completion(args, datasource, timeout_at):
   while True:
     url = args.coordinator_url.rstrip("/") + "/druid/coordinator/v1/loadstatus"
-    req = urllib2.Request(url)
+    req = urllib.request.Request(url)
     add_basic_auth_header(args, req)
     timeleft = timeout_at - time.time()
     response_timeout = min(max(timeleft, 5), 10)
-    response = urllib2.urlopen(req, None, response_timeout)
-    response_obj = json.loads(response.read())
+    response = urllib.request.urlopen(req, None, response_timeout)
+    response_obj = json.loads(response.read().decode('utf-8'))
     load_status = response_obj.get(datasource, 0.0)
     if load_status >= 100.0:
       sys.stderr.write("{0} loading complete! You may now query your 
data\n".format(datasource))
@@ -152,10 +153,10 @@ def main():
   if task_json['type'] == "compact":
     datasource = task_json['dataSource']
   else:
-    datasource = json.loads(task_contents)["spec"]["dataSchema"]["dataSource"]
+    datasource = task_json["spec"]["dataSchema"]["dataSource"]
   sys.stderr.write("Beginning indexing data for {0}\n".format(datasource))
 
-  task_id = json.loads(post_task(args, task_contents, 
submit_timeout_at))["task"]
+  task_id = json.loads(post_task(args, task_contents.encode(), 
submit_timeout_at).decode('utf-8'))["task"]
 
   sys.stderr.write('\033[1m' + "Task started: " + '\033[0m' + 
"{0}\n".format(task_id))
   sys.stderr.write('\033[1m' + "Task log:     " + '\033[0m' + 
"{0}/druid/indexer/v1/task/{1}/log\n".format(args.url.rstrip("/"),task_id))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to