This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b79a8ffce3 Delta: Fix integration tests and Create DataFile by 
partition values instead of path (#8398)
b79a8ffce3 is described below

commit b79a8ffce3a95e5e67a4f926ee63d165153e1c54
Author: HonahX <[email protected]>
AuthorDate: Thu Dec 7 11:33:20 2023 -0800

    Delta: Fix integration tests and Create DataFile by partition values 
instead of path (#8398)
---
 .github/workflows/delta-conversion-ci.yml          |   4 +-
 build.gradle                                       |  17 +--
 .../iceberg/delta/TestSnapshotDeltaLakeTable.java  | 114 +++++++++------------
 .../delta/BaseSnapshotDeltaLakeTableAction.java    |   8 +-
 gradle/libs.versions.toml                          |   4 +-
 5 files changed, 68 insertions(+), 79 deletions(-)

diff --git a/.github/workflows/delta-conversion-ci.yml 
b/.github/workflows/delta-conversion-ci.yml
index 5261b82176..6fd97e662a 100644
--- a/.github/workflows/delta-conversion-ci.yml
+++ b/.github/workflows/delta-conversion-ci.yml
@@ -74,7 +74,7 @@ jobs:
           key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', 
'**/gradle-wrapper.properties') }}
           restore-keys: ${{ runner.os }}-gradle-
       - run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | 
cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
-      - run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.12 -DhiveVersions= 
-DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
+      - run: ./gradlew -DsparkVersions=3.5 -DscalaVersion=2.12 -DhiveVersions= 
-DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
       - uses: actions/upload-artifact@v3
         if: failure()
         with:
@@ -103,7 +103,7 @@ jobs:
           key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', 
'**/gradle-wrapper.properties') }}
           restore-keys: ${{ runner.os }}-gradle-
       - run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | 
cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
-      - run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.13 -DhiveVersions= 
-DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
+      - run: ./gradlew -DsparkVersions=3.5 -DscalaVersion=2.13 -DhiveVersions= 
-DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
       - uses: actions/upload-artifact@v3
         if: failure()
         with:
diff --git a/build.gradle b/build.gradle
index 94996a41a6..3f76cbea02 100644
--- a/build.gradle
+++ b/build.gradle
@@ -577,11 +577,11 @@ project(':iceberg-delta-lake') {
       exclude group: 'com.google.code.gson', module: 'gson'
     }
 
-    // The newest version of delta-core uses Spark 3.3.*. Since its only for 
test, we do
+    // The newest version of delta-core uses Spark 3.5.*. Since its only for 
test, we do
     // not need to include older version of delta-core
