This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new ed0959257c MR: Fix using Date type as partition field (#10210)
ed0959257c is described below

commit ed0959257cba02f378f7097d81cecaaaef9fa43f
Author: lurnagao-dahua <[email protected]>
AuthorDate: Tue May 7 21:31:22 2024 +0800

    MR: Fix using Date type as partition field (#10210)
---
 .../iceberg/mr/hive/HiveIcebergRecordWriter.java   |  5 ++-
 .../TestHiveIcebergStorageHandlerWithEngine.java   | 51 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
index 793b9c5e64..f87d79b553 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.FileIO;
@@ -50,6 +51,7 @@ class HiveIcebergRecordWriter extends 
PartitionedFanoutWriter<Record>
   // The current key is reused at every write to avoid unnecessary object 
creation
   private final PartitionKey currentKey;
   private final FileIO io;
+  private final InternalRecordWrapper wrapper;
 
   // <TaskAttemptId, <TABLE_NAME, HiveIcebergRecordWriter>> map to store the 
active writers
   // Stored in concurrent map, since some executor engines can share containers
@@ -77,13 +79,14 @@ class HiveIcebergRecordWriter extends 
PartitionedFanoutWriter<Record>
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.io = io;
     this.currentKey = new PartitionKey(spec, schema);
+    this.wrapper = new InternalRecordWrapper(schema.asStruct());
     writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
     writers.get(taskAttemptID).put(tableName, this);
   }
 
   @Override
   protected PartitionKey partition(Record row) {
-    currentKey.partition(row);
+    currentKey.partition(wrapper.wrap(row));
     return currentKey;
   }
 
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index c8e91de9b8..b080f4bd49 100644
--- 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -25,6 +25,8 @@ import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -1206,6 +1208,55 @@ public class TestHiveIcebergStorageHandlerWithEngine {
         0);
   }
 
+  @TestTemplate
+  public void testWriteWithDatePartition() {
+    assumeThat(executionEngine).as("Tez write is not implemented 
yet").isEqualTo("mr");
+
+    Schema dateSchema =
+        new Schema(
+            optional(1, "id", Types.LongType.get()),
+            optional(2, "part_field", Types.DateType.get()));
+
+    PartitionSpec spec = 
PartitionSpec.builderFor(dateSchema).identity("part_field").build();
+    List<Record> records =
+        TestHelper.RecordsBuilder.newInstance(dateSchema)
+            .add(1L, LocalDate.of(2023, 1, 21))
+            .add(2L, LocalDate.of(2023, 1, 22))
+            .add(3L, LocalDate.of(2022, 1, 21))
+            .build();
+    testTables.createTable(shell, "part_test", dateSchema, spec, 
FileFormat.PARQUET, records);
+    List<Object[]> result = shell.executeStatement("SELECT * from part_test 
order by id");
+
+    assertThat(result).hasSameSizeAs(records);
+    assertThat(result.get(0)[1]).isEqualTo("2023-01-21");
+    assertThat(result.get(1)[1]).isEqualTo("2023-01-22");
+    assertThat(result.get(2)[1]).isEqualTo("2022-01-21");
+  }
+
+  @TestTemplate
+  public void testWriteWithTimestampPartition() throws IOException {
+    assumeThat(executionEngine).as("Tez write is not implemented 
yet").isEqualTo("mr");
+
+    Schema dateSchema =
+        new Schema(
+            optional(1, "id", Types.LongType.get()),
+            optional(2, "part_field", Types.TimestampType.withoutZone()));
+    PartitionSpec spec = 
PartitionSpec.builderFor(dateSchema).identity("part_field").build();
+    List<Record> records =
+        TestHelper.RecordsBuilder.newInstance(dateSchema)
+            .add(1L, LocalDateTime.of(2023, 1, 21, 21, 10, 10, 100000000))
+            .add(2L, LocalDateTime.of(2023, 1, 21, 22, 10, 10, 200000000))
+            .add(3L, LocalDateTime.of(2023, 1, 22, 21, 10, 10, 300000000))
+            .build();
+    testTables.createTable(shell, "part_test", dateSchema, spec, 
FileFormat.PARQUET, records);
+    List<Object[]> result = shell.executeStatement("SELECT * from part_test 
order by id");
+
+    assertThat(result).hasSameSizeAs(records);
+    assertThat(result.get(0)[1]).isEqualTo("2023-01-21 21:10:10.1");
+    assertThat(result.get(1)[1]).isEqualTo("2023-01-21 22:10:10.2");
+    assertThat(result.get(2)[1]).isEqualTo("2023-01-22 21:10:10.3");
+  }
+
   /**
    * Checks if the certain type is an unsupported vectorized types in Hive 
3.1.2
    *

Reply via email to