lincoln-lil commented on code in PR #20482:
URL: https://github.com/apache/flink/pull/20482#discussion_r939814164
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java:
##########
@@ -144,75 +146,92 @@ private LookupJoinUtil() {
// no instantiation
}
+ /** AsyncLookupOptions includes async related options. */
+ public static class AsyncLookupOptions {
+ public final int asyncBufferCapacity;
+ public final long asyncTimeout;
+ public final ExecutionConfigOptions.AsyncOutputMode asyncOutputMode;
+
+ public AsyncLookupOptions(
+ int asyncBufferCapacity,
+ long asyncTimeout,
+ ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
+ this.asyncBufferCapacity = asyncBufferCapacity;
+ this.asyncTimeout = asyncTimeout;
+ this.asyncOutputMode = asyncOutputMode;
+ }
+ }
+
/** Gets lookup keys sorted by index in ascending order. */
public static int[] getOrderedLookupKeys(Collection<Integer>
allLookupKeys) {
List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys);
lookupKeyIndicesInOrder.sort(Integer::compareTo);
return
lookupKeyIndicesInOrder.stream().mapToInt(Integer::intValue).toArray();
}
- /** Gets LookupFunction from temporal table according to the given lookup
keys. */
- public static UserDefinedFunction getLookupFunction(
- RelOptTable temporalTable, Collection<Integer> lookupKeys) {
- return getLookupFunction(temporalTable, lookupKeys, false);
- }
-
/**
- * Gets LookupFunction from temporal table according to the given lookup
keys. If specifies
- * requireSyncLookup, then only sync function will be created or raise an
error if not
- * implemented.
+ * Gets LookupFunction from temporal table according to the given lookup
keys with preference.
+ *
+ * @return the UserDefinedFunction by preferable lookup mode, if require
*/
public static UserDefinedFunction getLookupFunction(
- RelOptTable temporalTable, Collection<Integer> lookupKeys, boolean
requireSyncLookup) {
+ RelOptTable temporalTable,
+ Collection<Integer> lookupKeys,
+ boolean require,
+ boolean async) {
+ UserDefinedFunction syncLookupFunctions = null;
+ UserDefinedFunction asyncLookupFunctions = null;
int[] lookupKeyIndicesInOrder = getOrderedLookupKeys(lookupKeys);
-
if (temporalTable instanceof TableSourceTable) {
// TODO: support nested lookup keys in the future,
// currently we only support top-level lookup keys
int[][] indices =
IntStream.of(lookupKeyIndicesInOrder)
.mapToObj(i -> new int[] {i})
.toArray(int[][]::new);
+
LookupTableSource tableSource =
(LookupTableSource) ((TableSourceTable)
temporalTable).tableSource();
LookupRuntimeProviderContext providerContext =
new LookupRuntimeProviderContext(indices);
LookupTableSource.LookupRuntimeProvider provider =
tableSource.getLookupRuntimeProvider(providerContext);
- if (requireSyncLookup && !(provider instanceof
TableFunctionProvider)) {
- throw new TableException(
- String.format(
- "Require a synchronous TableFunction due to
planner's requirement but no TableFunctionProvider "
- + "found in TableSourceTable: %s,
please check the code to ensure a proper TableFunctionProvider is specified.",
- temporalTable.getQualifiedName()));
- }
if (provider instanceof LookupFunctionProvider) {
if (provider instanceof PartialCachingLookupProvider) {
PartialCachingLookupProvider partialCachingLookupProvider =
(PartialCachingLookupProvider) provider;
- return new CachingLookupFunction(
- partialCachingLookupProvider.getCache(),
-
partialCachingLookupProvider.createLookupFunction());
+ syncLookupFunctions =
+ new CachingLookupFunction(
+ partialCachingLookupProvider.getCache(),
+
partialCachingLookupProvider.createLookupFunction());
+ } else {
+ syncLookupFunctions =
+ ((LookupFunctionProvider)
provider).createLookupFunction();
}
- return ((LookupFunctionProvider)
provider).createLookupFunction();
- } else if (provider instanceof AsyncLookupFunctionProvider) {
+ }
+ if (provider instanceof AsyncLookupFunctionProvider) {
if (provider instanceof PartialCachingAsyncLookupProvider) {
PartialCachingAsyncLookupProvider
partialCachingLookupProvider =
(PartialCachingAsyncLookupProvider) provider;
- return new CachingAsyncLookupFunction(
- partialCachingLookupProvider.getCache(),
-
partialCachingLookupProvider.createAsyncLookupFunction());
+ asyncLookupFunctions =
+ new CachingAsyncLookupFunction(
Review Comment:
this function should be refactored to create nested delegators of user
lookup func
[CachingLookupFunction]
|
[RetryableLookupFunctionDelegator]
|
userLookupFunc
I'll update this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]