This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new ed0959257c MR: Fix using Date type as partition field (#10210)
ed0959257c is described below
commit ed0959257cba02f378f7097d81cecaaaef9fa43f
Author: lurnagao-dahua <[email protected]>
AuthorDate: Tue May 7 21:31:22 2024 +0800
MR: Fix using Date type as partition field (#10210)
---
.../iceberg/mr/hive/HiveIcebergRecordWriter.java | 5 ++-
.../TestHiveIcebergStorageHandlerWithEngine.java | 51 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
index 793b9c5e64..f87d79b553 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
@@ -50,6 +51,7 @@ class HiveIcebergRecordWriter extends
PartitionedFanoutWriter<Record>
// The current key is reused at every write to avoid unnecessary object
creation
private final PartitionKey currentKey;
private final FileIO io;
+ private final InternalRecordWrapper wrapper;
// <TaskAttemptId, <TABLE_NAME, HiveIcebergRecordWriter>> map to store the
active writers
// Stored in concurrent map, since some executor engines can share containers
@@ -77,13 +79,14 @@ class HiveIcebergRecordWriter extends
PartitionedFanoutWriter<Record>
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.io = io;
this.currentKey = new PartitionKey(spec, schema);
+ this.wrapper = new InternalRecordWrapper(schema.asStruct());
writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
writers.get(taskAttemptID).put(tableName, this);
}
@Override
protected PartitionKey partition(Record row) {
- currentKey.partition(row);
+ currentKey.partition(wrapper.wrap(row));
return currentKey;
}
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index c8e91de9b8..b080f4bd49 100644
---
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -25,6 +25,8 @@ import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.nio.file.Path;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -1206,6 +1208,55 @@ public class TestHiveIcebergStorageHandlerWithEngine {
0);
}
+ @TestTemplate
+ public void testWriteWithDatePartition() {
+ assumeThat(executionEngine).as("Tez write is not implemented
yet").isEqualTo("mr");
+
+ Schema dateSchema =
+ new Schema(
+ optional(1, "id", Types.LongType.get()),
+ optional(2, "part_field", Types.DateType.get()));
+
+ PartitionSpec spec =
PartitionSpec.builderFor(dateSchema).identity("part_field").build();
+ List<Record> records =
+ TestHelper.RecordsBuilder.newInstance(dateSchema)
+ .add(1L, LocalDate.of(2023, 1, 21))
+ .add(2L, LocalDate.of(2023, 1, 22))
+ .add(3L, LocalDate.of(2022, 1, 21))
+ .build();
+ testTables.createTable(shell, "part_test", dateSchema, spec,
FileFormat.PARQUET, records);
+ List<Object[]> result = shell.executeStatement("SELECT * from part_test
order by id");
+
+ assertThat(result).hasSameSizeAs(records);
+ assertThat(result.get(0)[1]).isEqualTo("2023-01-21");
+ assertThat(result.get(1)[1]).isEqualTo("2023-01-22");
+ assertThat(result.get(2)[1]).isEqualTo("2022-01-21");
+ }
+
+ @TestTemplate
+ public void testWriteWithTimestampPartition() throws IOException {
+ assumeThat(executionEngine).as("Tez write is not implemented
yet").isEqualTo("mr");
+
+ Schema dateSchema =
+ new Schema(
+ optional(1, "id", Types.LongType.get()),
+ optional(2, "part_field", Types.TimestampType.withoutZone()));
+ PartitionSpec spec =
PartitionSpec.builderFor(dateSchema).identity("part_field").build();
+ List<Record> records =
+ TestHelper.RecordsBuilder.newInstance(dateSchema)
+ .add(1L, LocalDateTime.of(2023, 1, 21, 21, 10, 10, 100000000))
+ .add(2L, LocalDateTime.of(2023, 1, 21, 22, 10, 10, 200000000))
+ .add(3L, LocalDateTime.of(2023, 1, 22, 21, 10, 10, 300000000))
+ .build();
+ testTables.createTable(shell, "part_test", dateSchema, spec,
FileFormat.PARQUET, records);
+ List<Object[]> result = shell.executeStatement("SELECT * from part_test
order by id");
+
+ assertThat(result).hasSameSizeAs(records);
+ assertThat(result.get(0)[1]).isEqualTo("2023-01-21 21:10:10.1");
+ assertThat(result.get(1)[1]).isEqualTo("2023-01-21 22:10:10.2");
+ assertThat(result.get(2)[1]).isEqualTo("2023-01-22 21:10:10.3");
+ }
+
/**
* Checks if the certain type is an unsupported vectorized types in Hive
3.1.2
*