aokolnychyi commented on a change in pull request #1126: URL: https://github.com/apache/iceberg/pull/1126#discussion_r443082387
########## File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java ########## @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.OrcMetrics; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.spark.TaskContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.NamedExpression; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import scala.Function2; +import scala.Option; +import scala.Some; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.runtime.AbstractPartialFunction; + +import static org.apache.spark.sql.functions.col; + +/** + * Java version of the original SparkTableUtil.scala + */ +public class SparkTableUtil { + + private static final PathFilter HIDDEN_PATH_FILTER = + p -> !p.getName().startsWith("_") && !p.getName().startsWith("."); + + private SparkTableUtil() { + } + + /** + * Returns a DataFrame with a row for each partition in the table. + * + * The DataFrame has 3 columns, partition key (a=1/b=2), partition location, and format + * (avro or parquet). + * + * @param spark a Spark session + * @param table a table name and (optional) database + * @return a DataFrame of the table's partitions + */ + public static Dataset<Row> partitionDF(SparkSession spark, String table) + throws NoSuchDatabaseException, NoSuchTableException, ParseException { + + List<SparkPartition> partitions = getPartitions(spark, table); + return spark.createDataFrame(partitions, SparkPartition.class).toDF("partition", "uri", "format"); + } + + /** + * Returns a DataFrame with a row for each partition that matches the specified 'expression'. + * + * @param spark a Spark session. + * @param table name of the table. + * @param expression The expression whose matching partitions are returned. + * @return a DataFrame of the table partitions. + */ + public static Dataset<Row> partitionDFByFilter( + SparkSession spark, + String table, + String expression) throws ParseException, NoSuchTableException, NoSuchDatabaseException { + + List<SparkPartition> partitions = getPartitionsByFilter(spark, table, expression); + return spark.createDataFrame(partitions, SparkPartition.class).toDF("partition", "uri", "format"); + } + + /** + * Returns all partitions in the table. + * + * @param spark a Spark session + * @param table a table name and (optional) database + * @return all table's partitions + */ + public static List<SparkPartition> getPartitions(SparkSession spark, String table) + throws NoSuchTableException, NoSuchDatabaseException, ParseException { + + TableIdentifier tableIdent = spark.sessionState().sqlParser().parseTableIdentifier(table); + return getPartitions(spark, tableIdent); + } + + /** + * Returns all partitions in the table. + * + * @param spark a Spark session + * @param tableIdent a table identifier + * @return all table's partitions + */ + public static List<SparkPartition> getPartitions(SparkSession spark, TableIdentifier tableIdent) + throws NoSuchTableException, NoSuchDatabaseException { + SessionCatalog catalog = spark.sessionState().catalog(); + CatalogTable catalogTable = catalog.getTableMetadata(tableIdent); + + return JavaConverters + .seqAsJavaListConverter(catalog.listPartitions(tableIdent, Option.empty())) + .asJava() + .stream() + .map(catalogPartition -> toSparkPartition(catalogPartition, catalogTable)) + .collect(Collectors.toList()); + } + + /** + * Returns partitions that match the specified 'predicate'. + * + * @param spark a Spark session + * @param table a table name and (optional) database + * @param predicate a predicate on partition columns + * @return matching table's partitions + */ + public static List<SparkPartition> getPartitionsByFilter(SparkSession spark, String table, String predicate) + throws ParseException, NoSuchTableException, NoSuchDatabaseException { + + TableIdentifier tableIdent = spark.sessionState().sqlParser().parseTableIdentifier(table); + Expression unresolvedPredicateExpr = spark.sessionState().sqlParser().parseExpression(predicate); + Expression resolvedPredicateExpr = resolveAttrs(spark, table, unresolvedPredicateExpr); + return getPartitionsByFilter(spark, tableIdent, resolvedPredicateExpr); + } + + /** + * Returns partitions that match the specified 'predicate'. + * + * @param spark a Spark session + * @param tableIdent a table identifier + * @param predicateExpr a predicate expression on partition columns + * @return matching table's partitions + */ + public static List<SparkPartition> getPartitionsByFilter( + SparkSession spark, + TableIdentifier tableIdent, + Expression predicateExpr) throws NoSuchTableException, NoSuchDatabaseException { + + SessionCatalog catalog = spark.sessionState().catalog(); + CatalogTable catalogTable = catalog.getTableMetadata(tableIdent); + + final Expression resolvedPredicateExpr; + if (!predicateExpr.resolved()) { + resolvedPredicateExpr = resolveAttrs(spark, tableIdent.quotedString(), predicateExpr); + } else { + resolvedPredicateExpr = predicateExpr; + } + + return JavaConverters.seqAsJavaListConverter( + catalog.listPartitionsByFilter( + tableIdent, + JavaConverters + .collectionAsScalaIterableConverter(ImmutableList.of(resolvedPredicateExpr)) + .asScala() + .toSeq())) + .asJava() + .stream() + .map(catalogPartition -> toSparkPartition(catalogPartition, catalogTable)) + .collect(Collectors.toList()); + } + + /** + * Returns the data files in a partition by listing the partition location. + * + * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions, + * metrics are set to null. + * + * @param partition a partition + * @param conf a serializable Hadoop conf + * @param metricsConfig a metrics conf + * @return a List of [[DataFile]] + */ + public static List<DataFile> listPartition( + SparkPartition partition, + PartitionSpec spec, + SerializableConfiguration conf, + MetricsConfig metricsConfig) throws IOException { + + return listPartition( + partition.values, + partition.uri, + partition.format, + spec, + conf.get(), + metricsConfig); + } + + /** + * Returns the data files in a partition by listing the partition location. + * + * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions, + * metrics are set to null. + * + * @param partition partition key, e.g., "a=1/b=2" + * @param uri partition location URI + * @param format partition format, avro or parquet + * @param conf a Hadoop conf + * @param metricsConfig a metrics conf + * @return a List of [[DataFile]] + */ + public static List<DataFile> listPartition( + Map<String, String> partition, + String uri, + String format, + PartitionSpec spec, + Configuration conf, + MetricsConfig metricsConfig) throws IOException { + + if (format.contains("avro")) { + return listAvroPartition(partition, uri, spec, conf); + } else if (format.contains("parquet")) { + return listParquetPartition(partition, uri, spec, conf, metricsConfig); + } else if (format.contains("orc")) { + // TODO: use MetricsConfig in listOrcPartition + return listOrcPartition(partition, uri, spec, conf); + } else { + throw new UnsupportedOperationException("Unknown partition format: " + format); + } + } + + private static List<DataFile> listAvroPartition( + Map<String, String> partitionPath, + String partitionUri, + PartitionSpec spec, + Configuration conf) throws IOException { + + Path partition = new Path(partitionUri); + FileSystem fs = partition.getFileSystem(conf); + return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER)) + .filter(FileStatus::isFile) + .map(stat -> { + Metrics metrics = new Metrics(-1L, null, null, null); + String partitionKey = spec.fields().stream() + .map(PartitionField::name) + .map(name -> String.format("%s=%s", name, partitionPath.get(name))) + .collect(Collectors.joining("/")); + + return DataFiles.builder(spec) + .withPath(stat.getPath().toString()) + .withFormat("avro") + .withFileSizeInBytes(stat.getLen()) + .withMetrics(metrics) + .withPartitionPath(partitionKey) + .build(); + + }).collect(Collectors.toList()); + } + + private static List<DataFile> listParquetPartition( + Map<String, String> partitionPath, + String partitionUri, + PartitionSpec spec, + Configuration conf, + MetricsConfig metricsSpec) throws IOException { + + Path partition = new Path(partitionUri); + FileSystem fs = partition.getFileSystem(conf); + + return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER)) + .filter(FileStatus::isFile) + .map(stat -> { + final Metrics metrics; Review comment: nit: final ########## File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java ########## @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.OrcMetrics; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.spark.TaskContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.NamedExpression; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import scala.Function2; +import scala.Option; +import scala.Some; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.runtime.AbstractPartialFunction; + +import static org.apache.spark.sql.functions.col; + +/** + * Java version of the original SparkTableUtil.scala + */ +public class SparkTableUtil { + + private static final PathFilter HIDDEN_PATH_FILTER = + p -> !p.getName().startsWith("_") && !p.getName().startsWith("."); + + private SparkTableUtil() { + } + + /** + * Returns a DataFrame with a row for each partition in the table. + * + * The DataFrame has 3 columns, partition key (a=1/b=2), partition location, and format + * (avro or parquet). + * + * @param spark a Spark session + * @param table a table name and (optional) database + * @return a DataFrame of the table's partitions + */ + public static Dataset<Row> partitionDF(SparkSession spark, String table) + throws NoSuchDatabaseException, NoSuchTableException, ParseException { + + List<SparkPartition> partitions = getPartitions(spark, table); + return spark.createDataFrame(partitions, SparkPartition.class).toDF("partition", "uri", "format"); + } + + /** + * Returns a DataFrame with a row for each partition that matches the specified 'expression'. + * + * @param spark a Spark session. + * @param table name of the table. + * @param expression The expression whose matching partitions are returned. + * @return a DataFrame of the table partitions. + */ + public static Dataset<Row> partitionDFByFilter( + SparkSession spark, + String table, + String expression) throws ParseException, NoSuchTableException, NoSuchDatabaseException { + + List<SparkPartition> partitions = getPartitionsByFilter(spark, table, expression); + return spark.createDataFrame(partitions, SparkPartition.class).toDF("partition", "uri", "format"); + } + + /** + * Returns all partitions in the table. + * + * @param spark a Spark session + * @param table a table name and (optional) database + * @return all table's partitions + */ + public static List<SparkPartition> getPartitions(SparkSession spark, String table) + throws NoSuchTableException, NoSuchDatabaseException, ParseException { + + TableIdentifier tableIdent = spark.sessionState().sqlParser().parseTableIdentifier(table); + return getPartitions(spark, tableIdent); + } + + /** + * Returns all partitions in the table. + * + * @param spark a Spark session + * @param tableIdent a table identifier + * @return all table's partitions + */ + public static List<SparkPartition> getPartitions(SparkSession spark, TableIdentifier tableIdent) + throws NoSuchTableException, NoSuchDatabaseException { + SessionCatalog catalog = spark.sessionState().catalog(); + CatalogTable catalogTable = catalog.getTableMetadata(tableIdent); + + return JavaConverters + .seqAsJavaListConverter(catalog.listPartitions(tableIdent, Option.empty())) + .asJava() + .stream() + .map(catalogPartition -> toSparkPartition(catalogPartition, catalogTable)) + .collect(Collectors.toList()); + } + + /** + * Returns partitions that match the specified 'predicate'. + * + * @param spark a Spark session + * @param table a table name and (optional) database + * @param predicate a predicate on partition columns + * @return matching table's partitions + */ + public static List<SparkPartition> getPartitionsByFilter(SparkSession spark, String table, String predicate) + throws ParseException, NoSuchTableException, NoSuchDatabaseException { + + TableIdentifier tableIdent = spark.sessionState().sqlParser().parseTableIdentifier(table); + Expression unresolvedPredicateExpr = spark.sessionState().sqlParser().parseExpression(predicate); + Expression resolvedPredicateExpr = resolveAttrs(spark, table, unresolvedPredicateExpr); + return getPartitionsByFilter(spark, tableIdent, resolvedPredicateExpr); + } + + /** + * Returns partitions that match the specified 'predicate'. + * + * @param spark a Spark session + * @param tableIdent a table identifier + * @param predicateExpr a predicate expression on partition columns + * @return matching table's partitions + */ + public static List<SparkPartition> getPartitionsByFilter( + SparkSession spark, + TableIdentifier tableIdent, + Expression predicateExpr) throws NoSuchTableException, NoSuchDatabaseException { + + SessionCatalog catalog = spark.sessionState().catalog(); + CatalogTable catalogTable = catalog.getTableMetadata(tableIdent); + + final Expression resolvedPredicateExpr; + if (!predicateExpr.resolved()) { + resolvedPredicateExpr = resolveAttrs(spark, tableIdent.quotedString(), predicateExpr); + } else { + resolvedPredicateExpr = predicateExpr; + } + + return JavaConverters.seqAsJavaListConverter( + catalog.listPartitionsByFilter( + tableIdent, + JavaConverters + .collectionAsScalaIterableConverter(ImmutableList.of(resolvedPredicateExpr)) + .asScala() + .toSeq())) + .asJava() + .stream() + .map(catalogPartition -> toSparkPartition(catalogPartition, catalogTable)) + .collect(Collectors.toList()); + } + + /** + * Returns the data files in a partition by listing the partition location. + * + * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions, + * metrics are set to null. + * + * @param partition a partition + * @param conf a serializable Hadoop conf + * @param metricsConfig a metrics conf + * @return a List of [[DataFile]] + */ + public static List<DataFile> listPartition( + SparkPartition partition, + PartitionSpec spec, + SerializableConfiguration conf, + MetricsConfig metricsConfig) throws IOException { + + return listPartition( + partition.values, + partition.uri, + partition.format, + spec, + conf.get(), + metricsConfig); + } + + /** + * Returns the data files in a partition by listing the partition location. + * + * For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions, + * metrics are set to null. + * + * @param partition partition key, e.g., "a=1/b=2" + * @param uri partition location URI + * @param format partition format, avro or parquet + * @param conf a Hadoop conf + * @param metricsConfig a metrics conf + * @return a List of [[DataFile]] + */ + public static List<DataFile> listPartition( + Map<String, String> partition, + String uri, + String format, + PartitionSpec spec, + Configuration conf, + MetricsConfig metricsConfig) throws IOException { + + if (format.contains("avro")) { + return listAvroPartition(partition, uri, spec, conf); + } else if (format.contains("parquet")) { + return listParquetPartition(partition, uri, spec, conf, metricsConfig); + } else if (format.contains("orc")) { + // TODO: use MetricsConfig in listOrcPartition + return listOrcPartition(partition, uri, spec, conf); + } else { + throw new UnsupportedOperationException("Unknown partition format: " + format); + } + } + + private static List<DataFile> listAvroPartition( + Map<String, String> partitionPath, + String partitionUri, + PartitionSpec spec, + Configuration conf) throws IOException { + + Path partition = new Path(partitionUri); + FileSystem fs = partition.getFileSystem(conf); + return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER)) + .filter(FileStatus::isFile) + .map(stat -> { + Metrics metrics = new Metrics(-1L, null, null, null); + String partitionKey = spec.fields().stream() + .map(PartitionField::name) + .map(name -> String.format("%s=%s", name, partitionPath.get(name))) + .collect(Collectors.joining("/")); + + return DataFiles.builder(spec) + .withPath(stat.getPath().toString()) + .withFormat("avro") + .withFileSizeInBytes(stat.getLen()) + .withMetrics(metrics) + .withPartitionPath(partitionKey) + .build(); + + }).collect(Collectors.toList()); + } + + private static List<DataFile> listParquetPartition( + Map<String, String> partitionPath, + String partitionUri, + PartitionSpec spec, + Configuration conf, + MetricsConfig metricsSpec) throws IOException { + + Path partition = new Path(partitionUri); + FileSystem fs = partition.getFileSystem(conf); + + return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER)) + .filter(FileStatus::isFile) + .map(stat -> { + final Metrics metrics; + try { + //noinspection deprecation + metrics = ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat), metricsSpec); + } catch (IOException e) { + throw new RuntimeException("Unable to read footer metrics for: " + stat.getPath(), e); + } + String partitionKey = spec.fields().stream() + .map(PartitionField::name) + .map(name -> String.format("%s=%s", name, partitionPath.get(name))) + .collect(Collectors.joining("/")); + + return DataFiles.builder(spec) + .withPath(stat.getPath().toString()) + .withFormat("parquet") + .withFileSizeInBytes(stat.getLen()) + .withMetrics(metrics) + .withPartitionPath(partitionKey) + .build(); + + }).collect(Collectors.toList()); + } + + private static List<DataFile> listOrcPartition( + Map<String, String> partitionPath, + String partitionUri, + PartitionSpec spec, + Configuration conf) throws IOException { + + Path partition = new Path(partitionUri); + FileSystem fs = partition.getFileSystem(conf); + + return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER)) + .filter(FileStatus::isFile) + .map(stat -> { + Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf)); + String partitionKey = spec.fields().stream() + .map(PartitionField::name) + .map(name -> String.format("%s=%s", name, partitionPath.get(name))) + .collect(Collectors.joining("/")); + + return DataFiles.builder(spec) + .withPath(stat.getPath().toString()) + .withFormat("orc") + .withFileSizeInBytes(stat.getLen()) + .withMetrics(metrics) + .withPartitionPath(partitionKey) + .build(); + + }).collect(Collectors.toList()); + } + + private static SparkPartition toSparkPartition(CatalogTablePartition partition, CatalogTable table) { + Option<URI> locationUri = partition.storage().locationUri(); + Option<String> serde = partition.storage().serde(); + + Preconditions.checkArgument(locationUri.nonEmpty(), "Partition URI should be defined"); + Preconditions.checkArgument(serde.nonEmpty() || table.provider().nonEmpty(), + "Partition format should be defined"); + + String uri = String.valueOf(locationUri.get()); + String format = serde.nonEmpty() ? serde.get() : table.provider().get(); + + return new SparkPartition( + JavaConverters.mapAsJavaMapConverter(partition.spec()).asJava(), + uri, + format + ); + } + + private static Expression resolveAttrs(SparkSession spark, String table, Expression expr) { + final Function2<String, String, Object> resolver = spark.sessionState().analyzer().resolver(); Review comment: nit: finals ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org