wombatu-kun commented on code in PR #16619:
URL: https://github.com/apache/iceberg/pull/16619#discussion_r3335275005


##########
parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java:
##########
@@ -378,4 +387,109 @@ private Pair<File, Long> generateFile(
             records.toArray(new GenericData.Record[] {}));
     return Pair.of(file, size);
   }
+
+  @Test
+  public void timestampNanoFilterRespectsNanoseconds() throws IOException {
+    // Predicate pushdown on timestamp_ns must filter at full nanosecond 
resolution. The five rows
+    // differ only by sub-microsecond nanoseconds, so a micros-truncating push 
down could not
+    // separate id 2 (250 ns) from id 3 (750 ns) and would return the wrong 
rows.
+    Schema schema =
+        new Schema(
+            required(1, "id", Types.LongType.get()),
+            required(2, "ts", Types.TimestampNanoType.withoutZone()));
+
+    Record template = org.apache.iceberg.data.GenericRecord.create(schema);
+    List<Record> records =
+        Lists.newArrayList(
+            template.copy(
+                ImmutableMap.of(
+                    "id", 1L, "ts", 
LocalDateTime.parse("2024-01-01T00:00:00.000000000"))),
+            template.copy(
+                ImmutableMap.of(
+                    "id", 2L, "ts", 
LocalDateTime.parse("2024-01-01T00:00:00.000000250"))),
+            template.copy(
+                ImmutableMap.of(
+                    "id", 3L, "ts", 
LocalDateTime.parse("2024-01-01T00:00:00.000000750"))),
+            template.copy(
+                ImmutableMap.of(
+                    "id", 4L, "ts", 
LocalDateTime.parse("2024-01-01T00:00:00.000001500"))),
+            template.copy(
+                ImmutableMap.of(
+                    "id", 5L, "ts", 
LocalDateTime.parse("2024-01-01T00:00:00.000003000"))));
+
+    File file = writeNanoRecords(schema, records, "ts-nano");
+
+    // Boundary at 500 ns: only ids 3 (750 ns), 4 (1500 ns), 5 (3000 ns) 
qualify.
+    List<Long> ids =
+        filterIds(schema, file, greaterThanOrEqual("ts", 
"2024-01-01T00:00:00.000000500"));
+    assertThat(ids).containsExactlyInAnyOrder(3L, 4L, 5L);
+  }
+
+  @Test
+  public void timestamptzNanoFilterAcrossTimezones() throws IOException {
+    // Each row is written in a different zone offset; instants are 
0/500/750/1500/3000 ns past the
+    // same UTC second. id2 lands exactly on the filter boundary but in 
+05:00, so a strict
+    // greaterThan must exclude it by instant, not by wall-clock (its 05:00 vs 
the boundary's
+    // 04:00).
+    Schema schema =
+        new Schema(
+            required(1, "id", Types.LongType.get()),
+            required(2, "ts", Types.TimestampNanoType.withZone()));
+
+    Record template = org.apache.iceberg.data.GenericRecord.create(schema);
+    List<Record> records =
+        Lists.newArrayList(
+            template.copy(
+                ImmutableMap.of(
+                    "id", 1L, "ts", 
OffsetDateTime.parse("2024-01-01T00:00:00.000000000+00:00"))),
+            template.copy(
+                ImmutableMap.of(
+                    "id", 2L, "ts", 
OffsetDateTime.parse("2024-01-01T05:00:00.000000500+05:00"))),
+            template.copy(
+                ImmutableMap.of(
+                    "id", 3L, "ts", 
OffsetDateTime.parse("2023-12-31T16:00:00.000000750-08:00"))),
+            template.copy(
+                ImmutableMap.of(
+                    "id", 4L, "ts", 
OffsetDateTime.parse("2024-01-01T05:30:00.000001500+05:30"))),
+            template.copy(
+                ImmutableMap.of(
+                    "id", 5L, "ts", 
OffsetDateTime.parse("2024-01-01T00:00:00.000003000+00:00"))));
+
+    File file = writeNanoRecords(schema, records, "ts-tz-nano");
+
+    // Boundary == 2024-01-01T00:00:00.000000500Z, expressed in +04:00. id2 is 
that same instant
+    // (written in +05:00); a strict greaterThan excludes it, leaving ids 
3/4/5.
+    List<Long> ids =
+        filterIds(schema, file, greaterThan("ts", 
"2024-01-01T04:00:00.000000500+04:00"));
+    assertThat(ids).containsExactlyInAnyOrder(3L, 4L, 5L);
+  }
+
+  private File writeNanoRecords(Schema schema, List<Record> records, String 
prefix)
+      throws IOException {
+    File file = temp.resolve(prefix + ".parquet").toFile();
+    assertThat(file.delete() || !file.exists()).isTrue();
+    try (FileAppender<Record> appender =
+        Parquet.write(Files.localOutput(file))
+            .schema(schema)
+            .createWriterFunc(GenericParquetWriter::create)
+            .build()) {
+      for (Record record : records) {
+        appender.add(record);
+      }
+    }
+    return file;
+  }
+
+  private List<Long> filterIds(Schema schema, File file, Expression filter) 
throws IOException {
+    List<Long> ids = Lists.newArrayList();
+    // callInit() drives the parquet-mr ReadSupport path, the only read path 
that runs
+    // ParquetFilters.
+    try (CloseableIterable<GenericData.Record> reader =
+        
Parquet.read(localInput(file)).project(schema).callInit().filter(filter).build())
 {
+      for (GenericData.Record record : reader) {
+        ids.add((Long) record.get("id"));
+      }
+    }

Review Comment:
   Done 095ffc34c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to