This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0c37a97143 Core, Spark: Fix migrate table in case of partitioned table
with partition containing a special character (#7744)
0c37a97143 is described below
commit 0c37a971430807a329adff81ebc7bbe6ee0020bf
Author: Wing Yew Poon <[email protected]>
AuthorDate: Fri Jun 30 13:17:01 2023 -0700
Core, Spark: Fix migrate table in case of partitioned table with partition
containing a special character (#7744)
---
.../main/java/org/apache/iceberg/DataFiles.java | 30 ++++++++++++
.../apache/iceberg/data/TableMigrationUtil.java | 54 ++++++++++++++++------
.../extensions/TestMigrateTableProcedure.java | 17 +++++++
.../extensions/TestMigrateTableProcedure.java | 17 +++++++
.../extensions/TestMigrateTableProcedure.java | 19 +++++++-
.../extensions/TestMigrateTableProcedure.java | 19 +++++++-
6 files changed, 139 insertions(+), 17 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java
b/core/src/main/java/org/apache/iceberg/DataFiles.java
index 95b2891c98..a6ea80d366 100644
--- a/core/src/main/java/org/apache/iceberg/DataFiles.java
+++ b/core/src/main/java/org/apache/iceberg/DataFiles.java
@@ -90,6 +90,26 @@ public class DataFiles {
return data;
}
+ static PartitionData fillFromValues(
+ PartitionSpec spec, List<String> partitionValues, PartitionData reuse) {
+ PartitionData data = reuse;
+ if (data == null) {
+ data = newPartitionData(spec);
+ }
+
+ Preconditions.checkArgument(
+ partitionValues.size() == spec.fields().size(),
+ "Invalid partition data, expecting %s fields, found %s",
+ spec.fields().size(),
+ partitionValues.size());
+
+ for (int i = 0; i < partitionValues.size(); i += 1) {
+ data.set(i, Conversions.fromPartitionString(data.getType(i),
partitionValues.get(i)));
+ }
+
+ return data;
+ }
+
public static PartitionData data(PartitionSpec spec, String partitionPath) {
return fillFromPath(spec, partitionPath, null);
}
@@ -250,6 +270,16 @@ public class DataFiles {
return this;
}
+ public Builder withPartitionValues(List<String> partitionValues) {
+ Preconditions.checkArgument(
+ isPartitioned ^ partitionValues.isEmpty(),
+ "Table must be partitioned or partition values must be empty");
+ if (!partitionValues.isEmpty()) {
+ this.partitionData = fillFromValues(spec, partitionValues,
partitionData);
+ }
+ return this;
+ }
+
public Builder withMetrics(Metrics metrics) {
// check for null to avoid NPE when unboxing
this.recordCount = metrics.recordCount() == null ? -1 :
metrics.recordCount();
diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
index aa1885a31e..0fb290f947 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
@@ -58,12 +58,12 @@ public class TableMigrationUtil {
* Returns the data files in a partition by listing the partition location.
*
* <p>For Parquet and ORC partitions, this will read metrics from the file
footer. For Avro
- * partitions, metrics are set to null.
+ * partitions, metrics other than row count are set to null.
*
- * <p>Note: certain metrics, like NaN counts, that are only supported by
iceberg file writers but
+ * <p>Note: certain metrics, like NaN counts, that are only supported by
Iceberg file writers but
* not file footers, will not be populated.
*
- * @param partition partition key, e.g., "a=1/b=2"
+ * @param partition map of column names to column values for the partition
* @param uri partition location URI
* @param format partition format, avro, parquet or orc
* @param spec a partition spec
@@ -83,8 +83,28 @@ public class TableMigrationUtil {
return listPartition(partition, uri, format, spec, conf, metricsConfig,
mapping, 1);
}
+ /**
+ * Returns the data files in a partition by listing the partition location.
Metrics are read from
+ * the files and the file reading is done in parallel by a specified number
of threads.
+ *
+ * <p>For Parquet and ORC partitions, this will read metrics from the file
footer. For Avro
+ * partitions, metrics other than row count are set to null.
+ *
+ * <p>Note: certain metrics, like NaN counts, that are only supported by
Iceberg file writers but
+ * not file footers, will not be populated.
+ *
+ * @param partition map of column names to column values for the partition
+ * @param partitionUri partition location URI
+ * @param format partition format, avro, parquet or orc
+ * @param spec a partition spec
+ * @param conf a Hadoop conf
+ * @param metricsSpec a metrics conf
+ * @param mapping a name mapping
+ * @param parallelism number of threads to use for file reading
+ * @return a List of DataFile
+ */
public static List<DataFile> listPartition(
- Map<String, String> partitionPath,
+ Map<String, String> partition,
String partitionUri,
String format,
PartitionSpec spec,
@@ -94,16 +114,16 @@ public class TableMigrationUtil {
int parallelism) {
ExecutorService service = null;
try {
- String partitionKey =
+ List<String> partitionValues =
spec.fields().stream()
.map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
- .collect(Collectors.joining("/"));
+ .map(partition::get)
+ .collect(Collectors.toList());
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
+ Path partitionDir = new Path(partitionUri);
+ FileSystem fs = partitionDir.getFileSystem(conf);
List<FileStatus> fileStatus =
- Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+ Arrays.stream(fs.listStatus(partitionDir, HIDDEN_PATH_FILTER))
.filter(FileStatus::isFile)
.collect(Collectors.toList());
DataFile[] datafiles = new DataFile[fileStatus.size()];
@@ -120,7 +140,7 @@ public class TableMigrationUtil {
index -> {
Metrics metrics =
getAvroMetrics(fileStatus.get(index).getPath(), conf);
datafiles[index] =
- buildDataFile(fileStatus.get(index), partitionKey, spec,
metrics, "avro");
+ buildDataFile(fileStatus.get(index), partitionValues, spec,
metrics, "avro");
});
} else if (format.contains("parquet")) {
task.run(
@@ -128,7 +148,7 @@ public class TableMigrationUtil {
Metrics metrics =
getParquetMetrics(fileStatus.get(index).getPath(), conf,
metricsSpec, mapping);
datafiles[index] =
- buildDataFile(fileStatus.get(index), partitionKey, spec,
metrics, "parquet");
+ buildDataFile(fileStatus.get(index), partitionValues, spec,
metrics, "parquet");
});
} else if (format.contains("orc")) {
task.run(
@@ -136,7 +156,7 @@ public class TableMigrationUtil {
Metrics metrics =
getOrcMetrics(fileStatus.get(index).getPath(), conf,
metricsSpec, mapping);
datafiles[index] =
- buildDataFile(fileStatus.get(index), partitionKey, spec,
metrics, "orc");
+ buildDataFile(fileStatus.get(index), partitionValues, spec,
metrics, "orc");
});
} else {
throw new UnsupportedOperationException("Unknown partition format: " +
format);
@@ -181,13 +201,17 @@ public class TableMigrationUtil {
}
private static DataFile buildDataFile(
- FileStatus stat, String partitionKey, PartitionSpec spec, Metrics
metrics, String format) {
+ FileStatus stat,
+ List<String> partitionValues,
+ PartitionSpec spec,
+ Metrics metrics,
+ String format) {
return DataFiles.builder(spec)
.withPath(stat.getPath().toString())
.withFormat(format)
.withFileSizeInBytes(stat.getLen())
.withMetrics(metrics)
- .withPartitionPath(partitionKey)
+ .withPartitionValues(partitionValues)
.build();
}
diff --git
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 83126b700e..919a513133 100644
---
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -184,4 +184,21 @@ public class TestMigrateTableProcedure extends
SparkExtensionsTestBase {
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.migrate('')", catalogName));
}
+
+ @Test
+ public void testMigratePartitionWithSpecialCharacter() throws IOException {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string, dt date) USING
parquet "
+ + "PARTITIONED BY (data, dt) LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, '2023/05/30', date '2023-05-30')",
tableName);
+ Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName,
tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(1L, "2023/05/30",
java.sql.Date.valueOf("2023-05-30"))),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
}
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 83126b700e..919a513133 100644
---
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -184,4 +184,21 @@ public class TestMigrateTableProcedure extends
SparkExtensionsTestBase {
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.migrate('')", catalogName));
}
+
+ @Test
+ public void testMigratePartitionWithSpecialCharacter() throws IOException {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string, dt date) USING
parquet "
+ + "PARTITIONED BY (data, dt) LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, '2023/05/30', date '2023-05-30')",
tableName);
+ Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName,
tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(1L, "2023/05/30",
java.sql.Date.valueOf("2023-05-30"))),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
}
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 8b2950b74f..49fee09408 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -103,7 +103,7 @@ public class TestMigrateTableProcedure extends
SparkExtensionsTestBase {
ImmutableList.of(row(1L, "a"), row(1L, "a")),
sql("SELECT * FROM %s ORDER BY id", tableName));
- sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
+ sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
}
@Test
@@ -184,4 +184,21 @@ public class TestMigrateTableProcedure extends
SparkExtensionsTestBase {
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.migrate('')", catalogName));
}
+
+ @Test
+ public void testMigratePartitionWithSpecialCharacter() throws IOException {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string, dt date) USING
parquet "
+ + "PARTITIONED BY (data, dt) LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, '2023/05/30', date '2023-05-30')",
tableName);
+ Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName,
tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(1L, "2023/05/30",
java.sql.Date.valueOf("2023-05-30"))),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
}
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 8b2950b74f..49fee09408 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -103,7 +103,7 @@ public class TestMigrateTableProcedure extends
SparkExtensionsTestBase {
ImmutableList.of(row(1L, "a"), row(1L, "a")),
sql("SELECT * FROM %s ORDER BY id", tableName));
- sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
+ sql("DROP TABLE IF EXISTS %s", tableName + "_BACKUP_");
}
@Test
@@ -184,4 +184,21 @@ public class TestMigrateTableProcedure extends
SparkExtensionsTestBase {
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.migrate('')", catalogName));
}
+
+ @Test
+ public void testMigratePartitionWithSpecialCharacter() throws IOException {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string, dt date) USING
parquet "
+ + "PARTITIONED BY (data, dt) LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s VALUES (1, '2023/05/30', date '2023-05-30')",
tableName);
+ Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName,
tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(1L, "2023/05/30",
java.sql.Date.valueOf("2023-05-30"))),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
}