This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5be9ab3c8a28b64d7039568adc43b0abb32c3f6e
Author: Jark Wu <[email protected]>
AuthorDate: Mon Jun 10 20:43:39 2019 +0800

    [FLINK-12708][table] Introduce LookupableTableSource and make blink planner 
support it
---
 .../flink/table/functions/AsyncTableFunction.java  |  62 +++---------
 .../flink/table/sources/LookupableTableSource.java |  12 ++-
 .../apache/flink/table/sources/LookupConfig.java   | 111 ---------------------
 .../table/codegen/LookupJoinCodeGenerator.scala    |   5 +-
 .../table/plan/nodes/common/CommonLookupJoin.scala |  39 +++-----
 .../plan/stream/sql/join/LookupJoinTest.scala      |  54 +++++-----
 .../utils/InMemoryLookupableTableSource.scala      |  37 ++-----
 .../apache/flink/table/api/TableConfigOptions.java |  14 +++
 .../join/lookup/DelegatingResultFuture.java        |  55 ++++++++++
 9 files changed, 151 insertions(+), 238 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java
similarity index 57%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java
index 0f609a5..ddb1e61 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.table.functions;
 
-import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.async.ResultFuture;
-import org.apache.flink.table.api.ValidationException;
+
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Base class for a user-defined asynchronously table function (UDTF). This is 
similar to
@@ -35,30 +34,26 @@ import org.apache.flink.table.api.ValidationException;
  * method. An evaluation method must be declared publicly, not static and 
named "eval".
  * Evaluation methods can also be overloaded by implementing multiple methods 
named "eval".
  *
- * <p>The first parameter of evaluation method must be {@link ResultFuture}, 
and the others are user
- * defined input parameters like the "eval" method of {@link TableFunction}.
+ * <p>The first parameter of evaluation method must be {@link 
CompletableFuture}, and the others are
+ * user defined input parameters like the "eval" method of {@link 
TableFunction}. The generic type of
+ * {@link CompletableFuture} must be {@link java.util.Collection} to collect 
multiple possible result
+ * values.
  *
  * <p>For each "eval", an async io operation can be triggered, and once it has 
