[ 
https://issues.apache.org/jira/browse/BEAM-5921?focusedWorklogId=161750&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-161750
 ]

ASF GitHub Bot logged work on BEAM-5921:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Nov/18 21:23
            Start Date: 01/Nov/18 21:23
    Worklog Time Spent: 10m 
      Work Description: XuMingmin closed pull request #6913: [BEAM-5921] [SQL] 
Support Joda types for UDF arguments
URL: https://github.com/apache/beam/pull/6913
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index a850f8aadb1..1760cf78984 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -38,7 +38,6 @@
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.sql.SqlExecutableStatement;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.tools.RelConversionException;
@@ -81,14 +80,14 @@ public static BeamSqlEnv inMemory(TableProvider... 
tableProviders) {
   private void registerBuiltinUdf(Map<String, List<Method>> methods) {
     for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
       for (Method method : entry.getValue()) {
-        defaultSchema.add(entry.getKey(), ScalarFunctionImpl.create(method));
+        defaultSchema.add(entry.getKey(), UdfImpl.create(method));
       }
     }
   }
 
   /** Register a UDF function which can be used in SQL expression. */
   public void registerUdf(String functionName, Class<?> clazz, String method) {
-    defaultSchema.add(functionName, ScalarFunctionImpl.create(clazz, method));
+    defaultSchema.add(functionName, UdfImpl.create(clazz, method));
   }
 
   /** Register a UDF function which can be used in SQL expression. */
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
index fd3f472efb0..cbdbbb6aa9c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.calcite.adapter.enumerable.AggImplementor;
 import org.apache.calcite.rel.type.RelDataType;
