YannByron commented on a change in pull request #4270:
URL: https://github.com/apache/hudi/pull/4270#discussion_r767130598
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala
##########
@@ -18,17 +18,25 @@
package org.apache.spark.sql.avro
import org.apache.avro.Schema
+
import org.apache.spark.sql.types.DataType
/**
* This is to be compatible with the type returned by Spark 3.1
* and other spark versions for AvroDeserializer
*/
-case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType)
- extends AvroDeserializer(rootAvroType, rootCatalystType) {
+case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType) {
Review comment:
SPARK-34404
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala
##########
@@ -22,17 +22,37 @@ import
org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.Compactio
case class CompactionTable(table: LogicalPlan, operation: CompactionOperation,
instantTimestamp: Option[Long])
extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
+
+ def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]):
CompactionTable = {
Review comment:
SPARK-34989
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -244,13 +253,19 @@ case class HoodieResolveReferences(sparkSession:
SparkSession) extends Rule[Logi
case DeleteAction(condition) =>
val resolvedCondition =
condition.map(resolveExpressionFrom(resolvedSource)(_))
DeleteAction(resolvedCondition)
+ case action: MergeAction =>
Review comment:
SPARK-34962
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -244,13 +253,19 @@ case class HoodieResolveReferences(sparkSession:
SparkSession) extends Rule[Logi
case DeleteAction(condition) =>
val resolvedCondition =
condition.map(resolveExpressionFrom(resolvedSource)(_))
DeleteAction(resolvedCondition)
+ case action: MergeAction =>
+ // ForSpark3.2, it's UpdateStarAction
+ UpdateAction(action.condition, Seq.empty)
}
// Resolve the notMatchedActions
val resolvedNotMatchedActions = notMatchedActions.map {
case InsertAction(condition, assignments) =>
val (resolvedCondition, resolvedAssignments) =
resolveConditionAssignments(condition, assignments)
InsertAction(resolvedCondition, resolvedAssignments)
+ case action: MergeAction =>
+ // ForSpark3.2, it's InsertStarAction
Review comment:
ditto
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -424,9 +439,9 @@ case class HoodiePostAnalysisRule(sparkSession:
SparkSession) extends Rule[Logic
case AlterTableChangeColumnCommand(tableName, columnName, newColumn)
if isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn)
- case ShowPartitionsCommand(tableName, specOpt)
- if isHoodieTable(tableName, sparkSession) =>
- ShowHoodieTablePartitionsCommand(tableName, specOpt)
+ case s: ShowPartitionsCommand
Review comment:
SPARK-34238
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
##########
@@ -84,12 +89,11 @@ case class AlterHoodieTableAddColumnsCommand(
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
SchemaUtils.checkColumnNameDuplication(
- newSqlSchema.map(_.name),
+ newSqlDataSchema.map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
- DDLUtils.checkDataColNames(table, colsToAdd.map(_.name))
Review comment:
the definition of `checkDataColNames` is changed by SPARK-36201, and
this method is meaningless for hudi table.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
##########
@@ -203,7 +207,11 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Runnab
sourceExpression match {
case attr: AttributeReference if sourceColumnName.find(resolver(_,
attr.name)).get.equals(targetColumnName) => true
- case Cast(attr: AttributeReference, _, _) if
sourceColumnName.find(resolver(_, attr.name)).get.equals(targetColumnName) =>
true
+ case cast: Cast =>
Review comment:
SPARK-35857
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
##########
@@ -293,28 +293,26 @@ class TestHoodieSparkSqlWriter {
*/
@Test
def testDisableAndEnableMetaFields(): Unit = {
- try {
- testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields =
false)
- //create a new table
- val fooTableModifier =
commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
- .updated(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
- .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
- .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(),
BulkInsertSortMode.NONE.name())
- .updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true")
+ testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields =
false)
Review comment:
solve the warning while build.
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
##########
@@ -711,51 +709,49 @@ class TestHoodieSparkSqlWriter {
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test")
- try {
- val df = spark.range(0, 1000).toDF("keyid")
- .withColumn("col3", expr("keyid"))
- .withColumn("age", lit(1))
- .withColumn("p", lit(2))
-
- df.write.format("hudi")
- .options(options)
- .option(DataSourceWriteOptions.OPERATION.key, "insert")
- .option("hoodie.insert.shuffle.parallelism", "4")
- .mode(SaveMode.Overwrite).save(tempBasePath)
-
- df.write.format("hudi")
- .options(options)
- .option(DataSourceWriteOptions.OPERATION.key,
"insert_overwrite_table")
- .option("hoodie.insert.shuffle.parallelism", "4")
- .mode(SaveMode.Append).save(tempBasePath)
-
- val currentCommits =
spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
- val incrementalKeyIdNum = spark.read.format("hudi")
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
- .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommits(0))
- .load(tempBasePath).select("keyid").orderBy("keyid").count
- assert(incrementalKeyIdNum == 1000)
-
- df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath)
- spark.emptyDataFrame.write.format("hudi")
- .options(options)
- .option(HoodieBootstrapConfig.BASE_PATH.key, baseBootStrapPath)
- .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key,
classOf[NonpartitionedKeyGenerator].getCanonicalName)
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
- .option(HoodieBootstrapConfig.PARALLELISM_VALUE.key, "4")
- .mode(SaveMode.Overwrite).save(tempBasePath)
- df.write.format("hudi").options(options)
- .option(DataSourceWriteOptions.OPERATION.key,
"insert_overwrite_table")
- .option("hoodie.insert.shuffle.parallelism",
"4").mode(SaveMode.Append).save(tempBasePath)
- val currentCommitsBootstrap =
spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
- val incrementalKeyIdNumBootstrap = spark.read.format("hudi")
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
- .option(DataSourceReadOptions.END_INSTANTTIME.key,
currentCommitsBootstrap(0))
- .load(tempBasePath).select("keyid").orderBy("keyid").count
- assert(incrementalKeyIdNumBootstrap == 1000)
- }
+ val df = spark.range(0, 1000).toDF("keyid")
Review comment:
ditto
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
##########
@@ -59,14 +59,18 @@ class TestHoodieSqlBase extends FunSuite with
BeforeAndAfterAll {
}
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any /* Assertion */)(implicit pos: source.Position): Unit = {
- try super.test(testName, testTags: _*)(try testFun finally {
- val catalog = spark.sessionState.catalog
- catalog.listDatabases().foreach{db =>
- catalog.listTables(db).foreach {table =>
- catalog.dropTable(table, true, true)
+ super.test(testName, testTags: _*)(
Review comment:
ditto
##########
File path:
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
##########
@@ -16,24 +16,255 @@
*/
package org.apache.spark.sql.execution.datasources
-import java.util.TimeZone
+
+import java.lang.{Double => JDouble, Long => JLong}
+import java.math.{BigDecimal => JBigDecimal}
+import java.time.ZoneId
+import java.util.{Locale, TimeZone}
import org.apache.hadoop.fs.Path
+
+import
org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
+import org.apache.hudi.spark3.internal.ReflectUtil
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.catalyst.InternalRow
+import
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
-import
org.apache.spark.sql.execution.datasources.PartitioningUtils.{PartitionValues,
timestampPartitionPattern}
+import
org.apache.spark.sql.execution.datasources.PartitioningUtils.timestampPartitionPattern
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+import scala.util.control.NonFatal
class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil {
- override def parsePartition(path: Path, typeInference: Boolean,
- basePaths: Set[Path], userSpecifiedDataTypes:
Map[String, DataType],
- timeZone: TimeZone): Option[PartitionValues] = {
- val dateFormatter = DateFormatter(timeZone.toZoneId)
+ override def parsePartition(
Review comment:
the definition of `PartitionValues` has been changed by SPARK-34314.
Copy some codes from `PartitioningUtils` in Spark3.2 here to solve the
compatibility between 3.1 and 3.2.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
##########
@@ -84,12 +89,11 @@ case class AlterHoodieTableAddColumnsCommand(
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
SchemaUtils.checkColumnNameDuplication(
- newSqlSchema.map(_.name),
+ newSqlDataSchema.map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
- DDLUtils.checkDataColNames(table, colsToAdd.map(_.name))
Review comment:
the definition of `checkDataColNames` has been changed by SPARK-36201,
and this method is meaningless for hudi table.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -180,11 +181,19 @@ case class HoodieResolveReferences(sparkSession:
SparkSession) extends Rule[Logi
.map { case (targetAttr, sourceAttr) => Assignment(targetAttr,
sourceAttr) }
}
} else {
- assignments.map(assignment => {
+ // For Spark3.2, InsertStarAction/UpdateStarAction's assignments
will contain the meta fields.
Review comment:
yep.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -244,13 +253,19 @@ case class HoodieResolveReferences(sparkSession:
SparkSession) extends Rule[Logi
case DeleteAction(condition) =>
val resolvedCondition =
condition.map(resolveExpressionFrom(resolvedSource)(_))
DeleteAction(resolvedCondition)
+ case action: MergeAction =>
+ // ForSpark3.2, it's UpdateStarAction
+ UpdateAction(action.condition, Seq.empty)
Review comment:
`action` is a UpdateStarAction's instance. Here we convert
UpdateStarAction to UpdateAction.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala
##########
@@ -18,17 +18,25 @@
package org.apache.spark.sql.avro
import org.apache.avro.Schema
+
import org.apache.spark.sql.types.DataType
/**
* This is to be compatible with the type returned by Spark 3.1
* and other spark versions for AvroDeserializer
*/
-case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType)
- extends AvroDeserializer(rootAvroType, rootCatalystType) {
+case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType) {
+
+ private val avroDeserializer = if
(org.apache.spark.SPARK_VERSION.startsWith("3.2")) {
+ val constructor =
classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType],
classOf[String])
+ constructor.newInstance(rootAvroType, rootCatalystType, "EXCEPTION")
+ } else {
+ val constructor =
classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType])
+ constructor.newInstance(rootAvroType, rootCatalystType)
+ }
def deserializeData(data: Any): Any = {
- super.deserialize(data) match {
+ avroDeserializer.deserialize(data) match {
case Some(r) => r // spark 3.1 return type is Option, we fetch the data.
Review comment:
ok
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala
##########
@@ -22,17 +22,37 @@ import
org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.Compactio
case class CompactionTable(table: LogicalPlan, operation: CompactionOperation,
instantTimestamp: Option[Long])
extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
+
+ def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]):
CompactionTable = {
+ copy(table = newChildren.head)
+ }
}
case class CompactionPath(path: String, operation: CompactionOperation,
instantTimestamp: Option[Long])
- extends Command
+ extends Command {
+ override def children: Seq[LogicalPlan] = Seq.empty
+
+ def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]):
CompactionPath = {
+ this
+ }
+}
case class CompactionShowOnTable(table: LogicalPlan, limit: Int = 20)
extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
+
+ def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]):
CompactionShowOnTable = {
+ copy(table = newChildren.head)
Review comment:
Other classes which extends `RunnableCommand`, class
`CompactionShowOnPath` and `CompactionPath` has no children, so that
`withNewChildrenInternal ` just return the original object.
In Spark3.2, any classes which extends `RunnableCommand` extends
`LeafRunnableCommand` at the same time, except ones related to `view`. And
`LeafRunnableCommand` overrides method `withNewChildrenInternal ` and return
`this`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]