wypoon commented on pull request #3415:
URL: https://github.com/apache/iceberg/pull/3415#issuecomment-954438654


   ```
   iceberg % diff -r spark/v3.0 spark/v3.1
   diff -r spark/v3.0/build.gradle spark/v3.1/build.gradle
   21,23c21,23
   <     project(':iceberg-spark:iceberg-spark3'),
   <     project(":iceberg-spark:iceberg-spark3-extensions"),
   <     project(':iceberg-spark:iceberg-spark3-runtime')
   ---
   >     project(':iceberg-spark:iceberg-spark-3.1'),
   >     project(":iceberg-spark:iceberg-spark-3.1-extensions"),
   >     project(':iceberg-spark:iceberg-spark-3.1-runtime')
   28c28
   <     sparkVersion = '3.0.3'
   ---
   >     sparkVersion = '3.1.2'
   41c41
   < project(':iceberg-spark:iceberg-spark3') {
   ---
   > project(':iceberg-spark:iceberg-spark-3.1') {
   106c106
   < project(":iceberg-spark:iceberg-spark3-extensions") {
   ---
   > project(":iceberg-spark:iceberg-spark-3.1-extensions") {
   130,131c130
   <     compileOnly project(':iceberg-spark')
   <     compileOnly project(':iceberg-spark:iceberg-spark3')
   ---
   >     compileOnly project(':iceberg-spark:iceberg-spark-3.1')
   142,143c141
   <     testImplementation project(path: ':iceberg-spark', configuration: 
'testArtifacts')
   <     testImplementation project(path: ':iceberg-spark:iceberg-spark3', 
configuration: 'testArtifacts')
   ---
   >     testImplementation project(path: ':iceberg-spark:iceberg-spark-3.1', 
configuration: 'testArtifacts')
   159c157
   < project(':iceberg-spark:iceberg-spark3-runtime') {
   ---
   > project(':iceberg-spark:iceberg-spark-3.1-runtime') {
   193,194c191,192
   <     implementation project(':iceberg-spark:iceberg-spark3')
   <     implementation project(':iceberg-spark:iceberg-spark3-extensions')
   ---
   >     implementation project(':iceberg-spark:iceberg-spark-3.1')
   >     implementation project(':iceberg-spark:iceberg-spark-3.1-extensions')
   205,207c203,204
   <     integrationImplementation project(path: ':iceberg-spark', 
configuration: 'testArtifacts')
   <     integrationImplementation project(path: 
':iceberg-spark:iceberg-spark3', configuration: 'testArtifacts')
   <     integrationImplementation project(path: 
':iceberg-spark:iceberg-spark3-extensions', configuration: 'testArtifacts')
   ---
   >     integrationImplementation project(path: 
':iceberg-spark:iceberg-spark-3.1', configuration: 'testArtifacts')
   >     integrationImplementation project(path: 
':iceberg-spark:iceberg-spark-3.1-extensions', configuration: 'testArtifacts')
   209,210c206,207
   <     integrationCompileOnly 
project(':iceberg-spark:iceberg-spark3-extensions')
   <     integrationCompileOnly project(':iceberg-spark:iceberg-spark3')
   ---
   >     integrationCompileOnly 
project(':iceberg-spark:iceberg-spark-3.1-extensions')
   >     integrationCompileOnly project(':iceberg-spark:iceberg-spark-3.1')
   249c246
   <     description = "Test Spark3 Runtime Jar against Spark 3.0"
   ---
   >     description = "Test Spark3 Runtime Jar against Spark 3.1"
   Only in spark/v3.0/spark/src/main/java/org/apache/iceberg/spark: 
Spark3VersionUtil.java
   diff -r 
spark/v3.0/spark/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
 
spark/v3.1/spark/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
   22,23d21
   < import org.apache.iceberg.common.DynConstructors
   < import org.apache.iceberg.spark.Spark3VersionUtil
   34a33
   > import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
   80d78
   <       val numShufflePartitions = conf.numShufflePartitions
   84c82
   <       PlanUtils.createRepartitionByExpression(distribution.toSeq, query, 
numShufflePartitions)
   ---
   >       RepartitionByExpression(distribution.toSeq, query, None)
   101,131d98
   <   private val sortOrderCtor: 
DynConstructors.Ctor[catalyst.expressions.SortOrder] =
   <     DynConstructors.builder()
   <       .impl(classOf[catalyst.expressions.SortOrder],
   <         classOf[catalyst.expressions.Expression],
   <         classOf[catalyst.expressions.SortDirection],
   <         classOf[catalyst.expressions.NullOrdering],
   <         classOf[Seq[catalyst.expressions.Expression]])
   <       .impl(classOf[catalyst.expressions.SortOrder],
   <         classOf[catalyst.expressions.Expression],
   <         classOf[catalyst.expressions.SortDirection],
   <         classOf[catalyst.expressions.NullOrdering],
   <         classOf[Set[catalyst.expressions.Expression]])
   <       .build()
   < 
   <   def createSortOrder(
   <       child: catalyst.expressions.Expression,
   <       direction: catalyst.expressions.SortDirection): 
catalyst.expressions.SortOrder = {
   <     createSortOrder(child, direction, direction.defaultNullOrdering)
   <   }
   < 
   <   def createSortOrder(
   <       child: catalyst.expressions.Expression,
   <       direction: catalyst.expressions.SortDirection,
   <       nullOrdering: catalyst.expressions.NullOrdering): 
catalyst.expressions.SortOrder = {
   <     if (Spark3VersionUtil.isSpark30) {
   <       sortOrderCtor.newInstance(child, direction, nullOrdering, Set.empty)
   <     } else {
   <       sortOrderCtor.newInstance(child, direction, nullOrdering, Seq.empty)
   <     }
   <   }
   < 
   152c119
   <         createSortOrder(catalystChild, toCatalyst(s.direction), 
toCatalyst(s.nullOrdering))
   ---
   >         catalyst.expressions.SortOrder(catalystChild, 
toCatalyst(s.direction), toCatalyst(s.nullOrdering), Seq.empty)
   diff -r 
spark/v3.0/spark/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanUtils.scala
 
spark/v3.1/spark/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanUtils.scala
   22,23d21
   < import org.apache.iceberg.common.DynConstructors
   < import org.apache.iceberg.spark.Spark3VersionUtil
   25d22
   < import org.apache.spark.sql.catalyst.expressions.Expression
   27d23
   < import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
   44,67d39
   < 
   <   private val repartitionByExpressionCtor: 
DynConstructors.Ctor[RepartitionByExpression] =
   <     DynConstructors.builder()
   <       .impl(classOf[RepartitionByExpression],
   <         classOf[Seq[Expression]],
   <         classOf[LogicalPlan],
   <         classOf[Option[Int]])
   <       .impl(classOf[RepartitionByExpression],
   <         classOf[Seq[Expression]],
   <         classOf[LogicalPlan],
   <         Integer.TYPE)
   <       .build()
   < 
   <   def createRepartitionByExpression(
   <       partitionExpressions: Seq[Expression],
   <       child: LogicalPlan,
   <       numPartitions: Int): RepartitionByExpression = {
   <     if (Spark3VersionUtil.isSpark30) {
   <       repartitionByExpressionCtor.newInstance(partitionExpressions, child, 
Integer.valueOf(numPartitions))
   <     } else {
   <       // Do not pass numPartitions because it is set automatically for AQE
   <       repartitionByExpressionCtor.newInstance(partitionExpressions, child, 
None)
   <     }
   <   }
   diff -r 
spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkDateTimes.java
 
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkDateTimes.java
   72c72
   <     String sparkTimestamp = DateTimeUtils.timestampToString(formatter, 
ts.value());
   ---
   >     String sparkTimestamp = formatter.format(ts.value());
   diff -r 
spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java 
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
   23d22
   < import org.apache.iceberg.AssertHelpers;
   27d25
   < import org.apache.iceberg.spark.Spark3VersionUtil;
   31d28
   < import org.junit.Assume;
   45,108d41
   <   public void testDeleteFromUnpartitionedTable() {
   <     // This test fails in Spark 3.1. `canDeleteWhere` was added to 
`SupportsDelete` in Spark 3.1,
   <     // but logic to rewrite the query if `canDeleteWhere` returns false 
was left to be implemented
   <     // later.
   <     Assume.assumeTrue(Spark3VersionUtil.isSpark30());
   <     // set the shuffle partitions to 1 to force the write to use a single 
task and produce 1 file
   <     String originalParallelism = 
spark.conf().get("spark.sql.shuffle.partitions");
   <     spark.conf().set("spark.sql.shuffle.partitions", "1");
   <     try {
   <       sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING 
iceberg", tableName);
   <       sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", 
tableName);
   < 
   <       assertEquals("Should have expected rows",
   <           ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
   <           sql("SELECT * FROM %s ORDER BY id", tableName));
   < 
   <       AssertHelpers.assertThrows("Should not delete when not all rows of a 
file match the filter",
   <           IllegalArgumentException.class, "Failed to cleanly delete data 
files",
   <           () -> sql("DELETE FROM %s WHERE id < 2", tableName));
   < 
   <       sql("DELETE FROM %s WHERE id < 4", tableName);
   < 
   <       Assert.assertEquals("Should have no rows after successful delete",
   <           0L, scalarSql("SELECT count(1) FROM %s", tableName));
   < 
   <     } finally {
   <       spark.conf().set("spark.sql.shuffle.partitions", 
originalParallelism);
   <     }
   <   }
   < 
   <   @Test
   <   public void testDeleteFromPartitionedTable() {
   <     // This test fails in Spark 3.1. `canDeleteWhere` was added to 
`SupportsDelete` in Spark 3.1,
   <     // but logic to rewrite the query if `canDeleteWhere` returns false 
was left to be implemented
   <     // later.
   <     Assume.assumeTrue(Spark3VersionUtil.isSpark30());
   <     // set the shuffle partitions to 1 to force the write to use a single 
task and produce 1 file per partition
   <     String originalParallelism = 
spark.conf().get("spark.sql.shuffle.partitions");
   <     spark.conf().set("spark.sql.shuffle.partitions", "1");
   <     try {
   <       sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
" +
   <           "PARTITIONED BY (truncate(id, 2))", tableName);
   <       sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", 
tableName);
   < 
   <       assertEquals("Should have 3 rows in 2 partitions",
   <           ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
   <           sql("SELECT * FROM %s ORDER BY id", tableName));
   < 
   <       AssertHelpers.assertThrows("Should not delete when not all rows of a 
file match the filter",
   <           IllegalArgumentException.class, "Failed to cleanly delete data 
files",
   <           () -> sql("DELETE FROM %s WHERE id > 2", tableName));
   < 
   <       sql("DELETE FROM %s WHERE id < 2", tableName);
   < 
   <       assertEquals("Should have two rows in the second partition",
   <           ImmutableList.of(row(2L, "b"), row(3L, "c")),
   <           sql("SELECT * FROM %s ORDER BY id", tableName));
   < 
   <     } finally {
   <       spark.conf().set("spark.sql.shuffle.partitions", 
originalParallelism);
   <     }
   <   }
   < 
   <   @Test
   diff -r 
spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
 
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
   28d27
   < import org.apache.iceberg.spark.Spark3VersionUtil;
   31d29
   < import org.junit.Assume;
   54,56d51
   <     Assume.assumeFalse("Spark 3.0 Spark Session Catalog does not use V2 
Catalogs so Iceberg refresh is impossible",
   <         Spark3VersionUtil.isSpark30() && 
catalogName.equals("spark_catalog"));
   < 
   diff -r 
spark/v3.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
 
spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
   35d34
   < import 
org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper.createAlias
   103c102
   <                 createAlias(GetStructField(col, ordinal, 
Some(field.name)), field.name)
   ---
   >                 Alias(GetStructField(col, ordinal, Some(field.name)), 
field.name)()
   diff -r 
spark/v3.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
 
spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
   30a31
   > import org.apache.spark.sql.catalyst.expressions.SortOrder
   35a37
   > import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
   40d41
   < import 
org.apache.spark.sql.catalyst.utils.PlanUtils.createRepartitionByExpression
   97,98c98
   <         val numShufflePartitions = conf.numShufflePartitions
   <         createRepartitionByExpression(Seq(fileNameCol), remainingRowsPlan, 
numShufflePartitions)
   ---
   >         RepartitionByExpression(Seq(fileNameCol), remainingRowsPlan, None)
   101c101
   <     val order = Seq(createSortOrder(fileNameCol, Ascending), 
createSortOrder(rowPosCol, Ascending))
   ---
   >     val order = Seq(SortOrder(fileNameCol, Ascending), 
SortOrder(rowPosCol, Ascending))
   diff -r 
spark/v3.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
 
spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
   61d60
   <   import RewriteRowLevelOperationHelper._
   85c84
   <         val outputCols = outputExprs.zip(outputColNames).map { case (expr, 
name) => createAlias(expr, name) }
   ---
   >         val outputCols = outputExprs.zip(outputColNames).map { case (expr, 
name) => Alias(expr, name)() }
   127c126
   <         val sourceTableProj = source.output ++ 
Seq(createAlias(TRUE_LITERAL, ROW_FROM_SOURCE))
   ---
   >         val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL, 
ROW_FROM_SOURCE)())
   157c156
   <         val sourceTableProj = source.output ++ 
Seq(createAlias(TRUE_LITERAL, ROW_FROM_SOURCE))
   ---
   >         val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL, 
ROW_FROM_SOURCE)())
   160c159
   <         val targetTableProj = targetTableScan.output ++ 
Seq(createAlias(TRUE_LITERAL, ROW_FROM_TARGET))
   ---
   >         val targetTableProj = targetTableScan.output ++ 
Seq(Alias(TRUE_LITERAL, ROW_FROM_TARGET)())
   diff -r 
spark/v3.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
 
spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
   49d48
   <   import RewriteRowLevelOperationHelper._
   117c116
   <         createAlias(assignedExpr, attr.name)
   ---
   >         Alias(assignedExpr, attr.name)()
   120c119
   <         createAlias(updatedExpr, attr.name)
   ---
   >         Alias(updatedExpr, attr.name)()
   diff -r 
spark/v3.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
 
spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
   24d23
   < import org.apache.iceberg.spark.Spark3VersionUtil
   92c91
   <     val scanRelation = createScanRelation(relation, scan, outputAttrs)
   ---
   >     val scanRelation = DataSourceV2ScanRelation(relation, scan, 
outputAttrs)
   112c111
   <     val scanRelation = createScanRelation(relation, scan, outputAttrs)
   ---
   >     val scanRelation = DataSourceV2ScanRelation(relation, scan, 
outputAttrs)
   180c179
   <     val accumulatorExpr = createAlias(AccumulateFiles(filesAccumulator, 
fileAttr), AFFECTED_FILES_ACC_ALIAS_NAME)
   ---
   >     val accumulatorExpr = Alias(AccumulateFiles(filesAccumulator, 
fileAttr), AFFECTED_FILES_ACC_ALIAS_NAME)()
   184c183
   <     val aggSumCol = 
createAlias(AggregateExpression(Sum(affectedFilesAttr), Complete, false), 
SUM_ROW_ID_ALIAS_NAME)
   ---
   >     val aggSumCol = Alias(AggregateExpression(Sum(affectedFilesAttr), 
Complete, false), SUM_ROW_ID_ALIAS_NAME)()
   238,281d236
   < 
   <   private val scanRelationCtor: 
DynConstructors.Ctor[DataSourceV2ScanRelation] =
   <     DynConstructors.builder()
   <       .impl(classOf[DataSourceV2ScanRelation],
   <         classOf[DataSourceV2Relation],
   <         classOf[Scan],
   <         classOf[Seq[AttributeReference]])
   <       .impl(classOf[DataSourceV2ScanRelation],
   <         classOf[Table],
   <         classOf[Scan],
   <         classOf[Seq[AttributeReference]])
   <       .build()
   < 
   <   def createScanRelation(
   <       relation: DataSourceV2Relation,
   <       scan: Scan,
   <       outputAttrs: Seq[AttributeReference]): DataSourceV2ScanRelation = {
   <     if (Spark3VersionUtil.isSpark30) {
   <       scanRelationCtor.newInstance(relation.table, scan, outputAttrs)
   <     } else {
   <       scanRelationCtor.newInstance(relation, scan, outputAttrs)
   <     }
   <   }
   < 
   <   private val aliasCtor: DynConstructors.Ctor[Alias] =
   <     DynConstructors.builder()
   <       .impl(classOf[Alias],
   <         classOf[Expression],
   <         classOf[String],
   <         classOf[ExprId],
   <         classOf[Seq[String]],
   <         classOf[Option[Metadata]],
   <         classOf[Seq[String]])
   <       .impl(classOf[Alias],
   <         classOf[Expression],
   <         classOf[String],
   <         classOf[ExprId],
   <         classOf[Seq[String]],
   <         classOf[Option[Metadata]])
   <       .build()
   < 
   <   def createAlias(child: Expression, name: String): Alias = {
   <     aliasCtor.newInstance(child, name, NamedExpression.newExprId, 
Seq.empty, None, Seq.empty)
   <   }
   iceberg % 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to