This is an automated email from the ASF dual-hosted git repository.
vogievetsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0ca37c20a6 Python 3 support for post-index-task. (#12841)
0ca37c20a6 is described below
commit 0ca37c20a6ab8e774931e6d504a3a4aa27a149bb
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Aug 2 17:53:34 2022 -0700
Python 3 support for post-index-task. (#12841)
* Python 3 support for post-index-task.
Useful when running on macOS or any other system that
doesn't have Python 2.
* Encode JSON returned by read_task_file.
* Adjust.
* Skip needless loads.
* Add a decode.
* Additional decodes needed.
---
examples/bin/post-index-task | 5 ++-
examples/bin/post-index-task-main | 2 +-
...{post-index-task-main => post-index-task-main3} | 39 +++++++++++-----------
3 files changed, 25 insertions(+), 21 deletions(-)
diff --git a/examples/bin/post-index-task b/examples/bin/post-index-task
index d7711baf2c..05cf674444 100755
--- a/examples/bin/post-index-task
+++ b/examples/bin/post-index-task
@@ -21,7 +21,10 @@ PWD="$(pwd)"
WHEREAMI="$(dirname "$0")"
WHEREAMI="$(cd "$WHEREAMI" && pwd)"
-if [ -x "$(command -v python2)" ]
+if [ -x "$(command -v python3)" ]
+then
+ exec python3 "$WHEREAMI/post-index-task-main3" "$@"
+elif [ -x "$(command -v python2)" ]
then
exec python2 "$WHEREAMI/post-index-task-main" "$@"
else
diff --git a/examples/bin/post-index-task-main
b/examples/bin/post-index-task-main
index 03436bc394..10b962ae90 100755
--- a/examples/bin/post-index-task-main
+++ b/examples/bin/post-index-task-main
@@ -152,7 +152,7 @@ def main():
if task_json['type'] == "compact":
datasource = task_json['dataSource']
else:
- datasource = json.loads(task_contents)["spec"]["dataSchema"]["dataSource"]
+ datasource = task_json["spec"]["dataSchema"]["dataSource"]
sys.stderr.write("Beginning indexing data for {0}\n".format(datasource))
task_id = json.loads(post_task(args, task_contents,
submit_timeout_at))["task"]
diff --git a/examples/bin/post-index-task-main
b/examples/bin/post-index-task-main3
old mode 100755
new mode 100644
similarity index 85%
copy from examples/bin/post-index-task-main
copy to examples/bin/post-index-task-main3
index 03436bc394..6e90861e43
--- a/examples/bin/post-index-task-main
+++ b/examples/bin/post-index-task-main3
@@ -23,16 +23,17 @@ import json
import re
import sys
import time
-import urllib2
-import urlparse
+import urllib.request, urllib.error, urllib.parse
+import urllib.parse
+# Read a file. Returns string.
def read_task_file(args):
with open(args.file, 'r') as f:
contents = f.read()
# We don't use the parsed data, but we want to throw early if it's invalid
try:
json.loads(contents)
- except Exception, e:
+ except Exception as e:
sys.stderr.write('Invalid JSON in task file "{0}":
{1}\n'.format(args.file, repr(e)))
sys.exit(1)
return contents
@@ -42,27 +43,27 @@ def add_basic_auth_header(args, req):
basic_auth_encoded = base64.b64encode('%s:%s' % (args.user, args.password))
req.add_header("Authorization", "Basic %s" % basic_auth_encoded)
-# Keep trying until timeout_at, maybe die then
+# Keep trying until timeout_at, maybe die then. Returns bytes.
def post_task(args, task_json, timeout_at):
try:
url = args.url.rstrip("/") + "/druid/indexer/v1/task"
- req = urllib2.Request(url, task_json, {'Content-Type' :
'application/json'})
+ req = urllib.request.Request(url, task_json, {'Content-Type' :
'application/json'})
add_basic_auth_header(args, req)
timeleft = timeout_at - time.time()
response_timeout = min(max(timeleft, 5), 10)
- response = urllib2.urlopen(req, None, response_timeout)
+ response = urllib.request.urlopen(req, None, response_timeout)
return response.read().rstrip()
- except urllib2.URLError as e:
- if isinstance(e, urllib2.HTTPError) and e.code >= 400 and e.code <= 500:
+ except urllib.error.URLError as e:
+ if isinstance(e, urllib.error.HTTPError) and e.code >= 400 and e.code <=
500:
# 4xx (problem with the request) or 500 (something wrong on the server)
raise_friendly_error(e)
elif time.time() >= timeout_at:
# No futher retries
raise_friendly_error(e)
- elif isinstance(e, urllib2.HTTPError) and e.code in [301, 302, 303, 305,
307] and \
+ elif isinstance(e, urllib.error.HTTPError) and e.code in [301, 302, 303,
305, 307] and \
e.info().getheader("Location") is not None:
# Set the new location in args.url so it can be used by
await_task_completion and re-issue the request
- location = urlparse.urlparse(e.info().getheader("Location"))
+ location = urllib.parse.urlparse(e.info().getheader("Location"))
args.url = "{0}://{1}".format(location.scheme, location.netloc)
sys.stderr.write("Redirect response received, setting url to
[{0}]\n".format(args.url))
return post_task(args, task_json, timeout_at)
@@ -82,12 +83,12 @@ def post_task(args, task_json, timeout_at):
def await_task_completion(args, task_id, timeout_at):
while True:
url = args.url.rstrip("/") +
"/druid/indexer/v1/task/{0}/status".format(task_id)
- req = urllib2.Request(url)
+ req = urllib.request.Request(url)
add_basic_auth_header(args, req)
timeleft = timeout_at - time.time()
response_timeout = min(max(timeleft, 5), 10)
- response = urllib2.urlopen(req, None, response_timeout)
- response_obj = json.loads(response.read())
+ response = urllib.request.urlopen(req, None, response_timeout)
+ response_obj = json.loads(response.read().decode('utf-8'))
response_status_code = response_obj["status"]["statusCode"]
if response_status_code in ['SUCCESS', 'FAILED']:
return response_status_code
@@ -101,7 +102,7 @@ def await_task_completion(args, task_id, timeout_at):
raise Exception("Task {0} did not finish in time!".format(task_id))
def raise_friendly_error(e):
- if isinstance(e, urllib2.HTTPError):
+ if isinstance(e, urllib.error.HTTPError):
text = e.read().strip()
reresult = re.search(r'<pre>(.*?)</pre>', text, re.DOTALL)
if reresult:
@@ -112,12 +113,12 @@ def raise_friendly_error(e):
def await_load_completion(args, datasource, timeout_at):
while True:
url = args.coordinator_url.rstrip("/") + "/druid/coordinator/v1/loadstatus"
- req = urllib2.Request(url)
+ req = urllib.request.Request(url)
add_basic_auth_header(args, req)
timeleft = timeout_at - time.time()
response_timeout = min(max(timeleft, 5), 10)
- response = urllib2.urlopen(req, None, response_timeout)
- response_obj = json.loads(response.read())
+ response = urllib.request.urlopen(req, None, response_timeout)
+ response_obj = json.loads(response.read().decode('utf-8'))
load_status = response_obj.get(datasource, 0.0)
if load_status >= 100.0:
sys.stderr.write("{0} loading complete! You may now query your
data\n".format(datasource))
@@ -152,10 +153,10 @@ def main():
if task_json['type'] == "compact":
datasource = task_json['dataSource']
else:
- datasource = json.loads(task_contents)["spec"]["dataSchema"]["dataSource"]
+ datasource = task_json["spec"]["dataSchema"]["dataSource"]
sys.stderr.write("Beginning indexing data for {0}\n".format(datasource))
- task_id = json.loads(post_task(args, task_contents,
submit_timeout_at))["task"]
+ task_id = json.loads(post_task(args, task_contents.encode(),
submit_timeout_at).decode('utf-8'))["task"]
sys.stderr.write('\033[1m' + "Task started: " + '\033[0m' +
"{0}\n".format(task_id))
sys.stderr.write('\033[1m' + "Task log: " + '\033[0m' +
"{0}/druid/indexer/v1/task/{1}/log\n".format(args.url.rstrip("/"),task_id))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]