Unsubscribe On Tue, Jul 12, 2022, 02:49 <zjf...@apache.org> wrote:
> This is an automated email from the ASF dual-hosted git repository. > > zjffdu pushed a commit to branch master > in repository https://gitbox.apache.org/repos/asf/zeppelin.git > > > The following commit(s) were added to refs/heads/master by this push: > new ca216a512e [ZEPPELIN-5755] Support Spark 3.3 (#4388) > ca216a512e is described below > > commit ca216a512e5c6ad474ddece5ecc17cfe594de6bc > Author: Jeff Zhang <zjf...@apache.org> > AuthorDate: Tue Jul 12 08:49:35 2022 +0800 > > [ZEPPELIN-5755] Support Spark 3.3 (#4388) > > * [ZEPPELIN-5755] Support Spark 3.3 > > * use hadoop3 profile for spark 3.3 > --- > .github/workflows/core.yml | 10 +++- > pom.xml | 2 +- > spark/interpreter/pom.xml | 14 +++++- > .../apache/zeppelin/spark/IPySparkInterpreter.java | 5 ++ > .../apache/zeppelin/spark/PySparkInterpreter.java | 6 +++ > .../src/main/resources/python/zeppelin_ipyspark.py | 6 ++- > .../src/main/resources/python/zeppelin_pyspark.py | 8 ++-- > .../zeppelin/spark/SparkSqlInterpreterTest.java | 8 ++-- > spark/scala-2.13/pom.xml | 2 +- > .../org/apache/zeppelin/spark/SparkVersion.java | 6 ++- > testing/env_python_3.7_with_R.yml | 2 +- > testing/env_python_3.8_with_R.yml | 2 +- > testing/env_python_3_with_R.yml | 2 +- > testing/env_python_3_with_R_and_tensorflow.yml | 2 +- > .../integration/SparkIntegrationTest33.java | 56 > ++++++++++++++++++++++ > .../integration/ZeppelinSparkClusterTest33.java | 40 ++++++++++++++++ > 16 files changed, 155 insertions(+), 16 deletions(-) > > diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml > index b9b3953fc8..aceaf9fa11 100644 > --- a/.github/workflows/core.yml > +++ b/.github/workflows/core.yml > @@ -325,7 +325,7 @@ jobs: > run: | > R -e "IRkernel::installspec()" > - name: run tests on hadoop${{ matrix.hadoop }} > - run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration > -Phadoop${{ matrix.hadoop }} -Pintegration -B > -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest24,SparkIntegrationTest24,ZeppelinSparkClusterTest30,SparkIntegrationTest30,ZeppelinSparkClusterTest31,SparkIntegrationTest31,ZeppelinSparkClusterTest32,SparkIntegrationTest32 > -DfailIfNoTests=false > + run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration > -Phadoop${{ matrix.hadoop }} -Pintegration -B > -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest24,SparkIntegrationTest24,ZeppelinSparkClusterTest30,SparkIntegrationTest30,ZeppelinSparkClusterTest31,SparkIntegrationTest31,ZeppelinSparkClusterTest32,SparkIntegrationTest32,ZeppelinSparkClusterTest33,SparkIntegrationTest33 > -DfailIfNoTests=false > > # test on spark for each spark version & scala version > spark-test: > @@ -395,6 +395,14 @@ jobs: > run: | > rm -rf spark/interpreter/metastore_db > ./mvnw test -DskipRat -pl spark-submit,spark/interpreter > -Pspark-3.2 -Pspark-scala-2.13 -Phadoop2 -Pintegration -B > -DfailIfNoTests=false > + - name: run spark-3.3 tests with scala-2.12 and python-${{ > matrix.python }} > + run: | > + rm -rf spark/interpreter/metastore_db > + ./mvnw test -DskipRat -pl spark-submit,spark/interpreter > -Pspark-3.3 -Pspark-scala-2.12 -Phadoop3 -Pintegration -B > -DfailIfNoTests=false > + - name: run spark-3.3 tests with scala-2.13 and python-${{ > matrix.python }} > + run: | > + rm -rf spark/interpreter/metastore_db > + ./mvnw test -DskipRat -pl spark-submit,spark/interpreter > -Pspark-3.3 -Pspark-scala-2.13 -Phadoop3 -Pintegration -B > -DfailIfNoTests=false > > livy-0-5-with-spark-2-2-0-under-python3: > runs-on: ubuntu-20.04 > diff --git a/pom.xml b/pom.xml > index 668bb99d70..78b011339c 100644 > --- a/pom.xml > +++ b/pom.xml > @@ -131,7 +131,7 @@ > <httpcomponents.client.version>4.5.13</httpcomponents.client.version> > > > <httpcomponents.asyncclient.version>4.0.2</httpcomponents.asyncclient.version> > <commons.compress.version>1.21</commons.compress.version> > - <commons.lang3.version>3.10</commons.lang3.version> > + <commons.lang3.version>3.12.0</commons.lang3.version> > <commons.text.version>1.8</commons.text.version> > <commons.configuration2.version>2.7</commons.configuration2.version> > <commons.exec.version>1.3</commons.exec.version> > diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml > index e539f99732..8f154ba609 100644 > --- a/spark/interpreter/pom.xml > +++ b/spark/interpreter/pom.xml > @@ -572,10 +572,22 @@ > <!-- profile spark-x only affect spark version used in test --> > > <profile> > - <id>spark-3.2</id> > + <id>spark-3.3</id> > <activation> > <activeByDefault>true</activeByDefault> > </activation> > + <properties> > + <datanucleus.core.version>4.1.17</datanucleus.core.version> > + <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> > + <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> > + <spark.version>3.3.0</spark.version> > + <protobuf.version>2.5.0</protobuf.version> > + <py4j.version>0.10.9.5</py4j.version> > + </properties> > + </profile> > + > + <profile> > + <id>spark-3.2</id> > <properties> > <datanucleus.core.version>4.1.17</datanucleus.core.version> > <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> > diff --git > a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java > b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java > index ab6b3dbf29..2e945ed2bb 100644 > --- > a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java > +++ > b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java > @@ -161,6 +161,11 @@ public class IPySparkInterpreter extends > IPythonInterpreter { > return sparkInterpreter.getSparkVersion().getMajorVersion() == 3; > } > > + // Used by PySpark > + public boolean isAfterSpark33() { > + return > sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_3_3_0); > + } > + > public JavaSparkContext getJavaSparkContext() { > return sparkInterpreter.getJavaSparkContext(); > } > diff --git > a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java > b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java > index 514ffa99fa..737bef8f4b 100644 > --- > a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java > +++ > b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java > @@ -233,7 +233,13 @@ public class PySparkInterpreter extends > PythonInterpreter { > } > } > > + // Used by PySpark > public boolean isSpark3() { > return sparkInterpreter.getSparkVersion().getMajorVersion() == 3; > } > + > + // Used by PySpark > + public boolean isAfterSpark33() { > + return > sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_3_3_0); > + } > } > diff --git > a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py > b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py > index fdf7b97918..958802ccdb 100644 > --- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py > +++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py > @@ -57,8 +57,12 @@ conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf) > sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) > > from pyspark.sql import SparkSession > +from pyspark.sql import SQLContext > spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) > -sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped > +if intp.isAfterSpark33(): > + sqlContext = sqlc = __zSqlc__ = SQLContext._get_or_create(sc) > +else: > + sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped > > class IPySparkZeppelinContext(PyZeppelinContext): > > diff --git > a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py > b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py > index 8dd3224baf..a77c383886 100644 > --- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py > +++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py > @@ -49,10 +49,12 @@ conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) > sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) > > from pyspark.sql import SparkSession > +from pyspark.sql import SQLContext > spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) > -sqlc = __zSqlc__ = __zSpark__._wrapped > - > -sqlContext = __zSqlc__ > +if intp.isAfterSpark33(): > + sqlContext = sqlc = __zSqlc__ = SQLContext._get_or_create(sc) > +else: > + sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped > > from zeppelin_context import PyZeppelinContext > > diff --git > a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java > b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java > index f469fc5ff2..c4a1d8e7d0 100644 > --- > a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java > +++ > b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java > @@ -225,15 +225,17 @@ public class SparkSqlInterpreterTest { > assertEquals(InterpreterResult.Code.ERROR, ret.code()); > assertEquals(context.out.toString(), 2, > context.out.toInterpreterResultMessage().size()); > assertEquals(context.out.toString(), Type.TABLE, > context.out.toInterpreterResultMessage().get(0).getType()); > - assertTrue(context.out.toString(), > context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched > input")); > + assertTrue(context.out.toString(), > context.out.toString().contains("mismatched input") || > + context.out.toString().contains("Syntax error")); > > // One correct sql + One invalid sql + One valid sql (skipped) > ret = sqlInterpreter.interpret("select * from gr;invalid_sql; select > count(1) from gr", context); > assertEquals(InterpreterResult.Code.ERROR, ret.code()); > assertEquals(context.out.toString(), 2, > context.out.toInterpreterResultMessage().size()); > assertEquals(context.out.toString(), Type.TABLE, > context.out.toInterpreterResultMessage().get(0).getType()); > - assertTrue(context.out.toString(), > context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched > input")); > - > + assertTrue(context.out.toString(), > context.out.toString().contains("mismatched input") || > + context.out.toString().contains("Syntax error")); > + > // Two 2 comments > ret = sqlInterpreter.interpret( > "--comment_1\n--comment_2", context); > diff --git a/spark/scala-2.13/pom.xml b/spark/scala-2.13/pom.xml > index c8f3bbc1f7..d2f337ed1b 100644 > --- a/spark/scala-2.13/pom.xml > +++ b/spark/scala-2.13/pom.xml > @@ -31,7 +31,7 @@ > <name>Zeppelin: Spark Interpreter Scala_2.13</name> > > <properties> > - <spark.version>3.2.0</spark.version> > + <spark.version>3.3.0</spark.version> > <spark.scala.version>2.13.4</spark.scala.version> > <spark.scala.binary.version>2.13</spark.scala.binary.version> > > > <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version> > diff --git > a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java > b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java > index eb41f43b12..8f88c1b6e1 100644 > --- > a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java > +++ > b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java > @@ -30,11 +30,15 @@ public class SparkVersion { > public static final SparkVersion SPARK_2_3_0 = > SparkVersion.fromVersionString("2.3.0"); > public static final SparkVersion SPARK_2_3_1 = > SparkVersion.fromVersionString("2.3.1"); > public static final SparkVersion SPARK_2_4_0 = > SparkVersion.fromVersionString("2.4.0"); > + > public static final SparkVersion SPARK_3_1_0 = > SparkVersion.fromVersionString("3.1.0"); > + > public static final SparkVersion SPARK_3_3_0 = > SparkVersion.fromVersionString("3.3.0"); > > + public static final SparkVersion SPARK_3_4_0 = > SparkVersion.fromVersionString("3.4.0"); > + > public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_2_0_0; > - public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = > SPARK_3_3_0; > + public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = > SPARK_3_4_0; > > private int version; > private int majorVersion; > diff --git a/testing/env_python_3.7_with_R.yml > b/testing/env_python_3.7_with_R.yml > index 10d46aa9d1..34e228e9d0 100644 > --- a/testing/env_python_3.7_with_R.yml > +++ b/testing/env_python_3.7_with_R.yml > @@ -24,7 +24,7 @@ dependencies: > - plotly > - jinja2=3.0.3 > - pip > - - r-base=3 > + - r-base=3.6 > - r-data.table > - r-evaluate > - r-base64enc > diff --git a/testing/env_python_3.8_with_R.yml > b/testing/env_python_3.8_with_R.yml > index 10d46aa9d1..34e228e9d0 100644 > --- a/testing/env_python_3.8_with_R.yml > +++ b/testing/env_python_3.8_with_R.yml > @@ -24,7 +24,7 @@ dependencies: > - plotly > - jinja2=3.0.3 > - pip > - - r-base=3 > + - r-base=3.6 > - r-data.table > - r-evaluate > - r-base64enc > diff --git a/testing/env_python_3_with_R.yml > b/testing/env_python_3_with_R.yml > index 016e7082b2..bd6a5781c0 100644 > --- a/testing/env_python_3_with_R.yml > +++ b/testing/env_python_3_with_R.yml > @@ -25,7 +25,7 @@ dependencies: > - plotly > - jinja2=3.0.3 > - pip > - - r-base=3 > + - r-base=3.6 > - r-data.table > - r-evaluate > - r-base64enc > diff --git a/testing/env_python_3_with_R_and_tensorflow.yml > b/testing/env_python_3_with_R_and_tensorflow.yml > index 498a00dd01..bbe0d80db3 100644 > --- a/testing/env_python_3_with_R_and_tensorflow.yml > +++ b/testing/env_python_3_with_R_and_tensorflow.yml > @@ -25,7 +25,7 @@ dependencies: > - plotly > - jinja2=3.0.3 > - pip > - - r-base=3 > + - r-base=3.6 > - r-data.table > - r-evaluate > - r-base64enc > diff --git > a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java > b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java > new file mode 100644 > index 0000000000..44fa8ebb99 > --- /dev/null > +++ > b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java > @@ -0,0 +1,56 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.zeppelin.integration; > + > +import org.apache.zeppelin.interpreter.InterpreterSetting; > +import org.junit.runner.RunWith; > +import org.junit.runners.Parameterized; > + > +import java.util.Arrays; > +import java.util.List; > + > +@RunWith(value = Parameterized.class) > +public class SparkIntegrationTest33 extends SparkIntegrationTest { > + > + public SparkIntegrationTest33(String sparkVersion, String > hadoopVersion) { > + super(sparkVersion, hadoopVersion); > + } > + > + @Parameterized.Parameters > + public static List<Object[]> data() { > + return Arrays.asList(new Object[][]{ > + {"3.3.0", "2"}, > + }); > + } > + > + @Override > + protected void setUpSparkInterpreterSetting(InterpreterSetting > interpreterSetting) { > + // spark3 doesn't support yarn-client and yarn-cluster anymore, use > + // spark.master and spark.submit.deployMode instead > + String sparkMaster = > interpreterSetting.getJavaProperties().getProperty("spark.master"); > + if (sparkMaster.equals("yarn-client")) { > + interpreterSetting.setProperty("spark.master", "yarn"); > + interpreterSetting.setProperty("spark.submit.deployMode", "client"); > + } else if (sparkMaster.equals("yarn-cluster")){ > + interpreterSetting.setProperty("spark.master", "yarn"); > + interpreterSetting.setProperty("spark.submit.deployMode", > "cluster"); > + } else if (sparkMaster.startsWith("local")) { > + interpreterSetting.setProperty("spark.submit.deployMode", "client"); > + } > + } > +} > diff --git > a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java > b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java > new file mode 100644 > index 0000000000..11f62217e0 > --- /dev/null > +++ > b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java > @@ -0,0 +1,40 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.zeppelin.integration; > + > +import org.junit.runner.RunWith; > +import org.junit.runners.Parameterized; > + > +import java.util.Arrays; > +import java.util.List; > + > +@RunWith(value = Parameterized.class) > +public class ZeppelinSparkClusterTest33 extends ZeppelinSparkClusterTest { > + > + public ZeppelinSparkClusterTest33(String sparkVersion, String > hadoopVersion) throws Exception { > + super(sparkVersion, hadoopVersion); > + } > + > + @Parameterized.Parameters > + public static List<Object[]> data() { > + return Arrays.asList(new Object[][]{ > + {"3.3.0", "2"}, > + {"3.3.0", "3"} > + }); > + } > +} > >