[ 
https://issues.apache.org/jira/browse/BEAM-5852?focusedWorklogId=161620&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-161620
 ]

ASF GitHub Bot logged work on BEAM-5852:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Nov/18 16:14
            Start Date: 01/Nov/18 16:14
    Worklog Time Spent: 10m 
      Work Description: akedin closed pull request #6898: [BEAM-5852] 
[BEAM-5892] BeamSQL functions
URL: https://github.com/apache/beam/pull/6898
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index 4822da33a3f..2bbc35a99d2 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -90,6 +90,8 @@
   public PCollection<Row> expand(PInput input) {
     BeamSqlEnv sqlEnv = BeamSqlEnv.readOnly(PCOLLECTION_NAME, 
toTableMap(input));
 
+    // TODO: validate duplicate functions.
+    sqlEnv.loadBeamBuiltinFunctions();
     registerFunctions(sqlEnv);
     if (autoUdfUdafLoad()) {
       sqlEnv.loadUdfUdafFromProvider();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 093961c59d2..a850f8aadb1 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
+import java.lang.reflect.Method;
+import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -24,6 +26,7 @@
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
@@ -75,6 +78,14 @@ public static BeamSqlEnv inMemory(TableProvider... 
tableProviders) {
     return withTableProvider(inMemoryMetaStore);
   }
 
+  private void registerBuiltinUdf(Map<String, List<Method>> methods) {
+    for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
+      for (Method method : entry.getValue()) {
+        defaultSchema.add(entry.getKey(), ScalarFunctionImpl.create(method));
+      }
+    }
+  }
+
   /** Register a UDF function which can be used in SQL expression. */
   public void registerUdf(String functionName, Class<?> clazz, String method) {
     defaultSchema.add(functionName, ScalarFunctionImpl.create(clazz, method));
@@ -113,6 +124,13 @@ public void loadUdfUdafFromProvider() {
             });
   }
 
