rymurr commented on a change in pull request #1783:
URL: https://github.com/apache/iceberg/pull/1783#discussion_r540133030
##########
File path: site/docs/spark.md
##########
@@ -325,6 +325,21 @@ spark.read
Time travel is not yet supported by Spark's SQL syntax.
+### Table names and paths
+
+Paths and table names can be loaded from the Spark3 dataframe interface. How
paths/tables are loaded depends on how
+the identifier is specified. When using
`spark.read().format("iceberg").path(table)` or `spark.table(table)` the `table`
+variable can take a number of forms as listed below:
+
+* `file:/path/to/table` -> loads a HadoopTable at given path
+* ```catalog.`file:/path/to/table` ``` -> fails. Don't set a catalog for paths
+* ```catalog.namespace.`file:/path/to/table` ``` -> fails. Namespace doesn't
exist for paths
Review comment:
done
##########
File path:
spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -20,21 +20,45 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.PathIdentifier;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.CatalogManager;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsCatalogOptions;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-public class IcebergSource implements DataSourceRegister, TableProvider {
+/**
+ * The IcebergSource loads/writes tables with format "iceberg". It can load
paths and tables.
+ *
+ * How paths/tables are loaded when using
spark.read().format("iceberg").path(table)
+ *
+ * table = "file:/path/to/table" -> loads a HadoopTable at given path
+ * table = "catalog.`file:/path/to/table`" -> fails. Don't set a catalog for
paths
+ * table = "catalog.namespace.`file:/path/to/table`" -> fails. Namespace
doesn't exist for paths
Review comment:
done
##########
File path:
spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -20,21 +20,45 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.PathIdentifier;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.catalog.CatalogManager;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsCatalogOptions;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-public class IcebergSource implements DataSourceRegister, TableProvider {
+/**
+ * The IcebergSource loads/writes tables with format "iceberg". It can load
paths and tables.
+ *
+ * How paths/tables are loaded when using
spark.read().format("iceberg").path(table)
+ *
+ * table = "file:/path/to/table" -> loads a HadoopTable at given path
+ * table = "catalog.`file:/path/to/table`" -> fails. Don't set a catalog for
paths
+ * table = "catalog.namespace.`file:/path/to/table`" -> fails. Namespace
doesn't exist for paths
+ * table = "tablename" -> loads currentCatalog.currentNamespace.tablename
+ * table = "catalog.tablename" -> load "tablename" from the specified catalog.
+ * table = "namespace.tablename" -> Otherwise load "namespace.tablename" from
current catalog
Review comment:
done
##########
File path:
spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -56,48 +80,70 @@ public boolean supportsExternalMetadata() {
}
@Override
- public SparkTable getTable(StructType schema, Transform[] partitioning,
Map<String, String> options) {
- // Get Iceberg table from options
- Configuration conf = SparkSession.active().sessionState().newHadoopConf();
- Table icebergTable = getTableAndResolveHadoopConfiguration(options, conf);
-
- // Build Spark table based on Iceberg table, and return it
- // Eagerly refresh the table before reading to ensure views containing
this table show up-to-date data
- return new SparkTable(icebergTable, schema, true);
+ public Table getTable(StructType schema, Transform[] partitioning,
Map<String, String> options) {
+ String catalogName = extractCatalog(new CaseInsensitiveStringMap(options));
+ Identifier ident = extractIdentifier(new
CaseInsensitiveStringMap(options));
+ CatalogManager catalogManager =
SparkSession.active().sessionState().catalogManager();
+ CatalogPlugin catalog = catalogManager.catalog(catalogName);
+ try {
+ if (catalog instanceof TableCatalog) {
+ return ((TableCatalog) catalog).loadTable(ident);
+ }
+ } catch (NoSuchTableException e) {
+ // throwing an iceberg NoSuchTableException because the Spark one is
typed and cant be thrown from this interface
+ throw new org.apache.iceberg.exceptions.NoSuchTableException(e, "Cannot
find table for %s.", ident);
+ }
+ // throwing an iceberg NoSuchTableException because the Spark one is typed
and cant be thrown from this interface
Review comment:
done.
##########
File path:
spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -56,48 +80,70 @@ public boolean supportsExternalMetadata() {
}
@Override
- public SparkTable getTable(StructType schema, Transform[] partitioning,
Map<String, String> options) {
- // Get Iceberg table from options
- Configuration conf = SparkSession.active().sessionState().newHadoopConf();
- Table icebergTable = getTableAndResolveHadoopConfiguration(options, conf);
-
- // Build Spark table based on Iceberg table, and return it
- // Eagerly refresh the table before reading to ensure views containing
this table show up-to-date data
- return new SparkTable(icebergTable, schema, true);
+ public Table getTable(StructType schema, Transform[] partitioning,
Map<String, String> options) {
+ String catalogName = extractCatalog(new CaseInsensitiveStringMap(options));
+ Identifier ident = extractIdentifier(new
CaseInsensitiveStringMap(options));
Review comment:
yup, this wasn't refactored when I removed some code from the extract
methods. Now that they don't do anything I have cleaned up the `getTable`
method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]