youngxinler opened a new issue, #6510:
URL: https://github.com/apache/iceberg/issues/6510
### Apache Iceberg version
1.1.0 (latest release)
### Query engine
Other
### Please describe the bug 🐞
When using the java iceberg api to write data into a partition table, an
error occurs For a timestamp partition field, the type cannot be parsed
correctly.
```
Configuration configuration = new Configuration();
// this is a local file catalog
HadoopCatalog hadoopCatalog = new HadoopCatalog(configuration,
icebergWareHousePath);
TableIdentifier name = TableIdentifier.of("logging", "logs");
Schema schema = new Schema(
Types.NestedField.required(1, "level",
Types.StringType.get()),
Types.NestedField.required(2, "event_time",
Types.TimestampType.withZone()),
Types.NestedField.required(3, "message",
Types.StringType.get()),
Types.NestedField.optional(4, "call_stack",
Types.ListType.ofRequired(5, Types.StringType.get()))
);
PartitionSpec spec = PartitionSpec.builderFor(schema)
.hour("event_time")
.identity("level")
.build();
Table table = hadoopCatalog.createTable(name, schema, spec);
GenericAppenderFactory appenderFactory = new
GenericAppenderFactory(table.schema());
int partitionId = 1, taskId = 1;
OutputFileFactory outputFileFactory =
OutputFileFactory.builderFor(table, partitionId,
taskId).format(FileFormat.PARQUET).build();
final PartitionKey partitionKey = new PartitionKey(table.spec(),
table.spec().schema());
// partitionedFanoutWriter will auto partitioned record and create
the partitioned writer
PartitionedFanoutWriter<Record> partitionedFanoutWriter = new
PartitionedFanoutWriter<Record>(table.spec(), FileFormat.PARQUET,
appenderFactory, outputFileFactory, table.io(), TARGET_FILE_SIZE_IN_BYTES) {
@Override
protected PartitionKey partition(Record record) {
partitionKey.partition(record);
return partitionKey;
}
};
Random random = new Random();
List<String> levels = Arrays.asList("info", "debug", "error",
"warn");
GenericRecord genericRecord = GenericRecord.create(table.schema());
// assume write 1000 records
for (int i = 0; i < 1000; i++) {
GenericRecord record = genericRecord.copy();
record.setField("level",
levels.get(random.nextInt(levels.size())));
// record.setField("event_time", System.currentTimeMillis());
record.setField("event_time", OffsetDateTime.now());
record.setField("message", "Iceberg is a great table format");
record.setField("call_stack",
Arrays.asList("NullPointerException"));
partitionedFanoutWriter.write(record);
}
AppendFiles appendFiles = table.newAppend();
// submit datafiles to the table
Arrays.stream(partitionedFanoutWriter.dataFiles()).forEach(appendFiles::appendFile);
// submit snapshot
Snapshot newSnapshot = appendFiles.apply();
appendFiles.commit();
```
When I use Long to set event_time and write, it will report an error
```
java.lang.ClassCastException: java.lang.Long cannot be cast to
java.time.OffsetDateTime
at
org.apache.iceberg.data.parquet.BaseParquetWriter$TimestamptzWriter.write(BaseParquetWriter.java:281)
at
org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:138)
at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
at
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:362)
at
org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:345)
at
org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
at
org.apache.iceberg.io.PartitionedFanoutWriter.write(PartitionedFanoutWriter.java:63)
```
When I use java.time.OffsetDateTime to set event_time and write, it will
also report an error
```
java.lang.IllegalStateException: Not an instance of java.lang.Long:
2023-01-02T11:20:20.746+08:00
at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
```
This problem will not occur if the table is non partitioned. I looked at
the internal code of PartitionKey. It seems that the transformation logic of
this internal partition field is related?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]