yihua commented on code in PR #5737: URL: https://github.com/apache/hudi/pull/5737#discussion_r891731728
########## hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util + +/** + * Utility allowing for seamless conversion b/w Java/Scala functional primitives + */ +object JFunction { + + def toScala[T, R](f: java.util.function.Function[T, R]): T => R = + (t: T) => f.apply(t) + + def toJava[T](f: T => Unit): java.util.function.Consumer[T] = + new java.util.function.Consumer[T] { + override def accept(t: T): Unit = f.apply(t) + } + Review Comment: nit: redundant empty line? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala: ########## @@ -128,24 +128,28 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = { val schemaResolver = new TableSchemaResolver(metaClient) - val avroSchema = Try(schemaResolver.getTableAvroSchema) match { - case Success(schema) => schema - case Failure(e) => - logWarning("Failed to fetch schema from the table", e) - // If there is no commit in the table, we can't get the schema - // t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead. - userSchema match { - case Some(s) => convertToAvroSchema(s) - case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty") - } + val avroSchema: Schema = schemaSpec.map(convertToAvroSchema).getOrElse { + Try(schemaResolver.getTableAvroSchema) match { + case Success(schema) => schema + case Failure(e) => + logError("Failed to fetch schema from the table", e) + throw new HoodieSchemaException("Failed to fetch schema from the table") + } } - // try to find internalSchema - val internalSchemaFromMeta = try { - schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema) - } catch { - case _: Exception => InternalSchema.getEmptyInternalSchema + + val internalSchema: InternalSchema = if (!isSchemaEvolutionEnabled) { + InternalSchema.getEmptyInternalSchema Review Comment: Does this mean that the user needs to make sure the schema evolution related config provided by the user must be consistent with what's in table (e.g., if table has evolved schema, while the `isSchemaEvolutionEnabled` is derived as false, then read result may be inconsistent)? Do we need to add docs and release notes for the change expectation? ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala: ########## @@ -39,45 +39,69 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer object HoodieAnalysis { - def customResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = - Seq( + type RuleBuilder = SparkSession => Rule[LogicalPlan] + + def customResolutionRules: Seq[RuleBuilder] = { + val rules: ListBuffer[RuleBuilder] = ListBuffer( + // Default rules session => HoodieResolveReferences(session), session => HoodieAnalysis(session) - ) ++ extraResolutionRules() - - def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = - Seq( - session => HoodiePostAnalysisRule(session) - ) ++ extraPostHocResolutionRules() + ) - def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = { if (HoodieSparkUtils.gteqSpark3_2) { + val dataSourceV2ToV1FallbackClass = "org.apache.spark.sql.hudi.analysis.HoodieDataSourceV2ToV1Fallback" + val dataSourceV2ToV1Fallback: RuleBuilder = + session => ReflectionUtils.loadClass(dataSourceV2ToV1FallbackClass, session).asInstanceOf[Rule[LogicalPlan]] + val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis" - val spark3Analysis: SparkSession => Rule[LogicalPlan] = + val spark3Analysis: RuleBuilder = session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]] - val spark3ResolveReferences = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences" - val spark3References: SparkSession => Rule[LogicalPlan] = - session => ReflectionUtils.loadClass(spark3ResolveReferences, session).asInstanceOf[Rule[LogicalPlan]] + val spark3ResolveReferencesClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences" + val spark3ResolveReferences: RuleBuilder = + session => ReflectionUtils.loadClass(spark3ResolveReferencesClass, session).asInstanceOf[Rule[LogicalPlan]] - Seq(spark3Analysis, spark3References) - } else { - Seq.empty + val spark32ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32" + val spark32ResolveAlterTableCommands: RuleBuilder = + session => ReflectionUtils.loadClass(spark32ResolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]] Review Comment: OK. I now see that the resolve rules are wrapped into a new class. ########## hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommand312.scala: ########## @@ -114,8 +116,9 @@ case class ResolveHudiAlterTableCommand312(sparkSession: SparkSession) extends R } } - private def schemaEvolutionEnabled(): Boolean = sparkSession - .sessionState.conf.getConfString(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean + private def schemaEvolutionEnabled(): Boolean = + sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key, + HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean Review Comment: nit: see if this util can be extracted in a follow-up, as I see the same appearances in different classes. ########## hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala: ########## @@ -162,20 +161,15 @@ trait SparkAdapter extends Serializable { isHoodieTable(table) } - def tripAlias(plan: LogicalPlan): LogicalPlan = { + protected def unfoldSubqueryAliases(plan: LogicalPlan): LogicalPlan = { plan match { case SubqueryAlias(_, relation: LogicalPlan) => - tripAlias(relation) + unfoldSubqueryAliases(relation) case other => other } } - /** - * Create customresolutionRule to deal with alter command for hudi. - */ - def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] Review Comment: Does this affect `ALTER TABLE` command? ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala: ########## @@ -39,45 +39,69 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer object HoodieAnalysis { - def customResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = - Seq( + type RuleBuilder = SparkSession => Rule[LogicalPlan] + + def customResolutionRules: Seq[RuleBuilder] = { + val rules: ListBuffer[RuleBuilder] = ListBuffer( + // Default rules session => HoodieResolveReferences(session), session => HoodieAnalysis(session) - ) ++ extraResolutionRules() - - def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = - Seq( - session => HoodiePostAnalysisRule(session) - ) ++ extraPostHocResolutionRules() + ) - def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = { if (HoodieSparkUtils.gteqSpark3_2) { + val dataSourceV2ToV1FallbackClass = "org.apache.spark.sql.hudi.analysis.HoodieDataSourceV2ToV1Fallback" + val dataSourceV2ToV1Fallback: RuleBuilder = + session => ReflectionUtils.loadClass(dataSourceV2ToV1FallbackClass, session).asInstanceOf[Rule[LogicalPlan]] + val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis" - val spark3Analysis: SparkSession => Rule[LogicalPlan] = + val spark3Analysis: RuleBuilder = session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]] - val spark3ResolveReferences = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences" - val spark3References: SparkSession => Rule[LogicalPlan] = - session => ReflectionUtils.loadClass(spark3ResolveReferences, session).asInstanceOf[Rule[LogicalPlan]] + val spark3ResolveReferencesClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences" + val spark3ResolveReferences: RuleBuilder = + session => ReflectionUtils.loadClass(spark3ResolveReferencesClass, session).asInstanceOf[Rule[LogicalPlan]] - Seq(spark3Analysis, spark3References) - } else { - Seq.empty + val spark32ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32" + val spark32ResolveAlterTableCommands: RuleBuilder = + session => ReflectionUtils.loadClass(spark32ResolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]] + + // NOTE: PLEASE READ CAREFULLY + // + // It's critical for this rules to follow in this order, so that DataSource V2 to V1 fallback + // is performed prior to other rules being evaluated + rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, spark32ResolveAlterTableCommands) Review Comment: So `spark3Analysis` and `spark3ResolveReferences` are also for Spark 3.2 only. We should rename the class name later on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
