This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a2ec17669 Spark 3.1, 3.2: Backport #5860 to support Java 8 time 
classes (#6190)
5a2ec17669 is described below

commit 5a2ec17669723102a8201d255d34dec942abed5d
Author: Prashant Singh <[email protected]>
AuthorDate: Sun Dec 18 15:59:16 2022 -0800

    Spark 3.1, 3.2: Backport #5860 to support Java 8 time classes (#6190)
---
 .../extensions/TestRewriteManifestsProcedure.java  | 92 +++++++++++++++++++++
 .../apache/iceberg/spark/SparkValueConverter.java  | 22 ++++-
 .../extensions/TestRewriteManifestsProcedure.java  | 95 ++++++++++++++++++++++
 .../apache/iceberg/spark/SparkValueConverter.java  | 22 ++++-
 4 files changed, 227 insertions(+), 4 deletions(-)

diff --git 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 7c5ec1f5cf..5f6bed7d37 100644
--- 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -20,13 +20,18 @@ package org.apache.iceberg.spark.extensions;
 
 import static 
org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
 
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -73,6 +78,93 @@ public class TestRewriteManifestsProcedure extends 
SparkExtensionsTestBase {
         "Must have 4 manifests", 4, 
table.currentSnapshot().allManifests(table.io()).size());
   }
 
