This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c37a97143 Core, Spark: Fix migrate table in case of partitioned table 
with partition containing a special character (#7744)
0c37a97143 is described below

commit 0c37a971430807a329adff81ebc7bbe6ee0020bf
Author: Wing Yew Poon <[email protected]>
AuthorDate: Fri Jun 30 13:17:01 2023 -0700

    Core, Spark: Fix migrate table in case of partitioned table with partition 
containing a special character (#7744)
---
 .../main/java/org/apache/iceberg/DataFiles.java    | 30 ++++++++++++
 .../apache/iceberg/data/TableMigrationUtil.java    | 54 ++++++++++++++++------
 .../extensions/TestMigrateTableProcedure.java      | 17 +++++++
 .../extensions/TestMigrateTableProcedure.java      | 17 +++++++
 .../extensions/TestMigrateTableProcedure.java      | 19 +++++++-
 .../extensions/TestMigrateTableProcedure.java      | 19 +++++++-
 6 files changed, 139 insertions(+), 17 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java 
b/core/src/main/java/org/apache/iceberg/DataFiles.java
index 95b2891c98..a6ea80d366 100644
--- a/core/src/main/java/org/apache/iceberg/DataFiles.java
+++ b/core/src/main/java/org/apache/iceberg/DataFiles.java
@@ -90,6 +90,26 @@ public class DataFiles {
     return data;
   }
 
+  static PartitionData fillFromValues(
+      PartitionSpec spec, List<String> partitionValues, PartitionData reuse) {
+    PartitionData data = reuse;
+    if (data == null) {
+      data = newPartitionData(spec);
+    }
+
+    Preconditions.checkArgument(
+        partitionValues.size() == spec.fields().size(),
+        "Invalid partition data, expecting %s fields, found %s",
+        spec.fields().size(),
+        partitionValues.size());
+
+    for (int i = 0; i < partitionValues.size(); i += 1) {
+      data.set(i, Conversions.fromPartitionString(data.getType(i), 
partitionValues.get(i)));
+    }
+
+    return data;
+  }
+
   public static PartitionData data(PartitionSpec spec, String partitionPath) {
     return fillFromPath(spec, partitionPath, null);
   }
@@ -250,6 +270,16 @@ public class DataFiles {
       return this;
     }
 
+    public Builder withPartitionValues(List<String> partitionValues) {
+      Preconditions.checkArgument(
+          isPartitioned ^ partitionValues.isEmpty(),
+          "Table must be partitioned or partition values must be empty");
+      if (!partitionValues.isEmpty()) {
+        this.partitionData = fillFromValues(spec, partitionValues, 
partitionData);
+      }
+      return this;
+    }
+
     public Builder withMetrics(Metrics metrics) {
       // check for null to avoid NPE when unboxing
       this.recordCount = metrics.recordCount() == null ? -1 : 
metrics.recordCount();
diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java 
b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
index aa1885a31e..0fb290f947 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
@@ -58,12 +58,12 @@ public class TableMigrationUtil {
    * Returns the data files in a partition by listing the partition location.
    *
    * <p>For Parquet and ORC partitions, this will read metrics from the file 
footer. For Avro
-   * partitions, metrics are set to null.
+   * partitions, metrics other than row count are set to null.
    *
-   * <p>Note: certain metrics, like NaN counts, that are only supported by 
iceberg file writers but
+   * <p>Note: certain metrics, like NaN counts, that are only supported by 
Iceberg file writers but
    * not file footers, will not be populated.
    *
-   * @param partition partition key, e.g., "a=1/b=2"
+   * @param partition map of column names to column values for the partition
    * @param uri partition location URI
    * @param format partition format, avro, parquet or orc
    * @param spec a partition spec
@@ -83,8 +83,28 @@ public class TableMigrationUtil {
     return listPartition(partition, uri, format, spec, conf, metricsConfig, 
mapping, 1);
   }
 
+  /**
+   * Returns the data files in a partition by listing the partition location. 
Metrics are read from
+   * the files and the file reading is done in parallel by a specified number 
of threads.
+   *
+   * <p>For Parquet and ORC partitions, this will read metrics from the file 
footer. For Avro
+   * partitions, metrics other than row count are set to null.
+   *
+   * <p>Note: certain metrics, like NaN counts, that are only supported by 
Iceberg file writers but
+   * not file footers, will not be populated.
+   *
+   * @param partition map of column names to column values for the partition
+   * @param partitionUri partition location URI
+   * @param format partition format, avro, parquet or orc
+   * @param spec a partition spec
+   * @param conf a Hadoop conf
+   * @param metricsSpec a metrics conf
+   * @param mapping a name mapping
+   * @param parallelism number of threads to use for file reading
+   * @return a List of DataFile
+   */
   public static List<DataFile> listPartition(
-      Map<String, String> partitionPath,
+      Map<String, String> partition,
       String partitionUri,
       String format,
       PartitionSpec spec,
@@ -94,16 +114,16 @@ public class TableMigrationUtil {
       int parallelism) {
     ExecutorService service = null;
     try {
-      String partitionKey =
+      List<String> partitionValues =
           spec.fields().stream()
               .map(PartitionField::name)
-              .map(name -> String.format("%s=%s", name, 
partitionPath.get(name)))
-              .collect(Collectors.joining("/"));
+              .map(partition::get)
+              .collect(Collectors.toList());
 
-      Path partition = new Path(partitionUri);
-      FileSystem fs = partition.getFileSystem(conf);
+      Path partitionDir = new Path(partitionUri);
+      FileSystem fs = partitionDir.getFileSystem(conf);
       List<FileStatus> fileStatus =
-          Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+          Arrays.stream(fs.listStatus(partitionDir, HIDDEN_PATH_FILTER))
               .filter(FileStatus::isFile)
               .collect(Collectors.toList());
       DataFile[] datafiles = new DataFile[fileStatus.size()];
@@ -120,7 +140,7 @@ public class TableMigrationUtil {
             index -> {
               Metrics metrics = 
getAvroMetrics(fileStatus.get(index).getPath(), conf);
               datafiles[index] =
-                  buildDataFile(fileStatus.get(index), partitionKey, spec, 
metrics, "avro");
+                  buildDataFile(fileStatus.get(index), partitionValues, spec, 
metrics, "avro");
             });
       } else if (format.contains("parquet")) {
         task.run(
@@ -128,7 +148,7 @@ public class TableMigrationUtil {
               Metrics metrics =
                   getParquetMetrics(fileStatus.get(index).getPath(), conf, 
metricsSpec, mapping);
               datafiles[index] =
-                  buildDataFile(fileStatus.get(index), partitionKey, spec, 
metrics, "parquet");
+                  buildDataFile(fileStatus.get(index), partitionValues, spec, 
metrics, "parquet");
             });
       } else if (format.contains("orc")) {
         task.run(
@@ -136,7 +156,7 @@ public class TableMigrationUtil {
               Metrics metrics =
                   getOrcMetrics(fileStatus.get(index).getPath(), conf, 
metricsSpec, mapping);
               datafiles[index] =
-                  buildDataFile(fileStatus.get(index), partitionKey, spec, 
metrics, "orc");
+                  buildDataFile(fileStatus.get(index), partitionValues, spec, 
metrics, "orc");
             });
       } else {
         throw new UnsupportedOperationException("Unknown partition format: " + 
format);
@@ -181,13 +201,17 @@ public class TableMigrationUtil {
   }
 
   private static DataFile buildDataFile(
-      FileStatus stat, String partitionKey, PartitionSpec spec, Metrics 
metrics, String format) {
+      FileStatus stat,
+      List<String> partitionValues,
+      PartitionSpec spec,
+      Metrics metrics,
+      String format) {
     return DataFiles.builder(spec)
         .withPath(stat.getPath().toString())
         .withFormat(format)
         .withFileSizeInBytes(stat.getLen())
         .withMetrics(metrics)
-        .withPartitionPath(partitionKey)
+        .withPartitionValues(partitionValues)
         .build();
   }
 
diff --git 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 83126b700e..919a513133 100644
--- 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -184,4 +184,21 @@ public class TestMigrateTableProcedure extends 
SparkExtensionsTestBase {
         "Cannot handle an empty identifier",
         () -> sql("CALL %s.system.migrate('')", catalogName));
   }
+
+  @Test
+  public void testMigratePartitionWithSpecialCharacter() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string, dt date) USING 
parquet "
+            + "PARTITIONED BY (data, dt) LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, '2023/05/30', date '2023-05-30')", 
tableName);
+    Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, 
tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(1L, "2023/05/30", 
java.sql.Date.valueOf("2023-05-30"))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
 }
diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 83126b700e..919a513133 100644
--- 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -184,4 +184,21 @@ public class TestMigrateTableProcedure extends 
SparkExtensionsTestBase {
         "Cannot handle an empty identifier",
         () -> sql("CALL %s.system.migrate('')", catalogName));
   }
+
+  @Test
+  public void testMigratePartitionWithSpecialCharacter() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string, dt date) USING 
parquet "
+            + "PARTITIONED BY (data, dt) LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, '2023/05/30', date '2023-05-30')", 
tableName);
+    Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, 
tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(1L, "2023/05/30", 
java.sql.Date.valueOf("2023-05-30"))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
 }
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 8b2950b74f..49fee09408 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -103,7 +103,7 @@ public class TestMigrateTableProcedure extends 
SparkExtensionsTestBase {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM %s ORDER BY id", tableName));
 
-    sql("DROP TABLE IF EXISTS  %s", tableName + "_BACKUP_");
+    sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
   }
 
   @Test
