apilloud commented on a change in pull request #13930:
URL: https://github.com/apache/beam/pull/13930#discussion_r610044553



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -468,66 +440,174 @@ private static Expression value(
 
       final Expression expression = list.append(list.newName("current"), 
input);
 
-      FieldType fromType = schema.getField(index).getType();
-      Class convertTo = null;
-      if (storageType == Object.class) {
-        convertTo = Object.class;
-      } else if (fromType.getTypeName().isLogicalType()) {
-        convertTo = 
LOGICAL_TYPE_TO_BASE_TYPE_MAP.get(fromType.getLogicalType().getIdentifier());
-      } else {
-        convertTo = TYPE_CONVERSION_MAP.get(fromType.getTypeName());
-      }
-      if (convertTo == null) {
-        throw new UnsupportedOperationException("Unable to get " + 
fromType.getTypeName());
+      FieldType fieldType = schema.getField(index).getType();
+      Expression value;
+      switch (fieldType.getTypeName()) {
+        case BYTE:
+          value = Expressions.call(expression, "getByte", 
Expressions.constant(index));
+          break;
+        case INT16:
+          value = Expressions.call(expression, "getInt16", 
Expressions.constant(index));
+          break;
+        case INT32:
+          value = Expressions.call(expression, "getInt32", 
Expressions.constant(index));
+          break;
+        case INT64:
+          value = Expressions.call(expression, "getInt64", 
Expressions.constant(index));
+          break;
+        case DECIMAL:
+          value = Expressions.call(expression, "getDecimal", 
Expressions.constant(index));
+          break;
+        case FLOAT:
+          value = Expressions.call(expression, "getFloat", 
Expressions.constant(index));
+          break;
+        case DOUBLE:
+          value = Expressions.call(expression, "getDouble", 
Expressions.constant(index));
+          break;
+        case STRING:
+          value = Expressions.call(expression, "getString", 
Expressions.constant(index));
+          break;
+        case DATETIME:
+          value = Expressions.call(expression, "getDateTime", 
Expressions.constant(index));
+          break;
+        case BOOLEAN:
+          value = Expressions.call(expression, "getBoolean", 
Expressions.constant(index));
+          break;
+        case BYTES:
+          value = Expressions.call(expression, "getBytes", 
Expressions.constant(index));
+          break;
+        case ARRAY:
+          value = Expressions.call(expression, "getArray", 
Expressions.constant(index));
+          if (storageType == Object.class
+              && 
TypeName.ROW.equals(fieldType.getCollectionElementType().getTypeName())) {
+            // Workaround for missing row output support
+            return Expressions.convert_(value, Object.class);
+          }
+          break;
+        case MAP:
+          value = Expressions.call(expression, "getMap", 
Expressions.constant(index));
+          break;
+        case ROW:
+          value = Expressions.call(expression, "getRow", 
Expressions.constant(index));
+          break;
+        case LOGICAL_TYPE:
+          String identifier = fieldType.getLogicalType().getIdentifier();
+          if (CharType.IDENTIFIER.equals(identifier)) {
+            value = Expressions.call(expression, "getString", 
Expressions.constant(index));
+          } else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
+            value = Expressions.call(expression, "getDateTime", 
Expressions.constant(index));
+          } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
+            value =
+                Expressions.convert_(
+                    Expressions.call(
+                        expression,
+                        "getLogicalTypeValue",
+                        Expressions.constant(index),
+                        Expressions.constant(LocalDate.class)),
+                    LocalDate.class);
+          } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
+            value =
+                Expressions.convert_(
+                    Expressions.call(
+                        expression,
+                        "getLogicalTypeValue",
+                        Expressions.constant(index),
+                        Expressions.constant(LocalTime.class)),
+                    LocalTime.class);
+          } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
+            value =
+                Expressions.convert_(
+                    Expressions.call(
+                        expression,
+                        "getLogicalTypeValue",
+                        Expressions.constant(index),
+                        Expressions.constant(LocalDateTime.class)),
+                    LocalDateTime.class);
+          } else {
+            throw new UnsupportedOperationException("Unable to get logical 
type " + identifier);
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException("Unable to get " + 
fieldType.getTypeName());
       }
 
-      Expression value =
-          Expressions.convert_(
-              Expressions.call(
-                  expression,
-                  "getBaseValue",
-                  Expressions.constant(index),
-                  Expressions.constant(convertTo)),
-              convertTo);
-      return (storageType != Object.class) ? value(value, fromType) : value;
+      return value(value, fieldType);
     }
 
-    private static Expression value(Expression value, Schema.FieldType type) {
-      if (type.getTypeName().isLogicalType()) {
-        String logicalId = type.getLogicalType().getIdentifier();
-        if (SqlTypes.TIME.getIdentifier().equals(logicalId)) {
+    private static Expression value(Expression value, FieldType fieldType) {

Review comment:
       Yep, renamed to getBeamField




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to