This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.9.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 505ee6108bfcd9c651d60bdc2902878a3252ea79
Author: Russell Spitzer <[email protected]>
AuthorDate: Wed Jul 22 15:11:52 2020 -0500

    Spark: Fix import for paths that include spaces (#1228)
    
    In order to avoid changing the API and SparkSQL compatible types we will 
fix the whitespace issue by
    replacing the encoded string representation with a decoded string 
representation. We use a
    method identical to Apache Spark, taking the Hadoop Path representation of 
the URI and getting the
    string representation from that.
---
 .../main/java/org/apache/iceberg/hadoop/Util.java  | 15 ++++++
 .../org/apache/iceberg/spark/SparkTableUtil.java   |  5 +-
 .../iceberg/spark/source/TestSparkTableUtil.java   | 63 ++++++++++++++++++++++
 3 files changed, 81 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/hadoop/Util.java 
b/core/src/main/java/org/apache/iceberg/hadoop/Util.java
index 24f3d6c..bdb334b 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/Util.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/Util.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.hadoop;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Set;
@@ -78,4 +79,18 @@ public class Util {
 
     return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
   }
+
+  /**
+   * From Apache Spark
+   *
+   * Convert URI to String.
+   * Since URI.toString does not decode the uri, e.g. change '%25' to '%'.
+   * Here we create a hadoop Path with the given URI, and rely on Path.toString
+   * to decode the uri
+   * @param uri the URI of the path
+   * @return the String of the path
+   */
+  public static String uriToString(URI uri) {
+    return new Path(uri).toString();
+  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 38b6fa8..828ec4f 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.orc.OrcMetrics;
@@ -386,7 +387,7 @@ public class SparkTableUtil {
     Preconditions.checkArgument(serde.nonEmpty() || 
table.provider().nonEmpty(),
         "Partition format should be defined");
 
-    String uri = String.valueOf(locationUri.get());
+    String uri = Util.uriToString(locationUri.get());
     String format = serde.nonEmpty() ? serde.get() : table.provider().get();
 
     Map<String, String> partitionSpec = 
JavaConverters.mapAsJavaMapConverter(partition.spec()).asJava();
@@ -495,7 +496,7 @@ public class SparkTableUtil {
       MetricsConfig metricsConfig = 
MetricsConfig.fromProperties(targetTable.properties());
 
       List<DataFile> files = listPartition(
-          partition, sourceTable.location().toString(), format.get(), spec, 
conf, metricsConfig);
+          partition, Util.uriToString(sourceTable.location()), format.get(), 
spec, conf, metricsConfig);
 
       AppendFiles append = targetTable.newAppend();
       files.forEach(append::appendFile);
diff --git 
a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java 
b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
index b73c036..1cc489a 100644
--- 
a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
+++ 
b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTableUtil.SparkPartition;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
@@ -297,4 +298,66 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
 
     
Assert.assertEquals(expected.stream().map(SimpleRecord::getData).collect(Collectors.toList()),
 actual);
   }
+
+  @Test
+  public void testImportPartitionedWithWhitespace() throws Exception {
+    String partitionCol = "dAtA sPaced";
+    String spacedTableName = "whitespacetable";
+    String whiteSpaceKey = "some key value";
+
+    List<SimpleRecord> spacedRecords = Lists.newArrayList(new SimpleRecord(1, 
whiteSpaceKey));
+
+    File icebergLocation = temp.newFolder("partitioned_table");
+
+    spark.createDataFrame(spacedRecords, SimpleRecord.class)
+        .withColumnRenamed("data", partitionCol)
+        .write().mode("overwrite").partitionBy(partitionCol).format("parquet")
+        .saveAsTable(spacedTableName);
+
+    TableIdentifier source = spark.sessionState().sqlParser()
+        .parseTableIdentifier(spacedTableName);
+    HadoopTables tables = new 
HadoopTables(spark.sessionState().newHadoopConf());
+    Table table = tables.create(SparkSchemaUtil.schemaForTable(spark, 
spacedTableName),
+        SparkSchemaUtil.specForTable(spark, spacedTableName),
+        ImmutableMap.of(),
+        icebergLocation.getCanonicalPath());
+    File stagingDir = temp.newFolder("staging-dir");
+    SparkTableUtil.importSparkTable(spark, source, table, 
stagingDir.toString());
+    List<SimpleRecord> results = 
spark.read().format("iceberg").load(icebergLocation.toString())
+        .withColumnRenamed(partitionCol, "data")
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Data should match", spacedRecords, results);
+  }
+
+  @Test
+  public void testImportUnpartitionedWithWhitespace() throws Exception {
+    String spacedTableName = "whitespacetable";
+    String whiteSpaceKey = "some key value";
+
+    List<SimpleRecord> spacedRecords = Lists.newArrayList(new SimpleRecord(1, 
whiteSpaceKey));
+
+    File whiteSpaceOldLocation = temp.newFolder("white space location");
+    File icebergLocation = temp.newFolder("partitioned_table");
+
+    spark.createDataFrame(spacedRecords, SimpleRecord.class)
+        .write().mode("overwrite").parquet(whiteSpaceOldLocation.getPath());
+
+    spark.catalog().createExternalTable(spacedTableName, 
whiteSpaceOldLocation.getPath());
+
+    TableIdentifier source = spark.sessionState().sqlParser()
+        .parseTableIdentifier(spacedTableName);
+    HadoopTables tables = new 
HadoopTables(spark.sessionState().newHadoopConf());
+    Table table = tables.create(SparkSchemaUtil.schemaForTable(spark, 
spacedTableName),
+        SparkSchemaUtil.specForTable(spark, spacedTableName),
+        ImmutableMap.of(),
+        icebergLocation.getCanonicalPath());
+    File stagingDir = temp.newFolder("staging-dir");
+    SparkTableUtil.importSparkTable(spark, source, table, 
stagingDir.toString());
+    List<SimpleRecord> results = 
spark.read().format("iceberg").load(icebergLocation.toString())
+        .as(Encoders.bean(SimpleRecord.class)).collectAsList();
+
+    Assert.assertEquals("Data should match", spacedRecords, results);
+  }
 }

Reply via email to