Repository: beam Updated Branches: refs/heads/DSL_SQL a452b8020 -> bed209e41
proposal for new UDF Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a49e4783 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a49e4783 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a49e4783 Branch: refs/heads/DSL_SQL Commit: a49e47830e5689b9b392d23813626b1cf9636ca6 Parents: 5fea746 Author: mingmxu <[email protected]> Authored: Thu Jul 13 23:22:14 2017 -0700 Committer: mingmxu <[email protected]> Committed: Thu Jul 13 23:22:14 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/dsls/sql/BeamSql.java | 9 +++-- .../org/apache/beam/dsls/sql/BeamSqlEnv.java | 5 ++- .../apache/beam/dsls/sql/schema/BeamSqlUdf.java | 41 ++++++++++++++++++++ .../beam/dsls/sql/BeamSqlDslUdfUdafTest.java | 9 +++-- 4 files changed, 54 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java index ec3799c..d902f42 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -23,6 +23,7 @@ import org.apache.beam.dsls.sql.schema.BeamPCollectionTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; +import org.apache.beam.dsls.sql.schema.BeamSqlUdf; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; @@ -128,8 +129,8 @@ public class BeamSql { /** * register a UDF function used in this query. */ - public QueryTransform withUdf(String functionName, Class<?> clazz, String methodName){ - getSqlEnv().registerUdf(functionName, clazz, methodName); + public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){ + getSqlEnv().registerUdf(functionName, clazz); return this; } @@ -196,8 +197,8 @@ public class BeamSql { /** * register a UDF function used in this query. */ - public SimpleQueryTransform withUdf(String functionName, Class<?> clazz, String methodName){ - getSqlEnv().registerUdf(functionName, clazz, methodName); + public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){ + getSqlEnv().registerUdf(functionName, clazz); return this; } http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java index 61f0355..e8c8c97 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java @@ -23,6 +23,7 @@ import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; +import org.apache.beam.dsls.sql.schema.BeamSqlUdf; import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.Enumerable; @@ -55,8 +56,8 @@ public class BeamSqlEnv implements Serializable{ /** * Register a UDF function which can be used in SQL expression. */ - public void registerUdf(String functionName, Class<?> clazz, String methodName) { - schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName)); + public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) { + schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java new file mode 100644 index 0000000..2066353 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.schema; + +import java.io.Serializable; + +/** + * Interface to create a UDF in Beam SQL. + * + * <p>A static method {@code eval} is required. Here is an example: + * + * <blockquote><pre> + * public static class MyLeftFunction { + * public String eval( + * @Parameter(name = "s") String s, + * @Parameter(name = "n", optional = true) Integer n) { + * return s.substring(0, n == null ? 1 : n); + * } + * }</pre></blockquote> + * + * <p>The first parameter is named "s" and is mandatory, + * and the second parameter is named "n" and is optional. + */ +public interface BeamSqlUdf extends Serializable { + String UDF_METHOD = "eval"; +} http://git-wip-us.apache.org/repos/asf/beam/blob/a49e4783/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java index ba3e87e..332a273 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java @@ -23,6 +23,7 @@ import java.util.Iterator; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; +import org.apache.beam.dsls.sql.schema.BeamSqlUdf; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -78,14 +79,14 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; PCollection<BeamSqlRow> result1 = boundedInput1.apply("testUdf1", - BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class, "cubic")); + BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class)); PAssert.that(result1).containsInAnyOrder(record); String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; PCollection<BeamSqlRow> result2 = PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1) .apply("testUdf2", - BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class, "cubic")); + BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class)); PAssert.that(result2).containsInAnyOrder(record); pipeline.run().waitUntilFinish(); @@ -129,8 +130,8 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { /** * A example UDF for test. */ - public static class CubicInteger{ - public static Integer cubic(Integer input){ + public static class CubicInteger implements BeamSqlUdf{ + public static Integer eval(Integer input){ return input * input * input; } }
