This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 295e42f [FLINK-9688] [table] Add ATAN2 SQL function support
295e42f is described below
commit 295e42f3eeea4558fd46cc0f0890126dbb5a50c5
Author: snuyanzin <[email protected]>
AuthorDate: Thu Jun 28 10:05:33 2018 +0300
[FLINK-9688] [table] Add ATAN2 SQL function support
This closes #6223.
---
docs/dev/table/sql.md | 11 ++++++
docs/dev/table/tableApi.md | 22 +++++++++++
.../flink/table/api/scala/expressionDsl.scala | 18 +++++++++
.../flink/table/codegen/calls/BuiltInMethods.scala | 19 +++++++++
.../table/codegen/calls/FunctionGenerator.scala | 12 ++++++
.../flink/table/expressions/InputTypeSpec.scala | 8 +++-
.../flink/table/expressions/mathExpressions.scala | 20 ++++++++++
.../table/runtime/functions/ScalarFunctions.scala | 6 +++
.../flink/table/validate/FunctionCatalog.scala | 2 +
.../table/expressions/ScalarFunctionsTest.scala | 45 ++++++++++++++++++++++
.../table/expressions/SqlExpressionTest.scala | 1 +
11 files changed, 162 insertions(+), 2 deletions(-)
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 366e3fd..94fd59b 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1514,6 +1514,17 @@ COT(numeric)
<tr>
<td>
{% highlight text %}
+ATAN2(numeric, numeric)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Calculates the arc tangent of a given coordinate.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight text %}
ASIN(numeric)
{% endhighlight %}
</td>
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 6e202f1..9abe17f 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2182,6 +2182,17 @@ NUMERIC.asin()
<tr>
<td>
{% highlight java %}
+atan2(NUMERIC, NUMERIC)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Calculates the arc tangent of a given coordinate.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
NUMERIC.acos()
{% endhighlight %}
</td>
@@ -3746,6 +3757,17 @@ NUMERIC.cot()
<tr>
<td>
{% highlight scala %}
+atan2(NUMERIC, NUMERIC)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Calculates the arc tangent of a given coordinate.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
NUMERIC.asin()
{% endhighlight %}
</td>
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 35d2167..8aa5f8a 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -1185,12 +1185,30 @@ object randInteger {
* Returns NULL if any argument is NULL.
*/
object concat {
+
+ /**
+ * Returns the string that results from concatenating the arguments.
+ * Returns NULL if any argument is NULL.
+ */
def apply(string: Expression, strings: Expression*): Expression = {
Concat(Seq(string) ++ strings)
}
}
/**
+ * Calculates the arc tangent of a given coordinate.
+ */
+object atan2 {
+
+ /**
+ * Calculates the arc tangent of a given coordinate.
+ */
+ def apply(y: Expression, x: Expression): Expression = {
+ Atan2(y, x)
+ }
+}
+
+/**
* Returns the string that results from concatenating the arguments and
separator.
* Returns NULL If the separator is NULL.
*
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 0e0f709..d08334f 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -24,6 +24,14 @@ import org.apache.calcite.linq4j.tree.Types
import org.apache.calcite.runtime.SqlFunctions
import org.apache.flink.table.runtime.functions.ScalarFunctions
+/**
+ * Contains references to built-in functions.
+ *
+ * NOTE: When adding functions here. Check if Calcite provides it in
+ * [[org.apache.calcite.util.BuiltInMethod]]. The function generator supports
Java's auto casting
+ * so we don't need the full matrix of data types for every function. Only
[[JBigDecimal]] needs
+ * special handling.
+ */
object BuiltInMethods {
val LOG = Types.lookupMethod(classOf[ScalarFunctions], "log",
classOf[Double])
@@ -75,6 +83,17 @@ object BuiltInMethods {
val ATAN = Types.lookupMethod(classOf[Math], "atan", classOf[Double])
val ATAN_DEC = Types.lookupMethod(classOf[SqlFunctions], "atan",
classOf[JBigDecimal])
+ val ATAN2_DOUBLE_DOUBLE = Types.lookupMethod(
+ classOf[Math],
+ "atan2",
+ classOf[Double],
+ classOf[Double])
+ val ATAN2_DEC_DEC = Types.lookupMethod(
+ classOf[SqlFunctions],
+ "atan2",
+ classOf[JBigDecimal],
+ classOf[JBigDecimal])
+
val DEGREES = Types.lookupMethod(classOf[Math], "toDegrees", classOf[Double])
val DEGREES_DEC = Types.lookupMethod(classOf[SqlFunctions], "degrees",
classOf[JBigDecimal])
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index a5c275a..230e3cf 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -313,6 +313,18 @@ object FunctionGenerator {
BuiltInMethods.ATAN_DEC)
addSqlFunctionMethod(
+ ATAN2,
+ Seq(DOUBLE_TYPE_INFO, DOUBLE_TYPE_INFO),
+ DOUBLE_TYPE_INFO,
+ BuiltInMethods.ATAN2_DOUBLE_DOUBLE)
+
+ addSqlFunctionMethod(
+ ATAN2,
+ Seq(BIG_DEC_TYPE_INFO, BIG_DEC_TYPE_INFO),
+ DOUBLE_TYPE_INFO,
+ BuiltInMethods.ATAN2_DEC_DEC)
+
+ addSqlFunctionMethod(
DEGREES,
Seq(DOUBLE_TYPE_INFO),
DOUBLE_TYPE_INFO,
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
index 39eca4e..9b2d20f 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
@@ -24,17 +24,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.validate._
/**
- * Expressions that have specification on its inputs.
+ * Expressions that have strict data type specification on its inputs.
*/
trait InputTypeSpec extends Expression {
/**
* Input type specification for each child.
*
- * For example, [[Power]] expecting both of the children be of Double Type
should use:
+ * For example, [[Power]] expecting both of the children be of double type
should use:
* {{{
* def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO ::
DOUBLE_TYPE_INFO :: Nil
* }}}
+ *
+ * Inputs that don't match the expected type will be safely casted to a
higher type. Therefore,
+ * use the decimal type with caution as all numeric types would be casted
to a very
+ * inefficient type.
*/
private[flink] def expectedTypes: Seq[TypeInformation[_]]
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
index 10ba007..cf3efa9 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -255,6 +255,26 @@ case class Atan(child: Expression) extends UnaryExpression
{
}
}
+case class Atan2(y: Expression, x: Expression) extends BinaryExpression {
+
+ override private[flink] def left = y
+
+ override private[flink] def right = x
+
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def validateInput() = {
+ TypeCheckUtils.assertNumericExpr(y.resultType, "atan2")
+ TypeCheckUtils.assertNumericExpr(x.resultType, "atan2")
+ }
+
+ override def toString: String = s"atan2($left, $right)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder):
RexNode = {
+ relBuilder.call(SqlStdOperatorTable.ATAN2, left.toRexNode, right.toRexNode)
+ }
+}
+
case class Degrees(child: Expression) extends UnaryExpression {
override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 50e8f9c..15f6096 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -25,6 +25,12 @@ import org.apache.commons.codec.binary.Base64
/**
* Built-in scalar runtime functions.
+ *
+ * NOTE: Before you add functions here, check if Calcite provides it in
+ * [[org.apache.calcite.runtime.SqlFunctions]]. Furthermore, make sure to
implement the function
+ * efficiently. Sometimes it makes sense to create a
+ * [[org.apache.flink.table.codegen.calls.CallGenerator]] instead to avoid
massive object
+ * creation and reuse instances.
*/
class ScalarFunctions {}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 8a91340..1e554ca 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -228,6 +228,7 @@ object FunctionCatalog {
"asin" -> classOf[Asin],
"acos" -> classOf[Acos],
"atan" -> classOf[Atan],
+ "atan2" -> classOf[Atan2],
"degrees" -> classOf[Degrees],
"radians" -> classOf[Radians],
"sign" -> classOf[Sign],
@@ -424,6 +425,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable
{
SqlStdOperatorTable.ASIN,
SqlStdOperatorTable.ACOS,
SqlStdOperatorTable.ATAN,
+ SqlStdOperatorTable.ATAN2,
SqlStdOperatorTable.DEGREES,
SqlStdOperatorTable.RADIANS,
SqlStdOperatorTable.SIGN,
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 4eb2e33..44cc182 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1161,6 +1161,51 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
}
@Test
+ def testAtan2(): Unit = {
+ testAllApis(
+ atan2('f25, 'f26),
+ "atan2(f25, f26)",
+ "ATAN2(f25, f26)",
+ math.atan2(0.42.toByte, 0.toByte).toString)
+
+ testAllApis(
+ Atan2('f26, 'f25),
+ "atan2(f26, f25)",
+ "ATAN2(f26, f25)",
+ math.atan2(0.toShort, 0.toShort).toString)
+
+ testAllApis(
+ Atan2('f27, 'f27),
+ "atan2(f27, f27)",
+ "ATAN2(f27, f27)",
+ math.atan2(0.toLong, 0.toLong).toString)
+
+ testAllApis(
+ Atan2('f28, 'f28),
+ "atan2(f28, f28)",
+ "ATAN2(f28, f28)",
+ math.atan2(0.45.toFloat, 0.45.toFloat).toString)
+
+ testAllApis(
+ Atan2('f29, 'f29),
+ "atan2(f29, f29)",
+ "ATAN2(f29, f29)",
+ math.atan2(0.46, 0.46).toString)
+
+ testAllApis(
+ Atan2('f30, 'f30),
+ "atan2(f30, f30)",
+ "ATAN2(f30, f30)",
+ math.atan2(1, 1).toString)
+
+ testAllApis(
+ Atan2('f31, 'f31),
+ "atan2(f31, f31)",
+ "ATAN2(f31, f31)",
+ math.atan2(-0.1231231321321321111, -0.1231231321321321111).toString)
+ }
+
+ @Test
def testDegrees(): Unit = {
testAllApis(
'f2.degrees(),
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 3d4dab3..aaf7d3b 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -111,6 +111,7 @@ class SqlExpressionTest extends ExpressionTestBase {
testSqlApi("ASIN(0.5)", "0.5235987755982989")
testSqlApi("ACOS(0.5)", "1.0471975511965979")
testSqlApi("ATAN(0.5)", "0.4636476090008061")
+ testSqlApi("ATAN2(0.5, 0.5)", "0.7853981633974483")
testSqlApi("DEGREES(0.5)", "28.64788975654116")
testSqlApi("RADIANS(0.5)", "0.008726646259971648")
testSqlApi("SIGN(-1.1)", "-1")