-    if (sparkVersions.contains("3.3")) {
-      integrationImplementation 
"io.delta:delta-core_${scalaVersion}:${libs.versions.delta.core.get()}"
-      integrationImplementation project(path: 
":iceberg-spark:iceberg-spark-3.3_${scalaVersion}")
+    if (sparkVersions.contains("3.5")) {
+      integrationImplementation 
"io.delta:delta-spark_${scalaVersion}:${libs.versions.delta.spark.get()}"
+      integrationImplementation project(path: 
":iceberg-spark:iceberg-spark-3.5_${scalaVersion}")
       integrationImplementation(libs.hadoop2.minicluster) {
         exclude group: 'org.apache.avro', module: 'avro'
         // to make sure netty libs only come from project(':iceberg-arrow')
@@ -590,7 +590,7 @@ project(':iceberg-delta-lake') {
       }
       integrationImplementation project(path: ':iceberg-hive-metastore')
       integrationImplementation project(path: ':iceberg-hive-metastore', 
configuration: 'testArtifacts')
-      
integrationImplementation("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive33.get()}")
 {
+      
integrationImplementation("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive35.get()}")
 {
         exclude group: 'org.apache.avro', module: 'avro'
         exclude group: 'org.apache.arrow'
         exclude group: 'org.apache.parquet'
@@ -602,9 +602,9 @@ project(':iceberg-delta-lake') {
     }
   }
 
-  // The newest version of delta-core uses Spark 3.3.*. The integration test 
should only be built
-  // if iceberg-spark-3.3 is available
-  if (sparkVersions.contains("3.3")) {
+  // The newest version of delta-core uses Spark 3.5.*. The integration test 
should only be built
+  // if iceberg-spark-3.5 is available
+  if (sparkVersions.contains("3.5")) {
     sourceSets {
       integration {
         java.srcDir "$projectDir/src/integration/java"
@@ -615,6 +615,7 @@ project(':iceberg-delta-lake') {
     }
 
     task integrationTest(type: Test) {
+      useJUnitPlatform()
       testClassesDirs = sourceSets.integration.output.classesDirs
       classpath = sourceSets.integration.runtimeClasspath
       jvmArgs += project.property('extraJvmArgs')
diff --git 
a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
 
b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
index 52966a484e..cebbea65f5 100644
--- 
a/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
+++ 
b/delta-lake/src/integration/java/org/apache/iceberg/delta/TestSnapshotDeltaLakeTable.java
@@ -18,8 +18,10 @@
  */
 package org.apache.iceberg.delta;
 
+import static org.apache.spark.sql.functions.col;
 import static org.apache.spark.sql.functions.current_date;
 import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.date_format;
 import static org.apache.spark.sql.functions.expr;
 
 import io.delta.standalone.DeltaLog;
@@ -34,7 +36,6 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.file.Files;
-import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Timestamp;
 import java.util.Iterator;
@@ -42,7 +43,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.net.URLCodec;
 import org.apache.iceberg.Snapshot;
@@ -62,10 +62,8 @@ import org.apache.spark.sql.delta.catalog.DeltaCatalog;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
 
 public class TestSnapshotDeltaLakeTable extends SparkDeltaLakeSnapshotTestBase 
{
   private static final String SNAPSHOT_SOURCE_PROP = "snapshot_source";
@@ -74,31 +72,22 @@ public class TestSnapshotDeltaLakeTable extends 
SparkDeltaLakeSnapshotTestBase {
   private static final String NAMESPACE = "delta_conversion_test";
   private static final String defaultSparkCatalog = "spark_catalog";
   private static final String icebergCatalogName = "iceberg_hive";
+  private static final Map<String, String> config =
+      ImmutableMap.of(
+          "type", "hive",
+          "default-namespace", "default",
+          "parquet-enabled", "true",
+          "cache-enabled",
+              "false" // Spark will delete tables using v1, leaving the cache 
out of sync
+          );
   private static Dataset<Row> typeTestDataFrame;
   private static Dataset<Row> nestedDataFrame;
 
-  static Stream<Arguments> parameters() {
-    return Stream.of(
-        Arguments.of(
-            icebergCatalogName,
-            SparkCatalog.class.getName(),
-            ImmutableMap.of(
-                "type",
-                "hive",
-                "default-namespace",
-                "default",
-                "parquet-enabled",
-                "true",
-                "cache-enabled",
-                "false" // Spark will delete tables using v1, leaving the 
cache out of sync
-                )));
-  }
-
-  @TempDir private Path temp;
+  @TempDir private File tempA;
+  @TempDir private File tempB;
 
-  public TestSnapshotDeltaLakeTable(
-      String catalogName, String implementation, Map<String, String> config) {
-    super(catalogName, implementation, config);
+  public TestSnapshotDeltaLakeTable() {
+    super(icebergCatalogName, SparkCatalog.class.getName(), config);
     spark.conf().set("spark.sql.catalog." + defaultSparkCatalog, 
DeltaCatalog.class.getName());
   }
 
@@ -115,7 +104,8 @@ public class TestSnapshotDeltaLakeTable extends 
SparkDeltaLakeSnapshotTestBase {
             .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
             .withColumn("dateCol", date_add(current_date(), 1))
             .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
-            .withColumn("stringCol", expr("CAST(timestampCol AS STRING)"))
+            .withColumn("timestampStrCol", expr("CAST(timestampCol AS 
STRING)"))
+            .withColumn("stringCol", date_format(col("timestampCol"), 
"yyyy/M/d"))
             .withColumn("booleanCol", expr("longCol > 5"))
             .withColumn("binaryCol", expr("CAST(longCol AS BINARY)"))
             .withColumn("byteCol", expr("CAST(longCol AS BYTE)"))
@@ -160,11 +150,10 @@ public class TestSnapshotDeltaLakeTable extends 
SparkDeltaLakeSnapshotTestBase {
     spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", NAMESPACE));
   }
 
-  @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
-  @MethodSource("parameters")
+  @Test
   public void testBasicSnapshotPartitioned() {
     String partitionedIdentifier = destName(defaultSparkCatalog, 
"partitioned_table");
-    String partitionedLocation = temp.toFile().toURI().toString();
+    String partitionedLocation = tempA.toURI().toString();
 
     writeDeltaTable(nestedDataFrame, partitionedIdentifier, 
partitionedLocation, "id");
     spark.sql("DELETE FROM " + partitionedIdentifier + " WHERE id=3");
@@ -182,13 +171,12 @@ public class TestSnapshotDeltaLakeTable extends 
SparkDeltaLakeSnapshotTestBase {
     checkIcebergTableLocation(newTableIdentifier, partitionedLocation);
   }
 
-  @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
-  @MethodSource("parameters")
+  @Test
   public void testBasicSnapshotUnpartitioned() {
     String unpartitionedIdentifier = destName(defaultSparkCatalog, 
"unpartitioned_table");
-    String unpartitionedLocation = temp.toFile().toURI().toString();
+    String unpartitionedLocation = tempA.toURI().toString();
 
-    writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, 
unpartitionedLocation, null);
+    writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, 
unpartitionedLocation);
     spark.sql("DELETE FROM " + unpartitionedIdentifier + " WHERE id=3");
     spark.sql("UPDATE " + unpartitionedIdentifier + " SET id=3 WHERE id=1");
 
@@ -204,12 +192,11 @@ public class TestSnapshotDeltaLakeTable extends 
SparkDeltaLakeSnapshotTestBase {
     checkIcebergTableLocation(newTableIdentifier, unpartitionedLocation);
   }
 
-  @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
-  @MethodSource("parameters")
+  @Test
   public void testSnapshotWithNewLocation() {
     String partitionedIdentifier = destName(defaultSparkCatalog, 
"partitioned_table");
-    String partitionedLocation = temp.toFile().toURI().toString();
-    String newIcebergTableLocation = temp.toFile().toURI().toString();
+    String partitionedLocation = tempA.toURI().toString();
+    String newIcebergTableLocation = tempB.toURI().toString();
 
     writeDeltaTable(nestedDataFrame, partitionedIdentifier, 
partitionedLocation, "id");
     spark.sql("DELETE FROM " + partitionedIdentifier + " WHERE id=3");
@@ -228,13 +215,12 @@ public class TestSnapshotDeltaLakeTable extends 
SparkDeltaLakeSnapshotTestBase {
     checkIcebergTableLocation(newTableIdentifier, newIcebergTableLocation);
   }
 
-  @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
-  @MethodSource("parameters")
+  @Test
   public void testSnapshotWithAdditionalProperties() {
     String unpartitionedIdentifier = destName(defaultSparkCatalog, 
"unpartitioned_table");
-    String unpartitionedLocation = temp.toFile().toURI().toString();
+    String unpartitionedLocation = tempA.toURI().toString();
 
-    writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, 
unpartitionedLocation, null);
+    writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, 
unpartitionedLocation);
     spark.sql("DELETE FROM " + unpartitionedIdentifier + " WHERE id=3");
     spark.sql("UPDATE " + unpartitionedIdentifier + " SET id=3 WHERE id=1");
 
@@ -267,20 +253,18 @@ public class TestSnapshotDeltaLakeTable extends 
SparkDeltaLakeSnapshotTestBase {
         unpartitionedLocation);
   }
 
-  @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
-  @MethodSource("parameters")
+  @Test
   public void testSnapshotTableWithExternalDataFiles() {
     String unpartitionedIdentifier = destName(defaultSparkCatalog, 
"unpartitioned_table");
     String externalDataFilesIdentifier = destName(defaultSparkCatalog, 
"external_data_files_table");
-    String unpartitionedLocation = temp.toFile().toURI().toString();
-    String externalDataFilesTableLocation = temp.toFile().toURI().toString();
+    String unpartitionedLocation = tempA.toURI().toString();
+    String externalDataFilesTableLocation = tempB.toURI().toString();
 
-    writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, 
unpartitionedLocation, null);
+    writeDeltaTable(nestedDataFrame, unpartitionedIdentifier, 
unpartitionedLocation);
     spark.sql("DELETE FROM " + unpartitionedIdentifier + " WHERE id=3");
     spark.sql("UPDATE " + unpartitionedIdentifier + " SET id=3 WHERE id=1");
 
-    writeDeltaTable(
-        nestedDataFrame, externalDataFilesIdentifier, 
externalDataFilesTableLocation, null);
+    writeDeltaTable(nestedDataFrame, externalDataFilesIdentifier, 
externalDataFilesTableLocation);
     // Add parquet files to default.external_data_files_table. The newly added 
parquet files
     // are not at the same location as the table.
     addExternalDatafiles(externalDataFilesTableLocation, 
unpartitionedLocation);
@@ -297,13 +281,19 @@ public class TestSnapshotDeltaLakeTable extends 
SparkDeltaLakeSnapshotTestBase {
     checkDataFilePathsIntegrity(newTableIdentifier, 
externalDataFilesTableLocation);
   }
 
-  @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
-  @MethodSource("parameters")
+  @Test
   public void testSnapshotSupportedTypes() {
     String typeTestIdentifier = destName(defaultSparkCatalog, 
"type_test_table");
-    String typeTestTableLocation = temp.toFile().toURI().toString();
+    String typeTestTableLocation = tempA.toURI().toString();
 
-    writeDeltaTable(typeTestDataFrame, typeTestIdentifier, 
typeTestTableLocation, "stringCol");
+    writeDeltaTable(
+        typeTestDataFrame,
+        typeTestIdentifier,
+        typeTestTableLocation,
+        "stringCol",
+        "timestampStrCol",
+        "booleanCol",
+        "longCol");
     String newTableIdentifier = destName(icebergCatalogName, 
"iceberg_type_test_table");
     SnapshotDeltaLakeTable.Result result =
         DeltaLakeToIcebergMigrationSparkIntegration.snapshotDeltaLakeTable(
@@ -316,13 +306,12 @@ public class TestSnapshotDeltaLakeTable extends 
SparkDeltaLakeSnapshotTestBase {
     checkIcebergTableProperties(newTableIdentifier, ImmutableMap.of(), 
typeTestTableLocation);
   }
 
-  @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
-  @MethodSource("parameters")
+  @Test
   public void testSnapshotVacuumTable() throws IOException {
     String vacuumTestIdentifier = destName(defaultSparkCatalog, 
"vacuum_test_table");
-    String vacuumTestTableLocation = temp.toFile().toURI().toString();
+    String vacuumTestTableLocation = tempA.toURI().toString();
 
-    writeDeltaTable(nestedDataFrame, vacuumTestIdentifier, 
vacuumTestTableLocation, null);
+    writeDeltaTable(nestedDataFrame, vacuumTestIdentifier, 
vacuumTestTableLocation);
     Random random = new Random();
     for (int i = 0; i < 13; i++) {
       spark.sql(
@@ -352,11 +341,10 @@ public class TestSnapshotDeltaLakeTable extends 
SparkDeltaLakeSnapshotTestBase {
     checkIcebergTableLocation(newTableIdentifier, vacuumTestTableLocation);
   }
 
-  @ParameterizedTest(name = "Catalog Name {0} - Options {2}")
-  @MethodSource("parameters")
+  @Test
   public void testSnapshotLogCleanTable() throws IOException {
     String logCleanTestIdentifier = destName(defaultSparkCatalog, 
"log_clean_test_table");
-    String logCleanTestTableLocation = temp.toFile().toURI().toString();
+    String logCleanTestTableLocation = tempA.toURI().toString();
 
     writeDeltaTable(nestedDataFrame, logCleanTestIdentifier, 
logCleanTestTableLocation, "id");
     Random random = new Random();
@@ -549,14 +537,14 @@ public class TestSnapshotDeltaLakeTable extends 
SparkDeltaLakeSnapshotTestBase {
   }
 
   private void writeDeltaTable(
-      Dataset<Row> df, String identifier, String path, String partitionColumn) 
{
+      Dataset<Row> df, String identifier, String path, String... 
partitionColumns) {
     spark.sql(String.format("DROP TABLE IF EXISTS %s", identifier));
-    if (partitionColumn != null) {
+    if (partitionColumns.length > 0) {
       df.write()
           .format("delta")
           .mode(SaveMode.Append)
           .option("path", path)
-          .partitionBy(partitionColumn)
+          .partitionBy(partitionColumns)
           .saveAsTable(identifier);
     } else {
       df.write().format("delta").mode(SaveMode.Append).option("path", 
path).saveAsTable(identifier);
diff --git 
a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
 
b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
index eb2414ac82..afe43f6bb1 100644
--- 
a/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
+++ 
b/delta-lake/src/main/java/org/apache/iceberg/delta/BaseSnapshotDeltaLakeTableAction.java
@@ -386,18 +386,18 @@ class BaseSnapshotDeltaLakeTableAction implements 
SnapshotDeltaLakeTable {
         nameMappingString != null ? 
NameMappingParser.fromJson(nameMappingString) : null;
     Metrics metrics = getMetricsForFile(file, format, metricsConfig, 
nameMapping);
 
-    String partition =
+    List<String> partitionValueList =
         spec.fields().stream()
             .map(PartitionField::name)
-            .map(name -> String.format("%s=%s", name, 
partitionValues.get(name)))
-            .collect(Collectors.joining("/"));
+            .map(partitionValues::get)
+            .collect(Collectors.toList());
 
     return DataFiles.builder(spec)
         .withPath(fullFilePath)
         .withFormat(format)
         .withFileSizeInBytes(fileSize)
         .withMetrics(metrics)
-        .withPartitionPath(partition)
+        .withPartitionValues(partitionValueList)
         .build();
   }
 
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 29d5610f3c..f4f5075409 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -32,7 +32,7 @@ azuresdk-bom = "1.2.18"
 caffeine = "2.9.3"
 calcite = "1.10.0"
 delta-standalone = "0.6.0"
-delta-core = "2.2.0"
+delta-spark = "3.0.0"
 esotericsoftware-kryo = "4.0.2"
 errorprone-annotations = "2.23.0"
 findbugs-jsr305 = "3.0.2"
@@ -169,7 +169,7 @@ snowflake-jdbc = { module = "net.snowflake:snowflake-jdbc", 
version.ref = "snowf
 # test libraries
 assertj-core = { module = "org.assertj:assertj-core", version.ref = 
"assertj-core" }
 awaitility = { module = "org.awaitility:awaitility", version.ref = 
"awaitility" }
-delta-core = { module = "io.delta:delta-core_2.12", version.ref = "delta-core" 
}
+delta-spark = { module = "io.delta:delta-spark_2.12", version.ref = 
"delta-spark" }
 esotericsoftware-kryo = { module = "com.esotericsoftware:kryo", version.ref = 
"esotericsoftware-kryo" }
 flink116-connector-test-utils = { module = 
"org.apache.flink:flink-connector-test-utils", version.ref = "flink116" }
 flink116-core = { module = "org.apache.flink:flink-core", version.ref = 
"flink116" }

Reply via email to