This is an automated email from the ASF dual-hosted git repository. blue pushed a commit to branch 0.9.x in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 505ee6108bfcd9c651d60bdc2902878a3252ea79 Author: Russell Spitzer <[email protected]> AuthorDate: Wed Jul 22 15:11:52 2020 -0500 Spark: Fix import for paths that include spaces (#1228) In order to avoid changing the API and SparkSQL compatible types we will fix the whitespace issue by replacing the encoded string representation with a decoded string representation. We use a method identical to Apache Spark, taking the Hadoop Path representation of the URI and getting the string representation from that. --- .../main/java/org/apache/iceberg/hadoop/Util.java | 15 ++++++ .../org/apache/iceberg/spark/SparkTableUtil.java | 5 +- .../iceberg/spark/source/TestSparkTableUtil.java | 63 ++++++++++++++++++++++ 3 files changed, 81 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/Util.java b/core/src/main/java/org/apache/iceberg/hadoop/Util.java index 24f3d6c..bdb334b 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/Util.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/Util.java @@ -20,6 +20,7 @@ package org.apache.iceberg.hadoop; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.Set; @@ -78,4 +79,18 @@ public class Util { return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE); } + + /** + * From Apache Spark + * + * Convert URI to String. + * Since URI.toString does not decode the uri, e.g. change '%25' to '%'. + * Here we create a hadoop Path with the given URI, and rely on Path.toString + * to decode the uri + * @param uri the URI of the path + * @return the String of the path + */ + public static String uriToString(URI uri) { + return new Path(uri).toString(); + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 38b6fa8..828ec4f 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -49,6 +49,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.orc.OrcMetrics; @@ -386,7 +387,7 @@ public class SparkTableUtil { Preconditions.checkArgument(serde.nonEmpty() || table.provider().nonEmpty(), "Partition format should be defined"); - String uri = String.valueOf(locationUri.get()); + String uri = Util.uriToString(locationUri.get()); String format = serde.nonEmpty() ? serde.get() : table.provider().get(); Map<String, String> partitionSpec = JavaConverters.mapAsJavaMapConverter(partition.spec()).asJava(); @@ -495,7 +496,7 @@ public class SparkTableUtil { MetricsConfig metricsConfig = MetricsConfig.fromProperties(targetTable.properties()); List<DataFile> files = listPartition( - partition, sourceTable.location().toString(), format.get(), spec, conf, metricsConfig); + partition, Util.uriToString(sourceTable.location()), format.get(), spec, conf, metricsConfig); AppendFiles append = targetTable.newAppend(); files.forEach(append::appendFile); diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java index b73c036..1cc489a 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java @@ -40,6 +40,7 @@ import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; @@ -297,4 +298,66 @@ public class TestSparkTableUtil extends HiveTableBaseTest { Assert.assertEquals(expected.stream().map(SimpleRecord::getData).collect(Collectors.toList()), actual); } + + @Test + public void testImportPartitionedWithWhitespace() throws Exception { + String partitionCol = "dAtA sPaced"; + String spacedTableName = "whitespacetable"; + String whiteSpaceKey = "some key value"; + + List<SimpleRecord> spacedRecords = Lists.newArrayList(new SimpleRecord(1, whiteSpaceKey)); + + File icebergLocation = temp.newFolder("partitioned_table"); + + spark.createDataFrame(spacedRecords, SimpleRecord.class) + .withColumnRenamed("data", partitionCol) + .write().mode("overwrite").partitionBy(partitionCol).format("parquet") + .saveAsTable(spacedTableName); + + TableIdentifier source = spark.sessionState().sqlParser() + .parseTableIdentifier(spacedTableName); + HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf()); + Table table = tables.create(SparkSchemaUtil.schemaForTable(spark, spacedTableName), + SparkSchemaUtil.specForTable(spark, spacedTableName), + ImmutableMap.of(), + icebergLocation.getCanonicalPath()); + File stagingDir = temp.newFolder("staging-dir"); + SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString()); + List<SimpleRecord> results = spark.read().format("iceberg").load(icebergLocation.toString()) + .withColumnRenamed(partitionCol, "data") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + Assert.assertEquals("Data should match", spacedRecords, results); + } + + @Test + public void testImportUnpartitionedWithWhitespace() throws Exception { + String spacedTableName = "whitespacetable"; + String whiteSpaceKey = "some key value"; + + List<SimpleRecord> spacedRecords = Lists.newArrayList(new SimpleRecord(1, whiteSpaceKey)); + + File whiteSpaceOldLocation = temp.newFolder("white space location"); + File icebergLocation = temp.newFolder("partitioned_table"); + + spark.createDataFrame(spacedRecords, SimpleRecord.class) + .write().mode("overwrite").parquet(whiteSpaceOldLocation.getPath()); + + spark.catalog().createExternalTable(spacedTableName, whiteSpaceOldLocation.getPath()); + + TableIdentifier source = spark.sessionState().sqlParser() + .parseTableIdentifier(spacedTableName); + HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf()); + Table table = tables.create(SparkSchemaUtil.schemaForTable(spark, spacedTableName), + SparkSchemaUtil.specForTable(spark, spacedTableName), + ImmutableMap.of(), + icebergLocation.getCanonicalPath()); + File stagingDir = temp.newFolder("staging-dir"); + SparkTableUtil.importSparkTable(spark, source, table, stagingDir.toString()); + List<SimpleRecord> results = spark.read().format("iceberg").load(icebergLocation.toString()) + .as(Encoders.bean(SimpleRecord.class)).collectAsList(); + + Assert.assertEquals("Data should match", spacedRecords, results); + } }
