leonardBang commented on a change in pull request #13729: URL: https://github.com/apache/flink/pull/13729#discussion_r518800038
########## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java ########## @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader; +import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase; +import org.apache.flink.connectors.hive.util.HivePartitionUtils; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.connector.source.TableFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.filesystem.FileSystemLookupFunction; +import org.apache.flink.table.filesystem.PartitionFetcher; +import org.apache.flink.table.filesystem.PartitionReader; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.mapred.JobConf; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL; +import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET; +import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL; +import static org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE; + +/** + * Hive Table Source that has lookup ability. + * + * <p>Hive Table source has both lookup and continuous read ability, when it acts as continuous read source + * it does not have the lookup ability but can be a temporal table just like other stream sources. + * When it acts as bounded table, it has the lookup ability. + * + * <p>A common user case is use hive table as dimension table and always lookup the latest partition data, in this + * case, hive table source is a continuous read source but currently we implements it by LookupFunction. Because + * currently TableSource can not tell the downstream when the latest partition has been read finished. This is a + * temporarily workaround and will re-implement in the future. + */ +public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource { + + private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L); + private final Configuration configuration; + private Duration hiveTableReloadInterval; + + public HiveLookupTableSource( + JobConf jobConf, + ReadableConfig flinkConf, + ObjectPath tablePath, + CatalogTable catalogTable) { + super(jobConf, flinkConf, tablePath, catalogTable); + this.configuration = new Configuration(); + catalogTable.getOptions().forEach(configuration::setString); + validateLookupConfigurations(); + } + + @Override + public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { + return TableFunctionProvider.of(getLookupFunction(context.getKeys())); + } + + @VisibleForTesting + TableFunction<RowData> getLookupFunction(int[][] keys) { + int[] keyIndices = new int[keys.length]; + int i = 0; + for (int[] key : keys) { + if (key.length > 1) { + throw new UnsupportedOperationException("Hive lookup can not support nested key now."); + } + keyIndices[i] = key[0]; + i++; + } + return getLookupFunction(keyIndices); + } + + private void validateLookupConfigurations() { + String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE); + if (isStreamingSource()) { + Preconditions.checkArgument( + !configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET), + String.format( + "The '%s' is not supported when set '%s' to 'latest'", + STREAMING_SOURCE_CONSUME_START_OFFSET.key(), + STREAMING_SOURCE_PARTITION_INCLUDE.key())); + + Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null + ? DEFAULT_LOOKUP_MONITOR_INTERVAL + : configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL); + Preconditions.checkArgument( + monitorInterval.toMillis() >= DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(), + String.format( + "Currently the value of '%s' is required bigger or equal to default value '%s' " + + "when set '%s' to 'latest', but actual is '%s'", + STREAMING_SOURCE_MONITOR_INTERVAL.key(), + DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(), + STREAMING_SOURCE_PARTITION_INCLUDE.key(), + monitorInterval.toMillis()) + ); + + hiveTableReloadInterval = monitorInterval; + } else { + Preconditions.checkArgument( Review comment: Yes, because we think using partition project is better to load specific partition in batch analysis, and from the semantic, a batch table does not have the latest partition but streaming hive table has. And for batch, user can still use `LOOKUP_JOIN_CACHE_TTL` to set the reload interval. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
