[ 
https://issues.apache.org/jira/browse/BEAM-6133?focusedWorklogId=170868&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170868
 ]

ASF GitHub Bot logged work on BEAM-6133:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Nov/18 23:28
            Start Date: 29/Nov/18 23:28
    Worklog Time Spent: 10m 
      Work Description: apilloud closed pull request #7141: [BEAM-6133] [SQL] 
Add support for TableMacro UDF
URL: https://github.com/apache/beam/pull/7141
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index fd2eddf171df..da7def010c8b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -40,17 +41,21 @@
 import org.apache.calcite.schema.TranslatableTable;
 
 /** Adapter from {@link BeamSqlTable} to a calcite Table. */
-class BeamCalciteTable extends AbstractQueryableTable
+public class BeamCalciteTable extends AbstractQueryableTable
     implements ModifiableTable, TranslatableTable {
   private final BeamSqlTable beamTable;
   private final Map<String, String> pipelineOptions;
 
-  public BeamCalciteTable(BeamSqlTable beamTable, Map<String, String> 
pipelineOptions) {
+  BeamCalciteTable(BeamSqlTable beamTable, Map<String, String> 
pipelineOptions) {
     super(Object[].class);
     this.beamTable = beamTable;
     this.pipelineOptions = pipelineOptions;
   }
 
+  public static BeamCalciteTable of(BeamSqlTable table) {
+    return new BeamCalciteTable(table, ImmutableMap.of());
+  }
+
   @Override
   public RelDataType getRowType(RelDataTypeFactory typeFactory) {
     return CalciteUtils.toCalciteRowType(this.beamTable.getSchema(), 
typeFactory);
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java
new file mode 100644
index 000000000000..ceed8ebd8da2
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.apache.calcite.util.Static.RESOURCE;
+
+import com.google.common.collect.ImmutableMultimap;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.calcite.adapter.enumerable.CallImplementor;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.enumerable.ReflectiveCallNotNullImplementor;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.linq4j.function.SemiStrict;
+import org.apache.calcite.linq4j.function.Strict;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.ImplementableFunction;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.sql.SqlOperatorBinding;
+
+/**
+ * Beam-customized version from {@link 
org.apache.calcite.schema.impl.ScalarFunctionImpl}, to
+ * address BEAM-5921.
+ */
+public class ScalarFunctionImpl extends UdfImplReflectiveFunctionBase
+    implements ScalarFunction, ImplementableFunction {
+
+  private final CallImplementor implementor;
+
+  /** Private constructor. */
+  private ScalarFunctionImpl(Method method, CallImplementor implementor) {
+    super(method);
+    this.implementor = implementor;
+  }
+
+  /** Creates {@link org.apache.calcite.schema.Function} for each method in a 
given class. */
+  public static ImmutableMultimap<String, Function> createAll(Class<?> clazz) {
+    final ImmutableMultimap.Builder<String, Function> builder = 
ImmutableMultimap.builder();
+    for (Method method : clazz.getMethods()) {
+      if (method.getDeclaringClass() == Object.class) {
+        continue;
+      }
+      if (!Modifier.isStatic(method.getModifiers()) && 
!classHasPublicZeroArgsConstructor(clazz)) {
+        continue;
+      }
+      final Function function = create(method);
+      builder.put(method.getName(), function);
+    }
+    return builder.build();
+  }
+
+  /**
+   * Creates {@link org.apache.calcite.schema.Function} from given class.
+   *
+   * <p>If a method of the given name is not found or it does not suit, 
returns {@code null}.
+   *
+   * @param clazz class that is used to implement the function
+   * @param methodName Method name (typically "eval")
+   * @return created {@link ScalarFunction} or null
+   */
+  public static Function create(Class<?> clazz, String methodName) {
+    final Method method = findMethod(clazz, methodName);
+    if (method == null) {
+      return null;
+    }
+    return create(method);
+  }
+
+  /**
+   * Creates {@link org.apache.calcite.schema.Function} from given method. 
When {@code eval} method
+   * does not suit, {@code null} is returned.
+   *
+   * @param method method that is used to implement the function
+   * @return created {@link Function} or null
+   */
+  public static Function create(Method method) {
+    if (!Modifier.isStatic(method.getModifiers())) {
+      Class clazz = method.getDeclaringClass();
+      if (!classHasPublicZeroArgsConstructor(clazz)) {
+        throw RESOURCE.requireDefaultConstructor(clazz.getName()).ex();
+      }
+    }
+    if (method.getExceptionTypes().length != 0) {
+      throw new RuntimeException(method.getName() + " must not throw checked 
exception");
+    }
+
+    CallImplementor implementor = createImplementor(method);
+    return new ScalarFunctionImpl(method, implementor);
+  }
+
+  @Override
+  public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
+    return CalciteUtils.sqlTypeWithAutoCast(typeFactory, 
method.getReturnType());
+  }
+
+  @Override
+  public CallImplementor getImplementor() {
+    return implementor;
+  }
+
+  private static CallImplementor createImplementor(final Method method) {
+    final NullPolicy nullPolicy = getNullPolicy(method);
+    return RexImpTable.createImplementor(
+        new ReflectiveCallNotNullImplementor(method), nullPolicy, false);
+  }
+
+  private static NullPolicy getNullPolicy(Method m) {
+    if (m.getAnnotation(Strict.class) != null) {
+      return NullPolicy.STRICT;
+    } else if (m.getAnnotation(SemiStrict.class) != null) {
+      return NullPolicy.SEMI_STRICT;
+    } else if (m.getDeclaringClass().getAnnotation(Strict.class) != null) {
+      return NullPolicy.STRICT;
+    } else if (m.getDeclaringClass().getAnnotation(SemiStrict.class) != null) {
+      return NullPolicy.SEMI_STRICT;
+    } else {
+      return NullPolicy.NONE;
+    }
+  }
+
+  public RelDataType getReturnType(RelDataTypeFactory typeFactory, 
SqlOperatorBinding opBinding) {
+    // Strict and semi-strict functions can return null even if their Java
+    // functions return a primitive type. Because when one of their arguments
+    // is null, they won't even be called.
+    final RelDataType returnType = getReturnType(typeFactory);
+    switch (getNullPolicy(method)) {
+      case STRICT:
+        for (RelDataType type : opBinding.collectOperandTypes()) {
+          if (type.isNullable()) {
+            return typeFactory.createTypeWithNullability(returnType, true);
+          }
+        }
+        break;
+      case SEMI_STRICT:
+        return typeFactory.createTypeWithNullability(returnType, true);
+      default:
+        break;
+    }
+    return returnType;
+  }
+
+  /**
+   * Verifies if given class has public constructor with zero arguments.
+   *
+   * @param clazz class to verify
+   * @return true if given class has public constructor with zero arguments
+   */
+  static boolean classHasPublicZeroArgsConstructor(Class<?> clazz) {
+    for (Constructor<?> constructor : clazz.getConstructors()) {
+      if (constructor.getParameterTypes().length == 0
+          && Modifier.isPublic(constructor.getModifiers())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /*
+   * Finds a method in a given class by name.
+   * @param clazz class to search method in
+   * @param name name of the method to find
+   * @return the first method with matching name or null when no method found
+   */
+  static Method findMethod(Class<?> clazz, String name) {
+    for (Method method : clazz.getMethods()) {
+      if (method.getName().equals(name) && !method.isBridge()) {
+        return method;
+      }
+    }
+    return null;
+  }
+}
+
+// End ScalarFunctionImpl.java
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java
index 015349521c9e..ba5848e54e5d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java
@@ -17,64 +17,26 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
-import static org.apache.calcite.util.Static.RESOURCE;
-
-import com.google.common.collect.ImmutableMultimap;
-import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.calcite.adapter.enumerable.CallImplementor;
-import org.apache.calcite.adapter.enumerable.NullPolicy;
-import org.apache.calcite.adapter.enumerable.ReflectiveCallNotNullImplementor;
-import org.apache.calcite.adapter.enumerable.RexImpTable;
-import org.apache.calcite.linq4j.function.SemiStrict;
-import org.apache.calcite.linq4j.function.Strict;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.ImplementableFunction;
-import org.apache.calcite.schema.ScalarFunction;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlOperatorBinding;
-
-/** Beam-customized version from {@link ScalarFunctionImpl}, to address 
BEAM-5921. */
-public class UdfImpl extends UdfImplReflectiveFunctionBase
-    implements ScalarFunction, ImplementableFunction {
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.TableMacroImpl;
 
-  private final CallImplementor implementor;
-
-  /** Private constructor. */
-  private UdfImpl(Method method, CallImplementor implementor) {
-    super(method);
-    this.implementor = implementor;
-  }
+/** Beam-customized facade behind {@link Function} to address BEAM-5921. */
+class UdfImpl {
 
-  /** Creates {@link org.apache.calcite.schema.ScalarFunction} for each method 
in a given class. */
-  public static ImmutableMultimap<String, ScalarFunction> createAll(Class<?> 
clazz) {
-    final ImmutableMultimap.Builder<String, ScalarFunction> builder = 
ImmutableMultimap.builder();
-    for (Method method : clazz.getMethods()) {
-      if (method.getDeclaringClass() == Object.class) {
-        continue;
-      }
-      if (!Modifier.isStatic(method.getModifiers()) && 
!classHasPublicZeroArgsConstructor(clazz)) {
-        continue;
-      }
-      final ScalarFunction function = create(method);
-      builder.put(method.getName(), function);
-    }
-    return builder.build();
-  }
+  private UdfImpl() {}
 
   /**
-   * Creates {@link org.apache.calcite.schema.ScalarFunction} from given class.
+   * Creates {@link org.apache.calcite.schema.Function} from given class.
    *
    * <p>If a method of the given name is not found or it does not suit, 
returns {@code null}.
    *
    * @param clazz class that is used to implement the function
    * @param methodName Method name (typically "eval")
-   * @return created {@link ScalarFunction} or null
+   * @return created {@link Function} or null
    */
-  public static ScalarFunction create(Class<?> clazz, String methodName) {
+  public static Function create(Class<?> clazz, String methodName) {
     final Method method = findMethod(clazz, methodName);
     if (method == null) {
       return null;
@@ -83,93 +45,19 @@ public static ScalarFunction create(Class<?> clazz, String 
methodName) {
   }
 
   /**
-   * Creates {@link org.apache.calcite.schema.ScalarFunction} from given 
method. When {@code eval}
-   * method does not suit, {@code null} is returned.
+   * Creates {@link org.apache.calcite.schema.Function} from given method.
    *
    * @param method method that is used to implement the function
-   * @return created {@link ScalarFunction} or null
+   * @return created {@link Function} or null
    */
-  public static ScalarFunction create(Method method) {
-    if (!Modifier.isStatic(method.getModifiers())) {
-      Class clazz = method.getDeclaringClass();
-      if (!classHasPublicZeroArgsConstructor(clazz)) {
-        throw RESOURCE.requireDefaultConstructor(clazz.getName()).ex();
-      }
-    }
-    if (method.getExceptionTypes().length != 0) {
-      throw new RuntimeException(method.getName() + " must not throw checked 
exception");
-    }
-    CallImplementor implementor = createImplementor(method);
-    return new UdfImpl(method, implementor);
-  }
-
-  @Override
-  public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
-    return CalciteUtils.sqlTypeWithAutoCast(typeFactory, 
method.getReturnType());
-  }
-
-  @Override
-  public CallImplementor getImplementor() {
-    return implementor;
-  }
-
-  private static CallImplementor createImplementor(final Method method) {
-    final NullPolicy nullPolicy = getNullPolicy(method);
-    return RexImpTable.createImplementor(
-        new ReflectiveCallNotNullImplementor(method), nullPolicy, false);
-  }
-
-  private static NullPolicy getNullPolicy(Method m) {
-    if (m.getAnnotation(Strict.class) != null) {
-      return NullPolicy.STRICT;
-    } else if (m.getAnnotation(SemiStrict.class) != null) {
-      return NullPolicy.SEMI_STRICT;
-    } else if (m.getDeclaringClass().getAnnotation(Strict.class) != null) {
-      return NullPolicy.STRICT;
-    } else if (m.getDeclaringClass().getAnnotation(SemiStrict.class) != null) {
-      return NullPolicy.SEMI_STRICT;
+  public static Function create(Method method) {
+    if (TranslatableTable.class.isAssignableFrom(method.getReturnType())) {
+      return TableMacroImpl.create(method);
     } else {
-      return NullPolicy.NONE;
+      return ScalarFunctionImpl.create(method);
     }
   }
 
-  public RelDataType getReturnType(RelDataTypeFactory typeFactory, 
SqlOperatorBinding opBinding) {
-    // Strict and semi-strict functions can return null even if their Java
-    // functions return a primitive type. Because when one of their arguments
-    // is null, they won't even be called.
-    final RelDataType returnType = getReturnType(typeFactory);
-    switch (getNullPolicy(method)) {
-      case STRICT:
-        for (RelDataType type : opBinding.collectOperandTypes()) {
-          if (type.isNullable()) {
-            return typeFactory.createTypeWithNullability(returnType, true);
-          }
-        }
-        break;
-      case SEMI_STRICT:
-        return typeFactory.createTypeWithNullability(returnType, true);
-      default:
-        break;
-    }
-    return returnType;
-  }
-
-  /**
-   * Verifies if given class has public constructor with zero arguments.
-   *
-   * @param clazz class to verify
-   * @return true if given class has public constructor with zero arguments
-   */
-  static boolean classHasPublicZeroArgsConstructor(Class<?> clazz) {
-    for (Constructor<?> constructor : clazz.getConstructors()) {
-      if (constructor.getParameterTypes().length == 0
-          && Modifier.isPublic(constructor.getModifiers())) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   /*
    * Finds a method in a given class by name.
    * @param clazz class to search method in
@@ -185,5 +73,3 @@ static Method findMethod(Class<?> clazz, String name) {
     return null;
   }
 }
-
-// End ScalarFunctionImpl.java
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index ebff981711c3..87f706bedc1a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -22,7 +22,7 @@
 import java.util.List;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.extensions.sql.impl.UdfImpl;
+import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCorrelVariableExpression;
@@ -503,7 +503,7 @@ private static BeamSqlExpression 
getBeamSqlExpression(RexNode rexNode) {
           // handle UDF
           if (((RexCall) rexNode).getOperator() instanceof 
SqlUserDefinedFunction) {
             SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) 
rexNode).getOperator();
-            UdfImpl fn = (UdfImpl) udf.getFunction();
+            ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
             ret =
                 new BeamSqlUdfExpression(
                     fn.method, subExps, ((RexCall) 
rexNode).type.getSqlTypeName());
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index e78c1136a31e..4eea5539115d 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -24,8 +24,11 @@
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
 import org.apache.beam.sdk.extensions.sql.impl.ParseException;
 import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -35,6 +38,7 @@
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.calcite.linq4j.function.Parameter;
+import org.apache.calcite.schema.TranslatableTable;
 import org.joda.time.Instant;
 import org.junit.Test;
 
@@ -169,6 +173,25 @@ public void testUdf() throws Exception {
     pipeline.run().waitUntilFinish();
   }
 
+  /** test {@link org.apache.calcite.schema.TableMacro} UDF. */
+  @Test
+  public void testTableMacroUdf() throws Exception {
+    String sql1 = "SELECT * FROM table(range_udf(0, 3))";
+
+    Schema schema = Schema.of(Schema.Field.of("f0", Schema.FieldType.INT32));
+
+    PCollection<Row> rows =
+        pipeline.apply(SqlTransform.query(sql1).registerUdf("range_udf", 
RangeUdf.class));
+
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            Row.withSchema(schema).addValue(0).build(),
+            Row.withSchema(schema).addValue(1).build(),
+            Row.withSchema(schema).addValue(2).build());
+
+    pipeline.run();
+  }
+
   /** test auto-provider UDF/UDAF. */
   @Test
   public void testAutoUdfUdaf() throws Exception {
@@ -320,4 +343,13 @@ public static Instant eval(Instant time) {
       return new Instant(time.getMillis() - 24 * 3600 * 1000L);
     }
   }
+
+  /** UDF to test support for {@link org.apache.calcite.schema.TableMacro}. */
+  public static final class RangeUdf implements BeamSqlUdf {
+    public static TranslatableTable eval(int startInclusive, int endExclusive) 
{
+      Schema schema = Schema.of(Schema.Field.of("f0", Schema.FieldType.INT32));
+      Object[] values = IntStream.range(startInclusive, 
endExclusive).boxed().toArray();
+      return BeamCalciteTable.of(new TestBoundedTable(schema).addRows(values));
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 170868)
    Time Spent: 2h 20m  (was: 2h 10m)

> [SQL] Add support for TableMacro UDF
> ------------------------------------
>
>                 Key: BEAM-6133
>                 URL: https://issues.apache.org/jira/browse/BEAM-6133
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Gleb Kanterov
>            Assignee: Gleb Kanterov
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Now we support only ScalarFunction UDFs. In Calcite, there are other kinds of 
> UDFs. With TableMacro UDFs users can connect external data sources in a 
> similar way as in TableProvider, but without specifying a schema, or 
> enumerating a list of existing tables in advance. 
> An example use case is connecting external metadata service and querying 
> range of partitions.
> {code}
> SELECT COUNT(*) FROM table(my_udf('dataset', start = '2017-01-01', end = 
> '2018-01-01'))
> {code}
> Where the implementation of `my_udf` will connect to this service, get file 
> locations for a range of partitions, and translate to PTransform reading it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to