deadb0d4 commented on code in PR #32695:
URL: https://github.com/apache/beam/pull/32695#discussion_r1793904304


##########
sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java:
##########
@@ -74,14 +77,23 @@ public PCollection<Row> expand(PBegin input) {
   private static class HCatToRowFn extends DoFn<HCatRecord, Row> {
     private final Schema schema;
 
+    private Object castHDate(Object obj) {
+      if (obj instanceof org.apache.hadoop.hive.common.type.Date) {
+        return new Instant(((org.apache.hadoop.hive.common.type.Date) 
obj).toEpochMilli());
+      }
+      return obj;
+    }
+
     HCatToRowFn(Schema schema) {
       this.schema = schema;
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) {
       HCatRecord hCatRecord = c.element();
-      c.output(Row.withSchema(schema).addValues(hCatRecord.getAll()).build());
+      List<Object> recordValues =
+          
hCatRecord.getAll().stream().map(this::castHDate).collect(Collectors.toList());

Review Comment:
   Hey, thanks for the suggestion! I moved this whole logic to a separate new 
util `castTypes`. Please let me know if you simply meant 
`s/castHDate/castTypes` though!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to