This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 295e42f  [FLINK-9688] [table] Add ATAN2 SQL function support
295e42f is described below

commit 295e42f3eeea4558fd46cc0f0890126dbb5a50c5
Author: snuyanzin <[email protected]>
AuthorDate: Thu Jun 28 10:05:33 2018 +0300

    [FLINK-9688] [table] Add ATAN2 SQL function support
    
    This closes #6223.
---
 docs/dev/table/sql.md                              | 11 ++++++
 docs/dev/table/tableApi.md                         | 22 +++++++++++
 .../flink/table/api/scala/expressionDsl.scala      | 18 +++++++++
 .../flink/table/codegen/calls/BuiltInMethods.scala | 19 +++++++++
 .../table/codegen/calls/FunctionGenerator.scala    | 12 ++++++
 .../flink/table/expressions/InputTypeSpec.scala    |  8 +++-
 .../flink/table/expressions/mathExpressions.scala  | 20 ++++++++++
 .../table/runtime/functions/ScalarFunctions.scala  |  6 +++
 .../flink/table/validate/FunctionCatalog.scala     |  2 +
 .../table/expressions/ScalarFunctionsTest.scala    | 45 ++++++++++++++++++++++
 .../table/expressions/SqlExpressionTest.scala      |  1 +
 11 files changed, 162 insertions(+), 2 deletions(-)

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 366e3fd..94fd59b 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1514,6 +1514,17 @@ COT(numeric)
     <tr>
       <td>
         {% highlight text %}
+ATAN2(numeric, numeric)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Calculates the arc tangent of a given coordinate.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
 ASIN(numeric)
 {% endhighlight %}
       </td>
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 6e202f1..9abe17f 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2182,6 +2182,17 @@ NUMERIC.asin()
     <tr>
       <td>
         {% highlight java %}
+atan2(NUMERIC, NUMERIC)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Calculates the arc tangent of a given coordinate.</p>
+      </td>
+    </tr>
+    
+    <tr>
+      <td>
+        {% highlight java %}
 NUMERIC.acos()
 {% endhighlight %}
       </td>
@@ -3746,6 +3757,17 @@ NUMERIC.cot()
     <tr>
       <td>
         {% highlight scala %}
