alexeykudinkin commented on code in PR #6264:
URL: https://github.com/apache/hudi/pull/6264#discussion_r939153771


##########
hudi-spark-datasource/hudi-spark/pom.xml:
##########
@@ -180,6 +180,14 @@
   </build>
 
   <dependencies>
+    <!-- H2 database for TestSpark3Catalog -->

Review Comment:
   No need to refer to the test



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3Catalog.scala:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import java.sql.{Connection, DriverManager}
+import java.util.Properties
+
+import org.apache.hudi.HoodieSparkUtils
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+import java.io.File
+
+class TestSpark3Catalog extends HoodieSparkSqlTestBase {
+
+  val tempDir: File = Utils.createTempDir()
+  val url = 
s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
+
+  override def sparkConf(): SparkConf = {
+    val sparkConf = super.sparkConf()
+    if (HoodieSparkUtils.gteqSpark3_1) {
+      sparkConf
+        .set("spark.sql.catalog.h2", 
"org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
+        .set("spark.sql.catalog.h2.url", url)
+        .set("spark.sql.catalog.h2.driver", "org.h2.Driver")
+    }
+    sparkConf
+  }
+
+  private def withConnection[T](f: Connection => T): T = {
+    val conn = DriverManager.getConnection(url, new Properties())
+    try {
+      f(conn)
+    } finally {
+      conn.close()
+    }
+  }
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    Utils.classForName("org.h2.Driver")
+    withConnection { conn =>
+      conn.prepareStatement("""CREATE SCHEMA "test"""").executeUpdate()
+      conn.prepareStatement(
+        """CREATE TABLE "test"."people" (id INTEGER NOT NULL, country TEXT(32) 
NOT NULL)""")
+        .executeUpdate()
+      conn.prepareStatement(
+        """INSERT INTO "test"."people" VALUES (1, 'US')""")
+        .executeUpdate()
+    }
+  }
+
+  override def afterAll(): Unit = {
+    Utils.deleteRecursively(tempDir)
+    super.afterAll()
+  }
+
+  test("Test Read And Write Cross Multi Spark Catalog") {
+    if (HoodieSparkUtils.gteqSpark3_1) {
+      checkAnswer("SHOW TABLES IN h2.test")(Seq("test", "people", false))

Review Comment:
   Is the expected sequence correct? 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3Catalog.scala:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import java.sql.{Connection, DriverManager}
+import java.util.Properties
+
+import org.apache.hudi.HoodieSparkUtils
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+import java.io.File
+
+class TestSpark3Catalog extends HoodieSparkSqlTestBase {
+
+  val tempDir: File = Utils.createTempDir()
+  val url = 
s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
+
+  override def sparkConf(): SparkConf = {
+    val sparkConf = super.sparkConf()
+    if (HoodieSparkUtils.gteqSpark3_1) {

Review Comment:
   Do we really need this conditional here?



##########
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala:
##########
@@ -18,23 +18,35 @@
 package org.apache.spark.sql
 
 import org.apache.hudi.spark3.internal.ReflectUtil
-import org.apache.spark.sql.catalyst.analysis.{TableOutputResolver, 
UnresolvedRelation}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, 
TableOutputResolver, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, 
JoinHint, LogicalPlan}
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, 
LookupCatalog, V1Table, V2TableWithV1Fallback}
 import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
 import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
 import org.apache.spark.sql.internal.SQLConf
 
-abstract class HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils {
+abstract class HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils
+  with LookupCatalog with Logging {
+
+  override protected val catalogManager: CatalogManager = {

Review Comment:
   Why do we need this? We should not be duplicating components available w/in 
`Analyzer`



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -473,9 +473,9 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
           "version expression is not supported for time travel")
       }
 
-      val tableIdentifier = 
sparkAdapter.getCatalystPlanUtils.toTableIdentifier(plan)
-      if (sparkAdapter.isHoodieTable(tableIdentifier, sparkSession)) {
-        val hoodieCatalogTable = HoodieCatalogTable(sparkSession, 
tableIdentifier)
+      val catalogTable = sparkAdapter.getCatalystPlanUtils.resolve(plan)
+      if (catalogTable.isDefined && 
sparkAdapter.isHoodieTable(catalogTable.get)) {

Review Comment:
   Will be better with pattern-matching



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to