This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5be9ab3c8a28b64d7039568adc43b0abb32c3f6e Author: Jark Wu <[email protected]> AuthorDate: Mon Jun 10 20:43:39 2019 +0800 [FLINK-12708][table] Introduce LookupableTableSource and make blink planner support it --- .../flink/table/functions/AsyncTableFunction.java | 62 +++--------- .../flink/table/sources/LookupableTableSource.java | 12 ++- .../apache/flink/table/sources/LookupConfig.java | 111 --------------------- .../table/codegen/LookupJoinCodeGenerator.scala | 5 +- .../table/plan/nodes/common/CommonLookupJoin.scala | 39 +++----- .../plan/stream/sql/join/LookupJoinTest.scala | 54 +++++----- .../utils/InMemoryLookupableTableSource.scala | 37 ++----- .../apache/flink/table/api/TableConfigOptions.java | 14 +++ .../join/lookup/DelegatingResultFuture.java | 55 ++++++++++ 9 files changed, 151 insertions(+), 238 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java similarity index 57% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java index 0f609a5..ddb1e61 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java @@ -18,11 +18,10 @@ package org.apache.flink.table.functions; -import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.functions.async.ResultFuture; -import org.apache.flink.table.api.ValidationException; + +import java.util.concurrent.CompletableFuture; /** * Base class for a user-defined asynchronously table function (UDTF). This is similar to @@ -35,30 +34,26 @@ import org.apache.flink.table.api.ValidationException; * method. An evaluation method must be declared publicly, not static and named "eval". * Evaluation methods can also be overloaded by implementing multiple methods named "eval". * - * <p>The first parameter of evaluation method must be {@link ResultFuture}, and the others are user - * defined input parameters like the "eval" method of {@link TableFunction}. + * <p>The first parameter of evaluation method must be {@link CompletableFuture}, and the others are + * user defined input parameters like the "eval" method of {@link TableFunction}. The generic type of + * {@link CompletableFuture} must be {@link java.util.Collection} to collect multiple possible result + * values. * * <p>For each "eval", an async io operation can be triggered, and once it has been done, - * the result can be collected by calling {@link ResultFuture#complete}. For each async + * the result can be collected by calling {@link CompletableFuture#complete}. For each async * operation, its context is stored in the operator immediately after invoking "eval", * avoiding blocking for each stream input as long as the internal buffer is not full. * - * <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data. + * <p>{@link CompletableFuture} can be passed into callbacks or futures to collect the result data. * An error can also be propagate to the async IO operator by - * {@link ResultFuture#completeExceptionally(Throwable)}. + * {@link CompletableFuture#completeExceptionally(Throwable)}. * * <p>User-defined functions must have a default constructor and must be instantiable during * runtime. * * <p>By default the result type of an evaluation method is determined by Flink's type extraction - * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more - * complex, custom, or composite types. In these cases {@link TypeInformation} of the result type - * can be manually defined by overriding {@link #getResultType}. - * - * <p>Internally, the Table/SQL API code generation works with primitive values as much as possible. - * If a user-defined table function should not introduce much overhead during runtime, it is - * recommended to declare parameters and result types as primitive types instead of their boxed - * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. + * facilities. Currently, only support {@link org.apache.flink.types.Row} and {@code BaseRow} as + * the result type. Will support more complex, custom types in the future. * * <p>Example: * @@ -67,7 +62,7 @@ import org.apache.flink.table.api.ValidationException; * public class HBaseAsyncTableFunction extends AsyncTableFunction<String> { * * // implement an "eval" method with as many parameters as you want - * public void eval(ResultFuture<String> result, String rowkey) { + * public void eval(CompletableFuture<Collection<String>> result, String rowkey) { * Get get = new Get(Bytes.toBytes(rowkey)); * ListenableFuture<Result> future = hbase.asyncGet(get); * Futures.addCallback(future, new FutureCallback<Result>() { @@ -85,11 +80,12 @@ import org.apache.flink.table.api.ValidationException; * } * } * - * <p>NOTE: the {@link AsyncTableFunction} is can not used as UDTF currently. It only used in - * temporal table join as a async lookup function + * <p>NOTE: the {@link AsyncTableFunction} can not be used as UDTF currently. It only used in + * temporal table join as an async lookup function. * * @param <T> The type of the output row */ +@Experimental public abstract class AsyncTableFunction<T> extends UserDefinedFunction { /** @@ -105,30 +101,4 @@ public abstract class AsyncTableFunction<T> extends UserDefinedFunction { public TypeInformation<T> getResultType() { return null; } - - /** - * Returns {@link TypeInformation} about the operands of the evaluation method with a given - * signature. - * - * <p>In order to perform operand type inference in SQL (especially when NULL is used) it might be - * necessary to determine the parameter {@link TypeInformation} of an evaluation method. - * By default Flink's type extraction facilities are used for this but might be wrong for - * more complex, custom, or composite types. - * - * @param signature signature of the method the operand types need to be determined - * @return {@link TypeInformation} of operand types - */ - public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) { - final TypeInformation<?>[] types = new TypeInformation<?>[signature.length]; - for (int i = 0; i < signature.length; i++) { - try { - types[i] = TypeExtractor.getForClass(signature[i]); - } catch (InvalidTypesException e) { - throw new ValidationException( - "Parameter types of table function " + this.getClass().getCanonicalName() + - " cannot be automatically determined. Please provide type information manually."); - } - } - return types; - } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java similarity index 83% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java index 2f38e63..45e19de 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java @@ -18,7 +18,7 @@ package org.apache.flink.table.sources; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Experimental; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.TableFunction; @@ -29,7 +29,7 @@ import org.apache.flink.table.functions.TableFunction; * * @param <T> type of the result */ -@PublicEvolving +@Experimental public interface LookupableTableSource<T> extends TableSource<T> { /** @@ -45,7 +45,11 @@ public interface LookupableTableSource<T> extends TableSource<T> { AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys); /** - * Defines the lookup behavior in the config. Such as whether to use async lookup. + * Returns true if async lookup is enabled. + * + * <p>The lookup function returned by {@link #getAsyncLookupFunction(String[])} will be + * used if returns true. Otherwise, the lookup function returned by + * {@link #getLookupFunction(String[])} will be used. */ - LookupConfig getLookupConfig(); + boolean isAsyncEnabled(); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupConfig.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupConfig.java deleted file mode 100644 index adb84a3..0000000 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupConfig.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources; - -/** - * The {@link LookupConfig} is used to configure some behavior when lookup a table. - * - * @see LookupableTableSource#getLookupConfig() - */ -public class LookupConfig { - - public static final LookupConfig DEFAULT = LookupConfig.builder().build(); - - private static final boolean DEFAULT_ASYNC_ENABLED = false; - private static final long DEFAULT_ASYNC_TIMEOUT_MS = 180_000; - private static final int DEFAULT_ASYNC_BUFFER_CAPACITY = 100; - - private final boolean asyncEnabled; - private final long asyncTimeoutMs; - private final int asyncBufferCapacity; - - private LookupConfig(boolean asyncEnabled, long asyncTimeoutMs, int asyncBufferCapacity) { - this.asyncEnabled = asyncEnabled; - this.asyncTimeoutMs = asyncTimeoutMs; - this.asyncBufferCapacity = asyncBufferCapacity; - } - - /** - * Returns true if async lookup is enabled. - */ - public boolean isAsyncEnabled() { - return asyncEnabled; - } - - /** - * Returns async timeout millisecond for the asynchronous operation to complete. - */ - public long getAsyncTimeoutMs() { - return asyncTimeoutMs; - } - - /** - * Returns the max number of async i/o operation that can be triggered. - */ - public int getAsyncBufferCapacity() { - return asyncBufferCapacity; - } - - /** - * Returns a new builder that builds a {@link LookupConfig}. - * - * <p>For example: - * - * <pre> - * LookupConfig.builder() - * .setAsyncEnabled(true) - * .setAsyncBufferCapacity(1000) - * .setAsyncTimeoutMs(30000) - * .build(); - * </pre> - */ - public static Builder builder() { - return new Builder(); - } - - /** - * A builder used to build a new {@link LookupConfig}. - */ - public static class Builder { - - private boolean asyncEnabled = DEFAULT_ASYNC_ENABLED; - private long asyncTimeoutMs = DEFAULT_ASYNC_TIMEOUT_MS; - private int asyncBufferCapacity = DEFAULT_ASYNC_BUFFER_CAPACITY; - - public Builder setAsyncEnabled(boolean asyncEnabled) { - this.asyncEnabled = asyncEnabled; - return this; - } - - public Builder setAsyncTimeoutMs(long timeoutMs) { - this.asyncTimeoutMs = timeoutMs; - return this; - } - - public Builder setAsyncBufferCapacity(int bufferCapacity) { - this.asyncBufferCapacity = bufferCapacity; - return this; - } - - public LookupConfig build() { - return new LookupConfig(asyncEnabled, asyncTimeoutMs, asyncBufferCapacity); - } - - } -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/LookupJoinCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/LookupJoinCodeGenerator.scala index 7ffe5a8..eb9fff7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/LookupJoinCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/LookupJoinCodeGenerator.scala @@ -33,6 +33,7 @@ import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction} import org.apache.flink.table.generated.{GeneratedCollector, GeneratedFunction, GeneratedResultFuture} import org.apache.flink.table.plan.util.LookupJoinUtil.{ConstantLookupKey, FieldRefLookupKey, LookupKey} import org.apache.flink.table.runtime.collector.{TableFunctionCollector, TableFunctionResultFuture} +import org.apache.flink.table.runtime.join.lookup.DelegatingResultFuture import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType import org.apache.flink.table.types.logical.{LogicalType, RowType} import org.apache.flink.table.types.utils.TypeConversions @@ -127,11 +128,13 @@ object LookupJoinCodeGenerator { fieldCopy = true) // always copy input field because of async buffer val lookupFunctionTerm = ctx.addReusableFunction(asyncLookupFunction) + val DELEGATE = className[DelegatingResultFuture[_]] val body = s""" |$prepareCode - |$lookupFunctionTerm.eval($DEFAULT_COLLECTOR_TERM, $parameters); + |$DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM); + |$lookupFunctionTerm.eval(delegates.getCompletableFuture(), $parameters); """.stripMargin FunctionCodeGenerator.generateFunction( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala index 5f5037c..a09c85b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala @@ -21,11 +21,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TypeExtractor} import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.streaming.api.functions.async.ResultFuture import org.apache.flink.streaming.api.operators.ProcessOperator import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator import org.apache.flink.streaming.api.transformations.{OneInputTransformation, StreamTransformation} -import org.apache.flink.table.api.{TableConfig, TableException, TableSchema} +import org.apache.flink.table.api.{TableConfig, TableConfigOptions, TableException, TableSchema} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.LookupJoinCodeGenerator._ import org.apache.flink.table.codegen.{CodeGeneratorContext, LookupJoinCodeGenerator} @@ -37,7 +36,7 @@ import org.apache.flink.table.plan.util.LookupJoinUtil._ import org.apache.flink.table.plan.util.{JoinTypeUtil, RelExplainUtil} import org.apache.flink.table.runtime.join.lookup.{AsyncLookupJoinRunner, AsyncLookupJoinWithCalcRunner, LookupJoinRunner, LookupJoinWithCalcRunner} import org.apache.flink.table.sources.TableIndex.IndexType -import org.apache.flink.table.sources.{LookupConfig, LookupableTableSource, TableIndex, TableSource} +import org.apache.flink.table.sources.{LookupableTableSource, TableIndex, TableSource} import org.apache.flink.table.types.ClassLogicalTypeConverter import org.apache.flink.table.types.ClassLogicalTypeConverter.getInternalClassForType import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType @@ -47,7 +46,6 @@ import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyIn import org.apache.flink.table.typeutils.BaseRowTypeInfo import org.apache.flink.types.Row -import com.google.common.primitives.Primitives import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.calcite.rel.core.{JoinInfo, JoinRelType} @@ -59,7 +57,10 @@ import org.apache.calcite.sql.validate.SqlValidatorUtil import org.apache.calcite.tools.RelBuilder import org.apache.calcite.util.mapping.IntPair +import com.google.common.primitives.Primitives + import java.util.Collections +import java.util.concurrent.CompletableFuture import scala.collection.JavaConverters._ import scala.collection.mutable @@ -129,7 +130,7 @@ abstract class CommonLookupJoin( case None => tableFieldNames } val resultFieldNames = getRowType.getFieldNames.asScala.toArray - val lookupConfig = getLookupConfig(tableSource.asInstanceOf[LookupableTableSource[_]]) + val lookupableSource = tableSource.asInstanceOf[LookupableTableSource[_]] val whereString = calcOnTemporalTable match { case Some(calc) => RelExplainUtil.conditionToString(calc, getExpressionString) case None => "N/A" @@ -138,7 +139,7 @@ abstract class CommonLookupJoin( super.explainTerms(pw) .item("table", tableSource.explainSource()) .item("joinType", JoinTypeUtil.getFlinkJoinType(joinType)) - .item("async", lookupConfig.isAsyncEnabled) + .item("async", lookupableSource.isAsyncEnabled) .item("on", joinOnToString(inputFieldNames, rightFieldNames, joinInfo)) .itemIf("where", whereString, calcOnTemporalTable.isDefined) .itemIf("joinCondition", @@ -191,12 +192,13 @@ abstract class CommonLookupJoin( allLookupKeys) val lookupableTableSource = tableSource.asInstanceOf[LookupableTableSource[_]] - val lookupConfig = getLookupConfig(lookupableTableSource) val leftOuterJoin = joinType == JoinRelType.LEFT - val operator = if (lookupConfig.isAsyncEnabled) { - val asyncBufferCapacity= lookupConfig.getAsyncBufferCapacity - val asyncTimeout = lookupConfig.getAsyncTimeoutMs + val operator = if (lookupableTableSource.isAsyncEnabled) { + val asyncBufferCapacity= config.getConf + .getInteger(TableConfigOptions.SQL_EXEC_LOOKUP_ASYNC_BUFFER_CAPACITY) + val asyncTimeout = config.getConf + .getLong(TableConfigOptions.SQL_EXEC_LOOKUP_ASYNC_TIMEOUT_MS) val asyncLookupFunction = lookupableTableSource .getAsyncLookupFunction(lookupFieldNamesInOrder) @@ -212,10 +214,9 @@ abstract class CommonLookupJoin( producedTypeInfo, udtfResultType, extractedResultTypeInfo) - val parameters = - Array(new TypeInformationAnyType[ResultFuture[_]]( - new GenericTypeInfo[ResultFuture[_]](classOf[ResultFuture[_]]))) ++ - lookupFieldTypesInOrder + val futureType = new TypeInformationAnyType( + new GenericTypeInfo(classOf[CompletableFuture[_]])) + val parameters = Array(futureType) ++ lookupFieldTypesInOrder checkEvalMethodSignature( asyncLookupFunction, parameters, @@ -253,7 +254,7 @@ abstract class CommonLookupJoin( producedTypeInfo, BaseRowTypeInfo.of(rightRowType), leftOuterJoin, - lookupConfig.getAsyncBufferCapacity) + asyncBufferCapacity) } else { // right type is the same as table source row type, because no calc after temporal table val rightRowType = tableSourceRowType @@ -356,14 +357,6 @@ abstract class CommonLookupJoin( inputTransformation.getParallelism) } - def getLookupConfig(tableSource: LookupableTableSource[_]): LookupConfig = { - if (tableSource.getLookupConfig != null) { - tableSource.getLookupConfig - } else { - LookupConfig.DEFAULT - } - } - private def rowTypeEquals(expected: TypeInformation[_], actual: TypeInformation[_]): Boolean = { // check internal and external type, cause we will auto convert external class to internal // class (eg: Row => BaseRow). diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala index 9e7ce7b..ffd0c4d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala @@ -19,8 +19,6 @@ package org.apache.flink.table.plan.stream.sql.join import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.async.ResultFuture import org.apache.flink.table.api._ import org.apache.flink.table.dataformat.{BaseRow, BinaryString} @@ -31,11 +29,12 @@ import org.apache.flink.table.types.logical.{IntType, TimestampType, VarCharType import org.apache.flink.table.typeutils.BaseRowTypeInfo import org.apache.flink.table.util.{StreamTableTestUtil, TableTestBase} import org.apache.flink.types.Row - import org.junit.Assert.{assertTrue, fail} import org.junit.Test import _root_.java.util +import _root_.java.util.concurrent.CompletableFuture +import _root_.java.util.{Collection => JCollection} import _root_.java.lang.{Long => JLong} import _root_.java.sql.Timestamp @@ -161,7 +160,7 @@ class LookupJoinTest extends TableTestBase with Serializable { expectExceptionThrown( "SELECT * FROM T AS T JOIN temporalTable7 " + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts", - "Expected: eval(org.apache.flink.streaming.api.functions.async.ResultFuture, " + + "Expected: eval(java.util.concurrent.CompletableFuture, " + "java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, java.lang.Long) \n" + "Actual: eval(java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, " + "java.sql.Timestamp)", @@ -173,9 +172,9 @@ class LookupJoinTest extends TableTestBase with Serializable { expectExceptionThrown( "SELECT * FROM T AS T JOIN temporalTable8 " + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts", - "Expected: eval(org.apache.flink.streaming.api.functions.async.ResultFuture, " + + "Expected: eval(java.util.concurrent.CompletableFuture, " + "java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, java.lang.Long) \n" + - "Actual: eval(org.apache.flink.streaming.api.functions.async.ResultFuture, " + + "Actual: eval(java.util.concurrent.CompletableFuture, " + "java.lang.Integer, java.lang.String, java.sql.Timestamp)", classOf[TableException] ) @@ -185,6 +184,18 @@ class LookupJoinTest extends TableTestBase with Serializable { verifyTranslationSuccess("SELECT * FROM T AS T JOIN temporalTable9 " + "FOR SYSTEM_TIME AS OF T.proctime AS D " + "ON T.a = D.id AND T.b = D.name AND T.ts = D.ts") + + val temporalTable10 = new TestInvalidTemporalTable(new InvalidAsyncTableFunctionEvalSignature3) + streamUtil.tableEnv.registerTableSource("temporalTable10", temporalTable10) + expectExceptionThrown( + "SELECT * FROM T AS T JOIN temporalTable10 " + + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts", + "Expected: eval(java.util.concurrent.CompletableFuture, " + + "java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, java.lang.Long) \n" + + "Actual: eval(org.apache.flink.streaming.api.functions.async.ResultFuture, " + + "java.lang.Integer, org.apache.flink.table.dataformat.BinaryString, java.lang.Long)", + classOf[TableException] + ) } @Test @@ -358,7 +369,7 @@ class TestTemporalTable "this method should never be called.") } - override def getLookupConfig: LookupConfig = LookupConfig.DEFAULT + override def isAsyncEnabled: Boolean = false override def getReturnType: TypeInformation[BaseRow] = { new BaseRowTypeInfo( @@ -385,8 +396,7 @@ class TestInvalidTemporalTable private( async: Boolean, fetcher: TableFunction[_], asyncFetcher: AsyncTableFunction[_]) - extends StreamTableSource[BaseRow] - with LookupableTableSource[BaseRow] + extends LookupableTableSource[BaseRow] with DefinedIndexes { val fieldNames: Array[String] = Array("id", "name", "age", "ts") @@ -418,11 +428,7 @@ class TestInvalidTemporalTable private( asyncFetcher.asInstanceOf[AsyncTableFunction[BaseRow]] } - override def getLookupConfig: LookupConfig = { - LookupConfig.builder() - .setAsyncEnabled(async) - .build() - } + override def isAsyncEnabled: Boolean = async override def getIndexes: util.Collection[TableIndex] = { util.Collections.singleton(TableIndex.builder() @@ -430,12 +436,6 @@ class TestInvalidTemporalTable private( .indexedColumns("id", "name", "ts") .build()) } - - override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[BaseRow] = { - throw new UnsupportedOperationException("This TableSource is only used for unit test, " + - "this method should never be called.") - } - } class InvalidTableFunctionResultType extends TableFunction[String] { @@ -472,17 +472,25 @@ class InvalidAsyncTableFunctionEvalSignature1 extends AsyncTableFunction[BaseRow } class InvalidAsyncTableFunctionEvalSignature2 extends AsyncTableFunction[BaseRow] { - def eval(resultFuture: ResultFuture[BaseRow], a: Integer, b: String, c: Timestamp): Unit = { + def eval(resultFuture: CompletableFuture[JCollection[BaseRow]], + a: Integer, b: String, c: Timestamp): Unit = { + } +} + +class InvalidAsyncTableFunctionEvalSignature3 extends AsyncTableFunction[BaseRow] { + def eval(resultFuture: ResultFuture[BaseRow], + a: Integer, b: BinaryString, c: JLong): Unit = { } } class ValidAsyncTableFunction extends AsyncTableFunction[BaseRow] { @varargs - def eval(resultFuture: ResultFuture[BaseRow], objs: AnyRef*): Unit = { + def eval(resultFuture: CompletableFuture[JCollection[BaseRow]], objs: AnyRef*): Unit = { } } class ValidAsyncTableFunction2 extends AsyncTableFunction[BaseRow] { - def eval(resultFuture: ResultFuture[BaseRow], a: Integer, b: BinaryString, c: JLong): Unit = { + def eval(resultFuture: CompletableFuture[JCollection[BaseRow]], + a: Integer, b: BinaryString, c: JLong): Unit = { } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala index 1abcf2e..29a116b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala @@ -21,9 +21,6 @@ package org.apache.flink.table.runtime.utils import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.streaming.api.functions.async.ResultFuture import org.apache.flink.table.api.TableSchema import org.apache.flink.table.functions.{AsyncTableFunction, FunctionContext, TableFunction} import org.apache.flink.table.runtime.utils.InMemoryLookupableTableSource.{InMemoryAsyncLookupFunction, InMemoryLookupFunction} @@ -51,9 +48,8 @@ class InMemoryLookupableTableSource( data: List[Row], primaryKey: Option[Array[String]], tableIndexes: Array[TableIndex], - lookupConfig: LookupConfig) + asyncEnabled: Boolean) extends LookupableTableSource[Row] - with StreamTableSource[Row] with DefinedPrimaryKey with DefinedIndexes { @@ -104,7 +100,7 @@ class InMemoryLookupableTableSource( map.toMap } - override def getLookupConfig: LookupConfig = lookupConfig + override def isAsyncEnabled: Boolean = asyncEnabled override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames) @@ -120,9 +116,6 @@ class InMemoryLookupableTableSource( @VisibleForTesting def getResourceCounter: Int = resourceCounter.get() - override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { - throw new UnsupportedOperationException("This should never be called.") - } } object InMemoryLookupableTableSource { @@ -181,7 +174,7 @@ object InMemoryLookupableTableSource { private val tableIndexes = new mutable.ArrayBuffer[TableIndex]() private var primaryKey: Option[Array[String]] = None private var data: List[Product] = _ - private val lookupConfigBuilder: LookupConfig.Builder = LookupConfig.builder() + private var asyncEnabled: Boolean = false /** * Sets table data for the table source. @@ -255,23 +248,7 @@ object InMemoryLookupableTableSource { * Enables async lookup for the table source */ def enableAsync(): Builder = { - lookupConfigBuilder.setAsyncEnabled(true) - this - } - - /** - * Sets async buffer capacity. - */ - def asyncBufferCapacity(capacity: Int): Builder = { - lookupConfigBuilder.setAsyncBufferCapacity(capacity) - this - } - - /** - * Sets async time out milli-second. - */ - def asyncTimeoutMs(ms: Long): Builder = { - lookupConfigBuilder.setAsyncTimeoutMs(ms) + asyncEnabled = true this } @@ -294,7 +271,7 @@ object InMemoryLookupableTableSource { rowData, primaryKey, tableIndexes.toArray, - lookupConfigBuilder.build() + asyncEnabled ) } } @@ -343,7 +320,7 @@ object InMemoryLookupableTableSource { } @varargs - def eval(resultFuture: ResultFuture[Row], inputs: AnyRef*): Unit = { + def eval(resultFuture: CompletableFuture[util.Collection[Row]], inputs: AnyRef*): Unit = { CompletableFuture .supplyAsync(new CollectionSupplier(data, Row.of(inputs: _*)), executor) .thenAccept(new CollectionConsumer(resultFuture)) @@ -369,7 +346,7 @@ object InMemoryLookupableTableSource { } } - private class CollectionConsumer(resultFuture: ResultFuture[Row]) + private class CollectionConsumer(resultFuture: CompletableFuture[util.Collection[Row]]) extends Consumer[util.Collection[Row]] { override def accept(results: util.Collection[Row]): Unit = { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java index a693637..06ef82a 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java @@ -182,6 +182,20 @@ public class TableConfigOptions { .withDescription("Cache size of every topn task."); // ------------------------------------------------------------------------ + // Async Lookup Options + // ------------------------------------------------------------------------ + + public static final ConfigOption<Integer> SQL_EXEC_LOOKUP_ASYNC_BUFFER_CAPACITY = + key("sql.exec.lookup.async.buffer-capacity") + .defaultValue(100) + .withDescription("The max number of async i/o operation that the async lookup join can trigger."); + + public static final ConfigOption<Long> SQL_EXEC_LOOKUP_ASYNC_TIMEOUT_MS = + key("sql.exec.lookup.async.timeout-ms") + .defaultValue(180_000L) + .withDescription("The async timeout millisecond for the asynchronous operation to complete."); + + // ------------------------------------------------------------------------ // MiniBatch Options // ------------------------------------------------------------------------ diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/DelegatingResultFuture.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/DelegatingResultFuture.java new file mode 100644 index 0000000..9f24aeb --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/DelegatingResultFuture.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join.lookup; + +import org.apache.flink.streaming.api.functions.async.ResultFuture; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; + +/** + * Delegates actions of {@link java.util.concurrent.CompletableFuture} to {@link ResultFuture}. + * This is used as a bridge between {@link org.apache.flink.table.functions.AsyncTableFunction} and + * {@link org.apache.flink.streaming.api.functions.async.AsyncFunction}. + */ +public class DelegatingResultFuture<OUT> implements BiConsumer<Collection<OUT>, Throwable> { + + private final ResultFuture<OUT> delegatedResultFuture; + private final CompletableFuture<Collection<OUT>> completableFuture; + + public DelegatingResultFuture(ResultFuture<OUT> delegatedResultFuture) { + this.delegatedResultFuture = delegatedResultFuture; + this.completableFuture = new CompletableFuture<>(); + this.completableFuture.whenComplete(this); + } + + @Override + public void accept(Collection<OUT> outs, Throwable throwable) { + if (throwable != null) { + delegatedResultFuture.completeExceptionally(throwable); + } else { + delegatedResultFuture.complete(outs); + } + } + + public CompletableFuture<Collection<OUT>> getCompletableFuture() { + return completableFuture; + } +}