@@ -65,8 +66,8 @@ public String getName() {
           @Override
           public RelDataType getType(RelDataTypeFactory typeFactory) {
             ParameterizedType parameterizedType = findCombineFnSuperClass();
-            return typeFactory.createJavaType(
-                (Class) parameterizedType.getActualTypeArguments()[0]);
+            return CalciteUtils.sqlTypeWithAutoCast(
+                typeFactory, parameterizedType.getActualTypeArguments()[0]);
           }
 
           private ParameterizedType findCombineFnSuperClass() {
@@ -100,6 +101,6 @@ public AggImplementor getImplementor(boolean windowContext) 
{
 
   @Override
   public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
-    return typeFactory.createJavaType((Class) 
combineFn.getOutputType().getType());
+    return CalciteUtils.sqlTypeWithAutoCast(typeFactory, 
combineFn.getOutputType().getType());
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java
new file mode 100644
index 00000000000..10375c444cf
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.apache.calcite.util.Static.RESOURCE;
+
+import com.google.common.collect.ImmutableMultimap;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.calcite.adapter.enumerable.CallImplementor;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.enumerable.ReflectiveCallNotNullImplementor;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.linq4j.function.SemiStrict;
+import org.apache.calcite.linq4j.function.Strict;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ImplementableFunction;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlOperatorBinding;
+
+/** Beam-customized version from {@link ScalarFunctionImpl}, to address 
BEAM-5921. */
+public class UdfImpl extends UdfImplReflectiveFunctionBase
+    implements ScalarFunction, ImplementableFunction {
+
+  private final CallImplementor implementor;
+
+  /** Private constructor. */
+  private UdfImpl(Method method, CallImplementor implementor) {
+    super(method);
+    this.implementor = implementor;
+  }
+
+  /** Creates {@link org.apache.calcite.schema.ScalarFunction} for each method 
in a given class. */
+  public static ImmutableMultimap<String, ScalarFunction> createAll(Class<?> 
clazz) {
+    final ImmutableMultimap.Builder<String, ScalarFunction> builder = 
ImmutableMultimap.builder();
+    for (Method method : clazz.getMethods()) {
+      if (method.getDeclaringClass() == Object.class) {
+        continue;
+      }
+      if (!Modifier.isStatic(method.getModifiers()) && 
!classHasPublicZeroArgsConstructor(clazz)) {
+        continue;
+      }
+      final ScalarFunction function = create(method);
+      builder.put(method.getName(), function);
+    }
+    return builder.build();
+  }
+
+  /**
+   * Creates {@link org.apache.calcite.schema.ScalarFunction} from given class.
+   *
+   * <p>If a method of the given name is not found or it does not suit, 
returns {@code null}.
+   *
+   * @param clazz class that is used to implement the function
+   * @param methodName Method name (typically "eval")
+   * @return created {@link ScalarFunction} or null
+   */
+  public static ScalarFunction create(Class<?> clazz, String methodName) {
+    final Method method = findMethod(clazz, methodName);
+    if (method == null) {
+      return null;
+    }
+    return create(method);
+  }
+
+  /**
+   * Creates {@link org.apache.calcite.schema.ScalarFunction} from given 
method. When {@code eval}
+   * method does not suit, {@code null} is returned.
+   *
+   * @param method method that is used to implement the function
+   * @return created {@link ScalarFunction} or null
+   */
+  public static ScalarFunction create(Method method) {
+    if (!Modifier.isStatic(method.getModifiers())) {
+      Class clazz = method.getDeclaringClass();
+      if (!classHasPublicZeroArgsConstructor(clazz)) {
+        throw RESOURCE.requireDefaultConstructor(clazz.getName()).ex();
+      }
+    }
+    CallImplementor implementor = createImplementor(method);
+    return new UdfImpl(method, implementor);
+  }
+
+  @Override
+  public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
+    return CalciteUtils.sqlTypeWithAutoCast(typeFactory, 
method.getReturnType());
+  }
+
+  @Override
+  public CallImplementor getImplementor() {
+    return implementor;
+  }
+
+  private static CallImplementor createImplementor(final Method method) {
+    final NullPolicy nullPolicy = getNullPolicy(method);
+    return RexImpTable.createImplementor(
+        new ReflectiveCallNotNullImplementor(method), nullPolicy, false);
+  }
+
+  private static NullPolicy getNullPolicy(Method m) {
+    if (m.getAnnotation(Strict.class) != null) {
+      return NullPolicy.STRICT;
+    } else if (m.getAnnotation(SemiStrict.class) != null) {
+      return NullPolicy.SEMI_STRICT;
+    } else if (m.getDeclaringClass().getAnnotation(Strict.class) != null) {
+      return NullPolicy.STRICT;
+    } else if (m.getDeclaringClass().getAnnotation(SemiStrict.class) != null) {
+      return NullPolicy.SEMI_STRICT;
+    } else {
+      return NullPolicy.NONE;
+    }
+  }
+
+  public RelDataType getReturnType(RelDataTypeFactory typeFactory, 
SqlOperatorBinding opBinding) {
+    // Strict and semi-strict functions can return null even if their Java
+    // functions return a primitive type. Because when one of their arguments
+    // is null, they won't even be called.
+    final RelDataType returnType = getReturnType(typeFactory);
+    switch (getNullPolicy(method)) {
+      case STRICT:
+        for (RelDataType type : opBinding.collectOperandTypes()) {
+          if (type.isNullable()) {
+            return typeFactory.createTypeWithNullability(returnType, true);
+          }
+        }
+        break;
+      case SEMI_STRICT:
+        return typeFactory.createTypeWithNullability(returnType, true);
+      default:
+        break;
+    }
+    return returnType;
+  }
+
+  /**
+   * Verifies if given class has public constructor with zero arguments.
+   *
+   * @param clazz class to verify
+   * @return true if given class has public constructor with zero arguments
+   */
+  static boolean classHasPublicZeroArgsConstructor(Class<?> clazz) {
+    for (Constructor<?> constructor : clazz.getConstructors()) {
+      if (constructor.getParameterTypes().length == 0
+          && Modifier.isPublic(constructor.getModifiers())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /*
+   * Finds a method in a given class by name.
+   * @param clazz class to search method in
+   * @param name name of the method to find
+   * @return the first method with matching name or null when no method found
+   */
+  static Method findMethod(Class<?> clazz, String name) {
+    for (Method method : clazz.getMethods()) {
+      if (method.getName().equals(name) && !method.isBridge()) {
+        return method;
+      }
+    }
+    return null;
+  }
+}
+
+// End ScalarFunctionImpl.java
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
new file mode 100644
index 00000000000..13b0a9813ae
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import com.google.common.collect.ImmutableList;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.impl.ReflectiveFunctionBase;
+import org.apache.calcite.util.ReflectUtil;
+
+/** Beam-customized version from {@link ReflectiveFunctionBase}, to address 
BEAM-5921. */
+public abstract class UdfImplReflectiveFunctionBase implements Function {
+  /** Method that implements the function. */
+  public final Method method;
+  /** Types of parameter for the function call. */
+  public final List<FunctionParameter> parameters;
+
+  /**
+   * {@code UdfImplReflectiveFunctionBase} constructor.
+   *
+   * @param method method that is used to get type information from
+   */
+  public UdfImplReflectiveFunctionBase(Method method) {
+    this.method = method;
+    this.parameters = builder().addMethodParameters(method).build();
+  }
+
+  /**
+   * Returns the parameters of this function.
+   *
+   * @return Parameters; never null
+   */
+  @Override
+  public List<FunctionParameter> getParameters() {
+    return parameters;
+  }
+
+  /**
+   * Verifies if given class has public constructor with zero arguments.
+   *
+   * @param clazz class to verify
+   * @return true if given class has public constructor with zero arguments
+   */
+  static boolean classHasPublicZeroArgsConstructor(Class<?> clazz) {
+    for (Constructor<?> constructor : clazz.getConstructors()) {
+      if (constructor.getParameterTypes().length == 0
+          && Modifier.isPublic(constructor.getModifiers())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Finds a method in a given class by name.
+   *
+   * @param clazz class to search method in
+   * @param name name of the method to find
+   * @return the first method with matching name or null when no method found
+   */
+  static Method findMethod(Class<?> clazz, String name) {
+    for (Method method : clazz.getMethods()) {
+      if (method.getName().equals(name) && !method.isBridge()) {
+        return method;
+      }
+    }
+    return null;
+  }
+
+  /** Creates a ParameterListBuilder. */
+  public static ParameterListBuilder builder() {
+    return new ParameterListBuilder();
+  }
+
+  /** Helps build lists of {@link 
org.apache.calcite.schema.FunctionParameter}. */
+  public static class ParameterListBuilder {
+    final List<FunctionParameter> builder = new ArrayList<>();
+
+    public ImmutableList<FunctionParameter> build() {
+      return ImmutableList.copyOf(builder);
+    }
+
+    public ParameterListBuilder add(final Class<?> type, final String name) {
+      return add(type, name, false);
+    }
+
+    public ParameterListBuilder add(
+        final Class<?> type, final String name, final boolean optional) {
+      final int ordinal = builder.size();
+      builder.add(
+          new FunctionParameter() {
+            @Override
+            public int getOrdinal() {
+              return ordinal;
+            }
+
+            @Override
+            public String getName() {
+              return name;
+            }
+
+            @Override
+            public RelDataType getType(RelDataTypeFactory typeFactory) {
+              return CalciteUtils.sqlTypeWithAutoCast(typeFactory, type);
+            }
+
+            @Override
+            public boolean isOptional() {
+              return optional;
+            }
+          });
+      return this;
+    }
+
+    public ParameterListBuilder addMethodParameters(Method method) {
+      final Class<?>[] types = method.getParameterTypes();
+      for (int i = 0; i < types.length; i++) {
+        add(
+            types[i],
+            ReflectUtil.getParameterName(method, i),
+            ReflectUtil.isParameterOptional(method, i));
+      }
+      return this;
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index dec5217fa37..ccda06e0433 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -22,6 +22,7 @@
 import java.util.List;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.impl.UdfImpl;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCorrelVariableExpression;
@@ -102,7 +103,6 @@
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
@@ -505,7 +505,7 @@ private static BeamSqlExpression 
getBeamSqlExpression(RexNode rexNode) {
           // handle UDF
           if (((RexCall) rexNode).getOperator() instanceof 
SqlUserDefinedFunction) {
             SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) 
rexNode).getOperator();
-            ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+            UdfImpl fn = (UdfImpl) udf.getFunction();
             ret =
                 new BeamSqlUdfExpression(
                     fn.method, subExps, ((RexCall) 
rexNode).type.getSqlTypeName());
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index b6dc88ad397..96ad251b36b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -20,6 +20,8 @@
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableMap;
+import java.lang.reflect.Type;
+import java.util.Date;
 import java.util.Map;
 import java.util.stream.IntStream;
 import org.apache.beam.sdk.schemas.Schema;
@@ -28,6 +30,7 @@
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.ReadableInstant;
 
 /** Utility methods for Calcite related operations. */
 public class CalciteUtils {
@@ -189,4 +192,19 @@ private static RelDataType toRelDataType(
 
     return dataTypeFactory.createTypeWithNullability(type, 
field.getNullable());
   }
+
+  /**
+   * SQL-Java type mapping, with specified Beam rules: <br>
+   * 1. redirect {@link ReadableInstant} to {@link Date} so Calcite can 
recognize it.
+   *
+   * @param rawType
+   * @return
+   */
+  public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory 
typeFactory, Type rawType) {
+    //For Joda time types, return SQL type for java.util.Date.
+    if (rawType instanceof Class && 
ReadableInstant.class.isAssignableFrom((Class<?>) rawType)) {
+      return typeFactory.createJavaType(Date.class);
+    }
+    return typeFactory.createJavaType((Class) rawType);
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 5b23151e795..a4c84ca8dcd 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -34,6 +34,7 @@
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.calcite.linq4j.function.Parameter;
+import org.joda.time.Instant;
 import org.junit.Test;
 
 /** Tests for UDF/UDAF. */
@@ -64,6 +65,32 @@ public void testUdaf() throws Exception {
     pipeline.run().waitUntilFinish();
   }
 
+  /** Test Joda time UDF/UDAF. */
+  @Test
+  public void testJodaTimeUdfUdaf() throws Exception {
+    Schema resultType = Schema.builder().addDateTimeField("jodatime").build();
+
+    Row row1 =
+        Row.withSchema(resultType).addValues(FORMAT.parseDateTime("2017-01-01 
02:04:03")).build();
+
+    String sql1 = "SELECT MAX_JODA(f_timestamp) as jodatime FROM PCOLLECTION";
+    PCollection<Row> result1 =
+        boundedInput1.apply(
+            "testJodaUdaf", SqlTransform.query(sql1).registerUdaf("MAX_JODA", 
new JodaMax()));
+    PAssert.that(result1).containsInAnyOrder(row1);
+
+    Row row2 =
+        Row.withSchema(resultType).addValues(FORMAT.parseDateTime("2016-12-31 
01:01:03")).build();
+
+    String sql2 = "SELECT PRE_DAY(f_timestamp) as jodatime FROM PCOLLECTION 
WHERE f_int=1";
+    PCollection<Row> result2 =
+        boundedInput1.apply(
+            "testJodaUdf", SqlTransform.query(sql2).registerUdf("PRE_DAY", 
JodaPreviousDay.class));
+    PAssert.that(result2).containsInAnyOrder(row2);
+
+    pipeline.run().waitUntilFinish();
+  }
+
   /** Test that an indirect subclass of a {@link CombineFn} works as a UDAF. 
BEAM-3777 */
   @Test
   public void testUdafMultiLevelDescendent() {
@@ -197,6 +224,33 @@ public Integer extractOutput(Integer accumulator) {
     }
   }
 
+  /** UDAF(CombineFn) to test support of Joda time. */
+  public static class JodaMax extends CombineFn<Instant, Instant, Instant> {
+    @Override
+    public Instant createAccumulator() {
+      return new Instant(0L);
+    }
+
+    @Override
+    public Instant addInput(Instant accumulator, Instant input) {
+      return accumulator.isBefore(input) ? input : accumulator;
+    }
+
+    @Override
+    public Instant mergeAccumulators(Iterable<Instant> accumulators) {
+      Instant v = new Instant(0L);
+      for (Instant accumulator : accumulators) {
+        v = accumulator.isBefore(v) ? v : accumulator;
+      }
+      return v;
+    }
+
+    @Override
+    public Instant extractOutput(Instant accumulator) {
+      return accumulator;
+    }
+  }
+
   /**
    * Non-parameterized CombineFn. Intended to test that non-parameterized 
CombineFns are correctly
    * rejected. The methods just return null, as they should never be called.
@@ -254,4 +308,11 @@ public static String eval(
       return s.substring(0, n == null ? 1 : n);
     }
   }
+
+  /** A UDF to test support of Joda time. */
+  public static final class JodaPreviousDay implements BeamSqlUdf {
+    public static Instant eval(Instant time) {
+      return new Instant(time.getMillis() - 24 * 3600 * 1000L);
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 161750)
    Time Spent: 3h  (was: 2h 50m)

> [SQL] Support Joda types for UDF arguments
> ------------------------------------------
>
>                 Key: BEAM-5921
>                 URL: https://issues.apache.org/jira/browse/BEAM-5921
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Xu Mingmin
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> We call ScalarFunctionImpl.create() to register a UDF with Calcite schema in 
> BeamSqlEnv. Internally it uses Calcite's internal mapping 
> (JavaToSqlTypeConversionRules) to map Java types to SQL types to create a 
> function signature that gets registered in the schema. Problem is that this 
> logic is not extensible and doesn't include Joda types support (maybe others 
> as well).
> We can work around this by constructing our own subclass of Function that 
> gets registered in the schema instead of calling ScalarFunctionImpl.create(). 
> This logic can use our own custom mapping (or fall back to Calcite 
> implementation if needed).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to