This is an automated email from the ASF dual-hosted git repository.

ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b4e3480 [LIVY-1024] Upgrade Livy to Python3
5b4e3480 is described below

commit 5b4e3480c51298d02242eb427717d97d91eae011
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu Dec 4 20:34:38 2025 +0530

    [LIVY-1024] Upgrade Livy to Python3
    
    ## What changes were proposed in this pull request?
    
     * Python2 was deprecated a while back, most systems/environments don't 
support it anymore
     * Bump Livy to Python3, remove Python2 support since Spark 2 has also now 
been deprecated
     * Closes LIVY-1024
    
    ## How was this patch tested?
    
     * CI / Locally tested the python3 files manually
---
 README.md                                          |  6 +--
 dev/docker/livy-dev-base/Dockerfile                | 28 ++--------
 dev/merge_livy_pr.py                               | 58 ++++++--------------
 examples/src/main/python/pi_app.py                 |  4 +-
 integration-test/src/test/resources/batch.py       |  2 +-
 .../src/test/resources/test_python_api.py          | 44 +++++----------
 .../test/scala/org/apache/livy/test/JobApiIT.scala |  2 +-
 python-api/setup.py                                |  6 +--
 python-api/src/main/python/livy/client.py          | 11 ++--
 python-api/src/main/python/livy/job_context.py     |  5 +-
 python-api/src/main/python/livy/job_handle.py      |  7 +--
 .../src/test/python/livy-tests/client_test.py      | 23 ++++----
 repl/src/main/resources/fake_shell.py              | 63 +++-------------------
 .../org/apache/livy/repl/PythonInterpreter.scala   |  2 +-
 14 files changed, 71 insertions(+), 190 deletions(-)

diff --git a/README.md b/README.md
index 5fbe4f46..150243f6 100644
--- a/README.md
+++ b/README.md
@@ -29,20 +29,20 @@ To build Livy, you will need:
 Debian/Ubuntu:
   * mvn (from ``maven`` package or maven3 tarball)
   * openjdk-8-jdk (or Oracle JDK 8)
-  * Python 2.7+
+  * Python 3.x+
   * R 3.x
 
 Redhat/CentOS:
   * mvn (from ``maven`` package or maven3 tarball)
   * java-1.8.0-openjdk (or Oracle JDK 8)
-  * Python 2.7+
+  * Python 3.x+
   * R 3.x
 
 MacOS:
   * Xcode command line tools
   * Oracle's JDK 1.8
   * Maven (Homebrew)
-  * Python 2.7+
+  * Python 3.x+
   * R 3.x
 
 Required python packages for building Livy:
diff --git a/dev/docker/livy-dev-base/Dockerfile 
b/dev/docker/livy-dev-base/Dockerfile
index 07711a5f..5686e61d 100644
--- a/dev/docker/livy-dev-base/Dockerfile
+++ b/dev/docker/livy-dev-base/Dockerfile
@@ -70,32 +70,11 @@ RUN git clone https://github.com/pyenv/pyenv.git $HOME/pyenv
 ENV PYENV_ROOT=$HOME/pyenv
 ENV PATH="$HOME/pyenv/shims:$HOME/pyenv/bin:$HOME/bin:$PATH"
 
-RUN pyenv install -v 2.7.18 && \
-  pyenv install -v 3.9.21 && \
-  pyenv global 2.7.18 3.9.21 && \
+RUN pyenv install -v 3.9.21 && \
+  pyenv global 3.9.21 && \
   pyenv rehash
 
-# Add build dependencies for python2
-# - First we upgrade pip because that makes a lot of things better
-# - Then we remove the provided version of setuptools and install a different 
version
-# - Then we install additional dependencies
-RUN python2 -m pip install -U "pip < 21.0" && \
-        apt-get remove -y python-setuptools && \
-        python2 -m pip install "setuptools < 36" && \
-        python2 -m pip install \
-        cloudpickle \
-        codecov \
-        flake8 \
-        flaky \
-        "future>=0.15.2" \
-        "futures>=3.0.5" \
-        pytest \
-        pytest-runner \
-        requests-kerberos \
-        "requests >= 2.10.0" \
-        "responses >= 0.5.1"
-
-# Now do the same for python3
+# Install build dependencies for python3
 RUN python3 -m pip install -U pip && pip3 install \
         cloudpickle \
         codecov \
@@ -112,4 +91,3 @@ RUN pyenv rehash
 RUN apt remove -y openjdk-11-jre-headless
 
 WORKDIR /workspace
-
diff --git a/dev/merge_livy_pr.py b/dev/merge_livy_pr.py
index 85ce7417..335607ff 100755
--- a/dev/merge_livy_pr.py
+++ b/dev/merge_livy_pr.py
@@ -33,21 +33,15 @@
 #   usage: ./merge_livy_pr.py    (see config env vars below)
 #
 
-
 import json
 import os
 import re
 import subprocess
 import sys