@@ -184,4 +184,21 @@ public class TestMigrateTableProcedure extends 
SparkExtensionsTestBase {
         "Cannot handle an empty identifier",
         () -> sql("CALL %s.system.migrate('')", catalogName));
   }
+
+  @Test
+  public void testMigratePartitionWithSpecialCharacter() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string, dt date) USING 
parquet "
+            + "PARTITIONED BY (data, dt) LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, '2023/05/30', date '2023-05-30')", 
tableName);
+    Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, 
tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(1L, "2023/05/30", 
java.sql.Date.valueOf("2023-05-30"))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
 }
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 8b2950b74f..49fee09408 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -103,7 +103,7 @@ public class TestMigrateTableProcedure extends 
SparkExtensionsTestBase {
         ImmutableList.of(row(1L, "a"), row(1L, "a")),
         sql("SELECT * FROM %s ORDER BY id", tableName));
 
-    sql("DROP TABLE IF EXISTS  %s", tableName + "_BACKUP_");
+    sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
   }
 
   @Test
@@ -184,4 +184,21 @@ public class TestMigrateTableProcedure extends 
SparkExtensionsTestBase {
         "Cannot handle an empty identifier",
         () -> sql("CALL %s.system.migrate('')", catalogName));
   }
+
+  @Test
+  public void testMigratePartitionWithSpecialCharacter() throws IOException {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string, dt date) USING 
parquet "
+            + "PARTITIONED BY (data, dt) LOCATION '%s'",
+        tableName, location);
+    sql("INSERT INTO TABLE %s VALUES (1, '2023/05/30', date '2023-05-30')", 
tableName);
+    Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, 
tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(1L, "2023/05/30", 
java.sql.Date.valueOf("2023-05-30"))),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
 }

Reply via email to