eric-maynard commented on code in PR #1862:
URL: https://github.com/apache/polaris/pull/1862#discussion_r2243622061


##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java:
##########
@@ -87,10 +95,80 @@ public static Table loadSparkTable(GenericTable 
genericTable) {
         tableProperties.put(TABLE_PATH_KEY, 
properties.get(TableCatalog.PROP_LOCATION));
       }
     }
+    return tableProperties;
+  }
+
+  /**
+   * Load spark table using DataSourceV2.
+   *
+   * @return V2Table if DataSourceV2 is available for the table format. For 
delta table, it returns
+   *     DeltaTableV2.
+   */
+  public static Table loadV2SparkTable(GenericTable genericTable) {
+    SparkSession sparkSession = SparkSession.active();
+    TableProvider provider =
+        DataSource.lookupDataSourceV2(genericTable.getFormat(), 
sparkSession.sessionState().conf())
+            .get();
+    Map<String, String> tableProperties = 
normalizeTablePropertiesForLoadSparkTable(genericTable);
     return DataSourceV2Utils.getTableFromProvider(
         provider, new CaseInsensitiveStringMap(tableProperties), 
scala.Option.empty());
   }
 
+  /**
+   * Return a Spark V1Table for formats that do not use DataSourceV2. 
Currently, this is being used
+   * for Hudi tables
+   */
+  public static Table loadV1SparkTable(
+      GenericTable genericTable, Identifier identifier, String catalogName) {
+    Map<String, String> tableProperties = 
normalizeTablePropertiesForLoadSparkTable(genericTable);
+
+    // Need full identifier in order to construct CatalogTable
+    String namespacePath = String.join(".", identifier.namespace());
+    TableIdentifier tableIdentifier =
+        new TableIdentifier(
+            identifier.name(), Option.apply(namespacePath), 
Option.apply(catalogName));
+
+    scala.collection.immutable.Map<String, String> scalaOptions =
+        (scala.collection.immutable.Map<String, String>)
+            scala.collection.immutable.Map$.MODULE$.apply(
+                
scala.collection.JavaConverters.mapAsScalaMap(tableProperties).toSeq());
+
+    org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat storage =
+        DataSource.buildStorageFormatFromOptions(scalaOptions);
+
+    // Currently Polaris generic table does not contain any schema 
information, partition columns,
+    // stats, etc
+    // for now we will just use fill the parameters we have from catalog, and 
let underlying client
+    // resolve the rest within its catalog implementation
+    org.apache.spark.sql.types.StructType emptySchema = new 
org.apache.spark.sql.types.StructType();
+    scala.collection.immutable.Seq<String> emptyStringSeq =
+        scala.collection.JavaConverters.asScalaBuffer(new 
java.util.ArrayList<String>()).toList();
+    CatalogTable catalogTable =
+        new CatalogTable(
+            tableIdentifier,
+            CatalogTableType.EXTERNAL(),
+            storage,
+            emptySchema,
+            Option.apply(genericTable.getFormat()),
+            emptyStringSeq,
+            scala.Option.empty(),
+            genericTable.getProperties().get("owner"),

Review Comment:
   The ask is that, ideally, we can re-use the existing constant (which based 
on your comment looks to be coming from 
[here](https://github.com/apache/spark/blob/dc8fba647ac1042fd83a8ecbeadb45687c784b5b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java#L89)).
 Barring that, yes, please make a new constant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to