alexeykudinkin commented on code in PR #6264:
URL: https://github.com/apache/hudi/pull/6264#discussion_r939143695
##########
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala:
##########
@@ -52,8 +64,51 @@ abstract class HoodieSpark3CatalystPlanUtils extends
HoodieCatalystPlansUtils {
}
}
- override def toTableIdentifier(relation: UnresolvedRelation):
TableIdentifier = {
- relation.multipartIdentifier.asTableIdentifier
+ override def resolve(relation: UnresolvedRelation): Option[CatalogTable] = {
+ val nameParts = relation.multipartIdentifier
+ nameParts match {
+ case CatalogAndIdentifier(catalog, ident) =>
+ CatalogV2Util.loadTable(catalog, ident) match {
+ case Some(table) =>
+ table match {
+ case v1Table: V1Table =>
+ Some(v1Table.v1Table)
+ case withFallback: V2TableWithV1Fallback =>
+ Some(withFallback.v1Table)
+ case _ =>
+ logWarning("It's not a hoodie table: " +
table.getClass.getName)
Review Comment:
Why are we logging `getClass.getName`?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala:
##########
@@ -19,13 +19,16 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
trait HoodieCatalystPlansUtils {
+ protected val spark: SparkSession = SparkSession.active
Review Comment:
Let's avoid adding implicit dependencies, if you want this one to have
SparkSession as a state let's provide it explicitly in ctor
##########
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala:
##########
@@ -74,12 +129,17 @@ abstract class HoodieSpark3CatalystPlanUtils extends
HoodieCatalystPlansUtils {
}
}
- override def createInsertInto(table: LogicalPlan, partition: Map[String,
Option[String]],
- query: LogicalPlan, overwrite: Boolean,
ifPartitionNotExists: Boolean): LogicalPlan = {
+ override def createInsertInto(
Review Comment:
Please keep formatting consistent
##########
hudi-spark-datasource/hudi-spark/pom.xml:
##########
@@ -180,6 +180,14 @@
</build>
<dependencies>
+ <!-- H2 database for TestSpark3Catalog -->
Review Comment:
What do we need this for?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala:
##########
@@ -54,9 +57,9 @@ trait HoodieCatalystPlansUtils {
def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier
/**
- * Convert a UnresolvedRelation to TableIdentifier.
+ * resolve UnresolvedRelation to CatalogTable.
Review Comment:
Please it keep the doc starting w/ a cap
##########
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala:
##########
@@ -18,23 +18,35 @@
package org.apache.spark.sql
import org.apache.hudi.spark3.internal.ReflectUtil
-import org.apache.spark.sql.catalyst.analysis.{TableOutputResolver,
UnresolvedRelation}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext,
TableOutputResolver, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join,
JoinHint, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util,
LookupCatalog, V1Table, V2TableWithV1Fallback}
import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
import org.apache.spark.sql.internal.SQLConf
-abstract class HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils {
+abstract class HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils
+ with LookupCatalog with Logging {
+
+ override protected val catalogManager: CatalogManager = {
+ val catalog = spark.sessionState.catalog
+ val v2SessionCatalog = new V2SessionCatalog(catalog)
+ new CatalogManager(v2SessionCatalog, catalog)
+ }
- def resolveOutputColumns(tableName: String,
- expected: Seq[Attribute],
- query: LogicalPlan,
- byName: Boolean,
- conf: SQLConf): LogicalPlan =
+ def resolveOutputColumns(
Review Comment:
Let's keep formatting consistent
##########
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala:
##########
@@ -52,8 +64,51 @@ abstract class HoodieSpark3CatalystPlanUtils extends
HoodieCatalystPlansUtils {
}
}
- override def toTableIdentifier(relation: UnresolvedRelation):
TableIdentifier = {
- relation.multipartIdentifier.asTableIdentifier
+ override def resolve(relation: UnresolvedRelation): Option[CatalogTable] = {
+ val nameParts = relation.multipartIdentifier
+ nameParts match {
+ case CatalogAndIdentifier(catalog, ident) =>
+ CatalogV2Util.loadTable(catalog, ident) match {
+ case Some(table) =>
+ table match {
+ case v1Table: V1Table =>
+ Some(v1Table.v1Table)
+ case withFallback: V2TableWithV1Fallback =>
+ Some(withFallback.v1Table)
+ case _ =>
+ logWarning("It's not a hoodie table: " +
table.getClass.getName)
+ None
+ }
+ }
+ case _ =>
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
Some(spark.sessionState.catalog.getTableMetadata(nameParts.asTableIdentifier))
+ }
+ }
+
+ protected def expandIdentifier(nameParts: Seq[String]): Seq[String] = {
Review Comment:
This doesn't seem to be used
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]