eric-maynard commented on code in PR #1862: URL: https://github.com/apache/polaris/pull/1862#discussion_r2243622061
########## plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java: ########## @@ -87,10 +95,80 @@ public static Table loadSparkTable(GenericTable genericTable) { tableProperties.put(TABLE_PATH_KEY, properties.get(TableCatalog.PROP_LOCATION)); } } + return tableProperties; + } + + /** + * Load spark table using DataSourceV2. + * + * @return V2Table if DataSourceV2 is available for the table format. For delta table, it returns + * DeltaTableV2. + */ + public static Table loadV2SparkTable(GenericTable genericTable) { + SparkSession sparkSession = SparkSession.active(); + TableProvider provider = + DataSource.lookupDataSourceV2(genericTable.getFormat(), sparkSession.sessionState().conf()) + .get(); + Map<String, String> tableProperties = normalizeTablePropertiesForLoadSparkTable(genericTable); return DataSourceV2Utils.getTableFromProvider( provider, new CaseInsensitiveStringMap(tableProperties), scala.Option.empty()); } + /** + * Return a Spark V1Table for formats that do not use DataSourceV2. Currently, this is being used + * for Hudi tables + */ + public static Table loadV1SparkTable( + GenericTable genericTable, Identifier identifier, String catalogName) { + Map<String, String> tableProperties = normalizeTablePropertiesForLoadSparkTable(genericTable); + + // Need full identifier in order to construct CatalogTable + String namespacePath = String.join(".", identifier.namespace()); + TableIdentifier tableIdentifier = + new TableIdentifier( + identifier.name(), Option.apply(namespacePath), Option.apply(catalogName)); + + scala.collection.immutable.Map<String, String> scalaOptions = + (scala.collection.immutable.Map<String, String>) + scala.collection.immutable.Map$.MODULE$.apply( + scala.collection.JavaConverters.mapAsScalaMap(tableProperties).toSeq()); + + org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat storage = + DataSource.buildStorageFormatFromOptions(scalaOptions); + + // Currently Polaris generic table does not contain any schema information, partition columns, + // stats, etc + // for now we will just use fill the parameters we have from catalog, and let underlying client + // resolve the rest within its catalog implementation + org.apache.spark.sql.types.StructType emptySchema = new org.apache.spark.sql.types.StructType(); + scala.collection.immutable.Seq<String> emptyStringSeq = + scala.collection.JavaConverters.asScalaBuffer(new java.util.ArrayList<String>()).toList(); + CatalogTable catalogTable = + new CatalogTable( + tableIdentifier, + CatalogTableType.EXTERNAL(), + storage, + emptySchema, + Option.apply(genericTable.getFormat()), + emptyStringSeq, + scala.Option.empty(), + genericTable.getProperties().get("owner"), Review Comment: The ask is that, ideally, we can re-use the existing constant (which based on your comment looks to be coming from [here](https://github.com/apache/spark/blob/dc8fba647ac1042fd83a8ecbeadb45687c784b5b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java#L89)). Barring that, yes, please make a new constant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org