been done,
- * the result can be collected by calling {@link ResultFuture#complete}. For 
each async
+ * the result can be collected by calling {@link CompletableFuture#complete}. 
For each async
  * operation, its context is stored in the operator immediately after invoking 
"eval",
  * avoiding blocking for each stream input as long as the internal buffer is 
not full.
  *
- * <p>{@link ResultFuture} can be passed into callbacks or futures to collect 
the result data.
+ * <p>{@link CompletableFuture} can be passed into callbacks or futures to 
collect the result data.
  * An error can also be propagate to the async IO operator by
- * {@link ResultFuture#completeExceptionally(Throwable)}.
+ * {@link CompletableFuture#completeExceptionally(Throwable)}.
  *
  * <p>User-defined functions must have a default constructor and must be 
instantiable during
  * runtime.
  *
  * <p>By default the result type of an evaluation method is determined by 
Flink's type extraction
- * facilities. This is sufficient for basic types or simple POJOs but might be 
wrong for more
- * complex, custom, or composite types. In these cases {@link TypeInformation} 
of the result type
- * can be manually defined by overriding {@link #getResultType}.
- *
- * <p>Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
- * If a user-defined table function should not introduce much overhead during 
runtime, it is
- * recommended to declare parameters and result types as primitive types 
instead of their boxed
- * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+ * facilities. Currently, only support {@link org.apache.flink.types.Row} and 
{@code BaseRow} as
+ * the result type. Will support more complex, custom types in the future.
  *
  * <p>Example:
  *
@@ -67,7 +62,7 @@ import org.apache.flink.table.api.ValidationException;
  *   public class HBaseAsyncTableFunction extends AsyncTableFunction<String> {
  *
  *     // implement an "eval" method with as many parameters as you want
- *     public void eval(ResultFuture<String> result, String rowkey) {
+ *     public void eval(CompletableFuture<Collection<String>> result, String 
rowkey) {
  *       Get get = new Get(Bytes.toBytes(rowkey));
  *       ListenableFuture<Result> future = hbase.asyncGet(get);
  *       Futures.addCallback(future, new FutureCallback<Result>() {
@@ -85,11 +80,12 @@ import org.apache.flink.table.api.ValidationException;
  *   }
  * }
  *
- * <p>NOTE: the {@link AsyncTableFunction} is can not used as UDTF currently. 
It only used in
- * temporal table join as a async lookup function
+ * <p>NOTE: the {@link AsyncTableFunction} can not be used as UDTF currently. 
It only used in
+ * temporal table join as an async lookup function.
  *
  * @param <T> The type of the output row
  */
+@Experimental
 public abstract class AsyncTableFunction<T> extends UserDefinedFunction {
 
        /**
@@ -105,30 +101,4 @@ public abstract class AsyncTableFunction<T> extends 
UserDefinedFunction {
        public TypeInformation<T> getResultType() {
                return null;
        }
-
-       /**
-        * Returns {@link TypeInformation} about the operands of the evaluation 
method with a given
-        * signature.
-        *
-        * <p>In order to perform operand type inference in SQL (especially 
when NULL is used) it might be
-        * necessary to determine the parameter {@link TypeInformation} of an 
evaluation method.
-        * By default Flink's type extraction facilities are used for this but 
might be wrong for
-        * more complex, custom, or composite types.
-        *
-        * @param signature signature of the method the operand types need to 
be determined
-        * @return {@link TypeInformation} of operand types
-        */
-       public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
-               final TypeInformation<?>[] types = new 
TypeInformation<?>[signature.length];
-               for (int i = 0; i < signature.length; i++) {
-                       try {
-                               types[i] = 
TypeExtractor.getForClass(signature[i]);
-                       } catch (InvalidTypesException e) {
-                               throw new ValidationException(
-                                       "Parameter types of table function " + 
this.getClass().getCanonicalName() +
-                                               " cannot be automatically 
determined. Please provide type information manually.");
-                       }
-               }
-               return types;
-       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java
similarity index 83%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java
index 2f38e63..45e19de 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.sources;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.table.functions.AsyncTableFunction;
 import org.apache.flink.table.functions.TableFunction;
 
@@ -29,7 +29,7 @@ import org.apache.flink.table.functions.TableFunction;
  *
  * @param <T> type of the result
  */
-@PublicEvolving
+@Experimental
 public interface LookupableTableSource<T> extends TableSource<T> {
 
        /**
@@ -45,7 +45,11 @@ public interface LookupableTableSource<T> extends 
TableSource<T> {
        AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);
 
        /**
-        * Defines the lookup behavior in the config. Such as whether to use 
async lookup.
+        * Returns true if async lookup is enabled.
+        *
+        * <p>The lookup function returned by {@link 
#getAsyncLookupFunction(String[])} will be
+        * used if returns true. Otherwise, the lookup function returned by
+        * {@link #getLookupFunction(String[])} will be used.
         */
-       LookupConfig getLookupConfig();
+       boolean isAsyncEnabled();
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupConfig.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupConfig.java
deleted file mode 100644
index adb84a3..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupConfig.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources;
-
-/**
- * The {@link LookupConfig} is used to configure some behavior when lookup a 
table.
- *
- * @see LookupableTableSource#getLookupConfig()
- */
-public class LookupConfig {
-
-       public static final LookupConfig DEFAULT = 
LookupConfig.builder().build();
-
-       private static final boolean DEFAULT_ASYNC_ENABLED = false;
-       private static final long DEFAULT_ASYNC_TIMEOUT_MS = 180_000;
-       private static final int DEFAULT_ASYNC_BUFFER_CAPACITY = 100;
-
-       private final boolean asyncEnabled;
-       private final long asyncTimeoutMs;
-       private final int asyncBufferCapacity;
-
-       private LookupConfig(boolean asyncEnabled, long asyncTimeoutMs, int 
asyncBufferCapacity) {
-               this.asyncEnabled = asyncEnabled;
-               this.asyncTimeoutMs = asyncTimeoutMs;
-               this.asyncBufferCapacity = asyncBufferCapacity;
-       }
-
-       /**
-        * Returns true if async lookup is enabled.
-        */
-       public boolean isAsyncEnabled() {
-               return asyncEnabled;
-       }
-
-       /**
-        * Returns async timeout millisecond for the asynchronous operation to 
complete.
-        */
-       public long getAsyncTimeoutMs() {
-               return asyncTimeoutMs;
-       }
-
-       /**
-        * Returns the max number of async i/o operation that can be triggered.
-        */
-       public int getAsyncBufferCapacity() {
-               return asyncBufferCapacity;
-       }
-
-       /**
-        * Returns a new builder that builds a {@link LookupConfig}.
-        *
-        * <p>For example:
-        *
-        * <pre>
-        *     LookupConfig.builder()
-        *       .setAsyncEnabled(true)
-        *       .setAsyncBufferCapacity(1000)
-        *       .setAsyncTimeoutMs(30000)
-        *       .build();
-        * </pre>
-        */
-       public static Builder builder() {
-               return new Builder();
-       }
-
-       /**
-        * A builder used to build a new {@link LookupConfig}.
-        */
-       public static class Builder {
-
-               private boolean asyncEnabled = DEFAULT_ASYNC_ENABLED;
-               private long asyncTimeoutMs = DEFAULT_ASYNC_TIMEOUT_MS;
-               private int asyncBufferCapacity = DEFAULT_ASYNC_BUFFER_CAPACITY;
-
-               public Builder setAsyncEnabled(boolean asyncEnabled) {
-                       this.asyncEnabled = asyncEnabled;
-                       return this;
-               }
-
-               public Builder setAsyncTimeoutMs(long timeoutMs) {
-                       this.asyncTimeoutMs = timeoutMs;
-                       return this;
-               }
-
-               public Builder setAsyncBufferCapacity(int bufferCapacity) {
-                       this.asyncBufferCapacity = bufferCapacity;
-                       return this;
-               }
-
-               public LookupConfig build() {
-                       return new LookupConfig(asyncEnabled, asyncTimeoutMs, 
asyncBufferCapacity);
-               }
-
-       }
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/LookupJoinCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/LookupJoinCodeGenerator.scala
index 7ffe5a8..eb9fff7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/LookupJoinCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/LookupJoinCodeGenerator.scala
@@ -33,6 +33,7 @@ import org.apache.flink.table.functions.{AsyncTableFunction, 
TableFunction}
 import org.apache.flink.table.generated.{GeneratedCollector, 
GeneratedFunction, GeneratedResultFuture}
 import org.apache.flink.table.plan.util.LookupJoinUtil.{ConstantLookupKey, 
FieldRefLookupKey, LookupKey}
 import org.apache.flink.table.runtime.collector.{TableFunctionCollector, 
TableFunctionResultFuture}
+import org.apache.flink.table.runtime.join.lookup.DelegatingResultFuture
 import 
org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
 import org.apache.flink.table.types.logical.{LogicalType, RowType}
 import org.apache.flink.table.types.utils.TypeConversions
@@ -127,11 +128,13 @@ object LookupJoinCodeGenerator {
       fieldCopy = true) // always copy input field because of async buffer
 
     val lookupFunctionTerm = ctx.addReusableFunction(asyncLookupFunction)
+    val DELEGATE = className[DelegatingResultFuture[_]]
 
     val body =
       s"""
          |$prepareCode
-         |$lookupFunctionTerm.eval($DEFAULT_COLLECTOR_TERM, $parameters);
+         |$DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM);
+         |$lookupFunctionTerm.eval(delegates.getCompletableFuture(), 
$parameters);
       """.stripMargin
 
     FunctionCodeGenerator.generateFunction(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
index 5f5037c..a09c85b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
@@ -21,11 +21,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, 
TypeExtractor}
 import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.functions.async.ResultFuture
 import org.apache.flink.streaming.api.operators.ProcessOperator
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
 import org.apache.flink.streaming.api.transformations.{OneInputTransformation, 
StreamTransformation}
-import org.apache.flink.table.api.{TableConfig, TableException, TableSchema}
+import org.apache.flink.table.api.{TableConfig, TableConfigOptions, 
TableException, TableSchema}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.LookupJoinCodeGenerator._
 import org.apache.flink.table.codegen.{CodeGeneratorContext, 
LookupJoinCodeGenerator}
@@ -37,7 +36,7 @@ import org.apache.flink.table.plan.util.LookupJoinUtil._
 import org.apache.flink.table.plan.util.{JoinTypeUtil, RelExplainUtil}
 import org.apache.flink.table.runtime.join.lookup.{AsyncLookupJoinRunner, 
AsyncLookupJoinWithCalcRunner, LookupJoinRunner, LookupJoinWithCalcRunner}
 import org.apache.flink.table.sources.TableIndex.IndexType
-import org.apache.flink.table.sources.{LookupConfig, LookupableTableSource, 
TableIndex, TableSource}
+import org.apache.flink.table.sources.{LookupableTableSource, TableIndex, 
TableSource}
 import org.apache.flink.table.types.ClassLogicalTypeConverter
 import 
org.apache.flink.table.types.ClassLogicalTypeConverter.getInternalClassForType
 import 
org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
@@ -47,7 +46,6 @@ import 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyIn
 import org.apache.flink.table.typeutils.BaseRowTypeInfo
 import org.apache.flink.types.Row
 
-import com.google.common.primitives.Primitives
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
 import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
@@ -59,7 +57,10 @@ import org.apache.calcite.sql.validate.SqlValidatorUtil
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.util.mapping.IntPair
 
+import com.google.common.primitives.Primitives
+
 import java.util.Collections
+import java.util.concurrent.CompletableFuture
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -129,7 +130,7 @@ abstract class CommonLookupJoin(
       case None => tableFieldNames
     }
     val resultFieldNames = getRowType.getFieldNames.asScala.toArray
-    val lookupConfig = 
getLookupConfig(tableSource.asInstanceOf[LookupableTableSource[_]])
+    val lookupableSource = tableSource.asInstanceOf[LookupableTableSource[_]]
     val whereString = calcOnTemporalTable match {
       case Some(calc) => RelExplainUtil.conditionToString(calc, 
getExpressionString)
       case None => "N/A"
@@ -138,7 +139,7 @@ abstract class CommonLookupJoin(
     super.explainTerms(pw)
       .item("table", tableSource.explainSource())
       .item("joinType", JoinTypeUtil.getFlinkJoinType(joinType))
-      .item("async", lookupConfig.isAsyncEnabled)
+      .item("async", lookupableSource.isAsyncEnabled)
       .item("on", joinOnToString(inputFieldNames, rightFieldNames, joinInfo))
       .itemIf("where", whereString, calcOnTemporalTable.isDefined)
       .itemIf("joinCondition",
@@ -191,12 +192,13 @@ abstract class CommonLookupJoin(
       allLookupKeys)
 
     val lookupableTableSource = 
tableSource.asInstanceOf[LookupableTableSource[_]]
-    val lookupConfig = getLookupConfig(lookupableTableSource)
     val leftOuterJoin = joinType == JoinRelType.LEFT
 
-    val operator = if (lookupConfig.isAsyncEnabled) {
-      val asyncBufferCapacity= lookupConfig.getAsyncBufferCapacity
-      val asyncTimeout = lookupConfig.getAsyncTimeoutMs
+    val operator = if (lookupableTableSource.isAsyncEnabled) {
+      val asyncBufferCapacity= config.getConf
+        .getInteger(TableConfigOptions.SQL_EXEC_LOOKUP_ASYNC_BUFFER_CAPACITY)
+      val asyncTimeout = config.getConf
+        .getLong(TableConfigOptions.SQL_EXEC_LOOKUP_ASYNC_TIMEOUT_MS)
 
       val asyncLookupFunction = lookupableTableSource
         .getAsyncLookupFunction(lookupFieldNamesInOrder)
@@ -212,10 +214,9 @@ abstract class CommonLookupJoin(
         producedTypeInfo,
         udtfResultType,
         extractedResultTypeInfo)
-      val parameters =
-        Array(new TypeInformationAnyType[ResultFuture[_]](
-          new GenericTypeInfo[ResultFuture[_]](classOf[ResultFuture[_]]))) ++
-            lookupFieldTypesInOrder
+      val futureType = new TypeInformationAnyType(
+        new GenericTypeInfo(classOf[CompletableFuture[_]]))
+      val parameters = Array(futureType) ++ lookupFieldTypesInOrder
       checkEvalMethodSignature(
         asyncLookupFunction,
         parameters,
@@ -253,7 +254,7 @@ abstract class CommonLookupJoin(
           producedTypeInfo,
           BaseRowTypeInfo.of(rightRowType),
           leftOuterJoin,
-          lookupConfig.getAsyncBufferCapacity)
+          asyncBufferCapacity)
       } else {
         // right type is the same as table source row type, because no calc 
after temporal table
         val rightRowType = tableSourceRowType
@@ -356,14 +357,6 @@ abstract class CommonLookupJoin(
       inputTransformation.getParallelism)
   }
 
-  def getLookupConfig(tableSource: LookupableTableSource[_]): LookupConfig = {
-    if (tableSource.getLookupConfig != null) {
-      tableSource.getLookupConfig
-    } else {
-      LookupConfig.DEFAULT
-    }
-  }
-
   private def rowTypeEquals(expected: TypeInformation[_], actual: 
TypeInformation[_]): Boolean = {
     // check internal and external type, cause we will auto convert external 
class to internal
     // class (eg: Row => BaseRow).
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index 9e7ce7b..ffd0c4d 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -19,8 +19,6 @@
 package org.apache.flink.table.plan.stream.sql.join
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.async.ResultFuture
 import org.apache.flink.table.api._
 import org.apache.flink.table.dataformat.{BaseRow, BinaryString}
@@ -31,11 +29,12 @@ import org.apache.flink.table.types.logical.{IntType, 
TimestampType, VarCharType
 import org.apache.flink.table.typeutils.BaseRowTypeInfo
 import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase}
 import org.apache.flink.types.Row
-
 import org.junit.Assert.{assertTrue, fail}
 import org.junit.Test
 
 import _root_.java.util
+import _root_.java.util.concurrent.CompletableFuture
+import _root_.java.util.{Collection => JCollection}
 import _root_.java.lang.{Long => JLong}
 import _root_.java.sql.Timestamp
 
@@ -161,7 +160,7 @@ class LookupJoinTest extends TableTestBase with 
Serializable {
     expectExceptionThrown(
       "SELECT * FROM T AS T JOIN temporalTable7 " +
         "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name 
AND T.ts = D.ts",
-      "Expected: 
eval(org.apache.flink.streaming.api.functions.async.ResultFuture, " +
+      "Expected: eval(java.util.concurrent.CompletableFuture, " +
         "java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, 
java.lang.Long) \n" +
         "Actual: eval(java.lang.Integer, 
org.apache.flink.table.dataformat.BinaryString, " +
         "java.sql.Timestamp)",
@@ -173,9 +172,9 @@ class LookupJoinTest extends TableTestBase with 
Serializable {
     expectExceptionThrown(
       "SELECT * FROM T AS T JOIN temporalTable8 " +
         "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name 
AND T.ts = D.ts",
-        "Expected: 
eval(org.apache.flink.streaming.api.functions.async.ResultFuture, " +
+        "Expected: eval(java.util.concurrent.CompletableFuture, " +
         "java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, 
java.lang.Long) \n" +
-        "Actual: 
eval(org.apache.flink.streaming.api.functions.async.ResultFuture, " +
+        "Actual: eval(java.util.concurrent.CompletableFuture, " +
         "java.lang.Integer, java.lang.String, java.sql.Timestamp)",
       classOf[TableException]
     )
@@ -185,6 +184,18 @@ class LookupJoinTest extends TableTestBase with 
Serializable {
     verifyTranslationSuccess("SELECT * FROM T AS T JOIN temporalTable9 " +
       "FOR SYSTEM_TIME AS OF T.proctime AS D " +
       "ON T.a = D.id AND T.b = D.name AND T.ts = D.ts")
+
+    val temporalTable10 = new TestInvalidTemporalTable(new 
InvalidAsyncTableFunctionEvalSignature3)
+    streamUtil.tableEnv.registerTableSource("temporalTable10", temporalTable10)
+    expectExceptionThrown(
+      "SELECT * FROM T AS T JOIN temporalTable10 " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name 
AND T.ts = D.ts",
+      "Expected: eval(java.util.concurrent.CompletableFuture, " +
+        "java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, 
java.lang.Long) \n" +
+        "Actual: 
eval(org.apache.flink.streaming.api.functions.async.ResultFuture, " +
+        "java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, 
java.lang.Long)",
+      classOf[TableException]
+    )
   }
 
   @Test
@@ -358,7 +369,7 @@ class TestTemporalTable
       "this method should never be called.")
   }
 
-  override def getLookupConfig: LookupConfig = LookupConfig.DEFAULT
+  override def isAsyncEnabled: Boolean = false
 
   override def getReturnType: TypeInformation[BaseRow] = {
     new BaseRowTypeInfo(
@@ -385,8 +396,7 @@ class TestInvalidTemporalTable private(
     async: Boolean,
     fetcher: TableFunction[_],
     asyncFetcher: AsyncTableFunction[_])
-  extends StreamTableSource[BaseRow]
-  with LookupableTableSource[BaseRow]
+  extends LookupableTableSource[BaseRow]
   with DefinedIndexes {
 
   val fieldNames: Array[String] = Array("id", "name", "age", "ts")
@@ -418,11 +428,7 @@ class TestInvalidTemporalTable private(
     asyncFetcher.asInstanceOf[AsyncTableFunction[BaseRow]]
   }
 
-  override def getLookupConfig: LookupConfig = {
-    LookupConfig.builder()
-        .setAsyncEnabled(async)
-        .build()
-  }
+  override def isAsyncEnabled: Boolean = async
 
   override def getIndexes: util.Collection[TableIndex] = {
     util.Collections.singleton(TableIndex.builder()
@@ -430,12 +436,6 @@ class TestInvalidTemporalTable private(
       .indexedColumns("id", "name", "ts")
       .build())
   }
-
-  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[BaseRow] = {
-    throw new UnsupportedOperationException("This TableSource is only used for 
unit test, " +
-      "this method should never be called.")
-  }
-
 }
 
 class InvalidTableFunctionResultType extends TableFunction[String] {
@@ -472,17 +472,25 @@ class InvalidAsyncTableFunctionEvalSignature1 extends 
AsyncTableFunction[BaseRow
 }
 
 class InvalidAsyncTableFunctionEvalSignature2 extends 
AsyncTableFunction[BaseRow] {
-  def eval(resultFuture: ResultFuture[BaseRow], a: Integer, b: String,  c: 
Timestamp): Unit = {
+  def eval(resultFuture: CompletableFuture[JCollection[BaseRow]],
+    a: Integer, b: String,  c: Timestamp): Unit = {
+  }
+}
+
+class InvalidAsyncTableFunctionEvalSignature3 extends 
AsyncTableFunction[BaseRow] {
+  def eval(resultFuture: ResultFuture[BaseRow],
+    a: Integer, b: BinaryString,  c: JLong): Unit = {
   }
 }
 
 class ValidAsyncTableFunction extends AsyncTableFunction[BaseRow] {
   @varargs
-  def eval(resultFuture: ResultFuture[BaseRow], objs: AnyRef*): Unit = {
+  def eval(resultFuture: CompletableFuture[JCollection[BaseRow]], objs: 
AnyRef*): Unit = {
   }
 }
 
 class ValidAsyncTableFunction2 extends AsyncTableFunction[BaseRow] {
-  def eval(resultFuture: ResultFuture[BaseRow], a: Integer, b: BinaryString, 
c: JLong): Unit = {
+  def eval(resultFuture: CompletableFuture[JCollection[BaseRow]],
+    a: Integer, b: BinaryString, c: JLong): Unit = {
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
index 1abcf2e..29a116b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
@@ -21,9 +21,6 @@ package org.apache.flink.table.runtime.utils
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.functions.async.ResultFuture
 import org.apache.flink.table.api.TableSchema
 import org.apache.flink.table.functions.{AsyncTableFunction, FunctionContext, 
TableFunction}
 import 
org.apache.flink.table.runtime.utils.InMemoryLookupableTableSource.{InMemoryAsyncLookupFunction,
 InMemoryLookupFunction}
@@ -51,9 +48,8 @@ class InMemoryLookupableTableSource(
     data: List[Row],
     primaryKey: Option[Array[String]],
     tableIndexes: Array[TableIndex],
-    lookupConfig: LookupConfig)
+    asyncEnabled: Boolean)
   extends LookupableTableSource[Row]
-  with StreamTableSource[Row]
   with DefinedPrimaryKey
   with DefinedIndexes {
 
@@ -104,7 +100,7 @@ class InMemoryLookupableTableSource(
     map.toMap
   }
 
-  override def getLookupConfig: LookupConfig = lookupConfig
+  override def isAsyncEnabled: Boolean = asyncEnabled
 
   override def getReturnType: TypeInformation[Row] = new 
RowTypeInfo(fieldTypes, fieldNames)
 
@@ -120,9 +116,6 @@ class InMemoryLookupableTableSource(
   @VisibleForTesting
   def getResourceCounter: Int = resourceCounter.get()
 
-  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
-    throw new UnsupportedOperationException("This should never be called.")
-  }
 }
 
 object InMemoryLookupableTableSource {
@@ -181,7 +174,7 @@ object InMemoryLookupableTableSource {
     private val tableIndexes = new mutable.ArrayBuffer[TableIndex]()
     private var primaryKey: Option[Array[String]] = None
     private var data: List[Product] = _
-    private val lookupConfigBuilder: LookupConfig.Builder = 
LookupConfig.builder()
+    private var asyncEnabled: Boolean = false
 
     /**
       * Sets table data for the table source.
@@ -255,23 +248,7 @@ object InMemoryLookupableTableSource {
       * Enables async lookup for the table source
       */
     def enableAsync(): Builder = {
-      lookupConfigBuilder.setAsyncEnabled(true)
-      this
-    }
-
-    /**
-      * Sets async buffer capacity.
-      */
-    def asyncBufferCapacity(capacity: Int): Builder = {
-      lookupConfigBuilder.setAsyncBufferCapacity(capacity)
-      this
-    }
-
-    /**
-      * Sets async time out milli-second.
-      */
-    def asyncTimeoutMs(ms: Long): Builder = {
-      lookupConfigBuilder.setAsyncTimeoutMs(ms)
+      asyncEnabled = true
       this
     }
 
@@ -294,7 +271,7 @@ object InMemoryLookupableTableSource {
         rowData,
         primaryKey,
         tableIndexes.toArray,
-        lookupConfigBuilder.build()
+        asyncEnabled
       )
     }
   }
@@ -343,7 +320,7 @@ object InMemoryLookupableTableSource {
     }
 
     @varargs
-    def eval(resultFuture: ResultFuture[Row], inputs: AnyRef*): Unit = {
+    def eval(resultFuture: CompletableFuture[util.Collection[Row]], inputs: 
AnyRef*): Unit = {
       CompletableFuture
         .supplyAsync(new CollectionSupplier(data, Row.of(inputs: _*)), 
executor)
         .thenAccept(new CollectionConsumer(resultFuture))
@@ -369,7 +346,7 @@ object InMemoryLookupableTableSource {
       }
     }
 
-    private class CollectionConsumer(resultFuture: ResultFuture[Row])
+    private class CollectionConsumer(resultFuture: 
CompletableFuture[util.Collection[Row]])
         extends Consumer[util.Collection[Row]] {
 
       override def accept(results: util.Collection[Row]): Unit = {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
index a693637..06ef82a 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
@@ -182,6 +182,20 @@ public class TableConfigOptions {
                                        .withDescription("Cache size of every 
topn task.");
 
        // 
------------------------------------------------------------------------
+       //  Async Lookup Options
+       // 
------------------------------------------------------------------------
+
+       public static final ConfigOption<Integer> 
SQL_EXEC_LOOKUP_ASYNC_BUFFER_CAPACITY =
+               key("sql.exec.lookup.async.buffer-capacity")
+               .defaultValue(100)
+               .withDescription("The max number of async i/o operation that 
the async lookup join can trigger.");
+
+       public static final ConfigOption<Long> SQL_EXEC_LOOKUP_ASYNC_TIMEOUT_MS 
=
+               key("sql.exec.lookup.async.timeout-ms")
+               .defaultValue(180_000L)
+               .withDescription("The async timeout millisecond for the 
asynchronous operation to complete.");
+
+       // 
------------------------------------------------------------------------
        //  MiniBatch Options
        // 
------------------------------------------------------------------------
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/DelegatingResultFuture.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/DelegatingResultFuture.java
new file mode 100644
index 0000000..9f24aeb
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/DelegatingResultFuture.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.lookup;
+
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+/**
+ * Delegates actions of {@link java.util.concurrent.CompletableFuture} to 
{@link ResultFuture}.
+ * This is used as a bridge between {@link 
org.apache.flink.table.functions.AsyncTableFunction} and
+ * {@link org.apache.flink.streaming.api.functions.async.AsyncFunction}.
+ */
+public class DelegatingResultFuture<OUT> implements 
BiConsumer<Collection<OUT>, Throwable> {
+
+       private final ResultFuture<OUT> delegatedResultFuture;
+       private final CompletableFuture<Collection<OUT>> completableFuture;
+
+       public DelegatingResultFuture(ResultFuture<OUT> delegatedResultFuture) {
+               this.delegatedResultFuture = delegatedResultFuture;
+               this.completableFuture = new CompletableFuture<>();
+               this.completableFuture.whenComplete(this);
+       }
+
+       @Override
+       public void accept(Collection<OUT> outs, Throwable throwable) {
+               if (throwable != null) {
+                       delegatedResultFuture.completeExceptionally(throwable);
+               } else {
+                       delegatedResultFuture.complete(outs);
+               }
+       }
+
+       public CompletableFuture<Collection<OUT>> getCompletableFuture() {
+               return completableFuture;
+       }
+}

Reply via email to