This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b79a8ffce3 Delta: Fix integration tests and Create DataFile by
partition values instead of path (#8398)
b79a8ffce3 is described below
commit b79a8ffce3a95e5e67a4f926ee63d165153e1c54
Author: HonahX <[email protected]>
AuthorDate: Thu Dec 7 11:33:20 2023 -0800
Delta: Fix integration tests and Create DataFile by partition values
instead of path (#8398)
---
.github/workflows/delta-conversion-ci.yml | 4 +-
build.gradle | 17 +--
.../iceberg/delta/TestSnapshotDeltaLakeTable.java | 114 +++++++++------------
.../delta/BaseSnapshotDeltaLakeTableAction.java | 8 +-
gradle/libs.versions.toml | 4 +-
5 files changed, 68 insertions(+), 79 deletions(-)
diff --git a/.github/workflows/delta-conversion-ci.yml
b/.github/workflows/delta-conversion-ci.yml
index 5261b82176..6fd97e662a 100644
--- a/.github/workflows/delta-conversion-ci.yml
+++ b/.github/workflows/delta-conversion-ci.yml
@@ -74,7 +74,7 @@ jobs:
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*',
'**/gradle-wrapper.properties') }}
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' |
cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- - run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.12 -DhiveVersions=
-DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
+ - run: ./gradlew -DsparkVersions=3.5 -DscalaVersion=2.12 -DhiveVersions=
-DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
@@ -103,7 +103,7 @@ jobs:
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*',
'**/gradle-wrapper.properties') }}
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' |
cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- - run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.13 -DhiveVersions=
-DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
+ - run: ./gradlew -DsparkVersions=3.5 -DscalaVersion=2.13 -DhiveVersions=
-DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
diff --git a/build.gradle b/build.gradle
index 94996a41a6..3f76cbea02 100644
--- a/build.gradle
+++ b/build.gradle
@@ -577,11 +577,11 @@ project(':iceberg-delta-lake') {
exclude group: 'com.google.code.gson', module: 'gson'
}
- // The newest version of delta-core uses Spark 3.3.*. Since its only for
test, we do
+ // The newest version of delta-core uses Spark 3.5.*. Since its only for
test, we do
// not need to include older version of delta-core
- if (sparkVersions.contains("3.3")) {
- integrationImplementation
"io.delta:delta-core_${scalaVersion}:${libs.versions.delta.core.get()}"
- integrationImplementation project(path:
":iceberg-spark:iceberg-spark-3.3_${scalaVersion}")
+ if (sparkVersions.contains("3.5")) {
+ integrationImplementation
"io.delta:delta-spark_${scalaVersion}:${libs.versions.delta.spark.get()}"
+ integrationImplementation project(path:
":iceberg-spark:iceberg-spark-3.5_${scalaVersion}")
integrationImplementation(libs.hadoop2.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
// to make sure netty libs only come from project(':iceberg-arrow')
@@ -590,7 +590,7 @@ project(':iceberg-delta-lake') {
}
integrationImplementation project(path: ':iceberg-hive-metastore')
integrationImplementation project(path: ':iceberg-hive-metastore',
configuration: 'testArtifacts')
-
integrationImplementation("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive33.get()}")
{
+
integrationImplementation("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive35.get()}")
{
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
exclude group: 'org.apache.parquet'
@@ -602,9 +602,9 @@ project(':iceberg-delta-lake') {
}
}
- // The newest version of delta-core uses Spark 3.3.*. The integration test
should only be built
- // if iceberg-spark-3.3 is available
- if (sparkVersions.contains("3.3")) {
+ // The newest version of delta-core uses Spark 3.5.*. The integration test
should only be built
+ // if iceberg-spark-3.5 is available
+ if (sparkVersions.contains("3.5")) {
sourceSets {
integration {
java.srcDir "$projectDir/src/integration/java"
@@ -615,6 +615,7 @@ project(':iceberg-delta-lake') {
}
task integrationTest(type: Test) {
+ useJUnitPlatform()
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath
jvmArgs += project.property('extraJvmArgs')
diff --git
a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
index 52966a484e..cebbea65f5 100644
---
a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
+++
b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
@@ -18,8 +18,10 @@
*/
package org.apache.iceberg.delta;
+import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.current_date;
import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.date_format;
import static org.apache.spark.sql.functions.expr;
import io.delta.standalone.DeltaLog;
@@ -34,7 +36,6 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
-import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Timestamp;
import java.util.Iterator;
@@ -42,7 +43,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.net.URLCodec;
import org.apache.iceberg.Snapshot;
@@ -62,10 +62,8 @@ import org.apache.spark.sql.delta.catalog.DeltaCatalog;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase
{
private static final String SNAPSHOT_SOURCE_PROP = "snapshot_source";
@@ -74,31 +72,22 @@ public class TestSnapshotDeltaLakeTable extends
SparkDeltaLakeSnapshotTestBase {
private static final String NAMESPACE = "delta_conversion_test";
private static final String defaultSparkCatalog = "spark_catalog";
private static final String icebergCatalogName = "iceberg_hive";
+ private static final Map<String, String> config =
+ ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default",
+ "parquet-enabled", "true",
+ "cache-enabled",
+ "false" // Spark will delete tables using v1, leaving the cache
out of sync
+ );
private static Dataset<Row> typeTestDataFrame;
private static Dataset<Row> nestedDataFrame;
- static Stream<Arguments> parameters() {
- return Stream.of(
- Arguments.of(
- icebergCatalogName,
- SparkCatalog.class.getName(),
- ImmutableMap.of(
- "type",
- "hive",
- "default-namespace",
- "default",
- "parquet-enabled",
- "true",
- "cache-enabled",
- "false" // Spark will delete tables using v1, leaving the
cache out of sync
- )));
- }
-
- @TempDir private Path temp;
+ @TempDir private File tempA;
+ @TempDir private File tempB;
- public TestSnapshotDeltaLakeTable(
- String catalogName, String implementation, Map<String, String> config) {
- super(catalogName, implementation, config);
+ public TestSnapshotDeltaLakeTable() {
+ super(icebergCatalogName, SparkCatalog.class.getName(), config);
spark.conf().set("spark.sql.catalog." + defaultSparkCatalog,
DeltaCatalog.class.getName());
}
@@ -115,7 +104,8 @@ public class TestSnapshotDeltaLakeTable extends
SparkDeltaLakeSnapshotTestBase {
.withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
.withColumn("dateCol", date_add(current_date(), 1))
.withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
- .withColumn("stringCol", expr("CAST(timestampCol AS STRING)"))
+ .withColumn("timestampStrCol", expr("CAST(timestampCol AS
STRING)"))
+ .withColumn("stringCol", date_format(col("timestampCol"),
"yyyy/M/d"))
.withColumn("booleanCol", expr("longCol > 5"))
.withColumn("binaryCol", expr("CAST(longCol AS BINARY)"))
.withColumn("byteCol", expr("CAST(longCol AS BYTE)"))
@@ -160,11 +150,10 @@ public class TestSnapshotDeltaLakeTable extends
SparkDeltaLakeSnapshotTestBase {
spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", NAMESPACE));
}
- @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
- @MethodSource("parameters")
+ @Test
public void testBasicSnapshotPartitioned() {
String partitionedIdentifier = destName(defaultSparkCatalog,
"partitioned_table");
- String partitionedLocation = temp.toFile().toURI().toString();
+ String partitionedLocation = tempA.toURI().toString();
writeDeltaTable(nestedDataFrame, partitionedIdentifier,
partitionedLocation, "id");
spark.sql("DELETE FROM " + partitionedIdentifier + " WHERE id=3");
@@ -182,13 +171,12 @@ public class TestSnapshotDeltaLakeTable extends
SparkDeltaLakeSnapshotTestBase {
checkIcebergTableLocation(newTableIdentifier, partitionedLocation);
}
- @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
- @MethodSource("parameters")
+ @Test
public void testBasicSnapshotUnpartitioned() {
String unpartitionedIdentifier = destName(defaultSparkCatalog,
"unpartitioned_table");
- String unpartitionedLocation = temp.toFile().toURI().toString();
+ String unpartitionedLocation = tempA.toURI().toString();
- writeDeltaTable(nestedDataFrame, unpartitionedIdentifier,
unpartitionedLocation, null);
+ writeDeltaTable(nestedDataFrame, unpartitionedIdentifier,
unpartitionedLocation);
spark.sql("DELETE FROM " + unpartitionedIdentifier + " WHERE id=3");
spark.sql("UPDATE " + unpartitionedIdentifier + " SET id=3 WHERE id=1");
@@ -204,12 +192,11 @@ public class TestSnapshotDeltaLakeTable extends
SparkDeltaLakeSnapshotTestBase {
checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
}
- @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
- @MethodSource("parameters")
+ @Test
public void testSnapshotWithNewLocation() {
String partitionedIdentifier = destName(defaultSparkCatalog,
"partitioned_table");
- String partitionedLocation = temp.toFile().toURI().toString();
- String newIcebergTableLocation = temp.toFile().toURI().toString();
+ String partitionedLocation = tempA.toURI().toString();
+ String newIcebergTableLocation = tempB.toURI().toString();
writeDeltaTable(nestedDataFrame, partitionedIdentifier,
partitionedLocation, "id");
spark.sql("DELETE FROM " + partitionedIdentifier + " WHERE id=3");
@@ -228,13 +215,12 @@ public class TestSnapshotDeltaLakeTable extends
SparkDeltaLakeSnapshotTestBase {
checkIcebergTableLocation(newTableIdentifier, newIcebergTableLocation);
}
- @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
- @MethodSource("parameters")
+ @Test
public void testSnapshotWithAdditionalProperties() {
String unpartitionedIdentifier = destName(defaultSparkCatalog,
"unpartitioned_table");
- String unpartitionedLocation = temp.toFile().toURI().toString();
+ String unpartitionedLocation = tempA.toURI().toString();
- writeDeltaTable(nestedDataFrame, unpartitionedIdentifier,
unpartitionedLocation, null);
+ writeDeltaTable(nestedDataFrame, unpartitionedIdentifier,
unpartitionedLocation);
spark.sql("DELETE FROM " + unpartitionedIdentifier + " WHERE id=3");
spark.sql("UPDATE " + unpartitionedIdentifier + " SET id=3 WHERE id=1");
@@ -267,20 +253,18 @@ public class TestSnapshotDeltaLakeTable extends
SparkDeltaLakeSnapshotTestBase {
unpartitionedLocation);
}
- @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
- @MethodSource("parameters")
+ @Test
public void testSnapshotTableWithExternalDataFiles() {
String unpartitionedIdentifier = destName(defaultSparkCatalog,
"unpartitioned_table");
String externalDataFilesIdentifier = destName(defaultSparkCatalog,
"external_data_files_table");
- String unpartitionedLocation = temp.toFile().toURI().toString();
- String externalDataFilesTableLocation = temp.toFile().toURI().toString();
+ String unpartitionedLocation = tempA.toURI().toString();
+ String externalDataFilesTableLocation = tempB.toURI().toString();
- writeDeltaTable(nestedDataFrame, unpartitionedIdentifier,
unpartitionedLocation, null);
+ writeDeltaTable(nestedDataFrame, unpartitionedIdentifier,
unpartitionedLocation);
spark.sql("DELETE FROM " + unpartitionedIdentifier + " WHERE id=3");
spark.sql("UPDATE " + unpartitionedIdentifier + " SET id=3 WHERE id=1");
- writeDeltaTable(
- nestedDataFrame, externalDataFilesIdentifier,
externalDataFilesTableLocation, null);
+ writeDeltaTable(nestedDataFrame, externalDataFilesIdentifier,
externalDataFilesTableLocation);
// Add parquet files to default.external_data_files_table. The newly added
parquet files
// are not at the same location as the table.
addExternalDatafiles(externalDataFilesTableLocation,
unpartitionedLocation);
@@ -297,13 +281,19 @@ public class TestSnapshotDeltaLakeTable extends
SparkDeltaLakeSnapshotTestBase {
checkDataFilePathsIntegrity(newTableIdentifier,
externalDataFilesTableLocation);
}
- @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
- @MethodSource("parameters")
+ @Test
public void testSnapshotSupportedTypes() {
String typeTestIdentifier = destName(defaultSparkCatalog,
"type_test_table");
- String typeTestTableLocation = temp.toFile().toURI().toString();
+ String typeTestTableLocation = tempA.toURI().toString();
- writeDeltaTable(typeTestDataFrame, typeTestIdentifier,
typeTestTableLocation, "stringCol");
+ writeDeltaTable(
+ typeTestDataFrame,
+ typeTestIdentifier,
+ typeTestTableLocation,
+ "stringCol",
+ "timestampStrCol",
+ "booleanCol",
+ "longCol");
String newTableIdentifier = destName(icebergCatalogName,
"iceberg_type_test_table");
SnapshotDeltaLakeTable.Result result =
DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable(
@@ -316,13 +306,12 @@ public class TestSnapshotDeltaLakeTable extends
SparkDeltaLakeSnapshotTestBase {
checkIcebergTableProperties(newTableIdentifier, ImmutableMap.of(),
typeTestTableLocation);
}
- @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
- @MethodSource("parameters")
+ @Test
public void testSnapshotVacuumTable() throws IOException {
String vacuumTestIdentifier = destName(defaultSparkCatalog,
"vacuum_test_table");
- String vacuumTestTableLocation = temp.toFile().toURI().toString();
+ String vacuumTestTableLocation = tempA.toURI().toString();
- writeDeltaTable(nestedDataFrame, vacuumTestIdentifier,
vacuumTestTableLocation, null);
+ writeDeltaTable(nestedDataFrame, vacuumTestIdentifier,
vacuumTestTableLocation);
Random random = new Random();
for (int i = 0; i < 13; i++) {
spark.sql(
@@ -352,11 +341,10 @@ public class TestSnapshotDeltaLakeTable extends
SparkDeltaLakeSnapshotTestBase {
checkIcebergTableLocation(newTableIdentifier, vacuumTestTableLocation);
}
- @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
- @MethodSource("parameters")
+ @Test
public void testSnapshotLogCleanTable() throws IOException {
String logCleanTestIdentifier = destName(defaultSparkCatalog,
"log_clean_test_table");
- String logCleanTestTableLocation = temp.toFile().toURI().toString();
+ String logCleanTestTableLocation = tempA.toURI().toString();
writeDeltaTable(nestedDataFrame, logCleanTestIdentifier,
logCleanTestTableLocation, "id");
Random random = new Random();
@@ -549,14 +537,14 @@ public class TestSnapshotDeltaLakeTable extends
SparkDeltaLakeSnapshotTestBase {
}
private void writeDeltaTable(
- Dataset<Row> df, String identifier, String path, String partitionColumn)
{
+ Dataset<Row> df, String identifier, String path, String...
partitionColumns) {
spark.sql(String.format("DROP TABLE IF EXISTS %s", identifier));
- if (partitionColumn != null) {
+ if (partitionColumns.length > 0) {
df.write()
.format("delta")
.mode(SaveMode.Append)
.option("path", path)
- .partitionBy(partitionColumn)
+ .partitionBy(partitionColumns)
.saveAsTable(identifier);
} else {
df.write().format("delta").mode(SaveMode.Append).option("path",
path).saveAsTable(identifier);
diff --git
a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
index eb2414ac82..afe43f6bb1 100644
---
a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
+++
b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
@@ -386,18 +386,18 @@ class BaseSnapshotDeltaLakeTableAction implements
SnapshotDeltaLakeTable {
nameMappingString != null ?
NameMappingParser.fromJson(nameMappingString) : null;
Metrics metrics = getMetricsForFile(file, format, metricsConfig,
nameMapping);
- String partition =
+ List<String> partitionValueList =
spec.fields().stream()
.map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionValues.get(name)))
- .collect(Collectors.joining("/"));
+ .map(partitionValues::get)
+ .collect(Collectors.toList());
return DataFiles.builder(spec)
.withPath(fullFilePath)
.withFormat(format)
.withFileSizeInBytes(fileSize)
.withMetrics(metrics)
- .withPartitionPath(partition)
+ .withPartitionValues(partitionValueList)
.build();
}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 29d5610f3c..f4f5075409 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -32,7 +32,7 @@ azuresdk-bom = "1.2.18"
caffeine = "2.9.3"
calcite = "1.10.0"
delta-standalone = "0.6.0"
-delta-core = "2.2.0"
+delta-spark = "3.0.0"
esotericsoftware-kryo = "4.0.2"
errorprone-annotations = "2.23.0"
findbugs-jsr305 = "3.0.2"
@@ -169,7 +169,7 @@ snowflake-jdbc = { module = "net.snowflake:snowflake-jdbc",
version.ref = "snowf
# test libraries
assertj-core = { module = "org.assertj:assertj-core", version.ref =
"assertj-core" }
awaitility = { module = "org.awaitility:awaitility", version.ref =
"awaitility" }
-delta-core = { module = "io.delta:delta-core_2.12", version.ref = "delta-core"
}
+delta-spark = { module = "io.delta:delta-spark_2.12", version.ref =
"delta-spark" }
esotericsoftware-kryo = { module = "com.esotericsoftware:kryo", version.ref =
"esotericsoftware-kryo" }
flink116-connector-test-utils = { module =
"org.apache.flink:flink-connector-test-utils", version.ref = "flink116" }
flink116-core = { module = "org.apache.flink:flink-core", version.ref =
"flink116" }