[ 
https://issues.apache.org/jira/browse/BEAM-4622?focusedWorklogId=124018&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-124018
 ]

ASF GitHub Bot logged work on BEAM-4622:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Jul/18 08:57
            Start Date: 17/Jul/18 08:57
    Worklog Time Spent: 10m 
      Work Description: aromanenko-dev closed pull request #5912: [BEAM-4622] 
Makes required to call Beam SQL expressions validation
URL: https://github.com/apache/beam/pull/5912
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index de667f0a1e7..228a0df07aa 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -147,6 +147,17 @@ public BeamSqlFnExecutor(RexProgram program) {
    * represent each {@link SqlOperator} with a corresponding {@link 
BeamSqlExpression}.
    */
   static BeamSqlExpression buildExpression(RexNode rexNode) {
+    BeamSqlExpression ret = getBeamSqlExpression(rexNode);
+
+    if (!ret.accept()) {
+      throw new IllegalStateException(
+          ret.getClass().getSimpleName() + " does not accept the operands.(" + 
rexNode + ")");
+    }
+
+    return ret;
+  }
+
+  private static BeamSqlExpression getBeamSqlExpression(RexNode rexNode) {
     BeamSqlExpression ret;
     if (rexNode instanceof RexLiteral) {
       RexLiteral node = (RexLiteral) rexNode;
@@ -156,11 +167,11 @@ static BeamSqlExpression buildExpression(RexNode rexNode) 
{
       if (SqlTypeName.CHAR_TYPES.contains(type) && node.getValue() instanceof 
NlsString) {
         // NlsString is not serializable, we need to convert
         // it to string explicitly.
-        return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
+        ret = BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
       } else if (isDateNode(type, value)) {
         // does this actually make sense?
         // Calcite actually treat Calendar as the java type of Date Literal
-        return BeamSqlPrimitive.of(type, new DateTime(((Calendar) 
value).getTimeInMillis()));
+        ret = BeamSqlPrimitive.of(type, new DateTime(((Calendar) 
value).getTimeInMillis()));
       } else {
         // node.getTypeName().getSqlTypeName() and node.getSqlTypeName() can 
be different
         // e.g. sql: "select 1"
@@ -207,7 +218,7 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
           }
         }
 
-        return BeamSqlPrimitive.of(realType, realValue);
+        ret = BeamSqlPrimitive.of(realType, realValue);
       }
     } else if (rexNode instanceof RexInputRef) {
       RexInputRef node = (RexInputRef) rexNode;
@@ -394,13 +405,15 @@ static BeamSqlExpression buildExpression(RexNode rexNode) 
{
 
           // date functions
         case "Reinterpret":
-          return new BeamSqlReinterpretExpression(subExps, 
node.type.getSqlTypeName());
+          ret = new BeamSqlReinterpretExpression(subExps, 
node.type.getSqlTypeName());
+          break;
         case "CEIL":
           if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
-            return new BeamSqlCeilExpression(subExps);
+            ret = new BeamSqlCeilExpression(subExps);
           } else {
-            return new BeamSqlOperatorExpression(DateOperators.DATETIME_CEIL, 
subExps);
+            ret = new BeamSqlOperatorExpression(DateOperators.DATETIME_CEIL, 
subExps);
           }
+          break;
 
         case "FLOOR":
           if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
@@ -412,53 +425,67 @@ static BeamSqlExpression buildExpression(RexNode rexNode) 
{
 
         case "EXTRACT_DATE":
         case "EXTRACT":
-          return new BeamSqlOperatorExpression(DateOperators.EXTRACT, subExps);
+          ret = new BeamSqlOperatorExpression(DateOperators.EXTRACT, subExps);
+          break;
 
         case "LOCALTIME":
         case "CURRENT_TIME":
-          return new BeamSqlCurrentTimeExpression(subExps);
+          ret = new BeamSqlCurrentTimeExpression(subExps);
+          break;
 
         case "CURRENT_TIMESTAMP":
         case "LOCALTIMESTAMP":
-          return new BeamSqlCurrentTimestampExpression(subExps);
+          ret = new BeamSqlCurrentTimestampExpression(subExps);
+          break;
 
         case "CURRENT_DATE":
-          return new BeamSqlCurrentDateExpression();
+          ret = new BeamSqlCurrentDateExpression();
+          break;
 
         case "DATETIME_PLUS":
-          return new BeamSqlDatetimePlusExpression(subExps);
+          ret = new BeamSqlDatetimePlusExpression(subExps);
+          break;
 
           // array functions
         case "ARRAY":
-          return new BeamSqlArrayExpression(subExps);
+          ret = new BeamSqlArrayExpression(subExps);
+          break;
           // map functions
         case "MAP":
-          return new BeamSqlMapExpression(subExps);
+          ret = new BeamSqlMapExpression(subExps);
+          break;
 
         case "ITEM":
           switch (subExps.get(0).getOutputType()) {
             case MAP:
-              return new BeamSqlMapItemExpression(subExps, 
node.type.getSqlTypeName());
+              ret = new BeamSqlMapItemExpression(subExps, 
node.type.getSqlTypeName());
+              break;
             case ARRAY:
-              return new BeamSqlArrayItemExpression(subExps, 
node.type.getSqlTypeName());
+              ret = new BeamSqlArrayItemExpression(subExps, 
node.type.getSqlTypeName());
+              break;
             default:
               throw new UnsupportedOperationException(
                   "Operator: " + opName + " is not supported yet");
           }
+          break;
 
           // collections functions
         case "ELEMENT":
-          return new BeamSqlSingleElementExpression(subExps, 
node.type.getSqlTypeName());
+          ret = new BeamSqlSingleElementExpression(subExps, 
node.type.getSqlTypeName());
+          break;
 
         case "CARDINALITY":
-          return new BeamSqlCardinalityExpression(subExps, 
node.type.getSqlTypeName());
+          ret = new BeamSqlCardinalityExpression(subExps, 
node.type.getSqlTypeName());
+          break;
 
         case "DOT":
-          return new BeamSqlDotExpression(subExps, node.type.getSqlTypeName());
+          ret = new BeamSqlDotExpression(subExps, node.type.getSqlTypeName());
+          break;
 
           // DEFAULT keyword for UDF with optional parameter
         case "DEFAULT":
-          return new BeamSqlDefaultExpression();
+          ret = new BeamSqlDefaultExpression();
+          break;
 
         case "CASE":
           ret = new BeamSqlCaseExpression(subExps);
@@ -506,14 +533,6 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
       throw new UnsupportedOperationException(
           String.format("%s is not supported yet", 
rexNode.getClass().toString()));
     }
-
-    // TODO: https://issues.apache.org/jira/browse/BEAM-4622
-    // Many paths above do not reach this validation
-    if (!ret.accept()) {
-      throw new IllegalStateException(
-          ret.getClass().getSimpleName() + " does not accept the operands.(" + 
rexNode + ")");
-    }
-
     return ret;
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapExpression.java
index e1ceb3c1640..f76ecacfd8d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/map/BeamSqlMapExpression.java
@@ -36,7 +36,15 @@ public BeamSqlMapExpression(List<BeamSqlExpression> 
operands) {
 
   @Override
   public boolean accept() {
-    return 
operands.stream().map(BeamSqlExpression::getOutputType).distinct().count() == 1;
+    int distinctCount = 2;
+    if (operands.size() < 2) {
+      return false;
+    }
+    if 
(operands.get(0).getOutputType().equals(operands.get(1).getOutputType())) {
+      distinctCount = 1;
+    }
+    return 
operands.stream().map(BeamSqlExpression::getOutputType).distinct().count()
+        == distinctCount;
   }
 
   @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java
index 461a776cb5b..9679c8d13e3 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/reinterpret/BeamSqlReinterpretExpression.java
@@ -24,6 +24,7 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -47,7 +48,18 @@ public BeamSqlReinterpretExpression(List<BeamSqlExpression> 
operands, SqlTypeNam
 
   @Override
   public boolean accept() {
-    return getOperands().size() == 1 && REINTERPRETER.canConvert(opType(0), 
SqlTypeName.BIGINT);
+    if (getOperands().size() != 1) {
+      return false;
+    }
+
+    // Interval types will be already converted into BIGINT after evaluation.
+    SqlTypeFamily opTypeFamily = opType(0).getFamily();
+    if (opTypeFamily.equals(SqlTypeFamily.INTERVAL_DAY_TIME)
+        || opTypeFamily.equals(SqlTypeFamily.INTERVAL_YEAR_MONTH)) {
+      return true;
+    }
+
+    return REINTERPRETER.canConvert(opType(0), SqlTypeName.BIGINT);
   }
 
   @Override
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java
index a49718ce52a..7017ffaa066 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java
@@ -91,6 +91,29 @@ public void testSelectMapField() {
     pipeline.run();
   }
 
+  @Test
+  public void testSelectMapFieldKeyValueSameType() {
+    PCollection<Row> input = pCollectionOf2Elements();
+
+    Schema resultType =
+        Schema.builder()
+            .addInt32Field("f_int")
+            .addMapField("f_intStringMap", Schema.FieldType.STRING, 
Schema.FieldType.STRING)
+            .build();
+
+    PCollection<Row> result =
+        input.apply(
+            "sqlQuery",
+            SqlTransform.query("SELECT 42, MAP['aa', '1'] as `f_map` FROM 
PCOLLECTION"));
+
+    PAssert.that(result)
+        .containsInAnyOrder(
+            Row.withSchema(resultType).addValues(42, ImmutableMap.of("aa", 
"1")).build(),
+            Row.withSchema(resultType).addValues(42, ImmutableMap.of("aa", 
"1")).build());
+
+    pipeline.run();
+  }
+
   @Test
   public void testAccessMapElement() {
     PCollection<Row> input = pCollectionOf2Elements();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 124018)
    Time Spent: 1h 10m  (was: 1h)

> Many Beam SQL expressions never have their validation called
> ------------------------------------------------------------
>
>                 Key: BEAM-4622
>                 URL: https://issues.apache.org/jira/browse/BEAM-4622
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Kenneth Knowles
>            Assignee: Alexey Romanenko
>            Priority: Major
>              Labels: easyfix, newbie, starter
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> In {{BeamSqlFnExecutor}} there is a pattern where first the returned 
> expression is assigned to a variable {{ret}} and then after a giant switch 
> statement the validation is invoked. But there are many code paths that just 
> call {{return}} and skip validation. This should be refactored so it is 
> impossible to short-circuit on accident like this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to