kingeasternsun commented on a change in pull request #3987:
URL: https://github.com/apache/iceberg/pull/3987#discussion_r810451687
##########
File path:
flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
##########
@@ -72,4 +95,86 @@ protected void writeAndValidate(Schema schema) throws
IOException {
writeAndValidate(RandomGenericData.generateDictionaryEncodableRecords(schema,
NUM_RECORDS, 21124), schema);
writeAndValidate(RandomGenericData.generateFallbackRecords(schema,
NUM_RECORDS, 21124, NUM_RECORDS / 20), schema);
}
+
+ protected List<RowData> rowDatasFromFile(InputFile inputFile, Schema schema)
throws IOException {
+ try (CloseableIterable<RowData> reader =
+ Parquet.read(inputFile)
+ .project(schema)
+ .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema,
type))
+ .build()) {
+ return Lists.newArrayList(reader);
+ }
+ }
+
+ @Test
+ public void testInt96TimestampProducedBySparkIsReadCorrectly() throws
IOException {
+ String outputFilePath = String.format("%s/%s",
temp.getRoot().getAbsolutePath(), "parquet_int96.parquet");
+ HadoopOutputFile outputFile =
+ HadoopOutputFile.fromPath(
+ new org.apache.hadoop.fs.Path(outputFilePath), new
Configuration());
+ Schema schema = new Schema(required(1, "ts",
Types.TimestampType.withZone()));
+ StructType sparkSchema =
+ new StructType(
+ new StructField[] {
+ new StructField("ts", DataTypes.TimestampType, true,
Metadata.empty())
+ });
+
+ final Random random = new Random(0L);
+ List<InternalRow> rows = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ rows.add(new GenericInternalRow(new Object[] {
+
RandomUtil.generatePrimitive(schema.asStruct().fieldType("ts").asPrimitiveType(),
random)}));
+ }
+
+ try (FileAppender<InternalRow> writer =
+ new ParquetWriteAdapter<>(
+ new NativeSparkWriterBuilder(outputFile)
+ .set("org.apache.spark.sql.parquet.row.attributes",
sparkSchema.json())
+ .set("spark.sql.parquet.writeLegacyFormat", "false")
+ .set("spark.sql.parquet.outputTimestampType", "INT96")
+ .build(),
+ MetricsConfig.getDefault())) {
+ writer.addAll(rows);
+ }
+
+ InputFile parquetInputFile = Files.localInput(outputFilePath);
+ List<RowData> readDataRows = rowDatasFromFile(parquetInputFile, schema);
+ Assert.assertEquals(rows.size(), readDataRows.size());
+ for (int i = 0; i < rows.size(); i += 1) {
+ Assert.assertEquals(rows.get(i).getLong(0),
readDataRows.get(i).getLong(0));
+ }
+ }
+
+ /**
+ * Native Spark ParquetWriter.Builder implementation so that we can write
timestamps using Spark's native
+ * ParquetWriteSupport.
+ * thanks for the PR https://github.com/apache/iceberg/pull/1184 by
@gustavoatt
Review comment:
Ok, I'll fix it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]