wypoon commented on pull request #3415:
URL: https://github.com/apache/iceberg/pull/3415#issuecomment-954438654
```
iceberg % diff -r spark/v3.0 spark/v3.1
diff -r spark/v3.0/build.gradle spark/v3.1/build.gradle
21,23c21,23
< project(':iceberg-spark:iceberg-spark3'),
< project(":iceberg-spark:iceberg-spark3-extensions"),
< project(':iceberg-spark:iceberg-spark3-runtime')
---
> project(':iceberg-spark:iceberg-spark-3.1'),
> project(":iceberg-spark:iceberg-spark-3.1-extensions"),
> project(':iceberg-spark:iceberg-spark-3.1-runtime')
28c28
< sparkVersion = '3.0.3'
---
> sparkVersion = '3.1.2'
41c41
< project(':iceberg-spark:iceberg-spark3') {
---
> project(':iceberg-spark:iceberg-spark-3.1') {
106c106
< project(":iceberg-spark:iceberg-spark3-extensions") {
---
> project(":iceberg-spark:iceberg-spark-3.1-extensions") {
130,131c130
< compileOnly project(':iceberg-spark')
< compileOnly project(':iceberg-spark:iceberg-spark3')
---
> compileOnly project(':iceberg-spark:iceberg-spark-3.1')
142,143c141
< testImplementation project(path: ':iceberg-spark', configuration:
'testArtifacts')
< testImplementation project(path: ':iceberg-spark:iceberg-spark3',
configuration: 'testArtifacts')
---
> testImplementation project(path: ':iceberg-spark:iceberg-spark-3.1',
configuration: 'testArtifacts')
159c157
< project(':iceberg-spark:iceberg-spark3-runtime') {
---
> project(':iceberg-spark:iceberg-spark-3.1-runtime') {
193,194c191,192
< implementation project(':iceberg-spark:iceberg-spark3')
< implementation project(':iceberg-spark:iceberg-spark3-extensions')
---
> implementation project(':iceberg-spark:iceberg-spark-3.1')
> implementation project(':iceberg-spark:iceberg-spark-3.1-extensions')
205,207c203,204
< integrationImplementation project(path: ':iceberg-spark',
configuration: 'testArtifacts')
< integrationImplementation project(path:
':iceberg-spark:iceberg-spark3', configuration: 'testArtifacts')
< integrationImplementation project(path:
':iceberg-spark:iceberg-spark3-extensions', configuration: 'testArtifacts')
---
> integrationImplementation project(path:
':iceberg-spark:iceberg-spark-3.1', configuration: 'testArtifacts')
> integrationImplementation project(path:
':iceberg-spark:iceberg-spark-3.1-extensions', configuration: 'testArtifacts')
209,210c206,207
< integrationCompileOnly
project(':iceberg-spark:iceberg-spark3-extensions')
< integrationCompileOnly project(':iceberg-spark:iceberg-spark3')
---
> integrationCompileOnly
project(':iceberg-spark:iceberg-spark-3.1-extensions')
> integrationCompileOnly project(':iceberg-spark:iceberg-spark-3.1')
249c246
< description = "Test Spark3 Runtime Jar against Spark 3.0"
---
> description = "Test Spark3 Runtime Jar against Spark 3.1"
Only in spark/v3.0/spark/src/main/java/org/apache/iceberg/spark:
Spark3VersionUtil.java
diff -r
spark/v3.0/spark/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
spark/v3.1/spark/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
22,23d21
< import org.apache.iceberg.common.DynConstructors
< import org.apache.iceberg.spark.Spark3VersionUtil
34a33
> import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
80d78
< val numShufflePartitions = conf.numShufflePartitions
84c82
< PlanUtils.createRepartitionByExpression(distribution.toSeq, query,
numShufflePartitions)
---
> RepartitionByExpression(distribution.toSeq, query, None)
101,131d98
< private val sortOrderCtor:
DynConstructors.Ctor[catalyst.expressions.SortOrder] =
< DynConstructors.builder()
< .impl(classOf[catalyst.expressions.SortOrder],
< classOf[catalyst.expressions.Expression],
< classOf[catalyst.expressions.SortDirection],
< classOf[catalyst.expressions.NullOrdering],
< classOf[Seq[catalyst.expressions.Expression]])
< .impl(classOf[catalyst.expressions.SortOrder],
< classOf[catalyst.expressions.Expression],
< classOf[catalyst.expressions.SortDirection],
< classOf[catalyst.expressions.NullOrdering],
< classOf[Set[catalyst.expressions.Expression]])
< .build()
<
< def createSortOrder(
< child: catalyst.expressions.Expression,
< direction: catalyst.expressions.SortDirection):
catalyst.expressions.SortOrder = {
< createSortOrder(child, direction, direction.defaultNullOrdering)
< }
<
< def createSortOrder(
< child: catalyst.expressions.Expression,
< direction: catalyst.expressions.SortDirection,
< nullOrdering: catalyst.expressions.NullOrdering):
catalyst.expressions.SortOrder = {
< if (Spark3VersionUtil.isSpark30) {
< sortOrderCtor.newInstance(child, direction, nullOrdering, Set.empty)
< } else {
< sortOrderCtor.newInstance(child, direction, nullOrdering, Seq.empty)
< }
< }
<
152c119
< createSortOrder(catalystChild, toCatalyst(s.direction),
toCatalyst(s.nullOrdering))
---
> catalyst.expressions.SortOrder(catalystChild,
toCatalyst(s.direction), toCatalyst(s.nullOrdering), Seq.empty)
diff -r
spark/v3.0/spark/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanUtils.scala
spark/v3.1/spark/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanUtils.scala
22,23d21
< import org.apache.iceberg.common.DynConstructors
< import org.apache.iceberg.spark.Spark3VersionUtil
25d22
< import org.apache.spark.sql.catalyst.expressions.Expression
27d23
< import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
44,67d39
<
< private val repartitionByExpressionCtor:
DynConstructors.Ctor[RepartitionByExpression] =
< DynConstructors.builder()
< .impl(classOf[RepartitionByExpression],
< classOf[Seq[Expression]],
< classOf[LogicalPlan],
< classOf[Option[Int]])
< .impl(classOf[RepartitionByExpression],
< classOf[Seq[Expression]],
< classOf[LogicalPlan],
< Integer.TYPE)
< .build()
<
< def createRepartitionByExpression(
< partitionExpressions: Seq[Expression],
< child: LogicalPlan,
< numPartitions: Int): RepartitionByExpression = {
< if (Spark3VersionUtil.isSpark30) {
< repartitionByExpressionCtor.newInstance(partitionExpressions, child,
Integer.valueOf(numPartitions))
< } else {
< // Do not pass numPartitions because it is set automatically for AQE
< repartitionByExpressionCtor.newInstance(partitionExpressions, child,
None)
< }
< }
diff -r
spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkDateTimes.java
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkDateTimes.java
72c72
< String sparkTimestamp = DateTimeUtils.timestampToString(formatter,
ts.value());
---
> String sparkTimestamp = formatter.format(ts.value());
diff -r
spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
23d22
< import org.apache.iceberg.AssertHelpers;
27d25
< import org.apache.iceberg.spark.Spark3VersionUtil;
31d28
< import org.junit.Assume;
45,108d41
< public void testDeleteFromUnpartitionedTable() {
< // This test fails in Spark 3.1. `canDeleteWhere` was added to
`SupportsDelete` in Spark 3.1,
< // but logic to rewrite the query if `canDeleteWhere` returns false
was left to be implemented
< // later.
< Assume.assumeTrue(Spark3VersionUtil.isSpark30());
< // set the shuffle partitions to 1 to force the write to use a single
task and produce 1 file
< String originalParallelism =
spark.conf().get("spark.sql.shuffle.partitions");
< spark.conf().set("spark.sql.shuffle.partitions", "1");
< try {
< sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING
iceberg", tableName);
< sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c')",
tableName);
<
< assertEquals("Should have expected rows",
< ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
< sql("SELECT * FROM %s ORDER BY id", tableName));
<
< AssertHelpers.assertThrows("Should not delete when not all rows of a
file match the filter",
< IllegalArgumentException.class, "Failed to cleanly delete data
files",
< () -> sql("DELETE FROM %s WHERE id < 2", tableName));
<
< sql("DELETE FROM %s WHERE id < 4", tableName);
<
< Assert.assertEquals("Should have no rows after successful delete",
< 0L, scalarSql("SELECT count(1) FROM %s", tableName));
<
< } finally {
< spark.conf().set("spark.sql.shuffle.partitions",
originalParallelism);
< }
< }
<
< @Test
< public void testDeleteFromPartitionedTable() {
< // This test fails in Spark 3.1. `canDeleteWhere` was added to
`SupportsDelete` in Spark 3.1,
< // but logic to rewrite the query if `canDeleteWhere` returns false
was left to be implemented
< // later.
< Assume.assumeTrue(Spark3VersionUtil.isSpark30());
< // set the shuffle partitions to 1 to force the write to use a single
task and produce 1 file per partition
< String originalParallelism =
spark.conf().get("spark.sql.shuffle.partitions");
< spark.conf().set("spark.sql.shuffle.partitions", "1");
< try {
< sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
" +
< "PARTITIONED BY (truncate(id, 2))", tableName);
< sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c')",
tableName);
<
< assertEquals("Should have 3 rows in 2 partitions",
< ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
< sql("SELECT * FROM %s ORDER BY id", tableName));
<
< AssertHelpers.assertThrows("Should not delete when not all rows of a
file match the filter",
< IllegalArgumentException.class, "Failed to cleanly delete data
files",
< () -> sql("DELETE FROM %s WHERE id > 2", tableName));
<
< sql("DELETE FROM %s WHERE id < 2", tableName);
<
< assertEquals("Should have two rows in the second partition",
< ImmutableList.of(row(2L, "b"), row(3L, "c")),
< sql("SELECT * FROM %s ORDER BY id", tableName));
<
< } finally {
< spark.conf().set("spark.sql.shuffle.partitions",
originalParallelism);
< }
< }
<
< @Test
diff -r
spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
28d27
< import org.apache.iceberg.spark.Spark3VersionUtil;
31d29
< import org.junit.Assume;
54,56d51
< Assume.assumeFalse("Spark 3.0 Spark Session Catalog does not use V2
Catalogs so Iceberg refresh is impossible",
< Spark3VersionUtil.isSpark30() &&
catalogName.equals("spark_catalog"));
<
diff -r
spark/v3.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
35d34
< import
org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper.createAlias
103c102
< createAlias(GetStructField(col, ordinal,
Some(field.name)), field.name)
---
> Alias(GetStructField(col, ordinal, Some(field.name)),
field.name)()
diff -r
spark/v3.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
30a31
> import org.apache.spark.sql.catalyst.expressions.SortOrder
35a37
> import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
40d41
< import
org.apache.spark.sql.catalyst.utils.PlanUtils.createRepartitionByExpression
97,98c98
< val numShufflePartitions = conf.numShufflePartitions
< createRepartitionByExpression(Seq(fileNameCol), remainingRowsPlan,
numShufflePartitions)
---
> RepartitionByExpression(Seq(fileNameCol), remainingRowsPlan, None)
101c101
< val order = Seq(createSortOrder(fileNameCol, Ascending),
createSortOrder(rowPosCol, Ascending))
---
> val order = Seq(SortOrder(fileNameCol, Ascending),
SortOrder(rowPosCol, Ascending))
diff -r
spark/v3.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
61d60
< import RewriteRowLevelOperationHelper._
85c84
< val outputCols = outputExprs.zip(outputColNames).map { case (expr,
name) => createAlias(expr, name) }
---
> val outputCols = outputExprs.zip(outputColNames).map { case (expr,
name) => Alias(expr, name)() }
127c126
< val sourceTableProj = source.output ++
Seq(createAlias(TRUE_LITERAL, ROW_FROM_SOURCE))
---
> val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL,
ROW_FROM_SOURCE)())
157c156
< val sourceTableProj = source.output ++
Seq(createAlias(TRUE_LITERAL, ROW_FROM_SOURCE))
---
> val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL,
ROW_FROM_SOURCE)())
160c159
< val targetTableProj = targetTableScan.output ++
Seq(createAlias(TRUE_LITERAL, ROW_FROM_TARGET))
---
> val targetTableProj = targetTableScan.output ++
Seq(Alias(TRUE_LITERAL, ROW_FROM_TARGET)())
diff -r
spark/v3.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
49d48
< import RewriteRowLevelOperationHelper._
117c116
< createAlias(assignedExpr, attr.name)
---
> Alias(assignedExpr, attr.name)()
120c119
< createAlias(updatedExpr, attr.name)
---
> Alias(updatedExpr, attr.name)()
diff -r
spark/v3.0/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
24d23
< import org.apache.iceberg.spark.Spark3VersionUtil
92c91
< val scanRelation = createScanRelation(relation, scan, outputAttrs)
---
> val scanRelation = DataSourceV2ScanRelation(relation, scan,
outputAttrs)
112c111
< val scanRelation = createScanRelation(relation, scan, outputAttrs)
---
> val scanRelation = DataSourceV2ScanRelation(relation, scan,
outputAttrs)
180c179
< val accumulatorExpr = createAlias(AccumulateFiles(filesAccumulator,
fileAttr), AFFECTED_FILES_ACC_ALIAS_NAME)
---
> val accumulatorExpr = Alias(AccumulateFiles(filesAccumulator,
fileAttr), AFFECTED_FILES_ACC_ALIAS_NAME)()
184c183
< val aggSumCol =
createAlias(AggregateExpression(Sum(affectedFilesAttr), Complete, false),
SUM_ROW_ID_ALIAS_NAME)
---
> val aggSumCol = Alias(AggregateExpression(Sum(affectedFilesAttr),
Complete, false), SUM_ROW_ID_ALIAS_NAME)()
238,281d236
<
< private val scanRelationCtor:
DynConstructors.Ctor[DataSourceV2ScanRelation] =
< DynConstructors.builder()
< .impl(classOf[DataSourceV2ScanRelation],
< classOf[DataSourceV2Relation],
< classOf[Scan],
< classOf[Seq[AttributeReference]])
< .impl(classOf[DataSourceV2ScanRelation],
< classOf[Table],
< classOf[Scan],
< classOf[Seq[AttributeReference]])
< .build()
<
< def createScanRelation(
< relation: DataSourceV2Relation,
< scan: Scan,
< outputAttrs: Seq[AttributeReference]): DataSourceV2ScanRelation = {
< if (Spark3VersionUtil.isSpark30) {
< scanRelationCtor.newInstance(relation.table, scan, outputAttrs)
< } else {
< scanRelationCtor.newInstance(relation, scan, outputAttrs)
< }
< }
<
< private val aliasCtor: DynConstructors.Ctor[Alias] =
< DynConstructors.builder()
< .impl(classOf[Alias],
< classOf[Expression],
< classOf[String],
< classOf[ExprId],
< classOf[Seq[String]],
< classOf[Option[Metadata]],
< classOf[Seq[String]])
< .impl(classOf[Alias],
< classOf[Expression],
< classOf[String],
< classOf[ExprId],
< classOf[Seq[String]],
< classOf[Option[Metadata]])
< .build()
<
< def createAlias(child: Expression, name: String): Alias = {
< aliasCtor.newInstance(child, name, NamedExpression.newExprId,
Seq.empty, None, Seq.empty)
< }
iceberg %
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]