eric-maynard commented on code in PR #1862:
URL: https://github.com/apache/polaris/pull/1862#discussion_r2240213615


##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java:
##########
@@ -87,10 +95,80 @@ public static Table loadSparkTable(GenericTable 
genericTable) {
         tableProperties.put(TABLE_PATH_KEY, 
properties.get(TableCatalog.PROP_LOCATION));
       }
     }
+    return tableProperties;
+  }
+
+  /**
+   * Load spark table using DataSourceV2.
+   *
+   * @return V2Table if DataSourceV2 is available for the table format. For 
delta table, it returns
+   *     DeltaTableV2.
+   */
+  public static Table loadV2SparkTable(GenericTable genericTable) {
+    SparkSession sparkSession = SparkSession.active();
+    TableProvider provider =
+        DataSource.lookupDataSourceV2(genericTable.getFormat(), 
sparkSession.sessionState().conf())
+            .get();
+    Map<String, String> tableProperties = 
normalizeTablePropertiesForLoadSparkTable(genericTable);
     return DataSourceV2Utils.getTableFromProvider(
         provider, new CaseInsensitiveStringMap(tableProperties), 
scala.Option.empty());
   }
 
+  /**
+   * Return a Spark V1Table for formats that do not use DataSourceV2. 
Currently, this is being used
+   * for Hudi tables
+   */
+  public static Table loadV1SparkTable(
+      GenericTable genericTable, Identifier identifier, String catalogName) {
+    Map<String, String> tableProperties = 
normalizeTablePropertiesForLoadSparkTable(genericTable);
+
+    // Need full identifier in order to construct CatalogTable
+    String namespacePath = String.join(".", identifier.namespace());
+    TableIdentifier tableIdentifier =
+        new TableIdentifier(
+            identifier.name(), Option.apply(namespacePath), 
Option.apply(catalogName));
+
+    scala.collection.immutable.Map<String, String> scalaOptions =
+        (scala.collection.immutable.Map<String, String>)
+            scala.collection.immutable.Map$.MODULE$.apply(
+                
scala.collection.JavaConverters.mapAsScalaMap(tableProperties).toSeq());
+
+    org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat storage =
+        DataSource.buildStorageFormatFromOptions(scalaOptions);
+
+    // Currently Polaris generic table does not contain any schema 
information, partition columns,
+    // stats, etc
+    // for now we will just use fill the parameters we have from catalog, and 
let underlying client
+    // resolve the rest within its catalog implementation
+    org.apache.spark.sql.types.StructType emptySchema = new 
org.apache.spark.sql.types.StructType();
+    scala.collection.immutable.Seq<String> emptyStringSeq =
+        scala.collection.JavaConverters.asScalaBuffer(new 
java.util.ArrayList<String>()).toList();
+    CatalogTable catalogTable =
+        new CatalogTable(
+            tableIdentifier,
+            CatalogTableType.EXTERNAL(),
+            storage,
+            emptySchema,
+            Option.apply(genericTable.getFormat()),
+            emptyStringSeq,
+            scala.Option.empty(),
+            genericTable.getProperties().get("owner"),

Review Comment:
   This property currently isn't defined anywhere. Are you somehow setting it 
on writes?
   
   If so, this should be a constant somewhere. If not, you should remove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to