rdblue commented on a change in pull request #624: Update SparkTableUtil to use
SessionCatalog and proper MetricsConfig
URL: https://github.com/apache/incubator-iceberg/pull/624#discussion_r344419870
##########
File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
##########
@@ -27,86 +27,153 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.iceberg.{DataFile, DataFiles, FileFormat, ManifestFile,
ManifestWriter}
import org.apache.iceberg.{Metrics, MetricsConfig, PartitionSpec, Table}
import org.apache.iceberg.exceptions.NoSuchTableException
-import org.apache.iceberg.hadoop.{HadoopFileIO, HadoopInputFile, HadoopTables,
SerializableConfiguration}
+import org.apache.iceberg.hadoop.{HadoopFileIO, HadoopInputFile,
SerializableConfiguration}
import org.apache.iceberg.orc.OrcMetrics
import org.apache.iceberg.parquet.ParquetUtil
-import org.apache.iceberg.spark.hacks.Hive
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
CatalogTablePartition}
+import org.apache.spark.sql.catalyst.expressions.Expression
import scala.collection.JavaConverters._
+import scala.util.Try
object SparkTableUtil {
/**
- * Returns a DataFrame with a row for each partition in the table.
- *
- * The DataFrame has 3 columns, partition key (a=1/b=2), partition location,
and format
- * (avro or parquet).
+ * Returns all partitions in the table.
*
* @param spark a Spark session
* @param table a table name and (optional) database
- * @return a DataFrame of the table's partitions
+ * @return all table's partitions
*/
- def partitionDF(spark: SparkSession, table: String): DataFrame = {
- import spark.implicits._
+ def getPartitions(spark: SparkSession, table: String): Seq[SparkPartition] =
{
+ val tableIdentifier =
spark.sessionState.sqlParser.parseTableIdentifier(table)
+ getPartitions(spark, tableIdentifier)
+ }
- val partitions: Seq[(Map[String, String], Option[String], Option[String])]
=
- Hive.partitions(spark, table).map { p: CatalogTablePartition =>
- (p.spec, p.storage.locationUri.map(String.valueOf(_)), p.storage.serde)
- }
+ /**
+ * Returns all partitions in the table.
+ *
+ * @param spark a Spark session
+ * @param tableIdent a table identifier
+ * @return all table's partitions
+ */
+ def getPartitions(spark: SparkSession, tableIdent: TableIdentifier):
Seq[SparkPartition] = {
+ val catalog = spark.sessionState.catalog
+ val catalogTable = catalog.getTableMetadata(tableIdent)
- partitions.toDF("partition", "uri", "format")
+ catalog
+ .listPartitions(tableIdent)
+ .map(catalogPartition => toSparkPartition(catalogPartition,
catalogTable))
}
/**
- * Returns a DataFrame with a row for each partition that matches the
specified 'expression'.
- *
- * @param spark a Spark session.
- * @param table name of the table.
- * @param expression The expression whose matching partitions are returned.
- * @return a DataFrame of the table partitions.
- */
- def partitionDFByFilter(spark: SparkSession, table: String, expression:
String): DataFrame = {
- import spark.implicits._
+ * Returns partitions that match the specified 'predicate'.
+ *
+ * @param spark a Spark session
+ * @param table a table name and (optional) database
+ * @param predicate a predicate on partition columns
+ * @return matching table's partitions
+ */
+ def getPartitionsByFilter(spark: SparkSession, table: String, predicate:
String): Seq[SparkPartition] = {
+ val tableIdentifier =
spark.sessionState.sqlParser.parseTableIdentifier(table)
+
+ val unresolvedPredicateExpr =
spark.sessionState.sqlParser.parseExpression(predicate)
+ val resolver = spark.sessionState.analyzer.resolver
+ val plan = spark.table(table).queryExecution.analyzed
+ val resolvedPredicateExpr = unresolvedPredicateExpr.transform {
+ case attr: UnresolvedAttribute =>
+ plan.resolve(attr.nameParts, resolver) match {
+ case Some(resolvedAttr) => resolvedAttr
+ case None => throw new IllegalArgumentException(s"Could not resolve
$attr using columns: ${plan.output}")
+ }
+ }
- val expr = spark.sessionState.sqlParser.parseExpression(expression)
- val partitions: Seq[(Map[String, String], Option[String], Option[String])]
=
- Hive.partitionsByFilter(spark, table, expr).map { p:
CatalogTablePartition =>
- (p.spec, p.storage.locationUri.map(String.valueOf(_)), p.storage.serde)
- }
+ getPartitionsByFilter(spark, tableIdentifier, resolvedPredicateExpr)
+ }
+
+ /**
+ * Returns partitions that match the specified 'predicate'.
+ *
+ * @param spark a Spark session
+ * @param tableIdent a table identifier
+ * @param predicate a predicate on partition columns
+ * @return matching table's partitions
+ */
+ def getPartitionsByFilter(
+ spark: SparkSession,
+ tableIdent: TableIdentifier,
+ predicate: Expression): Seq[SparkPartition] = {
+
+ if (!predicate.resolved) {
+ throw new IllegalArgumentException(s"$predicate is not resolved")
+ }
- partitions.toDF("partition", "uri", "format")
+ val catalog = spark.sessionState.catalog
+ val catalogTable = catalog.getTableMetadata(tableIdent)
+
+ catalog
+ .listPartitionsByFilter(tableIdent, Seq(predicate))
+ .map(catalogPartition => toSparkPartition(catalogPartition,
catalogTable))
}
/**
* Returns the data files in a partition by listing the partition location.
*
- * For Parquet partitions, this will read metrics from the file footer. For
Avro partitions,
+ * For Parquet and ORC partitions, this will read metrics from the file
footer. For Avro partitions,
+ * metrics are set to null.
+ *
+ * @param partition a partition
+ * @param conf a serializable Hadoop conf
+ * @param metricsConfig a metrics conf
+ * @return a Seq of [[SparkDataFile]]
+ */
+ def listPartition(
+ partition: SparkPartition,
+ conf: SerializableConfiguration,
+ metricsConfig: MetricsConfig): Seq[SparkDataFile] = {
Review comment:
Why not default this MetricsConfig as well?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]