+atan2(NUMERIC, NUMERIC)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Calculates the arc tangent of a given coordinate.</p>
+      </td>
+    </tr>
+    
+    <tr>
+      <td>
+        {% highlight scala %}
 NUMERIC.asin()
 {% endhighlight %}
       </td>
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 35d2167..8aa5f8a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -1185,12 +1185,30 @@ object randInteger {
   * Returns NULL if any argument is NULL.
   */
 object concat {
+
+  /**
+    * Returns the string that results from concatenating the arguments.
+    * Returns NULL if any argument is NULL.
+    */
   def apply(string: Expression, strings: Expression*): Expression = {
     Concat(Seq(string) ++ strings)
   }
 }
 
 /**
+  * Calculates the arc tangent of a given coordinate.
+  */
+object atan2 {
+
+  /**
+    * Calculates the arc tangent of a given coordinate.
+    */
+  def apply(y: Expression, x: Expression): Expression = {
+    Atan2(y, x)
+  }
+}
+
+/**
   * Returns the string that results from concatenating the arguments and 
separator.
   * Returns NULL If the separator is NULL.
   *
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 0e0f709..d08334f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -24,6 +24,14 @@ import org.apache.calcite.linq4j.tree.Types
 import org.apache.calcite.runtime.SqlFunctions
 import org.apache.flink.table.runtime.functions.ScalarFunctions
 
+/**
+  * Contains references to built-in functions.
+  *
+  * NOTE: When adding functions here. Check if Calcite provides it in
+  * [[org.apache.calcite.util.BuiltInMethod]]. The function generator supports 
Java's auto casting
+  * so we don't need the full matrix of data types for every function. Only 
[[JBigDecimal]] needs
+  * special handling.
+  */
 object BuiltInMethods {
 
   val LOG = Types.lookupMethod(classOf[ScalarFunctions], "log", 
classOf[Double])
@@ -75,6 +83,17 @@ object BuiltInMethods {
   val ATAN = Types.lookupMethod(classOf[Math], "atan", classOf[Double])
   val ATAN_DEC = Types.lookupMethod(classOf[SqlFunctions], "atan", 
classOf[JBigDecimal])
 
+  val ATAN2_DOUBLE_DOUBLE = Types.lookupMethod(
+    classOf[Math],
+    "atan2",
+    classOf[Double],
+    classOf[Double])
+  val ATAN2_DEC_DEC = Types.lookupMethod(
+    classOf[SqlFunctions],
+    "atan2",
+    classOf[JBigDecimal],
+    classOf[JBigDecimal])
+
   val DEGREES = Types.lookupMethod(classOf[Math], "toDegrees", classOf[Double])
   val DEGREES_DEC = Types.lookupMethod(classOf[SqlFunctions], "degrees", 
classOf[JBigDecimal])
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index a5c275a..230e3cf 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -313,6 +313,18 @@ object FunctionGenerator {
     BuiltInMethods.ATAN_DEC)
 
   addSqlFunctionMethod(
+    ATAN2,
+    Seq(DOUBLE_TYPE_INFO, DOUBLE_TYPE_INFO),
+    DOUBLE_TYPE_INFO,
+    BuiltInMethods.ATAN2_DOUBLE_DOUBLE)
+
+  addSqlFunctionMethod(
+    ATAN2,
+    Seq(BIG_DEC_TYPE_INFO, BIG_DEC_TYPE_INFO),
+    DOUBLE_TYPE_INFO,
+    BuiltInMethods.ATAN2_DEC_DEC)
+
+  addSqlFunctionMethod(
     DEGREES,
     Seq(DOUBLE_TYPE_INFO),
     DOUBLE_TYPE_INFO,
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
index 39eca4e..9b2d20f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
@@ -24,17 +24,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.validate._
 
 /**
-  * Expressions that have specification on its inputs.
+  * Expressions that have strict data type specification on its inputs.
   */
 trait InputTypeSpec extends Expression {
 
   /**
     * Input type specification for each child.
     *
-    * For example, [[Power]] expecting both of the children be of Double Type 
should use:
+    * For example, [[Power]] expecting both of the children be of double type 
should use:
     * {{{
     *   def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: 
DOUBLE_TYPE_INFO :: Nil
     * }}}
+    *
+    * Inputs that don't match the expected type will be safely casted to a 
higher type. Therefore,
+    * use the decimal type with caution as all numeric types would be casted 
to a very
+    * inefficient type.
     */
   private[flink] def expectedTypes: Seq[TypeInformation[_]]
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
index 10ba007..cf3efa9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -255,6 +255,26 @@ case class Atan(child: Expression) extends UnaryExpression 
{
   }
 }
 
+case class Atan2(y: Expression, x: Expression) extends BinaryExpression {
+
+  override private[flink] def left = y
+
+  override private[flink] def right = x
+
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def validateInput() = {
+    TypeCheckUtils.assertNumericExpr(y.resultType, "atan2")
+    TypeCheckUtils.assertNumericExpr(x.resultType, "atan2")
+  }
+
+  override def toString: String = s"atan2($left, $right)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+    relBuilder.call(SqlStdOperatorTable.ATAN2, left.toRexNode, right.toRexNode)
+  }
+}
+
 case class Degrees(child: Expression) extends UnaryExpression {
   override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 50e8f9c..15f6096 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -25,6 +25,12 @@ import org.apache.commons.codec.binary.Base64
 
 /**
   * Built-in scalar runtime functions.
+  *
+  * NOTE: Before you add functions here, check if Calcite provides it in
+  * [[org.apache.calcite.runtime.SqlFunctions]]. Furthermore, make sure to 
implement the function
+  * efficiently. Sometimes it makes sense to create a
+  * [[org.apache.flink.table.codegen.calls.CallGenerator]] instead to avoid 
massive object
+  * creation and reuse instances.
   */
 class ScalarFunctions {}
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 8a91340..1e554ca 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -228,6 +228,7 @@ object FunctionCatalog {
     "asin" -> classOf[Asin],
     "acos" -> classOf[Acos],
     "atan" -> classOf[Atan],
+    "atan2" -> classOf[Atan2],
     "degrees" -> classOf[Degrees],
     "radians" -> classOf[Radians],
     "sign" -> classOf[Sign],
@@ -424,6 +425,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable 
{
     SqlStdOperatorTable.ASIN,
     SqlStdOperatorTable.ACOS,
     SqlStdOperatorTable.ATAN,
+    SqlStdOperatorTable.ATAN2,
     SqlStdOperatorTable.DEGREES,
     SqlStdOperatorTable.RADIANS,
     SqlStdOperatorTable.SIGN,
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 4eb2e33..44cc182 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1161,6 +1161,51 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
+  def testAtan2(): Unit = {
+    testAllApis(
+      atan2('f25, 'f26),
+      "atan2(f25, f26)",
+      "ATAN2(f25, f26)",
+      math.atan2(0.42.toByte, 0.toByte).toString)
+
+    testAllApis(
+      Atan2('f26, 'f25),
+      "atan2(f26, f25)",
+      "ATAN2(f26, f25)",
+      math.atan2(0.toShort, 0.toShort).toString)
+
+    testAllApis(
+      Atan2('f27, 'f27),
+      "atan2(f27, f27)",
+      "ATAN2(f27, f27)",
+      math.atan2(0.toLong, 0.toLong).toString)
+
+    testAllApis(
+      Atan2('f28, 'f28),
+      "atan2(f28, f28)",
+      "ATAN2(f28, f28)",
+      math.atan2(0.45.toFloat, 0.45.toFloat).toString)
+
+    testAllApis(
+      Atan2('f29, 'f29),
+      "atan2(f29, f29)",
+      "ATAN2(f29, f29)",
+      math.atan2(0.46, 0.46).toString)
+
+    testAllApis(
+      Atan2('f30, 'f30),
+      "atan2(f30, f30)",
+      "ATAN2(f30, f30)",
+      math.atan2(1, 1).toString)
+
+    testAllApis(
+      Atan2('f31, 'f31),
+      "atan2(f31, f31)",
+      "ATAN2(f31, f31)",
+      math.atan2(-0.1231231321321321111, -0.1231231321321321111).toString)
+  }
+
+  @Test
   def testDegrees(): Unit = {
     testAllApis(
       'f2.degrees(),
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 3d4dab3..aaf7d3b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -111,6 +111,7 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("ASIN(0.5)", "0.5235987755982989")
     testSqlApi("ACOS(0.5)", "1.0471975511965979")
     testSqlApi("ATAN(0.5)", "0.4636476090008061")
+    testSqlApi("ATAN2(0.5, 0.5)", "0.7853981633974483")
     testSqlApi("DEGREES(0.5)", "28.64788975654116")
     testSqlApi("RADIANS(0.5)", "0.008726646259971648")
     testSqlApi("SIGN(-1.1)", "-1")

Reply via email to