+  @Test
+  public void 
testRewriteLargeManifestsOnDatePartitionedTableWithJava8APIEnabled() {
+    sql(
+        "CREATE TABLE %s (id INTEGER, name STRING, dept STRING, ts DATE) USING 
iceberg PARTITIONED BY (ts)",
+        tableName);
+    try {
+      spark
+          .createDataFrame(
+              ImmutableList.of(
+                  RowFactory.create(1, "John Doe", "hr", 
Date.valueOf("2021-01-01")),
+                  RowFactory.create(2, "Jane Doe", "hr", 
Date.valueOf("2021-01-02")),
+                  RowFactory.create(3, "Matt Doe", "hr", 
Date.valueOf("2021-01-03")),
+                  RowFactory.create(4, "Will Doe", "facilities", 
Date.valueOf("2021-01-04"))),
+              spark.table(tableName).schema())
+          .writeTo(tableName)
+          .append();
+    } catch (NoSuchTableException e) {
+      // not possible as we already created the table above.
+      throw new RuntimeException(e);
+    }
+    withSQLConf(
+        ImmutableMap.of("spark.sql.datetime.java8API.enabled", "true"),
+        () -> {
+          Table table = validationCatalog.loadTable(tableIdent);
+
+          Assert.assertEquals(
+              "Must have 1 manifest", 1, 
table.currentSnapshot().allManifests(table.io()).size());
+
+          sql(
+              "ALTER TABLE %s SET TBLPROPERTIES 
('commit.manifest.target-size-bytes' '1')",
+              tableName);
+
+          List<Object[]> output =
+              sql("CALL %s.system.rewrite_manifests('%s')", catalogName, 
tableIdent);
+          assertEquals("Procedure output must match", ImmutableList.of(row(1, 
4)), output);
+
+          table.refresh();
+
+          Assert.assertEquals(
+              "Must have 4 manifests", 4, 
table.currentSnapshot().allManifests(table.io()).size());
+        });
+  }
+
+  @Test
+  public void 
testRewriteLargeManifestsOnTimestampPartitionedTableWithJava8APIEnabled() {
+    sql(
+        "CREATE TABLE %s (id INTEGER, name STRING, dept STRING, ts TIMESTAMP) 
USING iceberg PARTITIONED BY (ts)",
+        tableName);
+    try {
+      spark
+          .createDataFrame(
+              ImmutableList.of(
+                  RowFactory.create(1, "John Doe", "hr", 
Timestamp.valueOf("2021-01-01 00:00:00")),
+                  RowFactory.create(2, "Jane Doe", "hr", 
Timestamp.valueOf("2021-01-02 00:00:00")),
+                  RowFactory.create(3, "Matt Doe", "hr", 
Timestamp.valueOf("2021-01-03 00:00:00")),
+                  RowFactory.create(
+                      4, "Will Doe", "facilities", 
Timestamp.valueOf("2021-01-04 00:00:00"))),
+              spark.table(tableName).schema())
+          .writeTo(tableName)
+          .append();
+    } catch (NoSuchTableException e) {
+      // not possible as we already created the table above.
+      throw new RuntimeException(e);
+    }
+    withSQLConf(
+        ImmutableMap.of("spark.sql.datetime.java8API.enabled", "true"),
+        () -> {
+          Table table = validationCatalog.loadTable(tableIdent);
+
+          Assert.assertEquals(
+              "Must have 1 manifest", 1, 
table.currentSnapshot().allManifests(table.io()).size());
+
+          sql(
+              "ALTER TABLE %s SET TBLPROPERTIES 
('commit.manifest.target-size-bytes' '1')",
+              tableName);
+
+          List<Object[]> output =
+              sql("CALL %s.system.rewrite_manifests('%s')", catalogName, 
tableIdent);
+          assertEquals("Procedure output must match", ImmutableList.of(row(1, 
4)), output);
+
+          table.refresh();
+
+          Assert.assertEquals(
+              "Must have 4 manifests", 4, 
table.currentSnapshot().allManifests(table.io()).size());
+        });
+  }
+
   @Test
   public void testRewriteSmallManifestsWithSnapshotIdInheritance() {
     sql(
diff --git 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
index 2f1c4d30b3..01d635a105 100644
--- 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
+++ 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
@@ -21,6 +21,8 @@ package org.apache.iceberg.spark;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.Schema;
@@ -70,9 +72,25 @@ public class SparkValueConverter {
         return convertedMap;
 
       case DATE:
-        return DateTimeUtils.fromJavaDate((Date) object);
+        // if spark.sql.datetime.java8API.enabled is set to true, 
java.time.LocalDate
+        // for Spark SQL DATE type otherwise java.sql.Date is returned.
+        if (object instanceof Date) {
+          return DateTimeUtils.fromJavaDate((Date) object);
+        } else if (object instanceof LocalDate) {
+          return DateTimeUtils.localDateToDays((LocalDate) object);
+        } else {
+          throw new UnsupportedOperationException("Not a supported date class: 
" + object);
+        }
       case TIMESTAMP:
-        return DateTimeUtils.fromJavaTimestamp((Timestamp) object);
+        // if spark.sql.datetime.java8API.enabled is set to true, 
java.time.Instant
+        // for Spark SQL TIMESTAMP type is returned otherwise 
java.sql.Timestamp is returned.
+        if (object instanceof Timestamp) {
+          return DateTimeUtils.fromJavaTimestamp((Timestamp) object);
+        } else if (object instanceof Instant) {
+          return DateTimeUtils.instantToMicros((Instant) object);
+        } else {
+          throw new UnsupportedOperationException("Not a supported timestamp 
class: " + object);
+        }
       case BINARY:
         return ByteBuffer.wrap((byte[]) object);
       case INTEGER:
diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 4578820290..784e5ed1e7 100644
--- 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -27,8 +27,11 @@ import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -75,6 +78,98 @@ public class TestRewriteManifestsProcedure extends 
SparkExtensionsTestBase {
         "Must have 4 manifests", 4, 
table.currentSnapshot().allManifests(table.io()).size());
   }
 
+  @Test
+  public void 
testRewriteLargeManifestsOnDatePartitionedTableWithJava8APIEnabled() {
+    withSQLConf(
+        ImmutableMap.of("spark.sql.datetime.java8API.enabled", "true"),
+        () -> {
+          sql(
+              "CREATE TABLE %s (id INTEGER, name STRING, dept STRING, ts DATE) 
USING iceberg PARTITIONED BY (ts)",
+              tableName);
+          try {
+            spark
+                .createDataFrame(
+                    ImmutableList.of(
+                        RowFactory.create(1, "John Doe", "hr", 
Date.valueOf("2021-01-01")),
+                        RowFactory.create(2, "Jane Doe", "hr", 
Date.valueOf("2021-01-02")),
+                        RowFactory.create(3, "Matt Doe", "hr", 
Date.valueOf("2021-01-03")),
+                        RowFactory.create(4, "Will Doe", "facilities", 
Date.valueOf("2021-01-04"))),
+                    spark.table(tableName).schema())
+                .writeTo(tableName)
+                .append();
+          } catch (NoSuchTableException e) {
+            // not possible as we already created the table above.
+            throw new RuntimeException(e);
+          }
+
+          Table table = validationCatalog.loadTable(tableIdent);
+
+          Assert.assertEquals(
+              "Must have 1 manifest", 1, 
table.currentSnapshot().allManifests(table.io()).size());
+
+          sql(
+              "ALTER TABLE %s SET TBLPROPERTIES 
('commit.manifest.target-size-bytes' '1')",
+              tableName);
+
+          List<Object[]> output =
+              sql("CALL %s.system.rewrite_manifests('%s')", catalogName, 
tableIdent);
+          assertEquals("Procedure output must match", ImmutableList.of(row(1, 
4)), output);
+
+          table.refresh();
+
+          Assert.assertEquals(
+              "Must have 4 manifests", 4, 
table.currentSnapshot().allManifests(table.io()).size());
+        });
+  }
+
+  @Test
+  public void 
testRewriteLargeManifestsOnTimestampPartitionedTableWithJava8APIEnabled() {
+    withSQLConf(
+        ImmutableMap.of("spark.sql.datetime.java8API.enabled", "true"),
+        () -> {
+          sql(
+              "CREATE TABLE %s (id INTEGER, name STRING, dept STRING, ts 
TIMESTAMP) USING iceberg PARTITIONED BY (ts)",
+              tableName);
+          try {
+            spark
+                .createDataFrame(
+                    ImmutableList.of(
+                        RowFactory.create(
+                            1, "John Doe", "hr", Timestamp.valueOf("2021-01-01 
00:00:00")),
+                        RowFactory.create(
+                            2, "Jane Doe", "hr", Timestamp.valueOf("2021-01-02 
00:00:00")),
+                        RowFactory.create(
+                            3, "Matt Doe", "hr", Timestamp.valueOf("2021-01-03 
00:00:00")),
+                        RowFactory.create(
+                            4, "Will Doe", "facilities", 
Timestamp.valueOf("2021-01-04 00:00:00"))),
+                    spark.table(tableName).schema())
+                .writeTo(tableName)
+                .append();
+          } catch (NoSuchTableException e) {
+            // not possible as we already created the table above.
+            throw new RuntimeException(e);
+          }
+
+          Table table = validationCatalog.loadTable(tableIdent);
+
+          Assert.assertEquals(
+              "Must have 1 manifest", 1, 
table.currentSnapshot().allManifests(table.io()).size());
+
+          sql(
+              "ALTER TABLE %s SET TBLPROPERTIES 
('commit.manifest.target-size-bytes' '1')",
+              tableName);
+
+          List<Object[]> output =
+              sql("CALL %s.system.rewrite_manifests('%s')", catalogName, 
tableIdent);
+          assertEquals("Procedure output must match", ImmutableList.of(row(1, 
4)), output);
+
+          table.refresh();
+
+          Assert.assertEquals(
+              "Must have 4 manifests", 4, 
table.currentSnapshot().allManifests(table.io()).size());
+        });
+  }
+
   @Test
   public void testRewriteSmallManifestsWithSnapshotIdInheritance() {
     sql(
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
index 5a5381099c..7d7bc0f675 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
@@ -21,6 +21,8 @@ package org.apache.iceberg.spark;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.Schema;
@@ -70,9 +72,25 @@ public class SparkValueConverter {
         return convertedMap;
 
       case DATE:
-        return DateTimeUtils.fromJavaDate((Date) object);
+        // if spark.sql.datetime.java8API.enabled is set to true, 
java.time.LocalDate
+        // for Spark SQL DATE type otherwise java.sql.Date is returned.
+        if (object instanceof Date) {
+          return DateTimeUtils.fromJavaDate((Date) object);
+        } else if (object instanceof LocalDate) {
+          return DateTimeUtils.localDateToDays((LocalDate) object);
+        } else {
+          throw new UnsupportedOperationException("Not a supported date class: 
" + object);
+        }
       case TIMESTAMP:
-        return DateTimeUtils.fromJavaTimestamp((Timestamp) object);
+        // if spark.sql.datetime.java8API.enabled is set to true, 
java.time.Instant
+        // for Spark SQL TIMESTAMP type is returned otherwise 
java.sql.Timestamp is returned.
+        if (object instanceof Timestamp) {
+          return DateTimeUtils.fromJavaTimestamp((Timestamp) object);
+        } else if (object instanceof Instant) {
+          return DateTimeUtils.instantToMicros((Instant) object);
+        } else {
+          throw new UnsupportedOperationException("Not a supported timestamp 
class: " + object);
+        }
       case BINARY:
         return ByteBuffer.wrap((byte[]) object);
       case INTEGER:

Reply via email to