gh-yzou commented on code in PR #1862:
URL: https://github.com/apache/polaris/pull/1862#discussion_r2226980564


##########
plugins/spark/v3.5/integration/build.gradle.kts:
##########
@@ -60,6 +60,7 @@ dependencies {
     exclude("org.apache.logging.log4j", "log4j-core")
     exclude("org.slf4j", "jul-to-slf4j")
   }
+

Review Comment:
   let's revert this line, seems not necessary



##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -156,15 +159,19 @@ public Table createTable(
         throw new UnsupportedOperationException(
             "Create table without location key is not supported by Polaris. 
Please provide location or path on table creation.");
       }
-
       if (PolarisCatalogUtils.useDelta(provider)) {
         // For delta table, we load the delta catalog to help dealing with the
         // delta log creation.
         TableCatalog deltaCatalog = 
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
         return deltaCatalog.createTable(ident, schema, transforms, properties);
-      } else {
-        return this.polarisSparkCatalog.createTable(ident, schema, transforms, 
properties);
       }
+      if (PolarisCatalogUtils.useHudi(provider)) {
+        // For creating the hudi table, we load HoodieCatalog
+        // to create the .hoodie folder in cloud storage
+        TableCatalog hudiCatalog = 
hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
+        return hudiCatalog.createTable(ident, schema, transforms, properties);
+      }
+      return this.polarisSparkCatalog.createTable(ident, schema, transforms, 
properties);

Review Comment:
   @rahil-c I believe with the change you are doing on hudi community, you 
don't need the extra polarisSparkCatalog.createTable anymore. 
   
   Let's restructure this like what eric suggested, we can do 
   ```
   if (PolarisCatalogUtils.useDelta(provider)) {
   } else if (PolarisCatalogUtils.useHudi(provider)) {
   } else {
   }
   ```



##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java:
##########
@@ -91,6 +109,85 @@ public static Table loadSparkTable(GenericTable 
genericTable) {
         provider, new CaseInsensitiveStringMap(tableProperties), 
scala.Option.empty());
   }
 
