This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 111fe81  [Spark] Support building against both Spark 3.0 and Spark 
3.1. (#2512)
111fe81 is described below

commit 111fe810c81eb2fea7d849ecc21daedd1c429355
Author: Wing Yew Poon <[email protected]>
AuthorDate: Thu Jun 24 08:16:41 2021 -0700

    [Spark] Support building against both Spark 3.0 and Spark 3.1. (#2512)
    
    Code changes that allow spark3 and spark3-extensions to be tested against
    both Spark 3.0 and Spark 3.1 while still built against a single Spark 3.0 
version.
    
    Although additional tests are are created we still only produce a single 
set of Spark3 binaries which
    are compatible with Spark 3.0 and 3.1
---
 build.gradle                                       | 98 ++++++++++++++++++++--
 .../java/org/apache/iceberg/util/DateTimeUtil.java |  8 ++
 .../extensions/IcebergSparkSessionExtensions.scala |  2 +-
 .../analysis/AlignRowLevelOperations.scala         |  5 +-
 .../analysis/AssignmentAlignmentSupport.scala      |  8 +-
 .../sql/catalyst/optimizer/RewriteDelete.scala     | 11 ++-
 .../sql/catalyst/optimizer/RewriteMergeInto.scala  | 15 ++--
 .../sql/catalyst/optimizer/RewriteUpdate.scala     | 12 ++-
 .../IcebergSparkSqlExtensionsParser.scala          | 18 +++-
 .../sql/catalyst/plans/logical/ReplaceData.scala   | 10 ++-
 .../utils/DistributionAndOrderingUtils.scala       | 53 +++++++++++-
 .../utils/RewriteRowLevelOperationHelper.scala     | 62 ++++++++++++--
 .../apache/iceberg/spark/Spark3VersionUtil.java    | 22 +++--
 .../spark/procedures/ExpireSnapshotsProcedure.java |  4 +-
 .../procedures/RemoveOrphanFilesProcedure.java     |  4 +-
 .../procedures/RollbackToTimestampProcedure.java   |  4 +-
 .../apache/iceberg/spark/sql/TestDeleteFrom.java   | 10 +++
 versions.props                                     |  1 -
 18 files changed, 293 insertions(+), 54 deletions(-)

diff --git a/build.gradle b/build.gradle
index df1c04d..d87eace 100644
--- a/build.gradle
+++ b/build.gradle
@@ -64,6 +64,10 @@ allprojects {
     mavenCentral()
     mavenLocal()
   }
+  project.ext {
+    Spark30Version = '3.0.1'
+    Spark31Version = '3.1.1'
+  }
 }
 
 subprojects {
@@ -977,6 +981,21 @@ if (jdkVersion == '8') {
 }
 
 project(':iceberg-spark3') {
+  sourceSets {
+    // Compile test source against Spark 3.1 and main classes compiled against 
Spark 3.0
+    spark31 {
+      java.srcDir "$projectDir/src/test/java"
+      resources.srcDir "$projectDir/src/test/resources"
+      compileClasspath += sourceSets.test.output + sourceSets.main.output
+      runtimeClasspath += sourceSets.test.output
+    }
+  }
+
+  configurations {
+    spark31Implementation.extendsFrom testImplementation
+    spark31RuntimeOnly.extendsFrom testRuntimeOnly
+  }
+
   dependencies {
     compile project(':iceberg-api')
     compile project(':iceberg-common')
@@ -989,7 +1008,7 @@ project(':iceberg-spark3') {
     compile project(':iceberg-spark')
 
     compileOnly "org.apache.avro:avro"
-    compileOnly("org.apache.spark:spark-hive_2.12") {
+    
compileOnly("org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}") {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.apache.arrow'
     }
@@ -1003,9 +1022,14 @@ project(':iceberg-spark3') {
     testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
     testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
     testCompile "org.xerial:sqlite-jdbc"
+
+    
spark31Implementation("org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}")
 {
+      exclude group: 'org.apache.avro', module: 'avro'
+      exclude group: 'org.apache.arrow'
+    }
   }
 
-  test {
+  tasks.withType(Test) {
     // For vectorized reads
     // Allow unsafe memory access to avoid the costly check arrow does to 
check if index is within bounds
     systemProperty("arrow.enable_unsafe_memory_access", "true")
@@ -1014,8 +1038,17 @@ project(':iceberg-spark3') {
     systemProperty("arrow.enable_null_check_for_get", "false")
 
     // Vectorized reads need more memory
-    maxHeapSize '2500m'
+    maxHeapSize '2560m'
+  }
+
+  task testSpark31(type: Test) {
+    dependsOn classes
+    description = "Test against Spark 3.1"
+    testClassesDirs = sourceSets.spark31.output.classesDirs
+    classpath = sourceSets.spark31.runtimeClasspath + sourceSets.main.output
   }
+
+  test.dependsOn testSpark31
 }
 
 project(":iceberg-spark3-extensions") {
@@ -1023,7 +1056,21 @@ project(":iceberg-spark3-extensions") {
   apply plugin: 'scala'
   apply plugin: 'antlr'
 
+  sourceSets {
+    // Compile test source against Spark 3.1 and main classes compiled against 
Spark 3.0
+    spark31 {
+      // Main source is in scala, but test source is only in java
+      java.srcDir "$projectDir/src/test/java"
+      resources.srcDir "$projectDir/src/test/resources"
+      compileClasspath += sourceSets.test.output + sourceSets.main.output
+      runtimeClasspath += sourceSets.test.output
+    }
+  }
+
   configurations {
+    spark31Implementation.extendsFrom testImplementation
+    spark31RuntimeOnly.extendsFrom testRuntimeOnly
+
     /*
      The Gradle Antlr plugin erroneously adds both antlr-build and runtime 
dependencies to the runtime path. This
      bug https://github.com/gradle/gradle/issues/820 exists because older 
versions of Antlr do not have separate
@@ -1037,10 +1084,9 @@ project(":iceberg-spark3-extensions") {
   }
 
   dependencies {
-    compileOnly project(':iceberg-spark3')
-
     compileOnly "org.scala-lang:scala-library"
-    compileOnly("org.apache.spark:spark-hive_2.12") {
+    compileOnly project(':iceberg-spark3')
+    
compileOnly("org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}") {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.apache.arrow'
     }
@@ -1050,6 +1096,11 @@ project(":iceberg-spark3-extensions") {
     testCompile project(path: ':iceberg-spark', configuration: 'testArtifacts')
     testCompile project(path: ':iceberg-spark3', configuration: 
'testArtifacts')
 
+    
spark31Implementation("org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}")
 {
+      exclude group: 'org.apache.avro', module: 'avro'
+      exclude group: 'org.apache.arrow'
+    }
+
     // Required because we remove antlr plugin dependencies from the compile 
configuration, see note above
     // We shade this in Spark3 Runtime to avoid issues with Spark's Antlr 
Runtime
     runtime "org.antlr:antlr4-runtime:4.7.1"
@@ -1060,6 +1111,15 @@ project(":iceberg-spark3-extensions") {
     maxHeapSize = "64m"
     arguments += ['-visitor', '-package', 
'org.apache.spark.sql.catalyst.parser.extensions']
   }
+
+  task testSpark31(type: Test) {
+    dependsOn classes
+    description = "Test against Spark 3.1"
+    testClassesDirs = sourceSets.spark31.output.classesDirs
+    classpath = sourceSets.spark31.runtimeClasspath + sourceSets.main.output
+  }
+
+  test.dependsOn testSpark31
 }
 
 project(':iceberg-spark3-runtime') {
@@ -1072,6 +1132,12 @@ project(':iceberg-spark3-runtime') {
       java.srcDir "$projectDir/src/integration/java"
       resources.srcDir "$projectDir/src/integration/resources"
     }
+    spark31 {
+      java.srcDir "$projectDir/src/integration/java"
+      resources.srcDir "$projectDir/src/integration/resources"
+      compileClasspath += sourceSets.integration.output
+      runtimeClasspath += sourceSets.integration.output
+    }
   }
 
   configurations {
@@ -1086,6 +1152,8 @@ project(':iceberg-spark3-runtime') {
       exclude group: 'javax.xml.bind'
       exclude group: 'javax.annotation'
     }
+    spark31Implementation.extendsFrom integrationImplementation
+    spark31CompileOnly.extendsFrom integrationCompileOnly
   }
 
   dependencies {
@@ -1096,7 +1164,7 @@ project(':iceberg-spark3-runtime') {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
 
-    integrationImplementation 'org.apache.spark:spark-hive_2.12'
+    integrationImplementation 
"org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}"
     integrationImplementation 'junit:junit'
     integrationImplementation 'org.slf4j:slf4j-simple'
     integrationImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
@@ -1107,6 +1175,8 @@ project(':iceberg-spark3-runtime') {
     // Not allowed on our classpath, only the runtime jar is allowed
     integrationCompileOnly project(':iceberg-spark3-extensions')
     integrationCompileOnly project(':iceberg-spark3')
+
+    spark31Implementation 
"org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}"
   }
 
   shadowJar {
@@ -1144,14 +1214,24 @@ project(':iceberg-spark3-runtime') {
   }
 
   task integrationTest(type: Test) {
-    description = "Test Spark3 Runtime Jar"
+    description = "Test Spark3 Runtime Jar against Spark 3.0"
     group = "verification"
     testClassesDirs = sourceSets.integration.output.classesDirs
     classpath = sourceSets.integration.runtimeClasspath + 
files(shadowJar.archiveFile.get().asFile.path)
     inputs.file(shadowJar.archiveFile.get().asFile.path)
   }
   integrationTest.dependsOn shadowJar
-  check.dependsOn integrationTest
+
+  task spark31IntegrationTest(type: Test) {
+    dependsOn classes
+    description = "Test Spark3 Runtime Jar against Spark 3.1"
+    group = "verification"
+    testClassesDirs = sourceSets.spark31.output.classesDirs
+    classpath = sourceSets.spark31.runtimeClasspath + 
files(shadowJar.archiveFile.get().asFile.path)
+  }
+  spark31IntegrationTest.dependsOn shadowJar
+
+  check.dependsOn integrationTest, spark31IntegrationTest
 
   jar {
     enabled = false
diff --git a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java 
b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
index 13a75db..2dcaa9f 100644
--- a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
@@ -33,6 +33,7 @@ public class DateTimeUtil {
 
   public static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
   public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+  public static final long MICROS_PER_MILLIS = 1000L;
 
   public static LocalDate dateFromDays(int daysFromEpoch) {
     return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch);
@@ -66,6 +67,13 @@ public class DateTimeUtil {
     return ChronoUnit.MICROS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
   }
 
+  public static long microsToMillis(long micros) {
+    // When the timestamp is negative, i.e before 1970, we need to adjust the 
milliseconds portion.
+    // Example - 1965-01-01 10:11:12.123456 is represented as 
(-157700927876544) in micro precision.
+    // In millis precision the above needs to be represented as 
(-157700927877).
+    return Math.floorDiv(micros, MICROS_PER_MILLIS);
+  }
+
   public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) {
     return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch);
   }
diff --git 
a/spark3-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
 
b/spark3-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
index 78b17d3..30b5df5 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
@@ -41,7 +41,7 @@ class IcebergSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
     // analyzer extensions
     extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
     extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
-    extensions.injectPostHocResolutionRule { spark => 
AlignRowLevelOperations(spark.sessionState.conf)}
+    extensions.injectPostHocResolutionRule { spark => AlignRowLevelOperations }
     extensions.injectCheckRule { _ => RowLevelOperationsPredicateCheck }
 
     // optimizer extensions
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignRowLevelOperations.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignRowLevelOperations.scala
index e6864b2..6da3ba6 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignRowLevelOperations.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignRowLevelOperations.scala
@@ -31,7 +31,10 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 import org.apache.spark.sql.internal.SQLConf
 
-case class AlignRowLevelOperations(conf: SQLConf) extends Rule[LogicalPlan] 
with AssignmentAlignmentSupport {
+case object AlignRowLevelOperations extends Rule[LogicalPlan]
+    with AssignmentAlignmentSupport with CastSupport {
+
+  override def conf: SQLConf = SQLConf.get
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case u: UpdateTable if u.resolved && isIcebergRelation(u.table)=>
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
index 673d287..c1140df 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
@@ -32,13 +32,17 @@ import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.expressions.NamedExpression
 import org.apache.spark.sql.catalyst.plans.logical.Assignment
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import 
org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper.createAlias
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.types.StructType
 import scala.collection.mutable
 
-trait AssignmentAlignmentSupport extends CastSupport {
+trait AssignmentAlignmentSupport {
+
+  def conf: SQLConf
 
   private case class ColumnUpdate(ref: Seq[String], expr: Expression)
 
@@ -96,7 +100,7 @@ trait AssignmentAlignmentSupport extends CastSupport {
             case StructType(fields) =>
               // build field expressions
               val fieldExprs = fields.zipWithIndex.map { case (field, ordinal) 
=>
-                Alias(GetStructField(col, ordinal, Some(field.name)), 
field.name)()
+                createAlias(GetStructField(col, ordinal, Some(field.name)), 
field.name)
               }
 
               // recursively apply this method on nested fields
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
index 32257ff..e5a8265 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions.EqualNullSafe
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.expressions.Not
-import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
 import org.apache.spark.sql.catalyst.plans.logical.Filter
@@ -38,6 +37,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
 import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
 import org.apache.spark.sql.catalyst.plans.logical.Sort
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils
 import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper
 import org.apache.spark.sql.connector.catalog.Table
@@ -52,6 +52,9 @@ case class RewriteDelete(spark: SparkSession) extends 
Rule[LogicalPlan] with Rew
 
   import ExtendedDataSourceV2Implicits._
   import RewriteRowLevelOperationHelper._
+  import DistributionAndOrderingUtils._
+
+  override def conf: SQLConf = SQLConf.get
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     // don't rewrite deletes that can be answered by passing filters to 
deleteWhere in SupportsDelete
@@ -66,7 +69,7 @@ case class RewriteDelete(spark: SparkSession) extends 
Rule[LogicalPlan] with Rew
       val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", 
writeInfo)
 
       val matchingRowsPlanBuilder = scanRelation => Filter(cond, scanRelation)
-      val scanPlan = buildDynamicFilterScanPlan(spark, r.table, r.output, 
mergeBuilder, cond, matchingRowsPlanBuilder)
+      val scanPlan = buildDynamicFilterScanPlan(spark, r, r.output, 
mergeBuilder, cond, matchingRowsPlanBuilder)
 
       val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true, 
BooleanType)))
       val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
@@ -91,11 +94,11 @@ case class RewriteDelete(spark: SparkSession) extends 
Rule[LogicalPlan] with Rew
         remainingRowsPlan
       case _ =>
         // apply hash partitioning by file if the distribution mode is hash or 
range
-        val numShufflePartitions = SQLConf.get.numShufflePartitions
+        val numShufflePartitions = conf.numShufflePartitions
         RepartitionByExpression(Seq(fileNameCol), remainingRowsPlan, 
numShufflePartitions)
     }
 
-    val order = Seq(SortOrder(fileNameCol, Ascending), SortOrder(rowPosCol, 
Ascending))
+    val order = Seq(createSortOrder(fileNameCol, Ascending), 
createSortOrder(rowPosCol, Ascending))
     val sort = Sort(order, global = false, planWithDistribution)
     Project(output, sort)
   }
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
index c30e603..a08adfb 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
@@ -55,9 +55,12 @@ import 
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implici
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.BooleanType
 
-case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] 
with RewriteRowLevelOperationHelper  {
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] 
with RewriteRowLevelOperationHelper {
   import ExtendedDataSourceV2Implicits._
   import RewriteMergeInto._
+  import RewriteRowLevelOperationHelper._
+
+  override def conf: SQLConf = SQLConf.get
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan transform {
@@ -79,7 +82,7 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
 
         val outputExprs = insertAction.assignments.map(_.value)
         val outputColNames = target.output.map(_.name)
-        val outputCols = outputExprs.zip(outputColNames).map { case (expr, 
name) => Alias(expr, name)() }
+        val outputCols = outputExprs.zip(outputColNames).map { case (expr, 
name) => createAlias(expr, name) }
         val mergePlan = Project(outputCols, joinPlan)
 
         val writePlan = buildWritePlan(mergePlan, target.table)
@@ -121,7 +124,7 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
 
         // when there are no not-matched actions, use a right outer join to 
ignore source rows that do not match, but
         // keep all unmatched target rows that must be preserved.
-        val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL, 
ROW_FROM_SOURCE)())
+        val sourceTableProj = source.output ++ Seq(createAlias(TRUE_LITERAL, 
ROW_FROM_SOURCE))
         val newSourceTableScan = Project(sourceTableProj, source)
         val targetTableScan = buildDynamicFilterTargetScan(mergeBuilder, 
target, source, cond, matchedActions)
         val joinPlan = Join(newSourceTableScan, targetTableScan, RightOuter, 
Some(cond), JoinHint.NONE)
@@ -151,10 +154,10 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
         val (matchedConditions, matchedOutputs) = 
rewriteMatchedActions(matchedActions, target.output)
 
         // use a full outer join because there are both matched and not 
matched actions
-        val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL, 
ROW_FROM_SOURCE)())
+        val sourceTableProj = source.output ++ Seq(createAlias(TRUE_LITERAL, 
ROW_FROM_SOURCE))
         val newSourceTableScan = Project(sourceTableProj, source)
         val targetTableScan = buildDynamicFilterTargetScan(mergeBuilder, 
target, source, cond, matchedActions)
-        val targetTableProj = targetTableScan.output ++ 
Seq(Alias(TRUE_LITERAL, ROW_FROM_TARGET)())
+        val targetTableProj = targetTableScan.output ++ 
Seq(createAlias(TRUE_LITERAL, ROW_FROM_TARGET))
         val newTargetTableScan = Project(targetTableProj, targetTableScan)
         val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, 
Some(cond), JoinHint.NONE)
 
@@ -202,7 +205,7 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
     val output = target.output
     val matchingRowsPlanBuilder = rel => Join(source, rel, Inner, Some(cond), 
JoinHint.NONE)
     val runCardinalityCheck = isCardinalityCheckEnabled(table) && 
isCardinalityCheckNeeded(matchedActions)
-    buildDynamicFilterScanPlan(spark, table, output, mergeBuilder, cond, 
matchingRowsPlanBuilder, runCardinalityCheck)
+    buildDynamicFilterScanPlan(spark, target, output, mergeBuilder, cond, 
matchingRowsPlanBuilder, runCardinalityCheck)
   }
 
   private def rewriteMatchedActions(
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
index fce7618..de5984c 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
@@ -40,11 +40,15 @@ import 
org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import 
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.BooleanType
 
 case class RewriteUpdate(spark: SparkSession) extends Rule[LogicalPlan] with 
RewriteRowLevelOperationHelper {
 
   import ExtendedDataSourceV2Implicits._
+  import RewriteRowLevelOperationHelper._
+
+  override def conf: SQLConf = SQLConf.get
 
   // TODO: can we do any better for no-op updates? when conditions evaluate to 
false/true?
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -59,7 +63,7 @@ case class RewriteUpdate(spark: SparkSession) extends 
Rule[LogicalPlan] with Rew
       // so the first job uses DynamicFileFilter and the second one uses the 
underlying scan plan
       // both jobs share the same SparkMergeScan instance to ensure they 
operate on same files
       val matchingRowsPlanBuilder = scanRelation => Filter(cond, scanRelation)
-      val scanPlan = buildDynamicFilterScanPlan(spark, r.table, r.output, 
mergeBuilder, cond, matchingRowsPlanBuilder)
+      val scanPlan = buildDynamicFilterScanPlan(spark, r, r.output, 
mergeBuilder, cond, matchingRowsPlanBuilder)
       val underlyingScanPlan = scanPlan match {
         case DynamicFileFilter(plan, _, _) => plan.clone()
         case _ => scanPlan.clone()
@@ -85,7 +89,7 @@ case class RewriteUpdate(spark: SparkSession) extends 
Rule[LogicalPlan] with Rew
       val mergeBuilder = r.table.asMergeable.newMergeBuilder("update", 
writeInfo)
 
       val matchingRowsPlanBuilder = scanRelation => Filter(cond, scanRelation)
-      val scanPlan = buildDynamicFilterScanPlan(spark, r.table, r.output, 
mergeBuilder, cond, matchingRowsPlanBuilder)
+      val scanPlan = buildDynamicFilterScanPlan(spark, r, r.output, 
mergeBuilder, cond, matchingRowsPlanBuilder)
 
       val updateProjection = buildUpdateProjection(r, scanPlan, assignments, 
cond)
 
@@ -110,10 +114,10 @@ case class RewriteUpdate(spark: SparkSession) extends 
Rule[LogicalPlan] with Rew
       if (attr.semanticEquals(assignedExpr)) {
         attr
       } else if (cond == Literal.TrueLiteral) {
-        Alias(assignedExpr, attr.name)()
+        createAlias(assignedExpr, attr.name)
       } else {
         val updatedExpr = If(cond, assignedExpr, attr)
-        Alias(updatedExpr, attr.name)()
+        createAlias(updatedExpr, attr.name)
       }
     }
 
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
index f11e30f..d301b75 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
@@ -25,6 +25,8 @@ import org.antlr.v4.runtime.atn.PredictionMode
 import org.antlr.v4.runtime.misc.Interval
 import org.antlr.v4.runtime.misc.ParseCancellationException
 import org.antlr.v4.runtime.tree.TerminalNodeImpl
+import org.apache.iceberg.common.DynConstructors
+import org.apache.iceberg.spark.Spark3VersionUtil
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -41,7 +43,9 @@ import org.apache.spark.sql.types.StructType
 
 class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends 
ParserInterface {
 
-  private lazy val substitutor = new VariableSubstitution(SQLConf.get)
+  import IcebergSparkSqlExtensionsParser._
+
+  private lazy val substitutor = substitutorCtor.newInstance(SQLConf.get)
   private lazy val astBuilder = new IcebergSqlExtensionsAstBuilder(delegate)
 
   /**
@@ -54,9 +58,7 @@ class IcebergSparkSqlExtensionsParser(delegate: 
ParserInterface) extends ParserI
   /**
    * Parse a string to a raw DataType without CHAR/VARCHAR replacement.
    */
-  override def parseRawDataType(sqlText: String): DataType = {
-    delegate.parseRawDataType(sqlText)
-  }
+  def parseRawDataType(sqlText: String): DataType = throw new 
UnsupportedOperationException()
 
   /**
    * Parse a string to an Expression.
@@ -161,6 +163,14 @@ class IcebergSparkSqlExtensionsParser(delegate: 
ParserInterface) extends ParserI
   }
 }
 
+object IcebergSparkSqlExtensionsParser {
+  private val substitutorCtor: DynConstructors.Ctor[VariableSubstitution] =
+    DynConstructors.builder()
+      .impl(classOf[VariableSubstitution])
+      .impl(classOf[VariableSubstitution], classOf[SQLConf])
+      .build()
+}
+
 /* Copied from Apache Spark's to avoid dependency on Spark Internals */
 class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
   override def consume(): Unit = wrapped.consume
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
index 2535e70..16fd559 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
@@ -25,4 +25,12 @@ import org.apache.spark.sql.connector.write.BatchWrite
 case class ReplaceData(
     table: NamedRelation,
     write: BatchWrite,
-    query: LogicalPlan) extends V2WriteCommand
+    query: LogicalPlan) extends V2WriteCommand {
+
+  def isByName: Boolean = false
+
+  def withNewQuery(newQuery: LogicalPlan): ReplaceData = copy(query = newQuery)
+
+  def withNewTable(newTable: NamedRelation): ReplaceData = copy(table = 
newTable)
+
+}
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
index ad46730..bcef977 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
@@ -19,6 +19,8 @@
 
 package org.apache.spark.sql.catalyst.utils
 
+import org.apache.iceberg.common.DynConstructors
+import org.apache.iceberg.spark.Spark3VersionUtil
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst
 import org.apache.spark.sql.catalyst.analysis.Resolver
@@ -58,6 +60,18 @@ import org.apache.spark.sql.types.IntegerType
 
 object DistributionAndOrderingUtils {
 
+  private val repartitionByExpressionCtor: 
DynConstructors.Ctor[RepartitionByExpression] =
+    DynConstructors.builder()
+      .impl(classOf[RepartitionByExpression],
+        classOf[Seq[catalyst.expressions.Expression]],
+        classOf[LogicalPlan],
+        classOf[Option[Int]])
+      .impl(classOf[RepartitionByExpression],
+        classOf[Seq[catalyst.expressions.Expression]],
+        classOf[LogicalPlan],
+        Integer.TYPE)
+      .build()
+
   def prepareQuery(
       requiredDistribution: Distribution,
       requiredOrdering: Seq[SortOrder],
@@ -80,7 +94,11 @@ object DistributionAndOrderingUtils {
       // the conversion to catalyst expressions above produces SortOrder 
expressions
       // for OrderedDistribution and generic expressions for 
ClusteredDistribution
       // this allows RepartitionByExpression to pick either range or hash 
partitioning
-      RepartitionByExpression(distribution, query, numShufflePartitions)
+      if (Spark3VersionUtil.isSpark30) {
+        repartitionByExpressionCtor.newInstance(distribution.toSeq, query, 
Integer.valueOf(numShufflePartitions))
+      } else {
+        repartitionByExpressionCtor.newInstance(distribution.toSeq, query, 
Some(numShufflePartitions))
+      }
     } else {
       query
     }
@@ -98,6 +116,37 @@ object DistributionAndOrderingUtils {
     queryWithDistributionAndOrdering
   }
 
+  private val sortOrderCtor: 
DynConstructors.Ctor[catalyst.expressions.SortOrder] =
+    DynConstructors.builder()
+      .impl(classOf[catalyst.expressions.SortOrder],
+        classOf[catalyst.expressions.Expression],
+        classOf[catalyst.expressions.SortDirection],
+        classOf[catalyst.expressions.NullOrdering],
+        classOf[Seq[catalyst.expressions.Expression]])
+      .impl(classOf[catalyst.expressions.SortOrder],
+        classOf[catalyst.expressions.Expression],
+        classOf[catalyst.expressions.SortDirection],
+        classOf[catalyst.expressions.NullOrdering],
+        classOf[Set[catalyst.expressions.Expression]])
+      .build()
+
+  def createSortOrder(
+      child: catalyst.expressions.Expression,
+      direction: catalyst.expressions.SortDirection): 
catalyst.expressions.SortOrder = {
+    createSortOrder(child, direction, direction.defaultNullOrdering)
+  }
+
+  def createSortOrder(
+      child: catalyst.expressions.Expression,
+      direction: catalyst.expressions.SortDirection,
+      nullOrdering: catalyst.expressions.NullOrdering): 
catalyst.expressions.SortOrder = {
+    if (Spark3VersionUtil.isSpark30) {
+      sortOrderCtor.newInstance(child, direction, nullOrdering, Set.empty)
+    } else {
+      sortOrderCtor.newInstance(child, direction, nullOrdering, Seq.empty)
+    }
+  }
+
   private def toCatalyst(
       expr: Expression,
       query: LogicalPlan,
@@ -118,7 +167,7 @@ object DistributionAndOrderingUtils {
     expr match {
       case s: SortOrder =>
         val catalystChild = toCatalyst(s.expression(), query, resolver)
-        catalyst.expressions.SortOrder(catalystChild, toCatalyst(s.direction), 
toCatalyst(s.nullOrdering), Set.empty)
+        createSortOrder(catalystChild, toCatalyst(s.direction), 
toCatalyst(s.nullOrdering))
       case it: IdentityTransform =>
         resolve(it.ref.fieldNames)
       case BucketTransform(numBuckets, ref) =>
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
index 19a950a..910e9ce 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
@@ -19,7 +19,9 @@
 package org.apache.spark.sql.catalyst.utils
 
 import java.util.UUID
+import org.apache.iceberg.common.DynConstructors
 import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3VersionUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.SparkSession
@@ -31,8 +33,10 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.expressions.AttributeSet
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.ExprId
 import org.apache.spark.sql.catalyst.expressions.GreaterThan
 import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.expressions.aggregate.Complete
@@ -48,6 +52,7 @@ import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.iceberg.distributions.OrderedDistribution
 import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter
 import org.apache.spark.sql.connector.iceberg.write.MergeBuilder
+import org.apache.spark.sql.connector.read.Scan
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.LogicalWriteInfo
 import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl
@@ -59,6 +64,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implici
 import org.apache.spark.sql.execution.datasources.v2.PushDownUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.Metadata
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -69,7 +75,7 @@ trait RewriteRowLevelOperationHelper extends PredicateHelper 
with Logging {
   import ExtendedDataSourceV2Implicits.ScanBuilderHelper
 
   protected def spark: SparkSession
-  protected lazy val conf: SQLConf = spark.sessionState.conf
+  def conf: SQLConf
   protected lazy val resolver: Resolver = conf.resolver
 
   protected def buildSimpleScanPlan(
@@ -83,14 +89,14 @@ trait RewriteRowLevelOperationHelper extends 
PredicateHelper with Logging {
     val scan = scanBuilder.asIceberg.withMetadataColumns(FILE_NAME_COL, 
ROW_POS_COL).build()
     val outputAttrs = toOutputAttrs(scan.readSchema(), relation.output)
     val predicates = extractFilters(cond, 
relation.output).reduceLeftOption(And)
-    val scanRelation = DataSourceV2ScanRelation(relation.table, scan, 
outputAttrs)
+    val scanRelation = createScanRelation(relation, scan, outputAttrs)
 
     predicates.map(Filter(_, scanRelation)).getOrElse(scanRelation)
   }
 
   protected def buildDynamicFilterScanPlan(
       spark: SparkSession,
-      table: Table,
+      relation: DataSourceV2Relation,
       tableAttrs: Seq[AttributeReference],
       mergeBuilder: MergeBuilder,
       cond: Expression,
@@ -103,7 +109,7 @@ trait RewriteRowLevelOperationHelper extends 
PredicateHelper with Logging {
 
     val scan = scanBuilder.asIceberg.withMetadataColumns(FILE_NAME_COL, 
ROW_POS_COL).build()
     val outputAttrs = toOutputAttrs(scan.readSchema(), tableAttrs)
-    val scanRelation = DataSourceV2ScanRelation(table, scan, outputAttrs)
+    val scanRelation = createScanRelation(relation, scan, outputAttrs)
 
     scan match {
       case filterable: SupportsFileFilter if runCardinalityCheck =>
@@ -171,11 +177,11 @@ trait RewriteRowLevelOperationHelper extends 
PredicateHelper with Logging {
       prunedTargetPlan: LogicalPlan): LogicalPlan = {
     val fileAttr = findOutputAttr(prunedTargetPlan.output, FILE_NAME_COL)
     val rowPosAttr = findOutputAttr(prunedTargetPlan.output, ROW_POS_COL)
-    val accumulatorExpr = Alias(AccumulateFiles(filesAccumulator, fileAttr), 
AFFECTED_FILES_ACC_ALIAS_NAME)()
+    val accumulatorExpr = createAlias(AccumulateFiles(filesAccumulator, 
fileAttr), AFFECTED_FILES_ACC_ALIAS_NAME)
     val projectList = Seq(fileAttr, rowPosAttr, accumulatorExpr)
     val projectPlan = Project(projectList, prunedTargetPlan)
     val affectedFilesAttr = findOutputAttr(projectPlan.output, 
AFFECTED_FILES_ACC_ALIAS_NAME)
-    val aggSumCol = Alias(AggregateExpression(Sum(affectedFilesAttr), 
Complete, false), SUM_ROW_ID_ALIAS_NAME)()
+    val aggSumCol = createAlias(AggregateExpression(Sum(affectedFilesAttr), 
Complete, false), SUM_ROW_ID_ALIAS_NAME)
     // Group by the rows by row id while collecting the files that need to be 
over written via accumulator.
     val aggPlan = Aggregate(Seq(fileAttr, rowPosAttr), Seq(aggSumCol), 
projectPlan)
     val sumAttr = findOutputAttr(aggPlan.output, SUM_ROW_ID_ALIAS_NAME)
@@ -229,4 +235,48 @@ object RewriteRowLevelOperationHelper {
   private final val AFFECTED_FILES_ACC_NAME = 
"internal.metrics.merge.affectedFiles"
   private final val AFFECTED_FILES_ACC_ALIAS_NAME = "_affectedFiles_"
   private final val SUM_ROW_ID_ALIAS_NAME = "_sum_"
+
+  private val scanRelationCtor: DynConstructors.Ctor[DataSourceV2ScanRelation] 
=
+    DynConstructors.builder()
+      .impl(classOf[DataSourceV2ScanRelation],
+        classOf[DataSourceV2Relation],
+        classOf[Scan],
+        classOf[Seq[AttributeReference]])
+      .impl(classOf[DataSourceV2ScanRelation],
+        classOf[Table],
+        classOf[Scan],
+        classOf[Seq[AttributeReference]])
+      .build()
+
+  def createScanRelation(
+      relation: DataSourceV2Relation,
+      scan: Scan,
+      outputAttrs: Seq[AttributeReference]): DataSourceV2ScanRelation = {
+    if (Spark3VersionUtil.isSpark30) {
+      scanRelationCtor.newInstance(relation.table, scan, outputAttrs)
+    } else {
+      scanRelationCtor.newInstance(relation, scan, outputAttrs)
+    }
+  }
+
+  private val aliasCtor: DynConstructors.Ctor[Alias] =
+    DynConstructors.builder()
+      .impl(classOf[Alias],
+        classOf[Expression],
+        classOf[String],
+        classOf[ExprId],
+        classOf[Seq[String]],
+        classOf[Option[Metadata]],
+        classOf[Seq[String]])
+      .impl(classOf[Alias],
+        classOf[Expression],
+        classOf[String],
+        classOf[ExprId],
+        classOf[Seq[String]],
+        classOf[Option[Metadata]])
+      .build()
+
+  def createAlias(child: Expression, name: String): Alias = {
+    aliasCtor.newInstance(child, name, NamedExpression.newExprId, Seq.empty, 
None, Seq.empty)
+  }
 }
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
 b/spark3/src/main/java/org/apache/iceberg/spark/Spark3VersionUtil.java
similarity index 69%
copy from 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
copy to spark3/src/main/java/org/apache/iceberg/spark/Spark3VersionUtil.java
index 2535e70..984c66d 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
+++ b/spark3/src/main/java/org/apache/iceberg/spark/Spark3VersionUtil.java
@@ -17,12 +17,20 @@
  * under the License.
  */
 
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.iceberg.spark;
 
-import org.apache.spark.sql.catalyst.analysis.NamedRelation
-import org.apache.spark.sql.connector.write.BatchWrite
+import org.apache.spark.package$;
 
-case class ReplaceData(
-    table: NamedRelation,
-    write: BatchWrite,
-    query: LogicalPlan) extends V2WriteCommand
+public class Spark3VersionUtil {
+
+  private Spark3VersionUtil() {
+  }
+
+  public static boolean isSpark30() {
+    return package$.MODULE$.SPARK_VERSION_SHORT().startsWith("3.0");
+  }
+
+  public static boolean isSpark31() {
+    return package$.MODULE$.SPARK_VERSION_SHORT().startsWith("3.1");
+  }
+}
diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
 
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index ba1e6b0..88803f0 100644
--- 
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ 
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -22,8 +22,8 @@ package org.apache.iceberg.spark.procedures;
 import org.apache.iceberg.actions.Actions;
 import org.apache.iceberg.actions.ExpireSnapshots;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
@@ -77,7 +77,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
   @Override
   public InternalRow[] call(InternalRow args) {
     Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    Long olderThanMillis = args.isNullAt(1) ? null : 
DateTimeUtils.toMillis(args.getLong(1));
+    Long olderThanMillis = args.isNullAt(1) ? null : 
DateTimeUtil.microsToMillis(args.getLong(1));
     Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
 
     return modifyIcebergTable(tableIdent, table -> {
diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
 
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index eea4951..f1dcf31 100644
--- 
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ 
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -24,8 +24,8 @@ import org.apache.iceberg.actions.Actions;
 import org.apache.iceberg.actions.RemoveOrphanFiles;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
@@ -79,7 +79,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure 
{
   @Override
   public InternalRow[] call(InternalRow args) {
     Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    Long olderThanMillis = args.isNullAt(1) ? null : 
DateTimeUtils.toMillis(args.getLong(1));
+    Long olderThanMillis = args.isNullAt(1) ? null : 
DateTimeUtil.microsToMillis(args.getLong(1));
     String location = args.isNullAt(2) ? null : args.getString(2);
     boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
 
diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
 
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
index 3e38c3e..94e8949 100644
--- 
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
+++ 
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
@@ -21,8 +21,8 @@ package org.apache.iceberg.spark.procedures;
 
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
@@ -77,7 +77,7 @@ class RollbackToTimestampProcedure extends BaseProcedure {
   public InternalRow[] call(InternalRow args) {
     Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
     // timestamps in Spark have nanosecond precision so this conversion is 
lossy
-    long timestampMillis = DateTimeUtils.toMillis(args.getLong(1));
+    long timestampMillis = DateTimeUtil.microsToMillis(args.getLong(1));
 
     return modifyIcebergTable(tableIdent, table -> {
       Snapshot previousSnapshot = table.currentSnapshot();
diff --git 
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
index 43b3bf9..eb852fb 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
@@ -24,9 +24,11 @@ import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.Spark3VersionUtil;
 import org.apache.iceberg.spark.SparkCatalogTestBase;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
 public class TestDeleteFrom extends SparkCatalogTestBase {
@@ -41,6 +43,10 @@ public class TestDeleteFrom extends SparkCatalogTestBase {
 
   @Test
   public void testDeleteFromUnpartitionedTable() {
+    // This test fails in Spark 3.1. `canDeleteWhere` was added to 
`SupportsDelete` in Spark 3.1,
+    // but logic to rewrite the query if `canDeleteWhere` returns false was 
left to be implemented
+    // later.
+    Assume.assumeTrue(Spark3VersionUtil.isSpark30());
     // set the shuffle partitions to 1 to force the write to use a single task 
and produce 1 file
     String originalParallelism = 
spark.conf().get("spark.sql.shuffle.partitions");
     spark.conf().set("spark.sql.shuffle.partitions", "1");
@@ -68,6 +74,10 @@ public class TestDeleteFrom extends SparkCatalogTestBase {
 
   @Test
   public void testDeleteFromPartitionedTable() {
+    // This test fails in Spark 3.1. `canDeleteWhere` was added to 
`SupportsDelete` in Spark 3.1,
+    // but logic to rewrite the query if `canDeleteWhere` returns false was 
left to be implemented
+    // later.
+    Assume.assumeTrue(Spark3VersionUtil.isSpark30());
     // set the shuffle partitions to 1 to force the write to use a single task 
and produce 1 file per partition
     String originalParallelism = 
spark.conf().get("spark.sql.shuffle.partitions");
     spark.conf().set("spark.sql.shuffle.partitions", "1");
diff --git a/versions.props b/versions.props
index 5e890f4..1b2c5f4 100644
--- a/versions.props
+++ b/versions.props
@@ -9,7 +9,6 @@ org.apache.orc:* = 1.6.8
 org.apache.parquet:* = 1.12.0
 org.apache.spark:spark-hive_2.11 = 2.4.7
 org.apache.spark:spark-avro_2.11 = 2.4.7
-org.apache.spark:spark-hive_2.12 = 3.0.1
 org.apache.pig:pig = 0.14.0
 com.fasterxml.jackson.*:* = 2.11.4
 com.google.guava:guava = 28.0-jre

Reply via email to