MonkeyCanCode commented on code in PR #4612:
URL: https://github.com/apache/polaris/pull/4612#discussion_r3357988240


##########
plugins/spark/v4.0/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import com.google.common.collect.Maps;
+import java.lang.reflect.Field;
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.RESTSessionCatalog;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.polaris.spark.rest.GenericTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.execution.datasources.DataSource;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.immutable.Map$;
+import scala.collection.mutable.Builder;
+
+public class PolarisCatalogUtils {
+
+  public static final String TABLE_PROVIDER_KEY = "provider";
+  public static final String TABLE_PATH_KEY = "path";
+
+  /** Check whether the table provider is iceberg. */
+  public static boolean useIceberg(String provider) {
+    return provider == null || "iceberg".equalsIgnoreCase(provider);
+  }
+
+  /** Check whether the table provider is delta. */
+  public static boolean useDelta(String provider) {
+    return "delta".equalsIgnoreCase(provider);
+  }
+
+  public static boolean useHudi(String provider) {
+    return "hudi".equalsIgnoreCase(provider);
+  }
+
+  /** Check whether the table provider is paimon. */
+  public static boolean usePaimon(String provider) {
+    return "paimon".equalsIgnoreCase(provider);
+  }
+
+  /**
+   * For tables whose location is managed by Spark Session Catalog, there will 
be no location or
+   * path in the properties.
+   */
+  public static boolean isTableWithSparkManagedLocation(Map<String, String> 
properties) {
+    boolean hasLocationClause = 
properties.containsKey(TableCatalog.PROP_LOCATION);
+    boolean hasPathClause = properties.containsKey(TABLE_PATH_KEY);
+    return !hasLocationClause && !hasPathClause;
+  }
+
+  /**
+   * Normalize table properties for loading Spark tables by ensuring the 
TABLE_PATH_KEY is properly
+   * set. DataSourceV2 requires the path property on table loading.
+   */
+  private static Map<String, String> normalizeTablePropertiesForLoadSparkTable(
+      GenericTable genericTable) {
+    Map<String, String> properties = genericTable.properties();
+    boolean hasLocationClause = properties.get(TableCatalog.PROP_LOCATION) != 
null;
+    boolean hasPathClause = properties.get(TABLE_PATH_KEY) != null;
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.putAll(properties);
+    if (!hasPathClause) {
+      // DataSourceV2 requires the path property on table loading. However, 
spark today
+      // doesn't create the corresponding path property if the path keyword is 
not
+      // provided by user when location is provided. Here, we duplicate the 
location
+      // property as path to make sure the table can be loaded.
+      if (genericTable.baseLocation() != null && 
!genericTable.baseLocation().isEmpty()) {
+        tableProperties.put(TABLE_PATH_KEY, genericTable.baseLocation());
+      } else if (hasLocationClause) {
+        tableProperties.put(TABLE_PATH_KEY, 
properties.get(TableCatalog.PROP_LOCATION));
+      }
+    }
+    return tableProperties;
+  }
+
+  /**
+   * Load spark table using DataSourceV2.
+   *
+   * @return V2Table if DataSourceV2 is available for the table format. For 
delta table, it returns
+   *     DeltaTableV2.
+   */
+  public static Table loadV2SparkTable(GenericTable genericTable) {
+    SparkSession sparkSession = SparkSession.active();
+    TableProvider provider =
+        DataSource.lookupDataSourceV2(genericTable.format(), 
sparkSession.sessionState().conf())
+            .get();
+    Map<String, String> tableProperties = 
normalizeTablePropertiesForLoadSparkTable(genericTable);
+    return DataSourceV2Utils.getTableFromProvider(
+        provider, new CaseInsensitiveStringMap(tableProperties), 
scala.Option.empty());
+  }
+
+  /**
+   * Return a Spark V1Table for formats that do not use DataSourceV2. 
Currently, this is being used
+   * for Hudi tables
+   */
+  public static Table loadV1SparkTable(
+      GenericTable genericTable, Identifier identifier, String catalogName) {
+    Map<String, String> tableProperties = 
normalizeTablePropertiesForLoadSparkTable(genericTable);
+
+    // Need full identifier in order to construct CatalogTable
+    String namespacePath = String.join(".", identifier.namespace());
+    TableIdentifier tableIdentifier =
+        new TableIdentifier(
+            identifier.name(), Option.apply(namespacePath), 
Option.apply(catalogName));
+
+    Builder<Tuple2<String, String>, scala.collection.immutable.Map<String, 
String>> mb =
+        Map$.MODULE$.newBuilder();
+    tableProperties.forEach((k, v) -> mb.$plus$eq(Tuple2.apply(k, v)));
+    scala.collection.immutable.Map<String, String> scalaOptions = mb.result();
+
+    org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat storage =
+        DataSource.buildStorageFormatFromOptions(scalaOptions);
+
+    // Currently Polaris generic table does not contain any schema 
information, partition columns,
+    // stats, etc
+    // for now we will fill the parameters we have from polaris catalog, and 
let underlying client
+    // resolve the rest within its catalog implementation
+    org.apache.spark.sql.types.StructType emptySchema = new 
org.apache.spark.sql.types.StructType();
+    @SuppressWarnings("deprecation")
+    scala.collection.immutable.Seq<String> emptyStringSeq =
+        scala.collection.JavaConverters.asScalaBuffer(new 
java.util.ArrayList<String>()).toList();
+    CatalogTable catalogTable =
+        new CatalogTable(
+            tableIdentifier,
+            CatalogTableType.EXTERNAL(),
+            storage,
+            emptySchema,
+            Option.apply(genericTable.format()),
+            emptyStringSeq,
+            scala.Option.empty(),
+            genericTable.properties().getOrDefault(TableCatalog.PROP_OWNER, 
""),
+            System.currentTimeMillis(),
+            -1L,
+            "",
+            scalaOptions,
+            scala.Option.empty(),

Review Comment:
   All good @dimas-b. Yes, one extra column here. Same responded above. Any 
classes with diff, I currently have them not in the common due to using 
`sourceSets`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to