This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 1c4bcf3a97e [SPARK-42573][CONNECT][SCALA] Enable binary compatibility tests on all major client APIs 1c4bcf3a97e is described below commit 1c4bcf3a97e7773a3f23524e2545cd85dbd14ef1 Author: Zhen Li <zhenli...@users.noreply.github.com> AuthorDate: Sat Feb 25 13:59:21 2023 -0400 [SPARK-42573][CONNECT][SCALA] Enable binary compatibility tests on all major client APIs ### What changes were proposed in this pull request? Make binary compatibility check for SparkSession/Dataset/Column/functions etc. ### Why are the changes needed? Help us to have a good understanding of the current API coverage of the Scala client. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #40168 from zhenlineo/comp-it. Authored-by: Zhen Li <zhenli...@users.noreply.github.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit 2470b753171a3765e778ae699b4b1675ba7a023e) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../org/apache/spark/sql/DataFrameWriterV2.scala | 2 +- .../main/scala/org/apache/spark/sql/Dataset.scala | 60 ++++---- .../spark/sql/RelationalGroupedDataset.scala | 2 +- .../sql/connect/client/CompatibilitySuite.scala | 156 +++++++++++++++------ 5 files changed, 149 insertions(+), 73 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index b7c4ed7bcab..8434addec92 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -252,7 +252,7 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { builder.putOptions(k, v) } - ds.session.execute(proto.Command.newBuilder().setWriteOperation(builder).build()) + ds.sparkSession.execute(proto.Command.newBuilder().setWriteOperation(builder).build()) } /** diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index ed149223129..b698e1dfaa1 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -157,7 +157,7 @@ final class DataFrameWriterV2[T] private[sql] (table: String, ds: Dataset[T]) overwriteCondition.foreach(builder.setOverwriteCondition) - ds.session.execute(proto.Command.newBuilder().setWriteOperationV2(builder).build()) + ds.sparkSession.execute(proto.Command.newBuilder().setWriteOperationV2(builder).build()) } } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 87aadfe437b..83ed7bdc071 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -115,7 +115,7 @@ import org.apache.spark.util.Utils * * @since 3.4.0 */ -class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: proto.Plan) +class Dataset[T] private[sql] (val sparkSession: SparkSession, private[sql] val plan: proto.Plan) extends Serializable { // Make sure we don't forget to set plan id. assert(plan.getRoot.getCommon.hasPlanId) @@ -169,7 +169,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @since 3.4.0 */ @scala.annotation.varargs - def toDF(colNames: String*): DataFrame = session.newDataset { builder => + def toDF(colNames: String*): DataFrame = sparkSession.newDataset { builder => builder.getToDfBuilder .setInput(plan.getRoot) .addAllColumnNames(colNames.asJava) @@ -191,7 +191,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @group basic * @since 3.4.0 */ - def to(schema: StructType): DataFrame = session.newDataset { builder => + def to(schema: StructType): DataFrame = sparkSession.newDataset { builder => builder.getToSchemaBuilder .setInput(plan.getRoot) .setSchema(DataTypeProtoConverter.toConnectProtoType(schema)) @@ -277,7 +277,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: private def explain(mode: proto.Explain.ExplainMode): Unit = { // scalastyle:off println - println(session.analyze(plan, mode).getExplainString) + println(sparkSession.analyze(plan, mode).getExplainString) // scalastyle:on println } @@ -468,7 +468,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @since 3.4.0 */ def show(numRows: Int, truncate: Int, vertical: Boolean): Unit = { - val df = session.newDataset { builder => + val df = sparkSession.newDataset { builder => builder.getShowStringBuilder .setInput(plan.getRoot) .setNumRows(numRows) @@ -485,7 +485,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: } private def buildJoin(right: Dataset[_])(f: proto.Join.Builder => Unit): DataFrame = { - session.newDataset { builder => + sparkSession.newDataset { builder => val joinBuilder = builder.getJoinBuilder joinBuilder.setLeft(plan.getRoot).setRight(right.plan.getRoot) f(joinBuilder) @@ -751,7 +751,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: } private def buildSort(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = { - session.newDataset { builder => + sparkSession.newDataset { builder => builder.getSortBuilder .setInput(plan.getRoot) .setIsGlobal(global) @@ -859,7 +859,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @since 3.4.0 */ @scala.annotation.varargs - def hint(name: String, parameters: Any*): Dataset[T] = session.newDataset { builder => + def hint(name: String, parameters: Any*): Dataset[T] = sparkSession.newDataset { builder => builder.getHintBuilder .setInput(plan.getRoot) .setName(name) @@ -899,7 +899,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @group typedrel * @since 3.4.0 */ - def as(alias: String): Dataset[T] = session.newDataset { builder => + def as(alias: String): Dataset[T] = sparkSession.newDataset { builder => builder.getSubqueryAliasBuilder .setInput(plan.getRoot) .setAlias(alias) @@ -939,7 +939,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @since 3.4.0 */ @scala.annotation.varargs - def select(cols: Column*): DataFrame = session.newDataset { builder => + def select(cols: Column*): DataFrame = sparkSession.newDataset { builder => builder.getProjectBuilder .setInput(plan.getRoot) .addAllExpressions(cols.map(_.expr).asJava) @@ -989,7 +989,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @group typedrel * @since 3.4.0 */ - def filter(condition: Column): Dataset[T] = session.newDataset { builder => + def filter(condition: Column): Dataset[T] = sparkSession.newDataset { builder => builder.getFilterBuilder.setInput(plan.getRoot).setCondition(condition.expr) } @@ -1032,7 +1032,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: ids: Array[Column], valuesOption: Option[Array[Column]], variableColumnName: String, - valueColumnName: String): DataFrame = session.newDataset { builder => + valueColumnName: String): DataFrame = sparkSession.newDataset { builder => val unpivot = builder.getUnpivotBuilder .setInput(plan.getRoot) .addAllIds(ids.toSeq.map(_.expr).asJava) @@ -1393,7 +1393,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @group typedrel * @since 3.4.0 */ - def limit(n: Int): Dataset[T] = session.newDataset { builder => + def limit(n: Int): Dataset[T] = sparkSession.newDataset { builder => builder.getLimitBuilder .setInput(plan.getRoot) .setLimit(n) @@ -1405,7 +1405,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @group typedrel * @since 3.4.0 */ - def offset(n: Int): Dataset[T] = session.newDataset { builder => + def offset(n: Int): Dataset[T] = sparkSession.newDataset { builder => builder.getOffsetBuilder .setInput(plan.getRoot) .setOffset(n) @@ -1413,7 +1413,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: private def buildSetOp(right: Dataset[T], setOpType: proto.SetOperation.SetOpType)( f: proto.SetOperation.Builder => Unit): Dataset[T] = { - session.newDataset { builder => + sparkSession.newDataset { builder => f( builder.getSetOpBuilder .setSetOpType(setOpType) @@ -1677,7 +1677,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @since 3.4.0 */ def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] = { - session.newDataset { builder => + sparkSession.newDataset { builder => builder.getSampleBuilder .setInput(plan.getRoot) .setWithReplacement(withReplacement) @@ -1745,7 +1745,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: normalizedCumWeights .sliding(2) .map { case Array(low, high) => - session.newDataset[T] { builder => + sparkSession.newDataset[T] { builder => builder.getSampleBuilder .setInput(sortedInput) .setWithReplacement(false) @@ -1789,7 +1789,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: val aliases = values.zip(names).map { case (value, name) => value.name(name).expr.getAlias } - session.newDataset { builder => + sparkSession.newDataset { builder => builder.getWithColumnsBuilder .setInput(plan.getRoot) .addAllAliases(aliases.asJava) @@ -1880,7 +1880,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @since 3.4.0 */ def withColumnsRenamed(colsMap: java.util.Map[String, String]): DataFrame = { - session.newDataset { builder => + sparkSession.newDataset { builder => builder.getWithColumnsRenamedBuilder .setInput(plan.getRoot) .putAllRenameColumnsMap(colsMap) @@ -1899,7 +1899,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: .setExpr(col(columnName).expr) .addName(columnName) .setMetadata(metadata.json) - session.newDataset { builder => + sparkSession.newDataset { builder => builder.getWithColumnsBuilder .setInput(plan.getRoot) .addAliases(newAlias) @@ -1959,7 +1959,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: @scala.annotation.varargs def drop(col: Column, cols: Column*): DataFrame = buildDrop(col +: cols) - private def buildDrop(cols: Seq[Column]): DataFrame = session.newDataset { builder => + private def buildDrop(cols: Seq[Column]): DataFrame = sparkSession.newDataset { builder => builder.getDropBuilder .setInput(plan.getRoot) .addAllCols(cols.map(_.expr).asJava) @@ -1972,7 +1972,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @group typedrel * @since 3.4.0 */ - def dropDuplicates(): Dataset[T] = session.newDataset { builder => + def dropDuplicates(): Dataset[T] = sparkSession.newDataset { builder => builder.getDeduplicateBuilder .setInput(plan.getRoot) .setAllColumnsAsKeys(true) @@ -1985,7 +1985,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @group typedrel * @since 3.4.0 */ - def dropDuplicates(colNames: Seq[String]): Dataset[T] = session.newDataset { builder => + def dropDuplicates(colNames: Seq[String]): Dataset[T] = sparkSession.newDataset { builder => builder.getDeduplicateBuilder .setInput(plan.getRoot) .addAllColumnNames(colNames.asJava) @@ -2042,7 +2042,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @since 3.4.0 */ @scala.annotation.varargs - def describe(cols: String*): DataFrame = session.newDataset { builder => + def describe(cols: String*): DataFrame = sparkSession.newDataset { builder => builder.getDescribeBuilder .setInput(plan.getRoot) .addAllCols(cols.asJava) @@ -2117,7 +2117,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @since 3.4.0 */ @scala.annotation.varargs - def summary(statistics: String*): DataFrame = session.newDataset { builder => + def summary(statistics: String*): DataFrame = sparkSession.newDataset { builder => builder.getSummaryBuilder .setInput(plan.getRoot) .addAllStatistics(statistics.asJava) @@ -2185,7 +2185,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: * @since 3.4.0 */ def tail(n: Int): Array[T] = { - val lastN = session.newDataset[T] { builder => + val lastN = sparkSession.newDataset[T] { builder => builder.getTailBuilder .setInput(plan.getRoot) .setLimit(n) @@ -2257,7 +2257,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: } private def buildRepartition(numPartitions: Int, shuffle: Boolean): Dataset[T] = { - session.newDataset { builder => + sparkSession.newDataset { builder => builder.getRepartitionBuilder .setInput(plan.getRoot) .setNumPartitions(numPartitions) @@ -2267,7 +2267,7 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: private def buildRepartitionByExpression( numPartitions: Option[Int], - partitionExprs: Seq[Column]): Dataset[T] = session.newDataset { builder => + partitionExprs: Seq[Column]): Dataset[T] = sparkSession.newDataset { builder => val repartitionBuilder = builder.getRepartitionByExpressionBuilder .setInput(plan.getRoot) .addAllPartitionExprs(partitionExprs.map(_.expr).asJava) @@ -2462,10 +2462,10 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: } private[sql] def analyze: proto.AnalyzePlanResponse = { - session.analyze(plan, proto.Explain.ExplainMode.SIMPLE) + sparkSession.analyze(plan, proto.Explain.ExplainMode.SIMPLE) } - def collectResult(): SparkResult = session.execute(plan) + def collectResult(): SparkResult = sparkSession.execute(plan) private[sql] def withResult[E](f: SparkResult => E): E = { val result = collectResult() diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index c918061ac46..76d3ab5cf09 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -42,7 +42,7 @@ class RelationalGroupedDataset protected[sql] ( pivot: Option[proto.Aggregate.Pivot] = None) { private[this] def toDF(aggExprs: Seq[Column]): DataFrame = { - df.session.newDataset { builder => + df.sparkSession.newDataset { builder => builder.getAggregateBuilder .setInput(df.plan.getRoot) .addAllGroupingExpressions(groupingExprs.asJava) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala index 010f3c616e6..d6d21773732 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala @@ -69,30 +69,131 @@ class CompatibilitySuite extends AnyFunSuite { // scalastyle:ignore funsuite val mima = new MiMaLib(Seq(clientJar, sqlJar)) val allProblems = mima.collectProblems(sqlJar, clientJar, List.empty) val includedRules = Seq( - IncludeByName("org.apache.spark.sql.Column"), - IncludeByName("org.apache.spark.sql.Column$"), - IncludeByName("org.apache.spark.sql.Dataset"), - // TODO(SPARK-42175) Add the Dataset object definition - // IncludeByName("org.apache.spark.sql.Dataset$"), - IncludeByName("org.apache.spark.sql.DataFrame"), + IncludeByName("org.apache.spark.sql.Column.*"), + IncludeByName("org.apache.spark.sql.DataFrame.*"), IncludeByName("org.apache.spark.sql.DataFrameReader.*"), IncludeByName("org.apache.spark.sql.DataFrameWriter.*"), IncludeByName("org.apache.spark.sql.DataFrameWriterV2.*"), - IncludeByName("org.apache.spark.sql.SparkSession"), - IncludeByName("org.apache.spark.sql.SparkSession$")) ++ includeImplementedMethods(clientJar) + IncludeByName("org.apache.spark.sql.Dataset.*"), + IncludeByName("org.apache.spark.sql.functions.*"), + IncludeByName("org.apache.spark.sql.RelationalGroupedDataset.*"), + IncludeByName("org.apache.spark.sql.SparkSession.*")) val excludeRules = Seq( // Filter unsupported rules: - // Two sql overloading methods are marked experimental in the API and skipped in the client. - ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sql"), - // Deprecated json methods and RDD related methods are skipped in the client. + // Note when muting errors for a method, checks on all overloading methods are also muted. + + // Skip all shaded dependencies and proto files in the client. + ProblemFilters.exclude[Problem]("org.sparkproject.*"), + ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"), + + // DataFrame Reader & Writer ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.csv"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"), - // Skip all shaded dependencies in the client. - ProblemFilters.exclude[Problem]("org.sparkproject.*"), - ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"), - // Disable Range until we support typed APIs + + // Dataset + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.ofRows"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.DATASET_ID_TAG"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.COL_POS_KEY"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.DATASET_ID_KEY"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.curId"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.groupBy"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.observe"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.queryExecution"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.encoder"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.as"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.checkpoint"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.localCheckpoint"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.withWatermark"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.na"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.stat"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.joinWith"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.select"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.selectUntyped"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.reduce"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.groupByKey"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.explode"), // deprecated + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.filter"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.map"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.mapPartitions"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.flatMap"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreach"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreachPartition"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.persist"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.cache"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.storageLevel"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.unpersist"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.rdd"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJavaRDD"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.javaRDD"), + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.Dataset.registerTempTable" + ), // deprecated + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createTempView"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createOrReplaceTempView"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createGlobalTempView"), + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.Dataset.createOrReplaceGlobalTempView"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.writeStream"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJSON"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sameSemantics"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.semanticHash"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.this"), + + // functions + ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.udf"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.call_udf"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.callUDF"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.unwrap_udt"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.udaf"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.broadcast"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.count"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.typedlit"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.typedLit"), + + // RelationalGroupedDataset + ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.as"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.pivot"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.this"), + + // SparkSession + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.active"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getActiveSession"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.implicits"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sparkContext"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.version"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sharedState"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sessionState"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sqlContext"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.conf"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.listenerManager"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.experimental"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.udf"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.streams"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.newSession"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.emptyDataFrame"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.emptyDataset"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataFrame"), + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.SparkSession.baseRelationToDataFrame"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataset"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.catalog"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.readStream"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.time"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.stop"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setActiveSession"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearActiveSession"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getActiveSession"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.range")) val problems = allProblems .filter { p => @@ -127,31 +228,6 @@ class CompatibilitySuite extends AnyFunSuite { // scalastyle:ignore funsuite }) } - /** - * Find all methods that are implemented in the client jar. Once all major methods are - * implemented we can switch to include all methods under the class using ".*" e.g. - * "org.apache.spark.sql.Dataset.*" - */ - private def includeImplementedMethods(clientJar: File): Seq[IncludeByName] = { - val clsNames = Seq( - "org.apache.spark.sql.Column", - // TODO(SPARK-42175) Add all overloading methods. Temporarily mute compatibility check for \ - // the Dataset methods, as too many overload methods are missing. - // "org.apache.spark.sql.Dataset", - "org.apache.spark.sql.SparkSession") - - val clientClassLoader: URLClassLoader = new URLClassLoader(Seq(clientJar.toURI.toURL).toArray) - clsNames - .flatMap { clsName => - val cls = clientClassLoader.loadClass(clsName) - // all distinct method names - cls.getMethods.map(m => s"$clsName.${m.getName}").toSet - } - .map { fullName => - IncludeByName(fullName) - } - } - private case class IncludeByName(name: String) extends ProblemFilter { private[this] val pattern = Pattern.compile(name.split("\\*", -1).map(Pattern.quote).mkString(".*")) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org