apilloud commented on a change in pull request #13930:
URL: https://github.com/apache/beam/pull/13930#discussion_r608963665
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -468,66 +440,174 @@ private static Expression value(
final Expression expression = list.append(list.newName("current"),
input);
- FieldType fromType = schema.getField(index).getType();
- Class convertTo = null;
- if (storageType == Object.class) {
- convertTo = Object.class;
- } else if (fromType.getTypeName().isLogicalType()) {
- convertTo =
LOGICAL_TYPE_TO_BASE_TYPE_MAP.get(fromType.getLogicalType().getIdentifier());
- } else {
- convertTo = TYPE_CONVERSION_MAP.get(fromType.getTypeName());
- }
- if (convertTo == null) {
- throw new UnsupportedOperationException("Unable to get " +
fromType.getTypeName());
+ FieldType fieldType = schema.getField(index).getType();
+ Expression value;
+ switch (fieldType.getTypeName()) {
+ case BYTE:
+ value = Expressions.call(expression, "getByte",
Expressions.constant(index));
+ break;
+ case INT16:
+ value = Expressions.call(expression, "getInt16",
Expressions.constant(index));
+ break;
+ case INT32:
+ value = Expressions.call(expression, "getInt32",
Expressions.constant(index));
+ break;
+ case INT64:
+ value = Expressions.call(expression, "getInt64",
Expressions.constant(index));
+ break;
+ case DECIMAL:
+ value = Expressions.call(expression, "getDecimal",
Expressions.constant(index));
+ break;
+ case FLOAT:
+ value = Expressions.call(expression, "getFloat",
Expressions.constant(index));
+ break;
+ case DOUBLE:
+ value = Expressions.call(expression, "getDouble",
Expressions.constant(index));
+ break;
+ case STRING:
+ value = Expressions.call(expression, "getString",
Expressions.constant(index));
+ break;
+ case DATETIME:
+ value = Expressions.call(expression, "getDateTime",
Expressions.constant(index));
+ break;
+ case BOOLEAN:
+ value = Expressions.call(expression, "getBoolean",
Expressions.constant(index));
+ break;
+ case BYTES:
+ value = Expressions.call(expression, "getBytes",
Expressions.constant(index));
+ break;
+ case ARRAY:
+ value = Expressions.call(expression, "getArray",
Expressions.constant(index));
+ if (storageType == Object.class
+ &&
TypeName.ROW.equals(fieldType.getCollectionElementType().getTypeName())) {
+ // Workaround for missing row output support
Review comment:
This is the one special case that remains, it requires rewriting the
output code to support rows. That is the next PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]