This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 111fe81 [Spark] Support building against both Spark 3.0 and Spark
3.1. (#2512)
111fe81 is described below
commit 111fe810c81eb2fea7d849ecc21daedd1c429355
Author: Wing Yew Poon <[email protected]>
AuthorDate: Thu Jun 24 08:16:41 2021 -0700
[Spark] Support building against both Spark 3.0 and Spark 3.1. (#2512)
Code changes that allow spark3 and spark3-extensions to be tested against
both Spark 3.0 and Spark 3.1 while still built against a single Spark 3.0
version.
Although additional tests are are created we still only produce a single
set of Spark3 binaries which
are compatible with Spark 3.0 and 3.1
---
build.gradle | 98 ++++++++++++++++++++--
.../java/org/apache/iceberg/util/DateTimeUtil.java | 8 ++
.../extensions/IcebergSparkSessionExtensions.scala | 2 +-
.../analysis/AlignRowLevelOperations.scala | 5 +-
.../analysis/AssignmentAlignmentSupport.scala | 8 +-
.../sql/catalyst/optimizer/RewriteDelete.scala | 11 ++-
.../sql/catalyst/optimizer/RewriteMergeInto.scala | 15 ++--
.../sql/catalyst/optimizer/RewriteUpdate.scala | 12 ++-
.../IcebergSparkSqlExtensionsParser.scala | 18 +++-
.../sql/catalyst/plans/logical/ReplaceData.scala | 10 ++-
.../utils/DistributionAndOrderingUtils.scala | 53 +++++++++++-
.../utils/RewriteRowLevelOperationHelper.scala | 62 ++++++++++++--
.../apache/iceberg/spark/Spark3VersionUtil.java | 22 +++--
.../spark/procedures/ExpireSnapshotsProcedure.java | 4 +-
.../procedures/RemoveOrphanFilesProcedure.java | 4 +-
.../procedures/RollbackToTimestampProcedure.java | 4 +-
.../apache/iceberg/spark/sql/TestDeleteFrom.java | 10 +++
versions.props | 1 -
18 files changed, 293 insertions(+), 54 deletions(-)
diff --git a/build.gradle b/build.gradle
index df1c04d..d87eace 100644
--- a/build.gradle
+++ b/build.gradle
@@ -64,6 +64,10 @@ allprojects {
mavenCentral()
mavenLocal()
}
+ project.ext {
+ Spark30Version = '3.0.1'
+ Spark31Version = '3.1.1'
+ }
}
subprojects {
@@ -977,6 +981,21 @@ if (jdkVersion == '8') {
}
project(':iceberg-spark3') {
+ sourceSets {
+ // Compile test source against Spark 3.1 and main classes compiled against
Spark 3.0
+ spark31 {
+ java.srcDir "$projectDir/src/test/java"
+ resources.srcDir "$projectDir/src/test/resources"
+ compileClasspath += sourceSets.test.output + sourceSets.main.output
+ runtimeClasspath += sourceSets.test.output
+ }
+ }
+
+ configurations {
+ spark31Implementation.extendsFrom testImplementation
+ spark31RuntimeOnly.extendsFrom testRuntimeOnly
+ }
+
dependencies {
compile project(':iceberg-api')
compile project(':iceberg-common')
@@ -989,7 +1008,7 @@ project(':iceberg-spark3') {
compile project(':iceberg-spark')
compileOnly "org.apache.avro:avro"
- compileOnly("org.apache.spark:spark-hive_2.12") {
+
compileOnly("org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
}
@@ -1003,9 +1022,14 @@ project(':iceberg-spark3') {
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
testCompile "org.xerial:sqlite-jdbc"
+
+
spark31Implementation("org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}")
{
+ exclude group: 'org.apache.avro', module: 'avro'
+ exclude group: 'org.apache.arrow'
+ }
}
- test {
+ tasks.withType(Test) {
// For vectorized reads
// Allow unsafe memory access to avoid the costly check arrow does to
check if index is within bounds
systemProperty("arrow.enable_unsafe_memory_access", "true")
@@ -1014,8 +1038,17 @@ project(':iceberg-spark3') {
systemProperty("arrow.enable_null_check_for_get", "false")
// Vectorized reads need more memory
- maxHeapSize '2500m'
+ maxHeapSize '2560m'
+ }
+
+ task testSpark31(type: Test) {
+ dependsOn classes
+ description = "Test against Spark 3.1"
+ testClassesDirs = sourceSets.spark31.output.classesDirs
+ classpath = sourceSets.spark31.runtimeClasspath + sourceSets.main.output
}
+
+ test.dependsOn testSpark31
}
project(":iceberg-spark3-extensions") {
@@ -1023,7 +1056,21 @@ project(":iceberg-spark3-extensions") {
apply plugin: 'scala'
apply plugin: 'antlr'
+ sourceSets {
+ // Compile test source against Spark 3.1 and main classes compiled against
Spark 3.0
+ spark31 {
+ // Main source is in scala, but test source is only in java
+ java.srcDir "$projectDir/src/test/java"
+ resources.srcDir "$projectDir/src/test/resources"
+ compileClasspath += sourceSets.test.output + sourceSets.main.output
+ runtimeClasspath += sourceSets.test.output
+ }
+ }
+
configurations {
+ spark31Implementation.extendsFrom testImplementation
+ spark31RuntimeOnly.extendsFrom testRuntimeOnly
+
/*
The Gradle Antlr plugin erroneously adds both antlr-build and runtime
dependencies to the runtime path. This
bug https://github.com/gradle/gradle/issues/820 exists because older
versions of Antlr do not have separate
@@ -1037,10 +1084,9 @@ project(":iceberg-spark3-extensions") {
}
dependencies {
- compileOnly project(':iceberg-spark3')
-
compileOnly "org.scala-lang:scala-library"
- compileOnly("org.apache.spark:spark-hive_2.12") {
+ compileOnly project(':iceberg-spark3')
+
compileOnly("org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
}
@@ -1050,6 +1096,11 @@ project(":iceberg-spark3-extensions") {
testCompile project(path: ':iceberg-spark', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-spark3', configuration:
'testArtifacts')
+
spark31Implementation("org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}")
{
+ exclude group: 'org.apache.avro', module: 'avro'
+ exclude group: 'org.apache.arrow'
+ }
+
// Required because we remove antlr plugin dependencies from the compile
configuration, see note above
// We shade this in Spark3 Runtime to avoid issues with Spark's Antlr
Runtime
runtime "org.antlr:antlr4-runtime:4.7.1"
@@ -1060,6 +1111,15 @@ project(":iceberg-spark3-extensions") {
maxHeapSize = "64m"
arguments += ['-visitor', '-package',
'org.apache.spark.sql.catalyst.parser.extensions']
}
+
+ task testSpark31(type: Test) {
+ dependsOn classes
+ description = "Test against Spark 3.1"
+ testClassesDirs = sourceSets.spark31.output.classesDirs
+ classpath = sourceSets.spark31.runtimeClasspath + sourceSets.main.output
+ }
+
+ test.dependsOn testSpark31
}
project(':iceberg-spark3-runtime') {
@@ -1072,6 +1132,12 @@ project(':iceberg-spark3-runtime') {
java.srcDir "$projectDir/src/integration/java"
resources.srcDir "$projectDir/src/integration/resources"
}
+ spark31 {
+ java.srcDir "$projectDir/src/integration/java"
+ resources.srcDir "$projectDir/src/integration/resources"
+ compileClasspath += sourceSets.integration.output
+ runtimeClasspath += sourceSets.integration.output
+ }
}
configurations {
@@ -1086,6 +1152,8 @@ project(':iceberg-spark3-runtime') {
exclude group: 'javax.xml.bind'
exclude group: 'javax.annotation'
}
+ spark31Implementation.extendsFrom integrationImplementation
+ spark31CompileOnly.extendsFrom integrationCompileOnly
}
dependencies {
@@ -1096,7 +1164,7 @@ project(':iceberg-spark3-runtime') {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
- integrationImplementation 'org.apache.spark:spark-hive_2.12'
+ integrationImplementation
"org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}"
integrationImplementation 'junit:junit'
integrationImplementation 'org.slf4j:slf4j-simple'
integrationImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
@@ -1107,6 +1175,8 @@ project(':iceberg-spark3-runtime') {
// Not allowed on our classpath, only the runtime jar is allowed
integrationCompileOnly project(':iceberg-spark3-extensions')
integrationCompileOnly project(':iceberg-spark3')
+
+ spark31Implementation
"org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}"
}
shadowJar {
@@ -1144,14 +1214,24 @@ project(':iceberg-spark3-runtime') {
}
task integrationTest(type: Test) {
- description = "Test Spark3 Runtime Jar"
+ description = "Test Spark3 Runtime Jar against Spark 3.0"
group = "verification"
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath +
files(shadowJar.archiveFile.get().asFile.path)
inputs.file(shadowJar.archiveFile.get().asFile.path)
}
integrationTest.dependsOn shadowJar
- check.dependsOn integrationTest
+
+ task spark31IntegrationTest(type: Test) {
+ dependsOn classes
+ description = "Test Spark3 Runtime Jar against Spark 3.1"
+ group = "verification"
+ testClassesDirs = sourceSets.spark31.output.classesDirs
+ classpath = sourceSets.spark31.runtimeClasspath +
files(shadowJar.archiveFile.get().asFile.path)
+ }
+ spark31IntegrationTest.dependsOn shadowJar
+
+ check.dependsOn integrationTest, spark31IntegrationTest
jar {
enabled = false
diff --git a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
index 13a75db..2dcaa9f 100644
--- a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
@@ -33,6 +33,7 @@ public class DateTimeUtil {
public static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+ public static final long MICROS_PER_MILLIS = 1000L;
public static LocalDate dateFromDays(int daysFromEpoch) {
return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch);
@@ -66,6 +67,13 @@ public class DateTimeUtil {
return ChronoUnit.MICROS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
}
+ public static long microsToMillis(long micros) {
+ // When the timestamp is negative, i.e before 1970, we need to adjust the
milliseconds portion.
+ // Example - 1965-01-01 10:11:12.123456 is represented as
(-157700927876544) in micro precision.
+ // In millis precision the above needs to be represented as
(-157700927877).
+ return Math.floorDiv(micros, MICROS_PER_MILLIS);
+ }
+
public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) {
return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch);
}
diff --git
a/spark3-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
b/spark3-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
index 78b17d3..30b5df5 100644
---
a/spark3-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
+++
b/spark3-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
@@ -41,7 +41,7 @@ class IcebergSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
// analyzer extensions
extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
- extensions.injectPostHocResolutionRule { spark =>
AlignRowLevelOperations(spark.sessionState.conf)}
+ extensions.injectPostHocResolutionRule { spark => AlignRowLevelOperations }
extensions.injectCheckRule { _ => RowLevelOperationsPredicateCheck }
// optimizer extensions
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignRowLevelOperations.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignRowLevelOperations.scala
index e6864b2..6da3ba6 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignRowLevelOperations.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignRowLevelOperations.scala
@@ -31,7 +31,10 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
import org.apache.spark.sql.internal.SQLConf
-case class AlignRowLevelOperations(conf: SQLConf) extends Rule[LogicalPlan]
with AssignmentAlignmentSupport {
+case object AlignRowLevelOperations extends Rule[LogicalPlan]
+ with AssignmentAlignmentSupport with CastSupport {
+
+ override def conf: SQLConf = SQLConf.get
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UpdateTable if u.resolved && isIcebergRelation(u.table)=>
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
index 673d287..c1140df 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentAlignmentSupport.scala
@@ -32,13 +32,17 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.Assignment
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import
org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper.createAlias
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import scala.collection.mutable
-trait AssignmentAlignmentSupport extends CastSupport {
+trait AssignmentAlignmentSupport {
+
+ def conf: SQLConf
private case class ColumnUpdate(ref: Seq[String], expr: Expression)
@@ -96,7 +100,7 @@ trait AssignmentAlignmentSupport extends CastSupport {
case StructType(fields) =>
// build field expressions
val fieldExprs = fields.zipWithIndex.map { case (field, ordinal)
=>
- Alias(GetStructField(col, ordinal, Some(field.name)),
field.name)()
+ createAlias(GetStructField(col, ordinal, Some(field.name)),
field.name)
}
// recursively apply this method on nested fields
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
index 32257ff..e5a8265 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions.EqualNullSafe
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.Not
-import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
import org.apache.spark.sql.catalyst.plans.logical.Filter
@@ -38,6 +37,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
import org.apache.spark.sql.catalyst.plans.logical.Sort
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils
import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper
import org.apache.spark.sql.connector.catalog.Table
@@ -52,6 +52,9 @@ case class RewriteDelete(spark: SparkSession) extends
Rule[LogicalPlan] with Rew
import ExtendedDataSourceV2Implicits._
import RewriteRowLevelOperationHelper._
+ import DistributionAndOrderingUtils._
+
+ override def conf: SQLConf = SQLConf.get
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// don't rewrite deletes that can be answered by passing filters to
deleteWhere in SupportsDelete
@@ -66,7 +69,7 @@ case class RewriteDelete(spark: SparkSession) extends
Rule[LogicalPlan] with Rew
val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete",
writeInfo)
val matchingRowsPlanBuilder = scanRelation => Filter(cond, scanRelation)
- val scanPlan = buildDynamicFilterScanPlan(spark, r.table, r.output,
mergeBuilder, cond, matchingRowsPlanBuilder)
+ val scanPlan = buildDynamicFilterScanPlan(spark, r, r.output,
mergeBuilder, cond, matchingRowsPlanBuilder)
val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true,
BooleanType)))
val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
@@ -91,11 +94,11 @@ case class RewriteDelete(spark: SparkSession) extends
Rule[LogicalPlan] with Rew
remainingRowsPlan
case _ =>
// apply hash partitioning by file if the distribution mode is hash or
range
- val numShufflePartitions = SQLConf.get.numShufflePartitions
+ val numShufflePartitions = conf.numShufflePartitions
RepartitionByExpression(Seq(fileNameCol), remainingRowsPlan,
numShufflePartitions)
}
- val order = Seq(SortOrder(fileNameCol, Ascending), SortOrder(rowPosCol,
Ascending))
+ val order = Seq(createSortOrder(fileNameCol, Ascending),
createSortOrder(rowPosCol, Ascending))
val sort = Sort(order, global = false, planWithDistribution)
Project(output, sort)
}
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
index c30e603..a08adfb 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
@@ -55,9 +55,12 @@ import
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implici
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.BooleanType
-case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan]
with RewriteRowLevelOperationHelper {
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan]
with RewriteRowLevelOperationHelper {
import ExtendedDataSourceV2Implicits._
import RewriteMergeInto._
+ import RewriteRowLevelOperationHelper._
+
+ override def conf: SQLConf = SQLConf.get
override def apply(plan: LogicalPlan): LogicalPlan = {
plan transform {
@@ -79,7 +82,7 @@ case class RewriteMergeInto(spark: SparkSession) extends
Rule[LogicalPlan] with
val outputExprs = insertAction.assignments.map(_.value)
val outputColNames = target.output.map(_.name)
- val outputCols = outputExprs.zip(outputColNames).map { case (expr,
name) => Alias(expr, name)() }
+ val outputCols = outputExprs.zip(outputColNames).map { case (expr,
name) => createAlias(expr, name) }
val mergePlan = Project(outputCols, joinPlan)
val writePlan = buildWritePlan(mergePlan, target.table)
@@ -121,7 +124,7 @@ case class RewriteMergeInto(spark: SparkSession) extends
Rule[LogicalPlan] with
// when there are no not-matched actions, use a right outer join to
ignore source rows that do not match, but
// keep all unmatched target rows that must be preserved.
- val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL,
ROW_FROM_SOURCE)())
+ val sourceTableProj = source.output ++ Seq(createAlias(TRUE_LITERAL,
ROW_FROM_SOURCE))
val newSourceTableScan = Project(sourceTableProj, source)
val targetTableScan = buildDynamicFilterTargetScan(mergeBuilder,
target, source, cond, matchedActions)
val joinPlan = Join(newSourceTableScan, targetTableScan, RightOuter,
Some(cond), JoinHint.NONE)
@@ -151,10 +154,10 @@ case class RewriteMergeInto(spark: SparkSession) extends
Rule[LogicalPlan] with
val (matchedConditions, matchedOutputs) =
rewriteMatchedActions(matchedActions, target.output)
// use a full outer join because there are both matched and not
matched actions
- val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL,
ROW_FROM_SOURCE)())
+ val sourceTableProj = source.output ++ Seq(createAlias(TRUE_LITERAL,
ROW_FROM_SOURCE))
val newSourceTableScan = Project(sourceTableProj, source)
val targetTableScan = buildDynamicFilterTargetScan(mergeBuilder,
target, source, cond, matchedActions)
- val targetTableProj = targetTableScan.output ++
Seq(Alias(TRUE_LITERAL, ROW_FROM_TARGET)())
+ val targetTableProj = targetTableScan.output ++
Seq(createAlias(TRUE_LITERAL, ROW_FROM_TARGET))
val newTargetTableScan = Project(targetTableProj, targetTableScan)
val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter,
Some(cond), JoinHint.NONE)
@@ -202,7 +205,7 @@ case class RewriteMergeInto(spark: SparkSession) extends
Rule[LogicalPlan] with
val output = target.output
val matchingRowsPlanBuilder = rel => Join(source, rel, Inner, Some(cond),
JoinHint.NONE)
val runCardinalityCheck = isCardinalityCheckEnabled(table) &&
isCardinalityCheckNeeded(matchedActions)
- buildDynamicFilterScanPlan(spark, table, output, mergeBuilder, cond,
matchingRowsPlanBuilder, runCardinalityCheck)
+ buildDynamicFilterScanPlan(spark, target, output, mergeBuilder, cond,
matchingRowsPlanBuilder, runCardinalityCheck)
}
private def rewriteMatchedActions(
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
index fce7618..de5984c 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
@@ -40,11 +40,15 @@ import
org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.BooleanType
case class RewriteUpdate(spark: SparkSession) extends Rule[LogicalPlan] with
RewriteRowLevelOperationHelper {
import ExtendedDataSourceV2Implicits._
+ import RewriteRowLevelOperationHelper._
+
+ override def conf: SQLConf = SQLConf.get
// TODO: can we do any better for no-op updates? when conditions evaluate to
false/true?
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -59,7 +63,7 @@ case class RewriteUpdate(spark: SparkSession) extends
Rule[LogicalPlan] with Rew
// so the first job uses DynamicFileFilter and the second one uses the
underlying scan plan
// both jobs share the same SparkMergeScan instance to ensure they
operate on same files
val matchingRowsPlanBuilder = scanRelation => Filter(cond, scanRelation)
- val scanPlan = buildDynamicFilterScanPlan(spark, r.table, r.output,
mergeBuilder, cond, matchingRowsPlanBuilder)
+ val scanPlan = buildDynamicFilterScanPlan(spark, r, r.output,
mergeBuilder, cond, matchingRowsPlanBuilder)
val underlyingScanPlan = scanPlan match {
case DynamicFileFilter(plan, _, _) => plan.clone()
case _ => scanPlan.clone()
@@ -85,7 +89,7 @@ case class RewriteUpdate(spark: SparkSession) extends
Rule[LogicalPlan] with Rew
val mergeBuilder = r.table.asMergeable.newMergeBuilder("update",
writeInfo)
val matchingRowsPlanBuilder = scanRelation => Filter(cond, scanRelation)
- val scanPlan = buildDynamicFilterScanPlan(spark, r.table, r.output,
mergeBuilder, cond, matchingRowsPlanBuilder)
+ val scanPlan = buildDynamicFilterScanPlan(spark, r, r.output,
mergeBuilder, cond, matchingRowsPlanBuilder)
val updateProjection = buildUpdateProjection(r, scanPlan, assignments,
cond)
@@ -110,10 +114,10 @@ case class RewriteUpdate(spark: SparkSession) extends
Rule[LogicalPlan] with Rew
if (attr.semanticEquals(assignedExpr)) {
attr
} else if (cond == Literal.TrueLiteral) {
- Alias(assignedExpr, attr.name)()
+ createAlias(assignedExpr, attr.name)
} else {
val updatedExpr = If(cond, assignedExpr, attr)
- Alias(updatedExpr, attr.name)()
+ createAlias(updatedExpr, attr.name)
}
}
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
index f11e30f..d301b75 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
@@ -25,6 +25,8 @@ import org.antlr.v4.runtime.atn.PredictionMode
import org.antlr.v4.runtime.misc.Interval
import org.antlr.v4.runtime.misc.ParseCancellationException
import org.antlr.v4.runtime.tree.TerminalNodeImpl
+import org.apache.iceberg.common.DynConstructors
+import org.apache.iceberg.spark.Spark3VersionUtil
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -41,7 +43,9 @@ import org.apache.spark.sql.types.StructType
class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends
ParserInterface {
- private lazy val substitutor = new VariableSubstitution(SQLConf.get)
+ import IcebergSparkSqlExtensionsParser._
+
+ private lazy val substitutor = substitutorCtor.newInstance(SQLConf.get)
private lazy val astBuilder = new IcebergSqlExtensionsAstBuilder(delegate)
/**
@@ -54,9 +58,7 @@ class IcebergSparkSqlExtensionsParser(delegate:
ParserInterface) extends ParserI
/**
* Parse a string to a raw DataType without CHAR/VARCHAR replacement.
*/
- override def parseRawDataType(sqlText: String): DataType = {
- delegate.parseRawDataType(sqlText)
- }
+ def parseRawDataType(sqlText: String): DataType = throw new
UnsupportedOperationException()
/**
* Parse a string to an Expression.
@@ -161,6 +163,14 @@ class IcebergSparkSqlExtensionsParser(delegate:
ParserInterface) extends ParserI
}
}
+object IcebergSparkSqlExtensionsParser {
+ private val substitutorCtor: DynConstructors.Ctor[VariableSubstitution] =
+ DynConstructors.builder()
+ .impl(classOf[VariableSubstitution])
+ .impl(classOf[VariableSubstitution], classOf[SQLConf])
+ .build()
+}
+
/* Copied from Apache Spark's to avoid dependency on Spark Internals */
class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
override def consume(): Unit = wrapped.consume
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
index 2535e70..16fd559 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
@@ -25,4 +25,12 @@ import org.apache.spark.sql.connector.write.BatchWrite
case class ReplaceData(
table: NamedRelation,
write: BatchWrite,
- query: LogicalPlan) extends V2WriteCommand
+ query: LogicalPlan) extends V2WriteCommand {
+
+ def isByName: Boolean = false
+
+ def withNewQuery(newQuery: LogicalPlan): ReplaceData = copy(query = newQuery)
+
+ def withNewTable(newTable: NamedRelation): ReplaceData = copy(table =
newTable)
+
+}
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
index ad46730..bcef977 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
@@ -19,6 +19,8 @@
package org.apache.spark.sql.catalyst.utils
+import org.apache.iceberg.common.DynConstructors
+import org.apache.iceberg.spark.Spark3VersionUtil
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis.Resolver
@@ -58,6 +60,18 @@ import org.apache.spark.sql.types.IntegerType
object DistributionAndOrderingUtils {
+ private val repartitionByExpressionCtor:
DynConstructors.Ctor[RepartitionByExpression] =
+ DynConstructors.builder()
+ .impl(classOf[RepartitionByExpression],
+ classOf[Seq[catalyst.expressions.Expression]],
+ classOf[LogicalPlan],
+ classOf[Option[Int]])
+ .impl(classOf[RepartitionByExpression],
+ classOf[Seq[catalyst.expressions.Expression]],
+ classOf[LogicalPlan],
+ Integer.TYPE)
+ .build()
+
def prepareQuery(
requiredDistribution: Distribution,
requiredOrdering: Seq[SortOrder],
@@ -80,7 +94,11 @@ object DistributionAndOrderingUtils {
// the conversion to catalyst expressions above produces SortOrder
expressions
// for OrderedDistribution and generic expressions for
ClusteredDistribution
// this allows RepartitionByExpression to pick either range or hash
partitioning
- RepartitionByExpression(distribution, query, numShufflePartitions)
+ if (Spark3VersionUtil.isSpark30) {
+ repartitionByExpressionCtor.newInstance(distribution.toSeq, query,
Integer.valueOf(numShufflePartitions))
+ } else {
+ repartitionByExpressionCtor.newInstance(distribution.toSeq, query,
Some(numShufflePartitions))
+ }
} else {
query
}
@@ -98,6 +116,37 @@ object DistributionAndOrderingUtils {
queryWithDistributionAndOrdering
}
+ private val sortOrderCtor:
DynConstructors.Ctor[catalyst.expressions.SortOrder] =
+ DynConstructors.builder()
+ .impl(classOf[catalyst.expressions.SortOrder],
+ classOf[catalyst.expressions.Expression],
+ classOf[catalyst.expressions.SortDirection],
+ classOf[catalyst.expressions.NullOrdering],
+ classOf[Seq[catalyst.expressions.Expression]])
+ .impl(classOf[catalyst.expressions.SortOrder],
+ classOf[catalyst.expressions.Expression],
+ classOf[catalyst.expressions.SortDirection],
+ classOf[catalyst.expressions.NullOrdering],
+ classOf[Set[catalyst.expressions.Expression]])
+ .build()
+
+ def createSortOrder(
+ child: catalyst.expressions.Expression,
+ direction: catalyst.expressions.SortDirection):
catalyst.expressions.SortOrder = {
+ createSortOrder(child, direction, direction.defaultNullOrdering)
+ }
+
+ def createSortOrder(
+ child: catalyst.expressions.Expression,
+ direction: catalyst.expressions.SortDirection,
+ nullOrdering: catalyst.expressions.NullOrdering):
catalyst.expressions.SortOrder = {
+ if (Spark3VersionUtil.isSpark30) {
+ sortOrderCtor.newInstance(child, direction, nullOrdering, Set.empty)
+ } else {
+ sortOrderCtor.newInstance(child, direction, nullOrdering, Seq.empty)
+ }
+ }
+
private def toCatalyst(
expr: Expression,
query: LogicalPlan,
@@ -118,7 +167,7 @@ object DistributionAndOrderingUtils {
expr match {
case s: SortOrder =>
val catalystChild = toCatalyst(s.expression(), query, resolver)
- catalyst.expressions.SortOrder(catalystChild, toCatalyst(s.direction),
toCatalyst(s.nullOrdering), Set.empty)
+ createSortOrder(catalystChild, toCatalyst(s.direction),
toCatalyst(s.nullOrdering))
case it: IdentityTransform =>
resolve(it.ref.fieldNames)
case BucketTransform(numBuckets, ref) =>
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
index 19a950a..910e9ce 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
@@ -19,7 +19,9 @@
package org.apache.spark.sql.catalyst.utils
import java.util.UUID
+import org.apache.iceberg.common.DynConstructors
import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3VersionUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
@@ -31,8 +33,10 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.ExprId
import org.apache.spark.sql.catalyst.expressions.GreaterThan
import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.expressions.aggregate.Complete
@@ -48,6 +52,7 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.iceberg.distributions.OrderedDistribution
import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter
import org.apache.spark.sql.connector.iceberg.write.MergeBuilder
+import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl
@@ -59,6 +64,7 @@ import
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implici
import org.apache.spark.sql.execution.datasources.v2.PushDownUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.Metadata
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -69,7 +75,7 @@ trait RewriteRowLevelOperationHelper extends PredicateHelper
with Logging {
import ExtendedDataSourceV2Implicits.ScanBuilderHelper
protected def spark: SparkSession
- protected lazy val conf: SQLConf = spark.sessionState.conf
+ def conf: SQLConf
protected lazy val resolver: Resolver = conf.resolver
protected def buildSimpleScanPlan(
@@ -83,14 +89,14 @@ trait RewriteRowLevelOperationHelper extends
PredicateHelper with Logging {
val scan = scanBuilder.asIceberg.withMetadataColumns(FILE_NAME_COL,
ROW_POS_COL).build()
val outputAttrs = toOutputAttrs(scan.readSchema(), relation.output)
val predicates = extractFilters(cond,
relation.output).reduceLeftOption(And)
- val scanRelation = DataSourceV2ScanRelation(relation.table, scan,
outputAttrs)
+ val scanRelation = createScanRelation(relation, scan, outputAttrs)
predicates.map(Filter(_, scanRelation)).getOrElse(scanRelation)
}
protected def buildDynamicFilterScanPlan(
spark: SparkSession,
- table: Table,
+ relation: DataSourceV2Relation,
tableAttrs: Seq[AttributeReference],
mergeBuilder: MergeBuilder,
cond: Expression,
@@ -103,7 +109,7 @@ trait RewriteRowLevelOperationHelper extends
PredicateHelper with Logging {
val scan = scanBuilder.asIceberg.withMetadataColumns(FILE_NAME_COL,
ROW_POS_COL).build()
val outputAttrs = toOutputAttrs(scan.readSchema(), tableAttrs)
- val scanRelation = DataSourceV2ScanRelation(table, scan, outputAttrs)
+ val scanRelation = createScanRelation(relation, scan, outputAttrs)
scan match {
case filterable: SupportsFileFilter if runCardinalityCheck =>
@@ -171,11 +177,11 @@ trait RewriteRowLevelOperationHelper extends
PredicateHelper with Logging {
prunedTargetPlan: LogicalPlan): LogicalPlan = {
val fileAttr = findOutputAttr(prunedTargetPlan.output, FILE_NAME_COL)
val rowPosAttr = findOutputAttr(prunedTargetPlan.output, ROW_POS_COL)
- val accumulatorExpr = Alias(AccumulateFiles(filesAccumulator, fileAttr),
AFFECTED_FILES_ACC_ALIAS_NAME)()
+ val accumulatorExpr = createAlias(AccumulateFiles(filesAccumulator,
fileAttr), AFFECTED_FILES_ACC_ALIAS_NAME)
val projectList = Seq(fileAttr, rowPosAttr, accumulatorExpr)
val projectPlan = Project(projectList, prunedTargetPlan)
val affectedFilesAttr = findOutputAttr(projectPlan.output,
AFFECTED_FILES_ACC_ALIAS_NAME)
- val aggSumCol = Alias(AggregateExpression(Sum(affectedFilesAttr),
Complete, false), SUM_ROW_ID_ALIAS_NAME)()
+ val aggSumCol = createAlias(AggregateExpression(Sum(affectedFilesAttr),
Complete, false), SUM_ROW_ID_ALIAS_NAME)
// Group by the rows by row id while collecting the files that need to be
over written via accumulator.
val aggPlan = Aggregate(Seq(fileAttr, rowPosAttr), Seq(aggSumCol),
projectPlan)
val sumAttr = findOutputAttr(aggPlan.output, SUM_ROW_ID_ALIAS_NAME)
@@ -229,4 +235,48 @@ object RewriteRowLevelOperationHelper {
private final val AFFECTED_FILES_ACC_NAME =
"internal.metrics.merge.affectedFiles"
private final val AFFECTED_FILES_ACC_ALIAS_NAME = "_affectedFiles_"
private final val SUM_ROW_ID_ALIAS_NAME = "_sum_"
+
+ private val scanRelationCtor: DynConstructors.Ctor[DataSourceV2ScanRelation]
=
+ DynConstructors.builder()
+ .impl(classOf[DataSourceV2ScanRelation],
+ classOf[DataSourceV2Relation],
+ classOf[Scan],
+ classOf[Seq[AttributeReference]])
+ .impl(classOf[DataSourceV2ScanRelation],
+ classOf[Table],
+ classOf[Scan],
+ classOf[Seq[AttributeReference]])
+ .build()
+
+ def createScanRelation(
+ relation: DataSourceV2Relation,
+ scan: Scan,
+ outputAttrs: Seq[AttributeReference]): DataSourceV2ScanRelation = {
+ if (Spark3VersionUtil.isSpark30) {
+ scanRelationCtor.newInstance(relation.table, scan, outputAttrs)
+ } else {
+ scanRelationCtor.newInstance(relation, scan, outputAttrs)
+ }
+ }
+
+ private val aliasCtor: DynConstructors.Ctor[Alias] =
+ DynConstructors.builder()
+ .impl(classOf[Alias],
+ classOf[Expression],
+ classOf[String],
+ classOf[ExprId],
+ classOf[Seq[String]],
+ classOf[Option[Metadata]],
+ classOf[Seq[String]])
+ .impl(classOf[Alias],
+ classOf[Expression],
+ classOf[String],
+ classOf[ExprId],
+ classOf[Seq[String]],
+ classOf[Option[Metadata]])
+ .build()
+
+ def createAlias(child: Expression, name: String): Alias = {
+ aliasCtor.newInstance(child, name, NamedExpression.newExprId, Seq.empty,
None, Seq.empty)
+ }
}
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
b/spark3/src/main/java/org/apache/iceberg/spark/Spark3VersionUtil.java
similarity index 69%
copy from
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
copy to spark3/src/main/java/org/apache/iceberg/spark/Spark3VersionUtil.java
index 2535e70..984c66d 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala
+++ b/spark3/src/main/java/org/apache/iceberg/spark/Spark3VersionUtil.java
@@ -17,12 +17,20 @@
* under the License.
*/
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.iceberg.spark;
-import org.apache.spark.sql.catalyst.analysis.NamedRelation
-import org.apache.spark.sql.connector.write.BatchWrite
+import org.apache.spark.package$;
-case class ReplaceData(
- table: NamedRelation,
- write: BatchWrite,
- query: LogicalPlan) extends V2WriteCommand
+public class Spark3VersionUtil {
+
+ private Spark3VersionUtil() {
+ }
+
+ public static boolean isSpark30() {
+ return package$.MODULE$.SPARK_VERSION_SHORT().startsWith("3.0");
+ }
+
+ public static boolean isSpark31() {
+ return package$.MODULE$.SPARK_VERSION_SHORT().startsWith("3.1");
+ }
+}
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index ba1e6b0..88803f0 100644
---
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -22,8 +22,8 @@ package org.apache.iceberg.spark.procedures;
import org.apache.iceberg.actions.Actions;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
@@ -77,7 +77,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- Long olderThanMillis = args.isNullAt(1) ? null :
DateTimeUtils.toMillis(args.getLong(1));
+ Long olderThanMillis = args.isNullAt(1) ? null :
DateTimeUtil.microsToMillis(args.getLong(1));
Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
return modifyIcebergTable(tableIdent, table -> {
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index eea4951..f1dcf31 100644
---
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -24,8 +24,8 @@ import org.apache.iceberg.actions.Actions;
import org.apache.iceberg.actions.RemoveOrphanFiles;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
@@ -79,7 +79,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure
{
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- Long olderThanMillis = args.isNullAt(1) ? null :
DateTimeUtils.toMillis(args.getLong(1));
+ Long olderThanMillis = args.isNullAt(1) ? null :
DateTimeUtil.microsToMillis(args.getLong(1));
String location = args.isNullAt(2) ? null : args.getString(2);
boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
index 3e38c3e..94e8949 100644
---
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
+++
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
@@ -21,8 +21,8 @@ package org.apache.iceberg.spark.procedures;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
@@ -77,7 +77,7 @@ class RollbackToTimestampProcedure extends BaseProcedure {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
// timestamps in Spark have nanosecond precision so this conversion is
lossy
- long timestampMillis = DateTimeUtils.toMillis(args.getLong(1));
+ long timestampMillis = DateTimeUtil.microsToMillis(args.getLong(1));
return modifyIcebergTable(tableIdent, table -> {
Snapshot previousSnapshot = table.currentSnapshot();
diff --git
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
index 43b3bf9..eb852fb 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
@@ -24,9 +24,11 @@ import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.Spark3VersionUtil;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
public class TestDeleteFrom extends SparkCatalogTestBase {
@@ -41,6 +43,10 @@ public class TestDeleteFrom extends SparkCatalogTestBase {
@Test
public void testDeleteFromUnpartitionedTable() {
+ // This test fails in Spark 3.1. `canDeleteWhere` was added to
`SupportsDelete` in Spark 3.1,
+ // but logic to rewrite the query if `canDeleteWhere` returns false was
left to be implemented
+ // later.
+ Assume.assumeTrue(Spark3VersionUtil.isSpark30());
// set the shuffle partitions to 1 to force the write to use a single task
and produce 1 file
String originalParallelism =
spark.conf().get("spark.sql.shuffle.partitions");
spark.conf().set("spark.sql.shuffle.partitions", "1");
@@ -68,6 +74,10 @@ public class TestDeleteFrom extends SparkCatalogTestBase {
@Test
public void testDeleteFromPartitionedTable() {
+ // This test fails in Spark 3.1. `canDeleteWhere` was added to
`SupportsDelete` in Spark 3.1,
+ // but logic to rewrite the query if `canDeleteWhere` returns false was
left to be implemented
+ // later.
+ Assume.assumeTrue(Spark3VersionUtil.isSpark30());
// set the shuffle partitions to 1 to force the write to use a single task
and produce 1 file per partition
String originalParallelism =
spark.conf().get("spark.sql.shuffle.partitions");
spark.conf().set("spark.sql.shuffle.partitions", "1");
diff --git a/versions.props b/versions.props
index 5e890f4..1b2c5f4 100644
--- a/versions.props
+++ b/versions.props
@@ -9,7 +9,6 @@ org.apache.orc:* = 1.6.8
org.apache.parquet:* = 1.12.0
org.apache.spark:spark-hive_2.11 = 2.4.7
org.apache.spark:spark-avro_2.11 = 2.4.7
-org.apache.spark:spark-hive_2.12 = 3.0.1
org.apache.pig:pig = 0.14.0
com.fasterxml.jackson.*:* = 2.11.4
com.google.guava:guava = 28.0-jre