+import urllib.request
+from urllib.error import HTTPError
 
-if sys.version_info[0] < 3:
-    import urllib2
-    from urllib2 import HTTPError
-    input_prompt_fn = raw_input
-else:
-    import urllib.request as urllib2
-    from urllib.error import HTTPError
-    input_prompt_fn = input
+input_prompt_fn = input
 
 try:
     import jira.client
@@ -71,7 +65,6 @@ JIRA_PASSWORD = os.environ.get("JIRA_PASSWORD", "")
 # https://github.com/settings/tokens. This script only requires the 
"public_repo" scope.
 GITHUB_OAUTH_KEY = os.environ.get("GITHUB_OAUTH_KEY")
 
-
 GITHUB_BASE = "https://github.com/apache/incubator-livy/pull";
 GITHUB_API_BASE = "https://api.github.com/repos/apache/incubator-livy";
 JIRA_BASE = "https://issues.apache.org/jira/browse";
@@ -79,13 +72,12 @@ JIRA_API_BASE = "https://issues.apache.org/jira";
 # Prefix added to temporary branches
 BRANCH_PREFIX = "PR_TOOL"
 
-
 def get_json(url):
     try:
-        request = urllib2.Request(url)
+        request = urllib.request.Request(url)
         if GITHUB_OAUTH_KEY:
             request.add_header('Authorization', 'token %s' % GITHUB_OAUTH_KEY)
-        return json.load(urllib2.urlopen(request))
+        return json.load(urllib.request.urlopen(request))
     except HTTPError as e:
         if "X-RateLimit-Remaining" in e.headers and 
