gustavoatt commented on a change in pull request #1184:
URL: https://github.com/apache/iceberg/pull/1184#discussion_r460131668
##########
File path:
spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +76,41 @@ protected void writeAndValidate(Schema schema) throws
IOException {
Assert.assertFalse("Should not have extra rows", rows.hasNext());
}
}
+
+ protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema)
throws IOException {
+ try (CloseableIterable<InternalRow> reader =
+ Parquet.read(inputFile)
+ .project(schema)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(schema,
type))
+ .build()) {
+ return Lists.newArrayList(reader);
+ }
+ }
+
+ @Test
+ public void testInt96TimestampProducedBySparkIsReadCorrectly() throws
IOException {
+ final Schema schema = new Schema(required(1, "ts",
Types.TimestampType.asSparkInt96()));
+ final StructType sparkSchema = SparkSchemaUtil.convert(schema);
+ final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(),
"parquet_int96.parquet");
Review comment:
I initially tried that way but the writer fails because the file already
exists.
##########
File path: api/src/main/java/org/apache/iceberg/types/Types.java
##########
@@ -219,16 +220,30 @@ public static TimestampType withoutZone() {
return INSTANCE_WITHOUT_ZONE;
}
+ /**
+ * @return Timestamp type (with timezone) represented as INT96. This is
only added for compatibility reasons
+ * and can only be written using a Spark's ParquetWriteSupport. Writing
this type should be avoided.
+ */
Review comment:
Agreed. I found a way to have tests running that doesn't add a new type,
I had to create an implementation of `ParquetWriter.Builder` that uses Spark's
`ParquetWriteSupport` and Iceberg's `ParquetWriteAdapter` to avoid creating a
`SparkSession`.
##########
File path:
spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +76,41 @@ protected void writeAndValidate(Schema schema) throws
IOException {
Assert.assertFalse("Should not have extra rows", rows.hasNext());
}
}
+
+ protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema)
throws IOException {
+ try (CloseableIterable<InternalRow> reader =
+ Parquet.read(inputFile)
+ .project(schema)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(schema,
type))
+ .build()) {
+ return Lists.newArrayList(reader);
+ }
+ }
+
+ @Test
+ public void testInt96TimestampProducedBySparkIsReadCorrectly() throws
IOException {
+ final Schema schema = new Schema(required(1, "ts",
Types.TimestampType.asSparkInt96()));
+ final StructType sparkSchema = SparkSchemaUtil.convert(schema);
+ final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(),
"parquet_int96.parquet");
+ final List<InternalRow> rows =
Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L));
Review comment:
Done. Removed these final modifiers.
##########
File path:
spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
##########
@@ -67,4 +76,41 @@ protected void writeAndValidate(Schema schema) throws
IOException {
Assert.assertFalse("Should not have extra rows", rows.hasNext());
}
}
+
+ protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema)
throws IOException {
+ try (CloseableIterable<InternalRow> reader =
+ Parquet.read(inputFile)
+ .project(schema)
+ .createReaderFunc(type -> SparkParquetReaders.buildReader(schema,
type))
+ .build()) {
+ return Lists.newArrayList(reader);
+ }
+ }
+
+ @Test
+ public void testInt96TimestampProducedBySparkIsReadCorrectly() throws
IOException {
+ final Schema schema = new Schema(required(1, "ts",
Types.TimestampType.asSparkInt96()));
+ final StructType sparkSchema = SparkSchemaUtil.convert(schema);
+ final Path parquetFile = Paths.get(temp.getRoot().getAbsolutePath(),
"parquet_int96.parquet");
+ final List<InternalRow> rows =
Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L));
+
+ try (FileAppender<InternalRow> writer =
+ Parquet.write(Files.localOutput(parquetFile.toString()))
+ .writeSupport(
+ new
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport())
+ .set("org.apache.spark.sql.parquet.row.attributes",
sparkSchema.json())
+ .set("org.apache.spark.legacyDateTime", "false")
+ .set("spark.sql.parquet.int96AsTimestamp", "true")
+ .set("spark.sql.parquet.writeLegacyFormat", "false")
+ .set("spark.sql.parquet.outputTimestampType", "INT96")
+ .schema(schema)
Review comment:
I'm not sure I fully understand this comment.
But I did change my approach here, and while still writing `InternalRow` I
removed most of these properties and left only the relevant ones to make sure
that Spark writes these as int96.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]