This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8dd269b Spark: Convert SparkTableUtil from Scala to Java (#1126)
8dd269b is described below
commit 8dd269bd5f1cc0c5260c83c22a1e010a6f92e460
Author: Thiru Paramasivan <[email protected]>
AuthorDate: Mon Jun 29 17:59:41 2020 -0700
Spark: Convert SparkTableUtil from Scala to Java (#1126)
---
baseline.gradle | 1 -
build.gradle | 20 +-
project/scalastyle_config.xml | 150 -----
.../apache/iceberg/spark/SparkExceptionUtil.java | 62 ++
.../org/apache/iceberg/spark/SparkTableUtil.java | 632 +++++++++++++++++++++
.../org/apache/iceberg/spark/SparkTableUtil.scala | 457 ---------------
.../iceberg/spark/source/TestSparkTableUtil.java | 5 +-
.../TestSparkTableUtilWithInMemoryCatalog.java | 5 +-
8 files changed, 699 insertions(+), 633 deletions(-)
diff --git a/baseline.gradle b/baseline.gradle
index e708d9c..3b48c59 100644
--- a/baseline.gradle
+++ b/baseline.gradle
@@ -36,7 +36,6 @@ subprojects {
apply plugin: 'com.palantir.baseline-checkstyle'
apply plugin: 'com.palantir.baseline-error-prone'
}
- apply plugin: 'com.palantir.baseline-scalastyle'
apply plugin: 'com.palantir.baseline-class-uniqueness'
apply plugin: 'com.palantir.baseline-reproducibility'
apply plugin: 'com.palantir.baseline-exact-dependencies'
diff --git a/build.gradle b/build.gradle
index 406c863..52c06dc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -79,8 +79,7 @@ subprojects {
compileClasspath {
// do not exclude Guava so the bundle project can reference classes.
- // the Spark module is also excluded because this breaks the Scala
compiler
- if (project.name != 'iceberg-bundled-guava' && project.name !=
'iceberg-spark' && project.name != 'iceberg-spark2') {
+ if (project.name != 'iceberg-bundled-guava') {
exclude group: 'com.google.guava', module: 'guava'
}
}
@@ -413,16 +412,7 @@ project(':iceberg-arrow') {
}
project(':iceberg-spark') {
- apply plugin: 'scala'
-
configurations.all {
- // this is needed to avoid a problem with dependency locking. it was
- // suggested as a work-around here:
- // https://github.com/gradle/gradle/issues/6750
- if (name.startsWith("incrementalScalaAnalysis")) {
- extendsFrom = []
- }
-
resolutionStrategy {
// Spark 2.4.4 can only use the below datanucleus version, the versions
introduced
// by Hive 2.3.6 will meet lots of unexpected issues, so here force to
use the versions
@@ -472,16 +462,8 @@ project(':iceberg-spark') {
}
project(':iceberg-spark2') {
- apply plugin: 'scala'
configurations.all {
- // this is needed to avoid a problem with dependency locking. it was
- // suggested as a work-around here:
- // https://github.com/gradle/gradle/issues/6750
- if (name.startsWith("incrementalScalaAnalysis")) {
- extendsFrom = []
- }
-
resolutionStrategy {
// Spark 2.4.4 can only use the below datanucleus version, the versions
introduced
// by Hive 2.3.6 will meet lots of unexpected issues, so here force to
use the versions
diff --git a/project/scalastyle_config.xml b/project/scalastyle_config.xml
deleted file mode 100644
index 04333bb..0000000
--- a/project/scalastyle_config.xml
+++ /dev/null
@@ -1,150 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<scalastyle commentFilter="enabled">
- <name>Iceberg Scalastyle configuration</name>
- <check level="error" class="org.scalastyle.file.FileTabChecker"
enabled="true"/>
- <check level="error" class="org.scalastyle.file.FileLengthChecker"
enabled="true">
- <parameters>
- <parameter name="maxFileLength"><![CDATA[800]]></parameter>
- </parameters>
- </check>
- <check level="error" class="org.scalastyle.file.HeaderMatchesChecker"
enabled="true">
- <parameters>
- <parameter name="regex">true</parameter>
- <parameter name="header">(?m)^/\*$\n^ \* Licensed to the Apache
Software Foundation \(ASF\) under one$</parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>
- <check level="error"
class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"/>
- <check level="error"
class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"/>
- <check level="error" class="org.scalastyle.file.FileLineLengthChecker"
enabled="true">
- <parameters>
- <parameter name="maxLineLength"><![CDATA[120]]></parameter>
- <parameter name="tabSize"><![CDATA[4]]></parameter>
- </parameters>
- </check>
- <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker"
enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
- </parameters>
- </check>
- <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker"
enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"/>
- <check level="error"
class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
- <parameters>
- <parameter
name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
- <parameters>
- <parameter name="maxParameters"><![CDATA[8]]></parameter>
- </parameters>
- </check>
- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker"
enabled="true">
- <parameters>
- <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker"
enabled="true"/>
- <check level="error"
class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker"
enabled="true"/>
- <check level="error" class="org.scalastyle.scalariform.ReturnChecker"
enabled="false"/>
- <check level="error" class="org.scalastyle.scalariform.NullChecker"
enabled="false"/>
- <check level="error" class="org.scalastyle.scalariform.NoCloneChecker"
enabled="true"/>
- <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker"
enabled="true"/>
- <check level="error"
class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"/>
- <check level="error"
class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"/>
- <check level="error" class="org.scalastyle.file.RegexChecker"
enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[println]]></parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
- <parameters>
- <parameter name="maxTypes"><![CDATA[30]]></parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
- <parameters>
- <parameter name="maximum"><![CDATA[10]]></parameter>
- </parameters>
- </check>
- <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker"
enabled="true"/>
- <check level="error"
class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker"
enabled="true"/>
- <check level="error" class="org.scalastyle.scalariform.IfBraceChecker"
enabled="true">
- <parameters>
- <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
- <parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
- <parameters>
- <parameter name="maxLength"><![CDATA[50]]></parameter>
- </parameters>
- </check>
- <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker"
enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="false">
- <parameters>
- <parameter name="maxMethods"><![CDATA[30]]></parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"/>
- <check level="error" class="org.scalastyle.file.NewLineAtEofChecker"
enabled="true"/>
- <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker"
enabled="false"/>
- <check level="error" class="org.scalastyle.scalariform.WhileChecker"
enabled="false"/>
- <check level="error" class="org.scalastyle.scalariform.VarFieldChecker"
enabled="false"/>
- <check level="error" class="org.scalastyle.scalariform.VarLocalChecker"
enabled="false"/>
- <check level="error" class="org.scalastyle.scalariform.RedundantIfChecker"
enabled="false"/>
- <check level="error" class="org.scalastyle.scalariform.TokenChecker"
enabled="false">
- <parameters>
- <parameter name="regex"><![CDATA[println]]></parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.DeprecatedJavaChecker" enabled="true"/>
- <check level="error" class="org.scalastyle.scalariform.EmptyClassChecker"
enabled="true"/>
- <check level="error"
class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[^[A-Z_]$]]></parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.UnderscoreImportChecker" enabled="false"/>
- <check level="error"
class="org.scalastyle.scalariform.LowercasePatternMatchChecker" enabled="true"/>
- <check level="error" class="org.scalastyle.scalariform.ImportOrderChecker"
enabled="true">
- <parameters>
- <parameter name="groups">all</parameter>
- <parameter name="group.all">.+</parameter>
- </parameters>
- </check>
- <check level="error"
class="org.scalastyle.scalariform.DisallowSpaceBeforeTokenChecker"
enabled="true">
- <parameters>
- <parameter name="tokens">COMMA</parameter>
- </parameters>
- </check>
-</scalastyle>
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/SparkExceptionUtil.java
b/spark/src/main/java/org/apache/iceberg/spark/SparkExceptionUtil.java
new file mode 100644
index 0000000..88b621b
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkExceptionUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.io.IOException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.spark.sql.AnalysisException;
+
+public class SparkExceptionUtil {
+
+ private SparkExceptionUtil() {
+ }
+
+ /**
+ * Converts checked exceptions to unchecked exceptions.
+ *
+ * @param cause a checked exception object which is to be converted to its
unchecked equivalent.
+ * @param message exception message as a format string
+ * @param args format specifiers
+ * @return unchecked exception.
+ */
+ public static RuntimeException toUncheckedException(Throwable cause, String
message, Object... args) {
+ if (cause instanceof RuntimeException) {
+ return (RuntimeException) cause;
+
+ } else if (cause instanceof
org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException) {
+ return new NoSuchNamespaceException(cause, message, args);
+
+ } else if (cause instanceof
org.apache.spark.sql.catalyst.analysis.NoSuchTableException) {
+ return new NoSuchTableException(cause, message, args);
+
+ } else if (cause instanceof AnalysisException) {
+ return new ValidationException(cause, message, args);
+
+ } else if (cause instanceof IOException) {
+ return new RuntimeIOException((IOException) cause, message, args);
+
+ } else {
+ return new RuntimeException(String.format(message, args), cause);
+ }
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
new file mode 100644
index 0000000..38b6fa8
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.NamedExpression;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import scala.Function2;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.runtime.AbstractPartialFunction;
+
+import static org.apache.spark.sql.functions.col;
+
+/**
+ * Java version of the original SparkTableUtil.scala
+ *
https://github.com/apache/iceberg/blob/apache-iceberg-0.8.0-incubating/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+ */
+public class SparkTableUtil {
+
+ private static final PathFilter HIDDEN_PATH_FILTER =
+ p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
+
+ private SparkTableUtil() {
+ }
+
+ /**
+ * Returns a DataFrame with a row for each partition in the table.
+ *
+ * The DataFrame has 3 columns, partition key (a=1/b=2), partition location,
and format
+ * (avro or parquet).
+ *
+ * @param spark a Spark session
+ * @param table a table name and (optional) database
+ * @return a DataFrame of the table's partitions
+ */
+ public static Dataset<Row> partitionDF(SparkSession spark, String table) {
+ List<SparkPartition> partitions = getPartitions(spark, table);
+ return spark.createDataFrame(partitions,
SparkPartition.class).toDF("partition", "uri", "format");
+ }
+
+ /**
+ * Returns a DataFrame with a row for each partition that matches the
specified 'expression'.
+ *
+ * @param spark a Spark session.
+ * @param table name of the table.
+ * @param expression The expression whose matching partitions are returned.
+ * @return a DataFrame of the table partitions.
+ */
+ public static Dataset<Row> partitionDFByFilter(SparkSession spark, String
table, String expression) {
+ List<SparkPartition> partitions = getPartitionsByFilter(spark, table,
expression);
+ return spark.createDataFrame(partitions,
SparkPartition.class).toDF("partition", "uri", "format");
+ }
+
+ /**
+ * Returns all partitions in the table.
+ *
+ * @param spark a Spark session
+ * @param table a table name and (optional) database
+ * @return all table's partitions
+ */
+ public static List<SparkPartition> getPartitions(SparkSession spark, String
table) {
+ try {
+ TableIdentifier tableIdent =
spark.sessionState().sqlParser().parseTableIdentifier(table);
+ return getPartitions(spark, tableIdent);
+ } catch (ParseException e) {
+ throw SparkExceptionUtil.toUncheckedException(e, "Unable to parse table
identifier: %s", table);
+ }
+ }
+
+ /**
+ * Returns all partitions in the table.
+ *
+ * @param spark a Spark session
+ * @param tableIdent a table identifier
+ * @return all table's partitions
+ */
+ public static List<SparkPartition> getPartitions(SparkSession spark,
TableIdentifier tableIdent) {
+ try {
+ SessionCatalog catalog = spark.sessionState().catalog();
+ CatalogTable catalogTable = catalog.getTableMetadata(tableIdent);
+
+ Seq<CatalogTablePartition> partitions =
catalog.listPartitions(tableIdent, Option.empty());
+
+ return JavaConverters
+ .seqAsJavaListConverter(partitions)
+ .asJava()
+ .stream()
+ .map(catalogPartition -> toSparkPartition(catalogPartition,
catalogTable))
+ .collect(Collectors.toList());
+ } catch (NoSuchDatabaseException e) {
+ throw SparkExceptionUtil.toUncheckedException(e, "Unknown table: %s.
Database not found in catalog.", tableIdent);
+ } catch (NoSuchTableException e) {
+ throw SparkExceptionUtil.toUncheckedException(e, "Unknown table: %s.
Table not found in catalog.", tableIdent);
+ }
+ }
+
+ /**
+ * Returns partitions that match the specified 'predicate'.
+ *
+ * @param spark a Spark session
+ * @param table a table name and (optional) database
+ * @param predicate a predicate on partition columns
+ * @return matching table's partitions
+ */
+ public static List<SparkPartition> getPartitionsByFilter(SparkSession spark,
String table, String predicate) {
+ TableIdentifier tableIdent;
+ try {
+ tableIdent =
spark.sessionState().sqlParser().parseTableIdentifier(table);
+ } catch (ParseException e) {
+ throw SparkExceptionUtil.toUncheckedException(e, "Unable to parse the
table identifier: %s", table);
+ }
+
+ Expression unresolvedPredicateExpr;
+ try {
+ unresolvedPredicateExpr =
spark.sessionState().sqlParser().parseExpression(predicate);
+ } catch (ParseException e) {
+ throw SparkExceptionUtil.toUncheckedException(e, "Unable to parse the
predicate expression: %s", predicate);
+ }
+
+ Expression resolvedPredicateExpr = resolveAttrs(spark, table,
unresolvedPredicateExpr);
+ return getPartitionsByFilter(spark, tableIdent, resolvedPredicateExpr);
+ }
+
+ /**
+ * Returns partitions that match the specified 'predicate'.
+ *
+ * @param spark a Spark session
+ * @param tableIdent a table identifier
+ * @param predicateExpr a predicate expression on partition columns
+ * @return matching table's partitions
+ */
+ public static List<SparkPartition> getPartitionsByFilter(SparkSession spark,
TableIdentifier tableIdent,
+ Expression
predicateExpr) {
+ try {
+ SessionCatalog catalog = spark.sessionState().catalog();
+ CatalogTable catalogTable = catalog.getTableMetadata(tableIdent);
+
+ Expression resolvedPredicateExpr;
+ if (!predicateExpr.resolved()) {
+ resolvedPredicateExpr = resolveAttrs(spark, tableIdent.quotedString(),
predicateExpr);
+ } else {
+ resolvedPredicateExpr = predicateExpr;
+ }
+ Seq<Expression> predicates = JavaConverters
+
.collectionAsScalaIterableConverter(ImmutableList.of(resolvedPredicateExpr))
+ .asScala().toSeq();
+
+ Seq<CatalogTablePartition> partitions =
catalog.listPartitionsByFilter(tableIdent, predicates);
+
+ return JavaConverters
+ .seqAsJavaListConverter(partitions)
+ .asJava()
+ .stream()
+ .map(catalogPartition -> toSparkPartition(catalogPartition,
catalogTable))
+ .collect(Collectors.toList());
+ } catch (NoSuchDatabaseException e) {
+ throw SparkExceptionUtil.toUncheckedException(e, "Unknown table: %s.
Database not found in catalog.", tableIdent);
+ } catch (NoSuchTableException e) {
+ throw SparkExceptionUtil.toUncheckedException(e, "Unknown table: %s.
Table not found in catalog.", tableIdent);
+ }
+ }
+
+ /**
+ * Returns the data files in a partition by listing the partition location.
+ *
+ * For Parquet and ORC partitions, this will read metrics from the file
footer. For Avro partitions,
+ * metrics are set to null.
+ *
+ * @param partition a partition
+ * @param conf a serializable Hadoop conf
+ * @param metricsConfig a metrics conf
+ * @return a List of DataFile
+ */
+ public static List<DataFile> listPartition(SparkPartition partition,
PartitionSpec spec,
+ SerializableConfiguration conf,
MetricsConfig metricsConfig) {
+ return listPartition(partition.values, partition.uri, partition.format,
spec, conf.get(), metricsConfig);
+ }
+
+ /**
+ * Returns the data files in a partition by listing the partition location.
+ *
+ * For Parquet and ORC partitions, this will read metrics from the file
footer. For Avro partitions,
+ * metrics are set to null.
+ *
+ * @param partition partition key, e.g., "a=1/b=2"
+ * @param uri partition location URI
+ * @param format partition format, avro or parquet
+ * @param conf a Hadoop conf
+ * @param metricsConfig a metrics conf
+ * @return a List of DataFile
+ */
+ public static List<DataFile> listPartition(Map<String, String> partition,
String uri, String format,
+ PartitionSpec spec, Configuration
conf, MetricsConfig metricsConfig) {
+ if (format.contains("avro")) {
+ return listAvroPartition(partition, uri, spec, conf);
+ } else if (format.contains("parquet")) {
+ return listParquetPartition(partition, uri, spec, conf, metricsConfig);
+ } else if (format.contains("orc")) {
+ // TODO: use MetricsConfig in listOrcPartition
+ return listOrcPartition(partition, uri, spec, conf);
+ } else {
+ throw new UnsupportedOperationException("Unknown partition format: " +
format);
+ }
+ }
+
+ private static List<DataFile> listAvroPartition(
+ Map<String, String> partitionPath, String partitionUri, PartitionSpec
spec, Configuration conf) {
+ try {
+ Path partition = new Path(partitionUri);
+ FileSystem fs = partition.getFileSystem(conf);
+ return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .map(stat -> {
+ Metrics metrics = new Metrics(-1L, null, null, null);
+ String partitionKey = spec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
+ .collect(Collectors.joining("/"));
+
+ return DataFiles.builder(spec)
+ .withPath(stat.getPath().toString())
+ .withFormat("avro")
+ .withFileSizeInBytes(stat.getLen())
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build();
+
+ }).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files
in partition: %s", partitionUri);
+ }
+ }
+
+ private static List<DataFile> listParquetPartition(Map<String, String>
partitionPath, String partitionUri,
+ PartitionSpec spec,
Configuration conf,
+ MetricsConfig
metricsSpec) {
+ try {
+ Path partition = new Path(partitionUri);
+ FileSystem fs = partition.getFileSystem(conf);
+
+ return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .map(stat -> {
+ Metrics metrics;
+ try {
+ metrics =
ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat),
metricsSpec);
+ } catch (IOException e) {
+ throw SparkExceptionUtil.toUncheckedException(
+ e, "Unable to read the footer of the parquet file: %s",
stat.getPath());
+ }
+ String partitionKey = spec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
+ .collect(Collectors.joining("/"));
+
+ return DataFiles.builder(spec)
+ .withPath(stat.getPath().toString())
+ .withFormat("parquet")
+ .withFileSizeInBytes(stat.getLen())
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build();
+
+ }).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files
in partition: %s", partitionUri);
+ }
+ }
+
+ private static List<DataFile> listOrcPartition(
+ Map<String, String> partitionPath, String partitionUri, PartitionSpec
spec, Configuration conf) {
+ try {
+ Path partition = new Path(partitionUri);
+ FileSystem fs = partition.getFileSystem(conf);
+
+ return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .map(stat -> {
+ Metrics metrics =
OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf));
+ String partitionKey = spec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
+ .collect(Collectors.joining("/"));
+
+ return DataFiles.builder(spec)
+ .withPath(stat.getPath().toString())
+ .withFormat("orc")
+ .withFileSizeInBytes(stat.getLen())
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build();
+
+ }).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files
in partition: %s", partitionUri);
+ }
+ }
+
+ private static SparkPartition toSparkPartition(CatalogTablePartition
partition, CatalogTable table) {
+ Option<URI> locationUri = partition.storage().locationUri();
+ Option<String> serde = partition.storage().serde();
+
+ Preconditions.checkArgument(locationUri.nonEmpty(), "Partition URI should
be defined");
+ Preconditions.checkArgument(serde.nonEmpty() ||
table.provider().nonEmpty(),
+ "Partition format should be defined");
+
+ String uri = String.valueOf(locationUri.get());
+ String format = serde.nonEmpty() ? serde.get() : table.provider().get();
+
+ Map<String, String> partitionSpec =
JavaConverters.mapAsJavaMapConverter(partition.spec()).asJava();
+ return new SparkPartition(partitionSpec, uri, format);
+ }
+
+ private static Expression resolveAttrs(SparkSession spark, String table,
Expression expr) {
+ Function2<String, String, Object> resolver =
spark.sessionState().analyzer().resolver();
+ LogicalPlan plan = spark.table(table).queryExecution().analyzed();
+ return expr.transform(new AbstractPartialFunction<Expression,
Expression>() {
+ @Override
+ public Expression apply(Expression attr) {
+ UnresolvedAttribute unresolvedAttribute = (UnresolvedAttribute) attr;
+ Option<NamedExpression> namedExpressionOption =
plan.resolve(unresolvedAttribute.nameParts(), resolver);
+ if (namedExpressionOption.isDefined()) {
+ return (Expression) namedExpressionOption.get();
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Could not resolve %s using columns: %s", attr,
plan.output()));
+ }
+ }
+
+ @Override
+ public boolean isDefinedAt(Expression attr) {
+ return attr instanceof UnresolvedAttribute;
+ }
+ });
+ }
+
+ private static Iterator<ManifestFile>
buildManifest(SerializableConfiguration conf, PartitionSpec spec,
+ String basePath,
Iterator<Tuple2<String, DataFile>> fileTuples) {
+ if (fileTuples.hasNext()) {
+ FileIO io = new HadoopFileIO(conf.get());
+ TaskContext ctx = TaskContext.get();
+ String suffix = String.format("stage-%d-task-%d-manifest",
ctx.stageId(), ctx.taskAttemptId());
+ Path location = new Path(basePath, suffix);
+ String outputPath = FileFormat.AVRO.addExtension(location.toString());
+ OutputFile outputFile = io.newOutputFile(outputPath);
+ ManifestWriter<DataFile> writer = ManifestFiles.write(spec, outputFile);
+
+ try (ManifestWriter<DataFile> writerRef = writer) {
+ fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2));
+ } catch (IOException e) {
+ throw SparkExceptionUtil.toUncheckedException(e, "Unable to close the
manifest writer: %s", outputPath);
+ }
+
+ ManifestFile manifestFile = writer.toManifestFile();
+ return ImmutableList.of(manifestFile).iterator();
+ } else {
+ return Collections.emptyIterator();
+ }
+ }
+
+ /**
+ * Import files from an existing Spark table to an Iceberg table.
+ *
+ * The import uses the Spark session to get table metadata. It assumes no
+ * operation is going on the original and target table and thus is not
+ * thread-safe.
+ *
+ * @param spark a Spark session
+ * @param sourceTableIdent an identifier of the source Spark table
+ * @param targetTable an Iceberg table where to import the data
+ * @param stagingDir a staging directory to store temporary manifest files
+ */
+ public static void importSparkTable(
+ SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
String stagingDir) {
+ SessionCatalog catalog = spark.sessionState().catalog();
+
+ String db = sourceTableIdent.database().nonEmpty() ?
+ sourceTableIdent.database().get() :
+ catalog.getCurrentDatabase();
+ TableIdentifier sourceTableIdentWithDB = new
TableIdentifier(sourceTableIdent.table(), Some.apply(db));
+
+ if (!catalog.tableExists(sourceTableIdentWithDB)) {
+ throw new org.apache.iceberg.exceptions.NoSuchTableException(
+ String.format("Table %s does not exist", sourceTableIdentWithDB));
+ }
+
+ try {
+ PartitionSpec spec = SparkSchemaUtil.specForTable(spark,
sourceTableIdentWithDB.unquotedString());
+
+ if (spec == PartitionSpec.unpartitioned()) {
+ importUnpartitionedSparkTable(spark, sourceTableIdentWithDB,
targetTable);
+ } else {
+ List<SparkPartition> sourceTablePartitions = getPartitions(spark,
sourceTableIdent);
+ importSparkPartitions(spark, sourceTablePartitions, targetTable, spec,
stagingDir);
+ }
+ } catch (AnalysisException e) {
+ throw SparkExceptionUtil.toUncheckedException(
+ e, "Unable to get partition spec for table: %s",
sourceTableIdentWithDB);
+ }
+ }
+
+ private static void importUnpartitionedSparkTable(
+ SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable)
{
+ try {
+ CatalogTable sourceTable =
spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
+ Option<String> format =
+ sourceTable.storage().serde().nonEmpty() ?
sourceTable.storage().serde() : sourceTable.provider();
+ Preconditions.checkArgument(format.nonEmpty(), "Could not determine
table format");
+
+ Map<String, String> partition = Collections.emptyMap();
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Configuration conf = spark.sessionState().newHadoopConf();
+ MetricsConfig metricsConfig =
MetricsConfig.fromProperties(targetTable.properties());
+
+ List<DataFile> files = listPartition(
+ partition, sourceTable.location().toString(), format.get(), spec,
conf, metricsConfig);
+
+ AppendFiles append = targetTable.newAppend();
+ files.forEach(append::appendFile);
+ append.commit();
+ } catch (NoSuchDatabaseException e) {
+ throw SparkExceptionUtil.toUncheckedException(
+ e, "Unknown table: %s. Database not found in catalog.",
sourceTableIdent);
+ } catch (NoSuchTableException e) {
+ throw SparkExceptionUtil.toUncheckedException(
+ e, "Unknown table: %s. Table not found in catalog.",
sourceTableIdent);
+ }
+ }
+
+ /**
+ * Import files from given partitions to an Iceberg table.
+ *
+ * @param spark a Spark session
+ * @param partitions partitions to import
+ * @param targetTable an Iceberg table where to import the data
+ * @param spec a partition spec
+ * @param stagingDir a staging directory to store temporary manifest files
+ */
+ public static void importSparkPartitions(
+ SparkSession spark, List<SparkPartition> partitions, Table targetTable,
PartitionSpec spec, String stagingDir) {
+ Configuration conf = spark.sessionState().newHadoopConf();
+ SerializableConfiguration serializableConf = new
SerializableConfiguration(conf);
+ int parallelism = Math.min(partitions.size(),
spark.sessionState().conf().parallelPartitionDiscoveryParallelism());
+ int numShufflePartitions =
spark.sessionState().conf().numShufflePartitions();
+ MetricsConfig metricsConfig =
MetricsConfig.fromProperties(targetTable.properties());
+
+ JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+ JavaRDD<SparkPartition> partitionRDD =
sparkContext.parallelize(partitions, parallelism);
+
+ Dataset<SparkPartition> partitionDS = spark.createDataset(
+ partitionRDD.rdd(),
+ Encoders.javaSerialization(SparkPartition.class));
+
+ List<ManifestFile> manifests = partitionDS
+ .flatMap((FlatMapFunction<SparkPartition, DataFile>) sparkPartition ->
+ listPartition(sparkPartition, spec, serializableConf,
metricsConfig).iterator(),
+ Encoders.javaSerialization(DataFile.class))
+ .repartition(numShufflePartitions)
+ .map((MapFunction<DataFile, Tuple2<String, DataFile>>) file ->
+ Tuple2.apply(file.path().toString(), file),
+ Encoders.tuple(Encoders.STRING(),
Encoders.javaSerialization(DataFile.class)))
+ .orderBy(col("_1"))
+ .mapPartitions(
+ (MapPartitionsFunction<Tuple2<String, DataFile>, ManifestFile>)
fileTuple ->
+ buildManifest(serializableConf, spec, stagingDir, fileTuple),
+ Encoders.javaSerialization(ManifestFile.class))
+ .collectAsList();
+
+ try {
+ boolean snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean(
+ targetTable.properties(),
+ TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
+ TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
+
+ AppendFiles append = targetTable.newAppend();
+ manifests.forEach(append::appendManifest);
+ append.commit();
+
+ if (!snapshotIdInheritanceEnabled) {
+ // delete original manifests as they were rewritten before the commit
+ deleteManifests(targetTable.io(), manifests);
+ }
+ } catch (Throwable e) {
+ deleteManifests(targetTable.io(), manifests);
+ throw e;
+ }
+ }
+
+ private static void deleteManifests(FileIO io, List<ManifestFile> manifests)
{
+ Tasks.foreach(manifests)
+ .noRetry()
+ .suppressFailureWhenFinished()
+ .run(item -> io.deleteFile(item.path()));
+ }
+
+ /**
+ * Class representing a table partition.
+ */
+ public static class SparkPartition implements Serializable {
+ private final Map<String, String> values;
+ private final String uri;
+ private final String format;
+
+ public SparkPartition(Map<String, String> values, String uri, String
format) {
+ this.values = ImmutableMap.copyOf(values);
+ this.uri = uri;
+ this.format = format;
+ }
+
+ public Map<String, String> getValues() {
+ return values;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("values", values)
+ .add("uri", uri)
+ .add("format", format)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SparkPartition that = (SparkPartition) o;
+ return Objects.equal(values, that.values) &&
+ Objects.equal(uri, that.uri) &&
+ Objects.equal(format, that.format);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(values, uri, format);
+ }
+ }
+}
diff --git
a/spark2/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
b/spark2/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
deleted file mode 100644
index 55ceda2..0000000
--- a/spark2/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.iceberg.{DataFile, DataFiles, FileFormat, ManifestFile,
ManifestFiles}
-import org.apache.iceberg.{Metrics, MetricsConfig, PartitionSpec, Table,
TableProperties}
-import org.apache.iceberg.exceptions.NoSuchTableException
-import org.apache.iceberg.hadoop.{HadoopFileIO, HadoopInputFile,
SerializableConfiguration}
-import org.apache.iceberg.orc.OrcMetrics
-import org.apache.iceberg.parquet.ParquetUtil
-import org.apache.iceberg.relocated.com.google.common.collect.Maps
-import org.apache.iceberg.util.PropertyUtil
-import org.apache.parquet.hadoop.ParquetFileReader
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
CatalogTablePartition}
-import org.apache.spark.sql.catalyst.expressions.Expression
-import scala.collection.JavaConverters._
-import scala.util.Try
-
-object SparkTableUtil {
- /**
- * Returns a DataFrame with a row for each partition in the table.
- *
- * The DataFrame has 3 columns, partition key (a=1/b=2), partition location,
and format
- * (avro or parquet).
- *
- * @param spark a Spark session
- * @param table a table name and (optional) database
- * @return a DataFrame of the table's partitions
- */
- def partitionDF(spark: SparkSession, table: String): DataFrame = {
- import spark.implicits._
-
- val partitions = getPartitions(spark, table)
- partitions.toDF("partition", "uri", "format")
- }
-
- /**
- * Returns a DataFrame with a row for each partition that matches the
specified 'expression'.
- *
- * @param spark a Spark session.
- * @param table name of the table.
- * @param expression The expression whose matching partitions are returned.
- * @return a DataFrame of the table partitions.
- */
- def partitionDFByFilter(spark: SparkSession, table: String, expression:
String): DataFrame = {
- import spark.implicits._
-
- val partitions = getPartitionsByFilter(spark, table, expression)
- partitions.toDF("partition", "uri", "format")
- }
-
- /**
- * Returns all partitions in the table.
- *
- * @param spark a Spark session
- * @param table a table name and (optional) database
- * @return all table's partitions
- */
- def getPartitions(spark: SparkSession, table: String): Seq[SparkPartition] =
{
- val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(table)
- getPartitions(spark, tableIdent)
- }
-
- /**
- * Returns all partitions in the table.
- *
- * @param spark a Spark session
- * @param tableIdent a table identifier
- * @return all table's partitions
- */
- def getPartitions(spark: SparkSession, tableIdent: TableIdentifier):
Seq[SparkPartition] = {
- val catalog = spark.sessionState.catalog
- val catalogTable = catalog.getTableMetadata(tableIdent)
-
- catalog
- .listPartitions(tableIdent)
- .map(catalogPartition => toSparkPartition(catalogPartition,
catalogTable))
- }
-
- /**
- * Returns partitions that match the specified 'predicate'.
- *
- * @param spark a Spark session
- * @param table a table name and (optional) database
- * @param predicate a predicate on partition columns
- * @return matching table's partitions
- */
- def getPartitionsByFilter(spark: SparkSession, table: String, predicate:
String): Seq[SparkPartition] = {
- val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(table)
- val unresolvedPredicateExpr =
spark.sessionState.sqlParser.parseExpression(predicate)
- val resolvedPredicateExpr = resolveAttrs(spark, table,
unresolvedPredicateExpr)
- getPartitionsByFilter(spark, tableIdent, resolvedPredicateExpr)
- }
-
- /**
- * Returns partitions that match the specified 'predicate'.
- *
- * @param spark a Spark session
- * @param tableIdent a table identifier
- * @param predicateExpr a predicate expression on partition columns
- * @return matching table's partitions
- */
- def getPartitionsByFilter(
- spark: SparkSession,
- tableIdent: TableIdentifier,
- predicateExpr: Expression): Seq[SparkPartition] = {
-
- val catalog = spark.sessionState.catalog
- val catalogTable = catalog.getTableMetadata(tableIdent)
-
- val resolvedPredicateExpr = if (!predicateExpr.resolved) {
- resolveAttrs(spark, tableIdent.quotedString, predicateExpr)
- } else {
- predicateExpr
- }
-
- catalog
- .listPartitionsByFilter(tableIdent, Seq(resolvedPredicateExpr))
- .map(catalogPartition => toSparkPartition(catalogPartition,
catalogTable))
- }
-
- /**
- * Returns the data files in a partition by listing the partition location.
- *
- * For Parquet and ORC partitions, this will read metrics from the file
footer. For Avro partitions,
- * metrics are set to null.
- *
- * @param partition a partition
- * @param conf a serializable Hadoop conf
- * @param metricsConfig a metrics conf
- * @return a Seq of [[DataFile]]
- */
- def listPartition(
- partition: SparkPartition,
- spec: PartitionSpec,
- conf: SerializableConfiguration,
- metricsConfig: MetricsConfig): Seq[DataFile] = {
-
- listPartition(partition.values, partition.uri, partition.format, spec,
conf.get(), metricsConfig)
- }
-
- /**
- * Returns the data files in a partition by listing the partition location.
- *
- * For Parquet and ORC partitions, this will read metrics from the file
footer. For Avro partitions,
- * metrics are set to null.
- *
- * @param partition partition key, e.g., "a=1/b=2"
- * @param uri partition location URI
- * @param format partition format, avro or parquet
- * @param conf a Hadoop conf
- * @param metricsConfig a metrics conf
- * @return a seq of [[DataFile]]
- */
- def listPartition(
- partition: Map[String, String],
- uri: String,
- format: String,
- spec: PartitionSpec,
- conf: Configuration = new Configuration(),
- metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[DataFile]
= {
-
- if (format.contains("avro")) {
- listAvroPartition(partition, uri, spec, conf)
- } else if (format.contains("parquet")) {
- listParquetPartition(partition, uri, spec, conf, metricsConfig)
- } else if (format.contains("orc")) {
- // TODO: use MetricsConfig in listOrcPartition
- listOrcPartition(partition, uri, spec, conf)
- } else {
- throw new UnsupportedOperationException(s"Unknown partition format:
$format")
- }
- }
-
- /**
- * Case class representing a table partition.
- */
- case class SparkPartition(values: Map[String, String], uri: String, format:
String)
-
- private def arrayToMap(arr: Array[Long]): java.util.Map[Integer,
java.lang.Long] = {
- if (arr != null) {
- val map: java.util.Map[Integer, java.lang.Long] = Maps.newHashMap()
- arr.zipWithIndex.foreach {
- case (-1, _) => // skip default values
- case (value, index) => map.put(index, value)
- }
- map
- } else {
- null
- }
- }
-
- private object HiddenPathFilter extends PathFilter {
- override def accept(p: Path): Boolean = {
- !p.getName.startsWith("_") && !p.getName.startsWith(".")
- }
- }
-
- private def listAvroPartition(
- partitionPath: Map[String, String],
- partitionUri: String,
- spec: PartitionSpec,
- conf: Configuration): Seq[DataFile] = {
- val partition = new Path(partitionUri)
- val fs = partition.getFileSystem(conf)
-
- fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
- val metrics = new Metrics(-1L, arrayToMap(null), arrayToMap(null),
arrayToMap(null))
- val partitionKey = spec.fields.asScala.map(_.name).map { name =>
- s"$name=${partitionPath(name)}"
- }.mkString("/")
-
- DataFiles.builder(spec)
- .withPath(stat.getPath.toString)
- .withFormat("avro")
- .withFileSizeInBytes(stat.getLen)
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build()
- }
- }
-
- //noinspection ScalaDeprecation
- private def listParquetPartition(
- partitionPath: Map[String, String],
- partitionUri: String,
- spec: PartitionSpec,
- conf: Configuration,
- metricsSpec: MetricsConfig): Seq[DataFile] = {
- val partition = new Path(partitionUri)
- val fs = partition.getFileSystem(conf)
-
- fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
- val metrics =
ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat), metricsSpec)
- val partitionKey = spec.fields.asScala.map(_.name).map { name =>
- s"$name=${partitionPath(name)}"
- }.mkString("/")
-
- DataFiles.builder(spec)
- .withPath(stat.getPath.toString)
- .withFormat("parquet")
- .withFileSizeInBytes(stat.getLen)
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build()
- }
- }
-
- private def listOrcPartition(
- partitionPath: Map[String, String],
- partitionUri: String,
- spec: PartitionSpec,
- conf: Configuration): Seq[DataFile] = {
- val partition = new Path(partitionUri)
- val fs = partition.getFileSystem(conf)
-
- fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
- val metrics =
OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath, conf))
- val partitionKey = spec.fields.asScala.map(_.name).map { name =>
- s"$name=${partitionPath(name)}"
- }.mkString("/")
-
- DataFiles.builder(spec)
- .withPath(stat.getPath.toString)
- .withFormat("orc")
- .withFileSizeInBytes(stat.getLen)
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build()
- }
- }
-
- private def toSparkPartition(partition: CatalogTablePartition, table:
CatalogTable): SparkPartition = {
- val uri = partition.storage.locationUri.map(String.valueOf(_))
- require(uri.nonEmpty, "Partition URI should be defined")
-
- val format = partition.storage.serde.orElse(table.provider)
- require(format.nonEmpty, "Partition format should be defined")
-
- SparkPartition(partition.spec, uri.get, format.get)
- }
-
- private def resolveAttrs(spark: SparkSession, table: String, expr:
Expression): Expression = {
- val resolver = spark.sessionState.analyzer.resolver
- val plan = spark.table(table).queryExecution.analyzed
- expr.transform {
- case attr: UnresolvedAttribute =>
- plan.resolve(attr.nameParts, resolver) match {
- case Some(resolvedAttr) => resolvedAttr
- case None => throw new IllegalArgumentException(s"Could not resolve
$attr using columns: ${plan.output}")
- }
- }
- }
-
- private def buildManifest(
- conf: SerializableConfiguration,
- spec: PartitionSpec,
- basePath: String): Iterator[DataFile] => Iterator[ManifestFile] = {
files =>
- if (files.hasNext) {
- val io = new HadoopFileIO(conf.get())
- val ctx = TaskContext.get()
- val location = new Path(basePath,
s"stage-${ctx.stageId()}-task-${ctx.taskAttemptId()}-manifest")
- val outputFile =
io.newOutputFile(FileFormat.AVRO.addExtension(location.toString))
- val writer = ManifestFiles.write(spec, outputFile)
- try {
- files.foreach(writer.add)
- } finally {
- writer.close()
- }
-
- val manifestFile = writer.toManifestFile
- Seq(manifestFile).iterator
- } else {
- Seq.empty.iterator
- }
- }
-
- /**
- * Import files from an existing Spark table to an Iceberg table.
- *
- * The import uses the Spark session to get table metadata. It assumes no
- * operation is going on the original and target table and thus is not
- * thread-safe.
- *
- * @param spark a Spark session
- * @param sourceTableIdent an identifier of the source Spark table
- * @param targetTable an Iceberg table where to import the data
- * @param stagingDir a staging directory to store temporary manifest files
- */
- def importSparkTable(
- spark: SparkSession,
- sourceTableIdent: TableIdentifier,
- targetTable: Table,
- stagingDir: String): Unit = {
-
- val catalog = spark.sessionState.catalog
-
- val db = sourceTableIdent.database.getOrElse(catalog.getCurrentDatabase)
- val sourceTableIdentWithDB = sourceTableIdent.copy(database = Some(db))
-
- if (!catalog.tableExists(sourceTableIdentWithDB)) {
- throw new NoSuchTableException(s"Table $sourceTableIdentWithDB does not
exist")
- }
-
- val spec = SparkSchemaUtil.specForTable(spark,
sourceTableIdentWithDB.unquotedString)
-
- if (spec == PartitionSpec.unpartitioned) {
- importUnpartitionedSparkTable(spark, sourceTableIdentWithDB, targetTable)
- } else {
- val sourceTablePartitions = getPartitions(spark, sourceTableIdent)
- importSparkPartitions(spark, sourceTablePartitions, targetTable, spec,
stagingDir)
- }
- }
-
- private def importUnpartitionedSparkTable(
- spark: SparkSession,
- sourceTableIdent: TableIdentifier,
- targetTable: Table): Unit = {
-
- val sourceTable =
spark.sessionState.catalog.getTableMetadata(sourceTableIdent)
- val format = sourceTable.storage.serde.orElse(sourceTable.provider)
- require(format.nonEmpty, "Could not determine table format")
-
- val partition = Map.empty[String, String]
- val spec = PartitionSpec.unpartitioned()
- val conf = spark.sessionState.newHadoopConf()
- val metricsConfig = MetricsConfig.fromProperties(targetTable.properties)
-
- val files = listPartition(partition, sourceTable.location.toString,
format.get, spec, conf, metricsConfig)
-
- val append = targetTable.newAppend()
- files.foreach(append.appendFile)
- append.commit()
- }
-
- /**
- * Import files from given partitions to an Iceberg table.
- *
- * @param spark a Spark session
- * @param partitions partitions to import
- * @param targetTable an Iceberg table where to import the data
- * @param spec a partition spec
- * @param stagingDir a staging directory to store temporary manifest files
- */
- def importSparkPartitions(
- spark: SparkSession,
- partitions: Seq[SparkPartition],
- targetTable: Table,
- spec: PartitionSpec,
- stagingDir: String): Unit = {
-
- implicit val manifestFileEncoder: Encoder[ManifestFile] =
Encoders.javaSerialization[ManifestFile]
- implicit val dataFileEncoder: Encoder[DataFile] =
Encoders.javaSerialization[DataFile]
- implicit val pathDataFileEncoder: Encoder[(String, DataFile)] =
Encoders.tuple(Encoders.STRING, dataFileEncoder)
-
- import spark.implicits._
-
- val conf = spark.sessionState.newHadoopConf()
- val serializableConf = new SerializableConfiguration(conf)
- val parallelism = Math.min(partitions.size,
spark.sessionState.conf.parallelPartitionDiscoveryParallelism)
- val partitionDS = spark.sparkContext.parallelize(partitions,
parallelism).toDS()
- val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
- val metricsConfig = MetricsConfig.fromProperties(targetTable.properties)
-
- val manifests = partitionDS
- .flatMap(partition => listPartition(partition, spec, serializableConf,
metricsConfig))
- .repartition(numShufflePartitions)
- .map(file => (file.path.toString, file))
- .orderBy($"_1")
- .mapPartitions(files => buildManifest(serializableConf, spec,
stagingDir)(files.map(_._2)))
- .collect()
-
- try {
- val snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean(
- targetTable.properties,
- TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
- TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT)
-
- val append = targetTable.newAppend()
- manifests.foreach(manifest => append.appendManifest(manifest))
- append.commit()
-
- if (!snapshotIdInheritanceEnabled) {
- // delete original manifests as they were rewritten before the commit
- manifests.foreach(manifest =>
Try(targetTable.io.deleteFile(manifest.path)))
- }
- } catch {
- case e: Throwable =>
- // always clean up created manifests if the append fails
- manifests.foreach(manifest =>
Try(targetTable.io.deleteFile(manifest.path)))
- throw e;
- }
- }
-}
diff --git
a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
index 37f57d4..b73c036 100644
---
a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
+++
b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
@@ -52,7 +52,6 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import scala.collection.Seq;
import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
@@ -132,7 +131,7 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
@Test
public void testPartitionScan() {
- Seq<SparkPartition> partitions = SparkTableUtil.getPartitions(spark,
qualifiedTableName);
+ List<SparkPartition> partitions = SparkTableUtil.getPartitions(spark,
qualifiedTableName);
Assert.assertEquals("There should be 3 partitions", 3, partitions.size());
Dataset<Row> partitionDF = SparkTableUtil.partitionDF(spark,
qualifiedTableName);
@@ -141,7 +140,7 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
@Test
public void testPartitionScanByFilter() {
- Seq<SparkPartition> partitions =
SparkTableUtil.getPartitionsByFilter(spark, qualifiedTableName, "data = 'a'");
+ List<SparkPartition> partitions =
SparkTableUtil.getPartitionsByFilter(spark, qualifiedTableName, "data = 'a'");
Assert.assertEquals("There should be 1 matching partition", 1,
partitions.size());
Dataset<Row> partitionDF = SparkTableUtil.partitionDFByFilter(spark,
qualifiedTableName, "data = 'a'");
diff --git
a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
index f3f7e53..2a94690 100644
---
a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
+++
b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
@@ -46,7 +46,6 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import scala.collection.Seq;
import static org.apache.iceberg.types.Types.NestedField.optional;
@@ -213,7 +212,7 @@ public class TestSparkTableUtilWithInMemoryCatalog {
.saveAsTable("parquet_table");
File stagingDir = temp.newFolder("staging-dir");
- Seq<SparkPartition> partitions =
SparkTableUtil.getPartitionsByFilter(spark, "parquet_table", "data = 'a'");
+ List<SparkPartition> partitions =
SparkTableUtil.getPartitionsByFilter(spark, "parquet_table", "data = 'a'");
SparkTableUtil.importSparkPartitions(spark, partitions, table,
table.spec(), stagingDir.toString());
List<SimpleRecord> expectedRecords = Lists.newArrayList(new
SimpleRecord(1, "a"));
@@ -258,7 +257,7 @@ public class TestSparkTableUtilWithInMemoryCatalog {
.saveAsTable("parquet_table");
File stagingDir = temp.newFolder("staging-dir");
- Seq<SparkPartition> partitions =
SparkTableUtil.getPartitionsByFilter(spark, "parquet_table", "data = 'a'");
+ List<SparkPartition> partitions =
SparkTableUtil.getPartitionsByFilter(spark, "parquet_table", "data = 'a'");
SparkTableUtil.importSparkPartitions(spark, partitions, table,
table.spec(), stagingDir.toString());
List<SimpleRecord> expectedRecords = Lists.newArrayList(new
SimpleRecord(1, "a"));