+  /**
+   * Extract catalog name from Spark session configuration. Looks for 
configuration like:
+   * spark.sql.catalog.<CATALOG_NAME>=org.apache.polaris.spark.SparkCatalog
+   */
+  private static String getCatalogName() {
+    SparkSession spark = SparkSession.active();
+    String catalogPrefix = "spark.sql.catalog.";
+    String polarisSparkCatalog = "org.apache.polaris.spark.SparkCatalog";
+
+    scala.collection.Iterator<scala.Tuple2<String, String>> configIterator =
+        spark.conf().getAll().iterator();
+    while (configIterator.hasNext()) {
+      scala.Tuple2<String, String> config = configIterator.next();
+      String key = config._1();
+      String value = config._2();
+
+      if (key.startsWith(catalogPrefix) && polarisSparkCatalog.equals(value)) {
+        return key.substring(catalogPrefix.length());
+      }
+    }
+
+    throw new IllegalStateException(
+        "Could not obtain Polaris catalog identifier."
+            + "Expected following configuration to be set in session: 
spark.sql.catalog.<CATALOG_NAME>=org.apache.polaris.spark.SparkCatalog");
+  }
+
+  public static Table loadV1SparkHudiTable(GenericTable genericTable, 
Identifier identifier) {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.putAll(genericTable.getProperties());
+    tableProperties.put(
+        TABLE_PATH_KEY, 
genericTable.getProperties().get(TableCatalog.PROP_LOCATION));
+
+    // Need full identifier in order to construct CatalogTable correctly for 
Hudi
+    String namespacePath = String.join(".", identifier.namespace());
+    TableIdentifier tableIdentifier =
+        new TableIdentifier(
+            identifier.name(), Option.apply(namespacePath), 
Option.apply(getCatalogName()));
+
+    scala.collection.immutable.Map<String, String> scalaOptions =
+        (scala.collection.immutable.Map<String, String>)
+            scala.collection.immutable.Map$.MODULE$.apply(
+                
scala.collection.JavaConverters.mapAsScalaMap(tableProperties).toSeq());
+
+    org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat storage =
+        DataSource.buildStorageFormatFromOptions(scalaOptions);
+
+    // Currently Polaris generic table does not contain any schema 
information, partition columns,
+    // stats, etc
+    // for now we will just use fill the parameters we have from catalog, and 
let underlying client
+    // resolve the rest within its catalog implementation
+    org.apache.spark.sql.types.StructType emptySchema = new 
org.apache.spark.sql.types.StructType();
+    scala.collection.immutable.Seq<String> emptyStringSeq =
+        scala.collection.JavaConverters.asScalaBuffer(new 
java.util.ArrayList<String>()).toList();
+    CatalogTable catalogTable =
+        new CatalogTable(
+            tableIdentifier,
+            CatalogTableType.EXTERNAL(),
+            storage,
+            emptySchema,
+            Option.apply(genericTable.getProperties().get("provider")),

Review Comment:
   let's use genericTable.format for the format instead of the provider as 
source of truth come from catalog service.



##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java:
##########
@@ -91,6 +109,85 @@ public static Table loadSparkTable(GenericTable 
genericTable) {
         provider, new CaseInsensitiveStringMap(tableProperties), 
scala.Option.empty());
   }
 
+  /**
+   * Extract catalog name from Spark session configuration. Looks for 
configuration like:
+   * spark.sql.catalog.<CATALOG_NAME>=org.apache.polaris.spark.SparkCatalog
+   */
+  private static String getCatalogName() {

Review Comment:
   do you need the catalog name at client side, or server side? from your 
description, it seems you only need the client side, then you should be able to 
direct get the catalog name from the PolarisSparkCatalog.catalogName, we 
probably don't need this parsing function



##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java:
##########
@@ -91,6 +109,85 @@ public static Table loadSparkTable(GenericTable 
genericTable) {
         provider, new CaseInsensitiveStringMap(tableProperties), 
scala.Option.empty());
   }
 
+  /**
+   * Extract catalog name from Spark session configuration. Looks for 
configuration like:
+   * spark.sql.catalog.<CATALOG_NAME>=org.apache.polaris.spark.SparkCatalog
+   */
+  private static String getCatalogName() {
+    SparkSession spark = SparkSession.active();
+    String catalogPrefix = "spark.sql.catalog.";
+    String polarisSparkCatalog = "org.apache.polaris.spark.SparkCatalog";
+
+    scala.collection.Iterator<scala.Tuple2<String, String>> configIterator =
+        spark.conf().getAll().iterator();
+    while (configIterator.hasNext()) {
+      scala.Tuple2<String, String> config = configIterator.next();
+      String key = config._1();
+      String value = config._2();
+
+      if (key.startsWith(catalogPrefix) && polarisSparkCatalog.equals(value)) {
+        return key.substring(catalogPrefix.length());
+      }
+    }
+
+    throw new IllegalStateException(
+        "Could not obtain Polaris catalog identifier."
+            + "Expected following configuration to be set in session: 
spark.sql.catalog.<CATALOG_NAME>=org.apache.polaris.spark.SparkCatalog");
+  }
+
+  public static Table loadV1SparkHudiTable(GenericTable genericTable, 
Identifier identifier) {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.putAll(genericTable.getProperties());
+    tableProperties.put(
+        TABLE_PATH_KEY, 
genericTable.getProperties().get(TableCatalog.PROP_LOCATION));
+
+    // Need full identifier in order to construct CatalogTable correctly for 
Hudi
+    String namespacePath = String.join(".", identifier.namespace());
+    TableIdentifier tableIdentifier =
+        new TableIdentifier(
+            identifier.name(), Option.apply(namespacePath), 
Option.apply(getCatalogName()));

Review Comment:
   when I was handling delta with V1Spark table, i don't think we actually need 
this. If we do not put the catalog name here, what happens?
   



##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java:
##########
@@ -91,6 +109,85 @@ public static Table loadSparkTable(GenericTable 
genericTable) {
         provider, new CaseInsensitiveStringMap(tableProperties), 
scala.Option.empty());
   }
 
+  /**
+   * Extract catalog name from Spark session configuration. Looks for 
configuration like:
+   * spark.sql.catalog.<CATALOG_NAME>=org.apache.polaris.spark.SparkCatalog
+   */
+  private static String getCatalogName() {
+    SparkSession spark = SparkSession.active();
+    String catalogPrefix = "spark.sql.catalog.";
+    String polarisSparkCatalog = "org.apache.polaris.spark.SparkCatalog";
+
+    scala.collection.Iterator<scala.Tuple2<String, String>> configIterator =
+        spark.conf().getAll().iterator();
+    while (configIterator.hasNext()) {
+      scala.Tuple2<String, String> config = configIterator.next();
+      String key = config._1();
+      String value = config._2();
+
+      if (key.startsWith(catalogPrefix) && polarisSparkCatalog.equals(value)) {
+        return key.substring(catalogPrefix.length());
+      }
+    }
+
+    throw new IllegalStateException(
+        "Could not obtain Polaris catalog identifier."
+            + "Expected following configuration to be set in session: 
spark.sql.catalog.<CATALOG_NAME>=org.apache.polaris.spark.SparkCatalog");
+  }
+
+  public static Table loadV1SparkHudiTable(GenericTable genericTable, 
Identifier identifier) {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.putAll(genericTable.getProperties());
+    tableProperties.put(

Review Comment:
   this part is the same as the loadSparkTable where we are updating the table 
property for spark load table. how about extract a common function called 
noralizeTablePropertyForLoadSparkTable, and put all property handling in the 
function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to