twalthr commented on a change in pull request #18858:
URL: https://github.com/apache/flink/pull/18858#discussion_r812637334



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
##########
@@ -106,78 +118,108 @@ public RexNodeJsonDeserializer() {
     @Override
     public RexNode deserialize(JsonParser jsonParser, DeserializationContext 
ctx)
             throws IOException {
-        JsonNode jsonNode = jsonParser.readValueAsTree();
-        return deserialize(jsonNode, jsonParser.getCodec(), ctx);
+        final JsonNode jsonNode = jsonParser.readValueAsTree();
+        final SerdeContext serdeContext = SerdeContext.get(ctx);
+        return deserialize(jsonNode, serdeContext);
     }
 
-    private RexNode deserialize(JsonNode jsonNode, ObjectCodec codec, 
DeserializationContext ctx)
+    private static RexNode deserialize(JsonNode jsonNode, SerdeContext 
serdeContext)
             throws IOException {
-        String kind = jsonNode.get(FIELD_NAME_KIND).asText().toUpperCase();
+        final String kind = jsonNode.required(FIELD_NAME_KIND).asText();
         switch (kind) {
-            case SQL_KIND_INPUT_REF:
-                return deserializeInputRef(jsonNode, codec, ctx);
-            case SQL_KIND_LITERAL:
-                return deserializeLiteral(jsonNode, codec, ctx);
-            case SQL_KIND_FIELD_ACCESS:
-                return deserializeFieldAccess(jsonNode, codec, ctx);
-            case SQL_KIND_CORREL_VARIABLE:
-                return deserializeCorrelVariable(jsonNode, codec, ctx);
-            case SQL_KIND_REX_CALL:
-                return deserializeCall(jsonNode, codec, ctx);
-            case SQL_KIND_PATTERN_INPUT_REF:
-                return deserializePatternInputRef(jsonNode, codec, ctx);
+            case KIND_INPUT_REF:
+                return deserializeInputRef(jsonNode, serdeContext);
+            case KIND_LITERAL:
+                return deserializeLiteral(jsonNode, serdeContext);
+            case KIND_FIELD_ACCESS:
+                return deserializeFieldAccess(jsonNode, serdeContext);
+            case KIND_CORREL_VARIABLE:
+                return deserializeCorrelVariable(jsonNode, serdeContext);
+            case KIND_PATTERN_INPUT_REF:
+                return deserializePatternFieldRef(jsonNode, serdeContext);
+            case KIND_CALL:
+                return deserializeCall(jsonNode, serdeContext);
             default:
                 throw new TableException("Cannot convert to RexNode: " + 
jsonNode.toPrettyString());
         }
     }
 
-    private RexNode deserializeInputRef(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType fieldType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
-        return SerdeContext.get(ctx).getRexBuilder().makeInputRef(fieldType, 
inputIndex);
+    private static RexNode deserializeInputRef(JsonNode jsonNode, SerdeContext 
serdeContext) {
+        final int inputIndex = 
jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+        final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+        final RelDataType fieldType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
+        return serdeContext.getRexBuilder().makeInputRef(fieldType, 
inputIndex);
     }
 
-    private RexNode deserializePatternInputRef(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        int inputIndex = jsonNode.get(FIELD_NAME_INPUT_INDEX).intValue();
-        String alpha = jsonNode.get(FIELD_NAME_ALPHA).asText();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType fieldType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
-        return SerdeContext.get(ctx)
-                .getRexBuilder()
-                .makePatternFieldRef(alpha, fieldType, inputIndex);
-    }
-
-    private RexNode deserializeLiteral(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) 
throws IOException {
-        RexBuilder rexBuilder = SerdeContext.get(ctx).getRexBuilder();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType literalType = ctx.readValue(typeNode.traverse(codec), 
RelDataType.class);
+    private static RexNode deserializeLiteral(JsonNode jsonNode, SerdeContext 
serdeContext) {
+        final JsonNode logicalTypeNode = jsonNode.get(FIELD_NAME_TYPE);
+        final RelDataType relDataType =
+                RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, 
serdeContext);
         if (jsonNode.has(FIELD_NAME_SARG)) {
-            Sarg<?> sarg =
-                    toSarg(jsonNode.get(FIELD_NAME_SARG), 
literalType.getSqlTypeName(), codec, ctx);
-            return rexBuilder.makeSearchArgumentLiteral(sarg, literalType);
+            return deserializeSarg(jsonNode.required(FIELD_NAME_SARG), 
relDataType, serdeContext);
         } else if (jsonNode.has(FIELD_NAME_VALUE)) {
-            JsonNode literalNode = jsonNode.get(FIELD_NAME_VALUE);
-            if (literalNode.isNull()) {
-                return rexBuilder.makeNullLiteral(literalType);
+            final Object value =
+                    deserializeLiteralValue(jsonNode, 
relDataType.getSqlTypeName(), serdeContext);
+            if (value == null) {
+                return 
serdeContext.getRexBuilder().makeNullLiteral(relDataType);
             }
-            Object literal = toLiteralValue(jsonNode, 
literalType.getSqlTypeName(), codec, ctx);
-            return rexBuilder.makeLiteral(literal, literalType, true);
+            return serdeContext.getRexBuilder().makeLiteral(value, 
relDataType, true);
         } else {
             throw new TableException("Unknown literal: " + 
jsonNode.toPrettyString());
         }
     }
 
-    private Object toLiteralValue(
-            JsonNode literalNode,
-            SqlTypeName sqlTypeName,
-            ObjectCodec codec,
-            DeserializationContext ctx)
-            throws IOException {
-        JsonNode valueNode = literalNode.get(FIELD_NAME_VALUE);
+    @SuppressWarnings({"UnstableApiUsage", "rawtypes", "unchecked"})
+    private static RexNode deserializeSarg(
+            JsonNode sargNode, RelDataType relDataType, SerdeContext 
serdeContext) {
+        final RexBuilder rexBuilder = serdeContext.getRexBuilder();
+        final ArrayNode rangesNode = (ArrayNode) 
sargNode.required(FIELD_NAME_RANGES);
+        final Builder builder = builder();
+        for (JsonNode rangeNode : rangesNode) {
+            Range range = all();
+            if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) {
+                final JsonNode lowerNode = 
rangeNode.required(FIELD_NAME_BOUND_LOWER);
+                final Comparable<?> boundValue =
+                        (Comparable<?>)
+                                deserializeLiteralValue(
+                                        lowerNode, 
relDataType.getSqlTypeName(), serdeContext);
+                assert boundValue != null;
+                final BoundType boundType =
+                        serializableToCalcite(
+                                BoundType.class,
+                                
lowerNode.required(FIELD_NAME_BOUND_TYPE).asText());
+                final Range<?> r =
+                        boundType == BoundType.OPEN ? greaterThan(boundValue) 
: atLeast(boundValue);
+                range = range.intersection(r);
+            }
+            if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) {
+                final JsonNode upperNode = 
rangeNode.required(FIELD_NAME_BOUND_UPPER);
+                final Comparable<?> boundValue =
+                        (Comparable<?>)
+                                deserializeLiteralValue(
+                                        upperNode, 
relDataType.getSqlTypeName(), serdeContext);
+                assert boundValue != null;
+                final BoundType boundType =
+                        serializableToCalcite(
+                                BoundType.class,
+                                
upperNode.required(FIELD_NAME_BOUND_TYPE).asText());
+                final Range<?> r =
+                        boundType == BoundType.OPEN ? lessThan(boundValue) : 
atMost(boundValue);
+                range = range.intersection(r);
+            }
+            if (range.hasUpperBound() || range.hasLowerBound()) {
+                builder.add(range);
+            }
+        }
+        final boolean containsNull = 
sargNode.required(FIELD_NAME_CONTAINS_NULL).booleanValue();
+        return rexBuilder.makeSearchArgumentLiteral(
+                Sarg.of(containsNull, builder.build()), relDataType);
+    }
+
+    private static Object deserializeLiteralValue(
+            JsonNode literalNode, SqlTypeName sqlTypeName, SerdeContext 
serdeContext) {

Review comment:
       For symbols we need to access further fields.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to