xuefuz commented on a change in pull request #10401: [FLINK-15020][hive] 
Support timestamp type in hive
URL: https://github.com/apache/flink/pull/10401#discussion_r353372429
 
 

 ##########
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
 ##########
 @@ -82,12 +86,34 @@ public static Object convertToHiveDate(HiveShim hiveShim, 
String s) throws Flink
                }
        }
 
-       public static Object convertToHiveTimestamp(HiveShim hiveShim, String 
s) throws FlinkHiveUDFException {
+       // converts a Flink timestamp instance to what's expected by Hive
+       public static Object toHiveTimestamp(HiveShim hiveShim, Object 
flinkTimestamp) {
+               Preconditions.checkArgument(flinkTimestamp instanceof Timestamp 
|| flinkTimestamp instanceof LocalDateTime,
+                               String.format("Only support converting %s or %s 
to Hive timestamp, but got %s",
+                                               Timestamp.class.getName(), 
LocalDateTime.class.getName(), flinkTimestamp.getClass().getName()));
+               Class hiveTimestampClz = hiveShim.getTimestampDataTypeClass();
+               if (hiveTimestampClz.equals(Timestamp.class)) {
+                       return flinkTimestamp instanceof Timestamp ? 
flinkTimestamp : Timestamp.valueOf((LocalDateTime) flinkTimestamp);
+               } else {
+                       try {
+                               return invokeMethod(hiveTimestampClz, null, 
"valueOf", new Class[]{String.class}, new Object[]{flinkTimestamp.toString()});
+                       } catch (NoSuchMethodException | 
InvocationTargetException | IllegalAccessException e) {
+                               throw new FlinkHiveException("Failed to convert 
to Hive timestamp", e);
+                       }
+               }
+       }
+
+       // converts a hive timestamp instance to java.sql.Timestamp which is 
expected by DataFormatConverter
+       public static Timestamp toFlinkTimestamp(HiveShim hiveShim, Object 
hiveTimestamp) {
+               if (hiveTimestamp instanceof Timestamp) {
+                       return (Timestamp) hiveTimestamp;
+               }
                try {
-                       Method method = 
hiveShim.getTimestampDataTypeClass().getMethod("valueOf", String.class);
-                       return method.invoke(null, s);
-               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
-                       throw new FlinkHiveUDFException("Failed to invoke 
Hive's Timestamp.valueOf()", e);
+                       String hiveTSStr = (String) 
invokeMethod(hiveShim.getTimestampDataTypeClass(), hiveTimestamp,
+                                       "toString", null, null);
+                       return Timestamp.valueOf(hiveTSStr);
 
 Review comment:
   The conversion to/from string might be two slow for the conversion. We 
probably can get milli seconds and nano seconds from java.sql.timestamp and set 
them to Hive's timestamp.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to