Repository: flink
Updated Branches:
  refs/heads/master 556ea8a71 -> f2ae2414d


[FLINK-8617] [table] Fix code generation bug while accessing Map type

This closes #5438.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2ae2414
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2ae2414
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2ae2414

Branch: refs/heads/master
Commit: f2ae2414d65ab2fbfa1efd60f69e243fbeeba118
Parents: 556ea8a
Author: Xpray <leonxp...@gmail.com>
Authored: Mon Feb 12 00:33:37 2018 +0800
Committer: twalthr <twal...@apache.org>
Committed: Mon Feb 12 10:48:37 2018 +0100

----------------------------------------------------------------------
 .../table/codegen/calls/ScalarOperators.scala   | 16 +++++++----
 .../flink/table/expressions/MapTypeTest.scala   | 28 ++++++++++++++++++++
 .../table/runtime/batch/table/CalcITCase.scala  |  9 +++++--
 3 files changed, 46 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2ae2414/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 742ee7d..a261b3d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -22,13 +22,12 @@ import org.apache.calcite.avatica.util.{DateTimeUtils, 
TimeUnitRange}
 import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo._
-import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, 
RowTypeInfo}
 import org.apache.flink.table.codegen.CodeGenUtils._
 import 
org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
 import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
-import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, 
TimeIntervalTypeInfo, TypeCoercion}
 import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, 
TimeIntervalTypeInfo, TypeCoercion}
 
 object ScalarOperators {
 
@@ -1169,9 +1168,9 @@ object ScalarOperators {
       s"""
          |${map.code}
          |${key.code}
-         |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm});
-         |$resultTypeTerm $resultTerm = $nullTerm ?
+         |$resultTypeTerm $resultTerm = (${map.nullTerm} || ${key.nullTerm}) ?
          |  null : ($resultTypeTerm) ${map.resultTerm}.get(${key.resultTerm});
+         |boolean $nullTerm = $resultTerm == null;
          |""".stripMargin
     } else {
       s"""
@@ -1181,7 +1180,14 @@ object ScalarOperators {
          | ${map.resultTerm}.get(${key.resultTerm});
          |""".stripMargin
     }
-    GeneratedExpression(resultTerm, nullTerm, accessCode, resultType)
+    val unboxing = codeGenerator.generateInputFieldUnboxing(resultType, 
resultTerm)
+
+    unboxing.copy(code =
+      s"""
+         |$accessCode
+         |${unboxing.code}
+         |""".stripMargin
+    )
   }
 
   def generateMapCardinality(

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ae2414/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
index 2ce17e4..0a30eb0 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
@@ -196,6 +196,34 @@ class MapTypeTest extends MapTypeTestBase {
       "f3.cardinality()",
       "CARDINALITY(f3)",
       "2")
+
+    testAllApis(
+      'f2.at("a").isNotNull,
+      "f2.at('a').isNotNull",
+      "f2['a'] IS NOT NULL",
+      "true"
+    )
+
+    testAllApis(
+      'f2.at("a").isNull,
+      "f2.at('a').isNull",
+      "f2['a'] IS NULL",
+      "false"
+    )
+
+    testAllApis(
+      'f2.at("c").isNotNull,
+      "f2.at('c').isNotNull",
+      "f2['c'] IS NOT NULL",
+      "false"
+    )
+
+    testAllApis(
+      'f2.at("c").isNull,
+      "f2.at('c').isNull",
+      "f2['c'] IS NULL",
+      "true"
+    )
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/f2ae2414/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 74f0560..1b89229 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -479,9 +479,14 @@ class CalcITCase(
 
     val table = env.fromElements(rowValue).toTable(tEnv, 'a, 'b, 'c)
 
-    val result = table.select(row('a, 'b, 'c), array(12, 'b), map('a, 'c))
+    val result = table.select(
+      row('a, 'b, 'c),
+      array(12, 'b),
+      map('a, 'c),
+      map('a, 'c).at('a) === 'c
+    )
 
-    val expected = "foo,12,1984-07-12 14:34:24.0,[12, 12],{foo=1984-07-12 
14:34:24.0}"
+    val expected = "foo,12,1984-07-12 14:34:24.0,[12, 12],{foo=1984-07-12 
14:34:24.0},true"
     val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
 

Reply via email to