e.headers["X-RateLimit-Remaining"] == '0':
             print("Exceeded the GitHub API rate limit; see the instructions in 
" +
@@ -95,42 +87,34 @@ def get_json(url):
             print("Unable to fetch URL, exiting: %s" % url)
         sys.exit(-1)
 
-
 def fail(msg):
     print(msg)
     clean_up()
     sys.exit(-1)
 
-
 def run_cmd(cmd):
     print(cmd)
     if isinstance(cmd, list):
         out_bytes = subprocess.check_output(cmd)
     else:
         out_bytes = subprocess.check_output(cmd.split(" "))
-    if sys.version_info[0] > 2:
-        return out_bytes.decode()
-    else:
-        return out_bytes
-
+    return out_bytes.decode()
 
 def continue_maybe(prompt):
     result = input_prompt_fn("\n%s (y/n): " % prompt)
     if result.lower() != "y":
         fail("Okay, exiting")
 
-
 def clean_up():
     print("Restoring head pointer to %s" % original_head)
     run_cmd("git checkout %s" % original_head)
 
     branches = run_cmd("git branch").replace(" ", "").split("\n")
 
-    for branch in filter(lambda x: x.startswith(BRANCH_PREFIX), branches):
+    for branch in [x for x in branches if x.startswith(BRANCH_PREFIX)]:
         print("Deleting local branch %s" % branch)
         run_cmd("git branch -D %s" % branch)
 
-
 # merge the requested PR and return the merge hash
 def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
     pr_branch_name = "%s_MERGE_PR_%s" % (BRANCH_PREFIX, pr_num)
@@ -201,7 +185,6 @@ def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
     print("Merge hash: %s" % merge_hash)
     return merge_hash
 
-
 def cherry_pick(pr_num, merge_hash, default_branch):
     pick_ref = input_prompt_fn("Enter a branch name [%s]: " % default_branch)
     if pick_ref == "":
@@ -236,15 +219,13 @@ def cherry_pick(pr_num, merge_hash, default_branch):
     print("Pick hash: %s" % pick_hash)
     return pick_ref
 
-
 def fix_version_from_branch(branch, versions):
     # Note: Assumes this is a sorted (newest->oldest) list of un-released 
versions
     if branch == "master":
         return versions[0]
     else:
         branch_ver = branch.replace("branch-", "")
-        return filter(lambda x: x.name.startswith(branch_ver), versions)[-1]
-
+        return [x for x in versions if x.name.startswith(branch_ver)][-1]
 
 def resolve_jira_issue(merge_branches, comment, default_jira_id=""):
     asf_jira = jira.client.JIRA({'server': JIRA_API_BASE},
@@ -275,11 +256,11 @@ def resolve_jira_issue(merge_branches, comment, 
default_jira_id=""):
 
     versions = asf_jira.project_versions("LIVY")
     versions = sorted(versions, key=lambda x: x.name, reverse=True)
-    versions = filter(lambda x: x.raw['released'] is False, versions)
+    versions = [x for x in versions if x.raw['released'] is False]
     # Consider only x.y.z versions
-    versions = filter(lambda x: re.match('\d+\.\d+\.\d+', x.name), versions)
+    versions = [x for x in versions if re.match(r'\d+\.\d+\.\d+', x.name)]
 
-    default_fix_versions = map(lambda x: fix_version_from_branch(x, 
versions).name, merge_branches)
+    default_fix_versions = [fix_version_from_branch(x, versions).name for x in 
merge_branches]
     for v in default_fix_versions:
         # Handles the case where we have forked a release branch but not yet 
made the release.
         # In this case, if the PR is committed to the master branch and the 
release branch, we
@@ -289,7 +270,7 @@ def resolve_jira_issue(merge_branches, comment, 
default_jira_id=""):
         if patch == "0":
             previous = "%s.%s.%s" % (major, int(minor) - 1, 0)
             if previous in default_fix_versions:
-                default_fix_versions = filter(lambda x: x != v, 
default_fix_versions)
+                default_fix_versions = [x for x in default_fix_versions if x 
!= v]
     default_fix_versions = ",".join(default_fix_versions)
 
     fix_versions = input_prompt_fn(
@@ -299,19 +280,18 @@ def resolve_jira_issue(merge_branches, comment, 
default_jira_id=""):
     fix_versions = fix_versions.replace(" ", "").split(",")
 
     def get_version_json(version_str):
-        return filter(lambda v: v.name == version_str, versions)[0].raw
+        return [v for v in versions if v.name == version_str][0].raw
 
-    jira_fix_versions = map(lambda v: get_version_json(v), fix_versions)
+    jira_fix_versions = [get_version_json(v) for v in fix_versions]
 
-    resolve = filter(lambda a: a['name'] == "Resolve Issue", 
asf_jira.transitions(jira_id))[0]
-    resolution = filter(lambda r: r.raw['name'] == "Fixed", 
asf_jira.resolutions())[0]
+    resolve = [a for a in asf_jira.transitions(jira_id) if a['name'] == 
"Resolve Issue"][0]
+    resolution = [r for r in asf_jira.resolutions() if r.raw['name'] == 
"Fixed"][0]
     asf_jira.transition_issue(
         jira_id, resolve["id"], fixVersions=jira_fix_versions,
         comment=comment, resolution={'id': resolution.raw['id']})
 
     print("Successfully resolved %s with fixVersions=%s!" % (jira_id, 
fix_versions))
 
-
 def resolve_jira_issues(title, merge_branches, comment):
     jira_ids = re.findall("LIVY-[0-9]{3,6}", title)
 
@@ -320,7 +300,6 @@ def resolve_jira_issues(title, merge_branches, comment):
     for jira_id in jira_ids:
         resolve_jira_issue(merge_branches, comment, jira_id)
 
-
 def standardize_jira_ref(text):
     """
     Standardize the [LIVY-XXXXX] [MODULE] prefix
@@ -362,7 +341,6 @@ def standardize_jira_ref(text):
 
     return clean_text
 
-
 def get_current_ref():
     ref = run_cmd("git rev-parse --abbrev-ref HEAD").strip()
     if ref == 'HEAD':
@@ -371,7 +349,6 @@ def get_current_ref():
     else:
         return ref
 
-
 def main():
     global original_head
 
@@ -379,7 +356,7 @@ def main():
     original_head = get_current_ref()
 
     branches = get_json("%s/branches" % GITHUB_API_BASE)
-    branch_names = filter(lambda x: x.startswith("branch-"), [x['name'] for x 
in branches])
+    branch_names = [x for x in [x['name'] for x in branches] if 
x.startswith("branch-")]
     # Assumes branch names can be sorted lexicographically
     latest_branch = sorted(branch_names, reverse=True)[0]
 
@@ -462,7 +439,6 @@ def main():
         print("Could not find jira-python library. Run 'sudo pip install jira' 
to install.")
         print("Exiting without trying to close the associated JIRA.")
 
-
 if __name__ == "__main__":
     import doctest
     (failure_count, test_count) = doctest.testmod()
diff --git a/examples/src/main/python/pi_app.py 
b/examples/src/main/python/pi_app.py
index 2945d4b3..c283476c 100644
--- a/examples/src/main/python/pi_app.py
+++ b/examples/src/main/python/pi_app.py
@@ -15,8 +15,6 @@
 # limitations under the License.
 #
 
-from __future__ import print_function
-
 import sys
 from random import random
 from operator import add
@@ -48,7 +46,7 @@ if __name__ == "__main__":
         return 1 if x ** 2 + y ** 2 <= 1 else 0
 
     def pi_job(context):
-        count = context.sc.parallelize(range(1, samples + 1), 
slices).map(f).reduce(add)
+        count = context.sc.parallelize(list(range(1, samples + 1)), 
slices).map(f).reduce(add)
         return 4.0 * count / samples
 
     pi = client.submit(pi_job).result()
diff --git a/integration-test/src/test/resources/batch.py 
b/integration-test/src/test/resources/batch.py
index 56a53cef..e371652a 100644
--- a/integration-test/src/test/resources/batch.py
+++ b/integration-test/src/test/resources/batch.py
@@ -22,6 +22,6 @@ from pyspark import SparkContext
 output = sys.argv[1]
 sc = SparkContext(appName="PySpark Test")
 try:
-  sc.parallelize(range(100), 10).map(lambda x: (x, x * 
2)).saveAsTextFile(output)
+  sc.parallelize(list(range(100)), 10).map(lambda x: (x, x * 
2)).saveAsTextFile(output)
 finally:
   sc.stop()
diff --git a/integration-test/src/test/resources/test_python_api.py 
b/integration-test/src/test/resources/test_python_api.py
index f89f85d8..bef67bd0 100644
--- a/integration-test/src/test/resources/test_python_api.py
+++ b/integration-test/src/test/resources/test_python_api.py
@@ -14,22 +14,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import os
 import base64
 import json
 import time
-try:
-    from urllib.parse import urlparse
-except ImportError:
-     from urlparse import urlparse
+from urllib.parse import urlparse
 import requests
 from requests_kerberos import HTTPKerberosAuth, REQUIRED, OPTIONAL
 import cloudpickle
 import pytest
-try:
-    import httplib
-except ImportError:
-    from http import HTTPStatus as httplib
+import http.client
 from flaky import flaky
 
 global session_id, job_id
@@ -58,7 +53,6 @@ upload_pyfile_url = os.environ.get("UPLOAD_PYFILE_URL")
 def after_all(request):
     request.addfinalizer(stop_session)
 
-
 def process_job(job, expected_result, is_error_job=False):
     global job_id
 
@@ -69,7 +63,7 @@ def process_job(job, expected_result, is_error_job=False):
     header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
     response = requests.request('POST', request_url, headers=header, 
data=base64_pickled_job_json, auth=request_auth, verify=ssl_cert)
 
-    assert response.status_code == httplib.CREATED
+    assert response.status_code == http.client.CREATED
     job_id = response.json()['id']
 
     poll_time = 1
@@ -85,7 +79,7 @@ def process_job(job, expected_result, is_error_job=False):
         poll_time *= 2
 
     assert poll_response.json()['id'] == job_id
-    assert poll_response.status_code == httplib.OK
+    assert poll_response.status_code == http.client.OK
     if not is_error_job:
         assert poll_response.json()['error'] is None
         result = poll_response.json()['result']
@@ -97,20 +91,17 @@ def process_job(job, expected_result, is_error_job=False):
         error = poll_response.json()['error']
         assert expected_result in error
 
-
 def delay_rerun(*args):
     time.sleep(10)
     return True
 
-
 def stop_session():
     global session_id
 
     request_url = livy_end_point + "/sessions/" + str(session_id)
     headers = {'X-Requested-By': 'livy'}
     response = requests.request('DELETE', request_url, headers=headers, 
auth=request_auth, verify=ssl_cert)
-    assert response.status_code == httplib.OK
-
+    assert response.status_code == http.client.OK
 
 def test_create_session():
     global session_id
@@ -121,21 +112,19 @@ def test_create_session():
     json_data = json.dumps({'kind': 'pyspark', 'conf': {'livy.uri': 
uri.geturl()}})
     response = requests.request('POST', request_url, headers=header, 
data=json_data, auth=request_auth, verify=ssl_cert)
 
-    assert response.status_code == httplib.CREATED
+    assert response.status_code == http.client.CREATED
     session_id = response.json()['id']
 
-
 @flaky(max_runs=6, rerun_filter=delay_rerun)
 def test_wait_for_session_to_become_idle():
     request_url = livy_end_point + "/sessions/" + str(session_id)
     header = {'X-Requested-By': 'livy'}
     response = requests.request('GET', request_url, headers=header, 
auth=request_auth, verify=ssl_cert)
-    assert response.status_code == httplib.OK
+    assert response.status_code == http.client.OK
     session_state = response.json()['state']
 
     assert session_state == 'idle'
 
-
 def test_spark_job():
     def simple_spark_job(context):
         elements = [10, 20, 30]
@@ -144,7 +133,6 @@ def test_spark_job():
 
     process_job(simple_spark_job, 3)
 
-
 def test_error_job():
     def error_job(context):
         return "hello" + 1
@@ -152,7 +140,6 @@ def test_error_job():
     process_job(error_job,
         "TypeError: ", True)
 
-
 def test_reconnect():
     global session_id
 
@@ -160,10 +147,9 @@ def test_reconnect():
     header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
     response = requests.request('POST', request_url, headers=header, 
auth=request_auth, verify=ssl_cert)
 
-    assert response.status_code == httplib.OK
+    assert response.status_code == http.client.OK
     assert session_id == response.json()['id']
 
-
 def test_add_file():
     add_file_name = os.path.basename(add_file_url)
     json_data = json.dumps({'uri': add_file_url})
@@ -171,7 +157,7 @@ def test_add_file():
     header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
     response = requests.request('POST', request_url, headers=header, 
data=json_data, auth=request_auth, verify=ssl_cert)
 
-    assert response.status_code == httplib.OK
+    assert response.status_code == http.client.OK
 
     def add_file_job(context):
         from pyspark import SparkFiles
@@ -181,7 +167,6 @@ def test_add_file():
 
     process_job(add_file_job, "hello from addfile")
 
-
 def test_add_pyfile():
     add_pyfile_name_with_ext = os.path.basename(add_pyfile_url)
     add_pyfile_name = add_pyfile_name_with_ext.rsplit('.', 1)[0]
@@ -190,7 +175,7 @@ def test_add_pyfile():
     header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
     response_add_pyfile = requests.request('POST', request_url, 
headers=header, data=json_data, auth=request_auth, verify=ssl_cert)
 
-    assert response_add_pyfile.status_code == httplib.OK
+    assert response_add_pyfile.status_code == http.client.OK
 
     def add_pyfile_job(context):
        pyfile_module = __import__ (add_pyfile_name)
@@ -198,7 +183,6 @@ def test_add_pyfile():
 
     process_job(add_pyfile_job, "hello from addpyfile")
 
-
 def test_upload_file():
     upload_file = open(upload_file_url)
     upload_file_name = os.path.basename(upload_file.name)
@@ -207,7 +191,7 @@ def test_upload_file():
     header = {'X-Requested-By': 'livy'}
     response = requests.request('POST', request_url, headers=header, 
files=files, auth=request_auth, verify=ssl_cert)
 
-    assert response.status_code == httplib.OK
+    assert response.status_code == http.client.OK
 
     def upload_file_job(context):
         from pyspark import SparkFiles
@@ -217,7 +201,6 @@ def test_upload_file():
 
     process_job(upload_file_job, "hello from uploadfile")
 
-
 def test_upload_pyfile():
     upload_pyfile = open(upload_pyfile_url)
     upload_pyfile_name_with_ext = os.path.basename(upload_pyfile.name)
@@ -226,14 +209,13 @@ def test_upload_pyfile():
     files = {'file': upload_pyfile}
     header = {'X-Requested-By': 'livy'}
     response = requests.request('POST', request_url, headers=header, 
files=files, auth=request_auth, verify=ssl_cert)
-    assert response.status_code == httplib.OK
+    assert response.status_code == http.client.OK
 
     def upload_pyfile_job(context):
         pyfile_module = __import__ (upload_pyfile_name)
         return pyfile_module.test_upload_pyfile()
     process_job(upload_pyfile_job, "hello from uploadpyfile")
 
-
 if __name__ == '__main__':
     value = pytest.main([os.path.dirname(__file__)])
     if value != 0:
diff --git 
a/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala 
b/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
index 92c3ea24..d62906ae 100644
--- a/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
+++ b/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
@@ -255,7 +255,7 @@ class JobApiIT extends BaseIntegrationTestSuite with 
BeforeAndAfterAll with Logg
     val testDir = Files.createTempDirectory(tmpDir.toPath(), 
"python-tests-").toFile()
     val testFile = createPyTestsForPythonAPI(testDir)
 
-    val builder = new ProcessBuilder(Seq("python", 
testFile.getAbsolutePath()).asJava)
+    val builder = new ProcessBuilder(Seq("python3", 
testFile.getAbsolutePath()).asJava)
     builder.directory(testDir)
 
     val env = builder.environment()
diff --git a/python-api/setup.py b/python-api/setup.py
index 59bec11f..511592f2 100644
--- a/python-api/setup.py
+++ b/python-api/setup.py
@@ -23,15 +23,13 @@ CLASSIFIERS = [
     'Development Status :: 1 - Planning',
     'Intended Audience :: Developers',
     'Operating System :: OS Independent',
-    'Programming Language :: Python :: 2.7',
+    'Programming Language :: Python :: 3',
+    'Programming Language :: Python :: 3.9',
     'Topic :: Software Development :: Libraries :: Python Modules',
 ]
 
 requirements = [
     'cloudpickle>=0.2.1',
-    'configparser>=3.5.0',
-    'future>=0.15.2',
-    'mock~=3.0.5',
     'requests>=2.10.0',
     'responses>=0.5.1',
     'requests-kerberos>=0.11.0',
diff --git a/python-api/src/main/python/livy/client.py 
b/python-api/src/main/python/livy/client.py
index d9830f32..22b88132 100644
--- a/python-api/src/main/python/livy/client.py
+++ b/python-api/src/main/python/livy/client.py
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from __future__ import absolute_import
 
 import base64
 import cloudpickle
@@ -25,13 +24,13 @@ import threading
 import traceback
 from configparser import ConfigParser
 from concurrent.futures import ThreadPoolExecutor
-from future.moves.urllib.parse import ParseResult, urlparse
-from io import open, StringIO
+from urllib.parse import ParseResult, urlparse
+from io import StringIO
 from requests_kerberos import HTTPKerberosAuth, REQUIRED
 from livy.job_handle import JobHandle
 
 
-class HttpClient(object):
+class HttpClient:
     """A http based client for submitting Spark-based jobs to a Livy backend.
 
     Parameters
@@ -357,7 +356,7 @@ class HttpClient(object):
         self._config.remove_option(self._CONFIG_SECTION, key)
 
     def _set_multiple_conf(self, conf_dict):
-        for key, value in conf_dict.items():
+        for key, value in list(conf_dict.items()):
             self._set_conf(key, value)
 
     def _load_config(self, load_defaults, conf_dict):
@@ -426,7 +425,7 @@ class HttpClient(object):
             data=data, headers=headers).content
 
 
-class _LivyConnection(object):
+class _LivyConnection:
 
     _SESSIONS_URI = '/sessions'
     # Timeout in seconds
diff --git a/python-api/src/main/python/livy/job_context.py 
b/python-api/src/main/python/livy/job_context.py
index 83236499..a36dbc2a 100644
--- a/python-api/src/main/python/livy/job_context.py
+++ b/python-api/src/main/python/livy/job_context.py
@@ -14,10 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 from abc import ABCMeta, abstractproperty, abstractmethod
 
 
-class JobContext:
+class JobContext(metaclass=ABCMeta):
     """
     An abstract class that holds runtime information about the job execution
     context.
@@ -28,8 +29,6 @@ class JobContext:
 
     """
 
-    __metaclass__ = ABCMeta
-
     @abstractproperty
     def sc(self):
         """
diff --git a/python-api/src/main/python/livy/job_handle.py 
b/python-api/src/main/python/livy/job_handle.py
index 278834c2..fc4dcde2 100644
--- a/python-api/src/main/python/livy/job_handle.py
+++ b/python-api/src/main/python/livy/job_handle.py
@@ -14,9 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import base64
 import cloudpickle
-import sys
 import threading
 import traceback
 from concurrent.futures import Future
@@ -217,10 +217,7 @@ class JobHandle(Future):
         raise NotImplementedError("This operation is not supported.")
 
     def set_job_exception(self, exception, error_msg=None):
-        if sys.version >= '3':
-            super(JobHandle, self).set_exception(exception)
-        else:
-            super(JobHandle, self).set_exception_info(exception, error_msg)
+        super(JobHandle, self).set_exception(exception)
 
     class _RepeatedTimer(object):
         def __init__(self, interval, polling_job, executor):
diff --git a/python-api/src/test/python/livy-tests/client_test.py 
b/python-api/src/test/python/livy-tests/client_test.py
index b6426ae1..efa3d446 100644
--- a/python-api/src/test/python/livy-tests/client_test.py
+++ b/python-api/src/test/python/livy-tests/client_test.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import os
 import pytest
 import responses
@@ -27,7 +28,7 @@ from livy.client import HttpClient
 session_id = 0
 job_id = 1
 # Make sure host name is lower case. See LIVY-582
-base_uri = 'http://{0}:{1}'.format(socket.gethostname().lower(), 8998)
+base_uri = f'http://{socket.gethostname().lower()}:{8998}'
 client_test = None
 invoked_queued_callback = False
 invoked_running_callback = False
@@ -40,8 +41,8 @@ def mock_and_validate_create_new_session(defaults):
     app_name = 'Test App'
     conf_dict = {'spark.app.name': app_name}
     json_data = {
-        u'kind': u'pyspark', u'log': [], u'proxyUser': None,
-        u'state': u'starting', u'owner': None, u'id': session_id
+        'kind': 'pyspark', 'log': [], 'proxyUser': None,
+        'state': 'starting', 'owner': None, 'id': session_id
     }
     responses.add(responses.POST, create_session_request_mock_uri,
         json=json_data, status=201, content_type='application/json')
@@ -68,13 +69,13 @@ def mock_submit_job_and_poll_result(
         + "/jobs/" + str(job_id)
 
     post_json_data = {
-        u'state': u'SENT', u'error': None, u'id': job_id, u'result': None
+        'state': 'SENT', 'error': None, 'id': job_id, 'result': None
     }
     responses.add(responses.POST, submit_request_mock_uri, status=201,
         json=post_json_data, content_type='application/json')
 
     get_json_data = {
-        u'state': job_state, u'error': error, u'id': job_id, u'result': result
+        'state': job_state, 'error': error, 'id': job_id, 'result': result
     }
     responses.add(responses.GET, poll_request_mock_uri, status=200,
         json=get_json_data, content_type='application/json')
@@ -117,8 +118,8 @@ def test_connect_to_existing_session():
         "/connect"
     reconnect_session_uri = base_uri + "/sessions/" + str(session_id)
     json_data = {
-        u'kind': u'pyspark', u'log': [], u'proxyUser': None,
-        u'state': u'starting', u'owner': None, u'id': session_id
+        'kind': 'pyspark', 'log': [], 'proxyUser': None,
+        'state': 'starting', 'owner': None, 'id': session_id
     }
     with responses.RequestsMock() as rsps:
         rsps.add(responses.POST, reconnect_mock_request_uri, json=json_data,
@@ -143,7 +144,7 @@ def create_test_archive(ext):
 @responses.activate
 def test_submit_job_verify_running_state():
     submit_job_future = mock_submit_job_and_poll_result(simple_spark_job,
-        u'STARTED')
+        'STARTED')
     lock = threading.Event()
 
     def handle_job_running_callback(f):
@@ -158,7 +159,7 @@ def test_submit_job_verify_running_state():
 @responses.activate
 def test_submit_job_verify_queued_state():
     submit_job_future = mock_submit_job_and_poll_result(simple_spark_job,
-        u'QUEUED')
+        'QUEUED')
     lock = threading.Event()
 
     def handle_job_queued_callback(f):
@@ -173,7 +174,7 @@ def test_submit_job_verify_queued_state():
 @responses.activate
 def test_submit_job_verify_succeeded_state():
     submit_job_future = mock_submit_job_and_poll_result(simple_spark_job,
-        u'SUCCEEDED',
+        'SUCCEEDED',
         result='Z0FKVkZGc3hNREFzSURJd01Dd2dNekF3TENBME1EQmRjUUF1')
     result = submit_job_future.result(15)
     assert result == '[100, 200, 300, 400]'
@@ -181,7 +182,7 @@ def test_submit_job_verify_succeeded_state():
 
 @responses.activate
 def test_submit_job_verify_failed_state():
-    submit_job_future = mock_submit_job_and_poll_result(failure_job, u'FAILED',
+    submit_job_future = mock_submit_job_and_poll_result(failure_job, 'FAILED',
         error='Error job')
     exception = submit_job_future.exception(15)
     assert isinstance(exception, Exception)
diff --git a/repl/src/main/resources/fake_shell.py 
b/repl/src/main/resources/fake_shell.py
index 5472f533..2c67a542 100644
--- a/repl/src/main/resources/fake_shell.py
+++ b/repl/src/main/resources/fake_shell.py
@@ -15,7 +15,6 @@
 # limitations under the License.
 #
 
-from __future__ import print_function
 import ast
 from collections import OrderedDict
 import datetime
@@ -34,12 +33,6 @@ import shutil
 import pickle
 import textwrap
 
-if sys.version >= '3':
-    unicode = str
-else:
-    import cStringIO
-    import StringIO
-
 if sys.version_info > (3,8):
     from ast import Module
 else :
@@ -66,31 +59,25 @@ def execute_reply(status, content):
         )
     }
 
-
 def execute_reply_ok(data):
     return execute_reply('ok', {
         'data': data,
     })
 
-
 def execute_reply_error(exc_type, exc_value, tb):
     LOG.error('execute_reply', exc_info=True)
-    if sys.version >= '3':
-      formatted_tb = traceback.format_exception(exc_type, exc_value, tb, 
chain=False)
-    else:
-      formatted_tb = traceback.format_exception(exc_type, exc_value, tb)
+    formatted_tb = traceback.format_exception(exc_type, exc_value, tb, 
chain=False)
     for i in range(len(formatted_tb)):
         if TOP_FRAME_REGEX.match(formatted_tb[i]):
             formatted_tb = formatted_tb[:1] + formatted_tb[i + 1:]
             break
 
     return execute_reply('error', {
-        'ename': unicode(exc_type.__name__),
-        'evalue': unicode(exc_value),
+        'ename': str(exc_type.__name__),
+        'evalue': str(exc_value),
         'traceback': formatted_tb,
     })
 
-
 def execute_reply_internal_error(message, exc_info=None):
     LOG.error('execute_reply_internal_error', exc_info=exc_info)
     return execute_reply('error', {
@@ -99,7 +86,6 @@ def execute_reply_internal_error(message, exc_info=None):
         'traceback': [],
     })
 
-
 class JobContextImpl(object):
     def __init__(self):
         self.lock = threading.Lock()
@@ -185,14 +171,10 @@ class JobContextImpl(object):
             except:
                 pass
 
-
 class PySparkJobProcessorImpl(object):
     def processBypassJob(self, serialized_job):
         try:
-            if sys.version >= '3':
-                deserialized_job = pickle.loads(serialized_job, 
encoding="bytes")
-            else:
-                deserialized_job = pickle.loads(serialized_job)
+            deserialized_job = pickle.loads(serialized_job, encoding="bytes")
             result = deserialized_job(job_context)
             serialized_result = global_dict['cloudpickle'].dumps(result)
             response = bytearray(base64.b64encode(serialized_result))
@@ -209,15 +191,13 @@ class PySparkJobProcessorImpl(object):
     def getLocalTmpDirPath(self):
         return os.path.join(job_context.get_local_tmp_dir_path(), '__livy__')
 
-    class Scala:
+    class Scala(object):
         extends = ['org.apache.livy.repl.PySparkJobProcessor']
 
-
 class ExecutionError(Exception):
     def __init__(self, exc_info):
         self.exc_info = exc_info
 
-
 class NormalNode(object):
     def __init__(self, code):
         self.code = compile(code, '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
@@ -240,11 +220,9 @@ class NormalNode(object):
             # code and passing the error along.
             raise ExecutionError(sys.exc_info())
 
-
 class UnknownMagic(Exception):
     pass
 
-
 class MagicNode(object):
     def __init__(self, line):
         parts = line[1:].split(' ', 1)
@@ -264,7 +242,6 @@ class MagicNode(object):
 
         return handler(*self.rest)
 
-
 def parse_code_into_nodes(code):
     nodes = []
     try:
@@ -304,7 +281,6 @@ def parse_code_into_nodes(code):
 
     return nodes
 
-
 def execute_request(content):
     try:
         code = content['code']
@@ -354,7 +330,6 @@ def execute_request(content):
 
     return execute_reply_ok(result)
 
-
 def magic_table_convert(value):
     try:
         converter = magic_table_types[type(value)]
@@ -363,7 +338,6 @@ def magic_table_convert(value):
 
     return converter(value)
 
-
 def magic_table_convert_seq(items):
     last_item_type = None
     converted_items = []
@@ -380,7 +354,6 @@ def magic_table_convert_seq(items):
 
     return 'ARRAY_TYPE', converted_items
 
-
 def magic_table_convert_map(m):
     last_key_type = None
     last_value_type = None
@@ -404,7 +377,6 @@ def magic_table_convert_map(m):
 
     return 'MAP_TYPE', converted_items
 
-
 magic_table_types = {
     type(None): lambda x: ('NULL_TYPE', x),
     bool: lambda x: ('BOOLEAN_TYPE', x),
@@ -419,15 +391,6 @@ magic_table_types = {
     dict: magic_table_convert_map,
 }
 
-# python 2.x only
-if sys.version < '3':
-    magic_table_types.update({
-        long: lambda x: ('BIGINT_TYPE', x),
-        unicode: lambda x: ('STRING_TYPE', x.encode('utf-8'))
-    })
-
-
-
 def magic_table(name):
     try:
         value = global_dict[name]
@@ -444,7 +407,7 @@ def magic_table(name):
     for row in value:
         cols = []
         data.append(cols)
-        
+
         if 'Row' == row.__class__.__name__:
             row = row.asDict()
 
@@ -488,7 +451,6 @@ def magic_table(name):
         }
     }
 
-
 def magic_json(name):
     try:
         value = global_dict[name]
@@ -507,9 +469,7 @@ def magic_matplot(name):
         imgdata = io.BytesIO()
         fig.savefig(imgdata, format='png')
         imgdata.seek(0)
-        encode = base64.b64encode(imgdata.getvalue())
-        if sys.version >= '3':
-            encode = encode.decode()
+        encode = base64.b64encode(imgdata.getvalue()).decode()
 
     except:
         exc_type, exc_value, tb = sys.exc_info()
@@ -523,7 +483,6 @@ def magic_matplot(name):
 def shutdown_request(_content):
     sys.exit()
 
-
 magic_router = {
     'table': magic_table,
     'json': magic_json,
@@ -541,24 +500,18 @@ class UnicodeDecodingStringIO(io.StringIO):
             s = s.decode("utf-8")
         super(UnicodeDecodingStringIO, self).write(s)
 
-
 def clearOutputs():
     sys.stdout.close()
     sys.stderr.close()
     sys.stdout = UnicodeDecodingStringIO()
     sys.stderr = UnicodeDecodingStringIO()
 
-
 def main():
     sys_stdin = sys.stdin
     sys_stdout = sys.stdout
     sys_stderr = sys.stderr
 
-    if sys.version >= '3':
-        sys.stdin = io.StringIO()
-    else:
-        sys.stdin = cStringIO.StringIO()
-
+    sys.stdin = io.StringIO()
     sys.stdout = UnicodeDecodingStringIO()
     sys.stderr = UnicodeDecodingStringIO()
 
diff --git a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala 
b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
index 58b7147a..40a25c55 100644
--- a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
@@ -49,7 +49,7 @@ object PythonInterpreter extends Logging {
     val pythonExec = conf.getOption("spark.pyspark.python")
       .orElse(sys.env.get("PYSPARK_PYTHON"))
       .orElse(sys.props.get("pyspark.python")) // This java property is only 
used for internal UT.
-      .getOrElse("python")
+      .getOrElse("python3")
 
     val secretKey = Utils.createSecret(256)
     val gatewayServer = createGatewayServer(sparkEntries, secretKey)


Reply via email to