+  public void loadBeamBuiltinFunctions() {
+    for (BeamBuiltinFunctionProvider provider :
+        ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
+      registerBuiltinUdf(provider.getBuiltinMethods());
+    }
+  }
+
   public BeamRelNode parseQuery(String query) throws ParseException {
     try {
       return planner.convertToBeamRel(query);
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java
new file mode 100644
index 00000000000..e338ea9cd9d
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.udf;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** BeamBuiltinFunctionClass interface. */
+public abstract class BeamBuiltinFunctionProvider {
+  public Map<String, List<Method>> getBuiltinMethods() {
+    List<Method> methods = Arrays.asList(getClass().getMethods());
+    return methods
+        .stream()
+        .filter(BeamBuiltinFunctionProvider::isUDF)
+        .collect(
+            Collectors.groupingBy(method -> 
method.getDeclaredAnnotation(UDF.class).funcName()));
+  }
+
+  private static boolean isUDF(Method m) {
+    return m.getDeclaredAnnotation(UDF.class) != null;
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
new file mode 100644
index 00000000000..83ee0574ef5
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.udf;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.schemas.Schema;
+
+/** TrigonometricFunctions. */
+@AutoService(BeamBuiltinFunctionProvider.class)
+public class BuiltinTrigonometricFunctions extends BeamBuiltinFunctionProvider 
{
+
+  /**
+   * COSH(X)
+   *
+   * <p>Computes the hyperbolic cosine of X. Generates an error if an overflow 
occurs.
+   */
+  // TODO: handle overflow
+  @UDF(
+    funcName = "COSH",
+    parameterArray = {Schema.TypeName.DOUBLE},
+    returnType = Schema.TypeName.DOUBLE
+  )
+  public Double cosh(Double o) {
+    return Math.cosh(o);
+  }
+
+  /**
+   * SINH(X)
+   *
+   * <p>Computes the hyperbolic sine of X. Generates an error if an overflow 
occurs.
+   */
+  // TODO: handle overflow
+  @UDF(
+    funcName = "SINH",
+    parameterArray = {Schema.TypeName.DOUBLE},
+    returnType = Schema.TypeName.DOUBLE
+  )
+  public Double sinh(Double o) {
+    return Math.sinh(o);
+  }
+
+  /**
+   * TANH(X)
+   *
+   * <p>Computes hyperbolic tangent of X. Does not fail.
+   */
+  @UDF(
+    funcName = "TANH",
+    parameterArray = {Schema.TypeName.DOUBLE},
+    returnType = Schema.TypeName.DOUBLE
+  )
+  public Double tanh(Double o) {
+    return Math.tanh(o);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java
new file mode 100644
index 00000000000..9e13cc85630
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.udf;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * IS_INF(X)
+ *
+ * <p>Returns TRUE if the value is positive or negative infinity. Returns NULL 
for NULL inputs.
+ * input: Float, Double
+ *
+ * <p>Output: Boolean
+ */
+@AutoService(BeamBuiltinFunctionProvider.class)
+public class IsInf extends BeamBuiltinFunctionProvider {
+  private static final String SQL_FUNCTION_NAME = "IS_INF";
+
+  @UDF(
+    funcName = SQL_FUNCTION_NAME,
+    parameterArray = {Schema.TypeName.DOUBLE},
+    returnType = Schema.TypeName.BOOLEAN
+  )
+  public Boolean isInf(Double value) {
+    return Double.isInfinite(value);
+  }
+
+  @UDF(
+    funcName = SQL_FUNCTION_NAME,
+    parameterArray = {Schema.TypeName.FLOAT},
+    returnType = Schema.TypeName.BOOLEAN
+  )
+  public Boolean isInf(Float value) {
+    return Float.isInfinite(value);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java
new file mode 100644
index 00000000000..6bc1d31a2a6
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.udf;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * IS_NAN(X)
+ *
+ * <p>Returns TRUE if the value is a NaN value. Returns NULL for NULL inputs. 
input: Float, Double
+ *
+ * <p>Output: Boolean
+ */
+@AutoService(BeamBuiltinFunctionProvider.class)
+public class IsNan extends BeamBuiltinFunctionProvider {
+  private static final String SQL_FUNCTION_NAME = "IS_NAN";
+
+  @UDF(
+    funcName = SQL_FUNCTION_NAME,
+    parameterArray = {Schema.TypeName.FLOAT},
+    returnType = Schema.TypeName.BOOLEAN
+  )
+  public Boolean isNan(Float value) {
+    return Float.isNaN(value);
+  }
+
+  @UDF(
+    funcName = SQL_FUNCTION_NAME,
+    parameterArray = {Schema.TypeName.DOUBLE},
+    returnType = Schema.TypeName.BOOLEAN
+  )
+  public Boolean isNan(Double value) {
+    return Double.isNaN(value);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/UDF.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/UDF.java
new file mode 100644
index 00000000000..feaf0572237
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/UDF.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.udf;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.apache.beam.sdk.schemas.Schema;
+
+/** Make UserDefinedFunction annotation as package private. */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD})
+@interface UDF {
+  String funcName();
+
+  Schema.TypeName[] parameterArray() default {};
+
+  Schema.TypeName returnType();
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/package-info.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/package-info.java
new file mode 100644
index 00000000000..d9ca03f274a
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** UDF classes. */
+package org.apache.beam.sdk.extensions.sql.impl.udf;
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index 9deb0353d60..97fce9247b1 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -40,7 +40,7 @@
 import org.junit.rules.ExpectedException;
 
 /**
- * prepare input records to test {@link BeamSql}.
+ * prepare input records to test.
  *
  * <p>Note that, any change in these records would impact tests in this 
package.
  */
@@ -51,15 +51,18 @@
   @Rule public ExpectedException exceptions = ExpectedException.none();
 
   static Schema schemaInTableA;
+  static Schema schemaFloatDouble;
   static List<Row> rowsInTableA;
+  static List<Row> rowsOfFloatDouble;
 
   //bounded PCollections
-  PCollection<Row> boundedInput1;
-  PCollection<Row> boundedInput2;
+  protected PCollection<Row> boundedInput1;
+  protected PCollection<Row> boundedInput2;
+  protected PCollection<Row> boundedInputFloatDouble;
 
   //unbounded PCollections
-  PCollection<Row> unboundedInput1;
-  PCollection<Row> unboundedInput2;
+  protected PCollection<Row> unboundedInput1;
+  protected PCollection<Row> unboundedInput2;
 
   @BeforeClass
   public static void prepareClass() throws ParseException {
@@ -124,6 +127,27 @@ public static void prepareClass() throws ParseException {
                 0,
                 new BigDecimal(4))
             .getRows();
+
+    schemaFloatDouble =
+        Schema.builder()
+            .addFloatField("f_float_1")
+            .addDoubleField("f_double_1")
+            .addFloatField("f_float_2")
+            .addDoubleField("f_double_2")
+            .addFloatField("f_float_3")
+            .addDoubleField("f_double_3")
+            .build();
+
+    rowsOfFloatDouble =
+        TestUtils.RowsBuilder.of(schemaFloatDouble)
+            .addRows(
+                Float.POSITIVE_INFINITY,
+                Double.POSITIVE_INFINITY,
+                Float.NEGATIVE_INFINITY,
+                Double.NEGATIVE_INFINITY,
+                Float.NaN,
+                Double.NaN)
+            .getRows();
   }
 
   @Before
@@ -146,6 +170,15 @@ public void preparePCollections() {
                     SerializableFunctions.identity(),
                     SerializableFunctions.identity()));
 
+    boundedInputFloatDouble =
+        pipeline.apply(
+            "boundedInputFloatDouble",
+            Create.of(rowsOfFloatDouble)
+                .withSchema(
+                    schemaFloatDouble,
+                    SerializableFunctions.identity(),
+                    SerializableFunctions.identity()));
+
     unboundedInput1 = prepareUnboundedPCollection1();
     unboundedInput2 = prepareUnboundedPCollection2();
   }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfImplTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfImplTest.java
new file mode 100644
index 00000000000..753cac8ae7b
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfImplTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.udf;
+
+import org.apache.beam.sdk.extensions.sql.BeamSqlDslBase;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for UDFs. */
+@RunWith(JUnit4.class)
+public class BeamSqlUdfImplTest extends BeamSqlDslBase {
+
+  @Rule public ExpectedException expectedEx = ExpectedException.none();
+
+  @Test
+  public void testIsInf() throws Exception {
+    Schema resultType =
+        Schema.builder()
+            .addBooleanField("field_1")
+            .addBooleanField("field_2")
+            .addBooleanField("field_3")
+            .addBooleanField("field_4")
+            .build();
+    Row resultRow = Row.withSchema(resultType).addValues(true, true, true, 
true).build();
+
+    String sql =
+        "SELECT IS_INF(f_float_1), IS_INF(f_double_1), IS_INF(f_float_2), 
IS_INF(f_double_2) FROM PCOLLECTION";
+    PCollection<Row> result = boundedInputFloatDouble.apply("testUdf", 
SqlTransform.query(sql));
+    PAssert.that(result).containsInAnyOrder(resultRow);
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testIsNan() throws Exception {
+    Schema resultType =
+        Schema.builder()
+            .addBooleanField("field_1")
+            .addBooleanField("field_2")
+            .addBooleanField("field_3")
+            .addBooleanField("field_4")
+            .build();
+    Row resultRow = Row.withSchema(resultType).addValues(false, false, true, 
true).build();
+
+    String sql =
+        "SELECT IS_NAN(f_float_2), IS_NAN(f_double_2), IS_NAN(f_float_3), 
IS_NAN(f_double_3) FROM PCOLLECTION";
+    PCollection<Row> result = boundedInputFloatDouble.apply("testUdf", 
SqlTransform.query(sql));
+    PAssert.that(result).containsInAnyOrder(resultRow);
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testCOSH() throws Exception {
+    Schema resultType = Schema.builder().addNullableField("field", 
Schema.FieldType.DOUBLE).build();
+
+    Row resultRow1 = 
Row.withSchema(resultType).addValues(Math.cosh(1.0)).build();
+    String sql1 = "SELECT COSH(CAST(1.0 as DOUBLE))";
+    PCollection<Row> result1 = boundedInput1.apply("testUdf1", 
SqlTransform.query(sql1));
+    PAssert.that(result1).containsInAnyOrder(resultRow1);
+
+    Row resultRow2 = 
Row.withSchema(resultType).addValues(Math.cosh(710.0)).build();
+    String sql2 = "SELECT COSH(CAST(710.0 as DOUBLE))";
+    PCollection<Row> result2 = boundedInput1.apply("testUdf2", 
SqlTransform.query(sql2));
+    PAssert.that(result2).containsInAnyOrder(resultRow2);
+
+    Row resultRow3 = 
Row.withSchema(resultType).addValues(Math.cosh(-1.0)).build();
+    String sql3 = "SELECT COSH(CAST(-1.0 as DOUBLE))";
+    PCollection<Row> result3 = boundedInput1.apply("testUdf3", 
SqlTransform.query(sql3));
+    PAssert.that(result3).containsInAnyOrder(resultRow3);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testSINH() throws Exception {
+    Schema resultType = Schema.builder().addNullableField("field", 
Schema.FieldType.DOUBLE).build();
+
+    Row resultRow1 = 
Row.withSchema(resultType).addValues(Math.sinh(1.0)).build();
+    String sql1 = "SELECT SINH(CAST(1.0 as DOUBLE))";
+    PCollection<Row> result1 = boundedInput1.apply("testUdf1", 
SqlTransform.query(sql1));
+    PAssert.that(result1).containsInAnyOrder(resultRow1);
+
+    Row resultRow2 = 
Row.withSchema(resultType).addValues(Math.sinh(710.0)).build();
+    String sql2 = "SELECT SINH(CAST(710.0 as DOUBLE))";
+    PCollection<Row> result2 = boundedInput1.apply("testUdf2", 
SqlTransform.query(sql2));
+    PAssert.that(result2).containsInAnyOrder(resultRow2);
+
+    Row resultRow3 = 
Row.withSchema(resultType).addValues(Math.sinh(-1.0)).build();
+    String sql3 = "SELECT SINH(CAST(-1.0 as DOUBLE))";
+    PCollection<Row> result3 = boundedInput1.apply("testUdf3", 
SqlTransform.query(sql3));
+    PAssert.that(result3).containsInAnyOrder(resultRow3);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testTANH() throws Exception {
+    Schema resultType = Schema.builder().addNullableField("field", 
Schema.FieldType.DOUBLE).build();
+
+    Row resultRow1 = 
Row.withSchema(resultType).addValues(Math.tanh(1.0)).build();
+    String sql1 = "SELECT TANH(CAST(1.0 as DOUBLE))";
+    PCollection<Row> result1 = boundedInput1.apply("testUdf1", 
SqlTransform.query(sql1));
+    PAssert.that(result1).containsInAnyOrder(resultRow1);
+
+    Row resultRow2 = 
Row.withSchema(resultType).addValues(Math.tanh(0.0)).build();
+    String sql2 = "SELECT TANH(CAST(0.0 as DOUBLE))";
+    PCollection<Row> result2 = boundedInput1.apply("testUdf2", 
SqlTransform.query(sql2));
+    PAssert.that(result2).containsInAnyOrder(resultRow2);
+
+    Row resultRow3 = 
Row.withSchema(resultType).addValues(Math.tanh(-1.0)).build();
+    String sql3 = "SELECT TANH(CAST(-1.0 as DOUBLE))";
+    PCollection<Row> result3 = boundedInput1.apply("testUdf3", 
SqlTransform.query(sql3));
+    PAssert.that(result3).containsInAnyOrder(resultRow3);
+
+    pipeline.run().waitUntilFinish();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 161620)
    Time Spent: 4h  (was: 3h 50m)

> Function extension in BeamSQL
> -----------------------------
>
>                 Key: BEAM-5852
>                 URL: https://issues.apache.org/jira/browse/BEAM-5852
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> We could add more functions to BeamSQL (as UDFs) to provide rich 
> functionalities than standard/Calcite functions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to