ZijieSong946 commented on a change in pull request #12348:
URL: https://github.com/apache/beam/pull/12348#discussion_r465268349



##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
##########
@@ -412,32 +398,141 @@ public void testNullDatetimeFields() {
             .addNullableField("year_with_null", FieldType.INT64)
             .addField("mm", FieldType.INT64)
             .addNullableField("month_with_null", FieldType.INT64)
+            .build();
+
+    PAssert.that(outputRow)
+        .containsInAnyOrder(
+            Row.withSchema(outputRowSchema).addValues(2019L, null, 06L, 
null).build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeDateFields() {
+    Schema dateTimeFieldSchema =
+        Schema.builder()
+            .addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE))
+            .addNullableField("nullableDateTypeField", 
FieldType.logicalType(SqlTypes.DATE))
+            .build();
+
+    Row dateRow =
+        Row.withSchema(dateTimeFieldSchema).addValues(LocalDate.of(2019, 6, 
27), null).build();
+
+    PCollection<Row> outputRow =
+        pipeline
+            .apply(Create.of(dateRow))
+            .setRowSchema(dateTimeFieldSchema)
+            .apply(
+                SqlTransform.query(
+                    "select EXTRACT(DAY from dateTypeField) as dd, "
+                        + " EXTRACT(DAY from nullableDateTypeField) as 
day_with_null, "
+                        + " dateTypeField + interval '1' day as 
date_with_day_added, "
+                        + " nullableDateTypeField + interval '1' day as 
day_added_with_null "
+                        + " from PCOLLECTION"));
+
+    Schema outputRowSchema =
+        Schema.builder()
+            .addField("dd", FieldType.INT64)
+            .addNullableField("day_with_null", FieldType.INT64)
+            .addField("date_with_day_added", 
FieldType.logicalType(SqlTypes.DATE))
+            .addNullableField("day_added_with_null", 
FieldType.logicalType(SqlTypes.DATE))
+            .build();
+
+    PAssert.that(outputRow)
+        .containsInAnyOrder(
+            Row.withSchema(outputRowSchema)
+                .addValues(27L, null, LocalDate.of(2019, 6, 28), null)
+                .build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testNullSqlLogicalTypeTimeFields() {
+    Schema dateTimeFieldSchema =
+        Schema.builder()
+            .addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME))
+            .addNullableField("nullableTimeTypeField", 
FieldType.logicalType(SqlTypes.TIME))
+            .build();
+
+    Row timeRow =
+        Row.withSchema(dateTimeFieldSchema).addValues(LocalTime.of(1, 0, 0), 
null).build();
+
+    PCollection<Row> outputRow =
+        pipeline
+            .apply(Create.of(timeRow))
+            .setRowSchema(dateTimeFieldSchema)
+            .apply(
+                SqlTransform.query(
+                    "select timeTypeField + interval '1' hour as 
time_with_hour_added, "
+                        + " nullableTimeTypeField + interval '1' hour as 
hour_added_with_null, "
+                        + " timeTypeField - INTERVAL '60' SECOND as 
time_with_seconds_added, "
+                        + " nullableTimeTypeField - INTERVAL '60' SECOND as 
seconds_added_with_null "
+                        + " from PCOLLECTION"));
+
+    Schema outputRowSchema =
+        Schema.builder()
             .addField("time_with_hour_added", 
FieldType.logicalType(SqlTypes.TIME))
             .addNullableField("hour_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
             .addField("time_with_seconds_added", 
FieldType.logicalType(SqlTypes.TIME))
             .addNullableField("seconds_added_with_null", 
FieldType.logicalType(SqlTypes.TIME))
+            .build();
+
+    PAssert.that(outputRow)
+        .containsInAnyOrder(
+            Row.withSchema(outputRowSchema)
+                .addValues(LocalTime.of(2, 0, 0), null, LocalTime.of(0, 59, 
0), null)
+                .build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testSqlLogicalTypeDatetimeFields() {
+    Schema dateTimeFieldSchema =
+        Schema.builder()
+            .addField("dateTimeField", 
FieldType.logicalType(SqlTypes.DATETIME))

Review comment:
       I planned to do that but Calcite do not have 
`NULLABLE_TIMESTAMP_WITH_LOCAL_TZ` type corresponding to 
`TIMESTAMP_WITH_LOCAL_TZ`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to