JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517186451



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemAllPartitionFetcher;
+import org.apache.flink.table.filesystem.FileSystemLatestPartitionFetcher;
+import org.apache.flink.table.filesystem.FileSystemNonPartitionedTableFetcher;
+import org.apache.flink.table.filesystem.FilesystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it 
acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like 
other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the 
latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements 
it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been 
read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements 
LookupTableSource {
+
+       private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = 
Duration.ofHours(1L);
+       private final Configuration configuration;
+       private Duration hiveTableCacheTTL;
+
+       public HiveLookupTableSource(
+                       JobConf jobConf,
+                       ReadableConfig flinkConf,
+                       ObjectPath tablePath,
+                       CatalogTable catalogTable) {
+               super(jobConf, flinkConf, tablePath, catalogTable);
+               this.configuration = new Configuration();
+               catalogTable.getOptions().forEach(configuration::setString);
+               validateLookupConfigurations();
+       }
+
+       @Override
+       public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
+               return 
TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+       }
+
+       @VisibleForTesting
+       TableFunction<RowData> getLookupFunction(int[][] keys) {
+               List<String> keyNames = new ArrayList<>();
+               TableSchema schema = getTableSchema();
+               for (int[] key : keys) {
+                       if (key.length > 1) {
+                               throw new UnsupportedOperationException("Hive 
lookup can not support nested key now.");
+                       }
+                       keyNames.add(schema.getFieldName(key[0]).get());
+               }
+               return getLookupFunction(keyNames.toArray(new String[0]));
+       }
+
+       private void validateLookupConfigurations() {
+               String partitionInclude = 
configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+               if (isStreamingSource()) {
+                       Preconditions.checkArgument(
+                                       
!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+                                       String.format(
+                                                       "The '%s' is not 
supported when set '%s' to 'latest'",
+                                                       
STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+                                                       
STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+                       Duration monitorInterval = 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+                       if 
(monitorInterval.equals(STREAMING_SOURCE_MONITOR_INTERVAL.defaultValue())) {
+                               monitorInterval = 
DEFAULT_LOOKUP_MONITOR_INTERVAL;
+                       }
+                       Preconditions.checkArgument(
+                                       monitorInterval.toMillis() >= 
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+                                       String.format(
+                                                       "Currently the value of 
'%s' is required bigger or equal to default value '%s' " +
+                                                                       "when 
set '%s' to 'latest', but actual is '%s'",
+                                                       
STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+                                                       
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+                                                       
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+                                                       
monitorInterval.toMillis())
+                       );
+
+                       hiveTableCacheTTL = monitorInterval;
+               } else {
+                       Preconditions.checkArgument(
+                                       "all".equals(partitionInclude),
+                                       String.format("The only supported %s 
for lookup is '%s' in batch source," +
+                                                       " but actual is '%s'", 
STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+                       hiveTableCacheTTL = 
configuration.get(LOOKUP_JOIN_CACHE_TTL);
+               }
+       }
+
+       private TableFunction<RowData> getLookupFunction(String[] keys) {
+
+               final String defaultPartitionName = 
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+                               
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+               PartitionFetcher.Context context = new 
HiveTablePartitionFetcherContext(
+                               tablePath,
+                               hiveShim,
+                               new JobConfWrapper(jobConf),
+                               catalogTable.getPartitionKeys(),
+                               getProducedTableSchema().getFieldDataTypes(),
+                               getProducedTableSchema().getFieldNames(),
+                               configuration,
+                               defaultPartitionName);
+
+               PartitionFetcher<HiveTablePartition> partitionFetcher;
+               if (catalogTable.getPartitionKeys().isEmpty()) {
+                       // non-partitioned table
+                       partitionFetcher = new 
FileSystemNonPartitionedTableFetcher(context);
+
+               } else if (isStreamingSource()) {
+                       // streaming-read partitioned table
+                       partitionFetcher = new 
FileSystemLatestPartitionFetcher(context);
+
+               } else {
+                       // bounded-read partitioned table
+                       partitionFetcher = new 
FileSystemAllPartitionFetcher(context);
+               }
+
+               PartitionReader<HiveTablePartition, RowData> partitionReader = 
new HiveInputFormatPartitionReader(
+                               jobConf,
+                               hiveVersion,
+                               tablePath,
+                               getProducedTableSchema().getFieldDataTypes(),
+                               getProducedTableSchema().getFieldNames(),
+                               catalogTable.getPartitionKeys(),
+                               projectedFields,
+                               
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+
+               return new FilesystemLookupFunction<>(
+                               partitionFetcher,
+                               partitionReader,
+                               getProducedTableSchema().getFieldDataTypes(),
+                               getProducedTableSchema().getFieldNames(),
+                               keys,
+                               hiveTableCacheTTL);
+       }
+
+       /**
+        * PartitionFetcher.Context for {@link HiveTablePartition}.
+        */
+       static class HiveTablePartitionFetcherContext implements 
PartitionFetcher.Context<HiveTablePartition> {
+
+               private static final long serialVersionUID = 1L;
+               private final ObjectPath tablePath;
+               private final HiveShim hiveShim;
+               private final JobConfWrapper confWrapper;
+               private final List<String> partitionKeys;
+               private final DataType[] fieldTypes;
+               private final String[] fieldNames;
+               private final Configuration configuration;
+               private final String defaultPartitionName;
+
+               private transient IMetaStoreClient metaStoreClient;
+               private transient StorageDescriptor tableSd;
+               private transient Properties tableProps;
+               private transient PartitionTimeExtractor extractor;
+               private transient ConsumeOrder consumeOrder;
+               private transient Path tableLocation;
+               private transient Table table;
+               private transient FileSystem fs;
+
+               public HiveTablePartitionFetcherContext(
+                               ObjectPath tablePath,
+                               HiveShim hiveShim,
+                               JobConfWrapper confWrapper,
+                               List<String> partitionKeys,
+                               DataType[] fieldTypes,
+                               String[] fieldNames,
+                               Configuration configuration,
+                               String defaultPartitionName) {
+                       this.tablePath = tablePath;
+                       this.hiveShim = hiveShim;
+                       this.confWrapper = confWrapper;
+                       this.partitionKeys = partitionKeys;
+                       this.fieldTypes = fieldTypes;
+                       this.fieldNames = fieldNames;
+                       this.configuration = configuration;
+                       this.defaultPartitionName = defaultPartitionName;
+               }
+
+               @Override
+               public void initialize() throws Exception {
+                       metaStoreClient = hiveShim.getHiveMetastoreClient(new 
HiveConf(confWrapper.conf(), HiveConf.class));
+                       table = 
metaStoreClient.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+                       tableSd = table.getSd();
+                       tableProps = 
HiveReflectionUtils.getTableMetadata(hiveShim, table);
+
+                       String consumeOrderStr = 
configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+                       consumeOrder = 
ConsumeOrder.getConsumeOrder(consumeOrderStr);
+                       String extractorKind = 
configuration.get(PARTITION_TIME_EXTRACTOR_KIND);
+                       String extractorClass = 
configuration.get(PARTITION_TIME_EXTRACTOR_CLASS);
+                       String extractorPattern = 
configuration.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+
+                       extractor = PartitionTimeExtractor.create(
+                                       
Thread.currentThread().getContextClassLoader(),
+                                       extractorKind,
+                                       extractorClass,
+                                       extractorPattern);
+                       tableLocation = new Path(table.getSd().getLocation());
+                       fs = tableLocation.getFileSystem(confWrapper.conf());
+               }
+
+               @Override
+               public Optional<HiveTablePartition> getPartition(List<String> 
partValues) throws Exception {
+                       if (partitionKeys.isEmpty()) {
+                               return Optional.empty();
+                       }
+                       try {
+                               Partition partition = 
metaStoreClient.getPartition(
+                                               tablePath.getDatabaseName(),
+                                               tablePath.getObjectName(),
+                                               partValues);
+                               HiveTablePartition hiveTablePartition = 
HivePartitionUtils.toHiveTablePartition(
+                                               partitionKeys,
+                                               fieldNames,
+                                               fieldTypes,
+                                               hiveShim,
+                                               tableProps,
+                                               defaultPartitionName, 
partition);
+                               return Optional.of(hiveTablePartition);
+                       } catch (NoSuchObjectException e) {
+                               return Optional.empty();
+                       }
+               }
+
+               @Override
+               public Optional<HiveTablePartition> 
getNonPartitionedTablePartition() {
+                       if (partitionKeys.isEmpty()) {
+                               return Optional.of(new 
HiveTablePartition(tableSd, tableProps));
+                       }
+                       return Optional.empty();
+               }
+
+               @Override
+               public List<Tuple2<List<String>, Long>> 
getAllPartValueToTimeList() {
+                       FileStatus[] statuses = 
HivePartitionUtils.getFileStatusRecurse(tableLocation, partitionKeys.size(), 
fs);
+                       List<Tuple2<List<String>, Long>> partValueList = new 
ArrayList<>();
+                       for (FileStatus status : statuses) {
+                               List<String> partValues = 
extractPartitionValues(
+                                               new 
org.apache.flink.core.fs.Path(status.getPath().toString()));
+                               long timestamp = extractTimestamp(
+                                               partitionKeys,
+                                               partValues,
+                                               // to UTC millisecond.
+                                               () -> 
TimestampData.fromTimestamp(
+                                                               new 
Timestamp(status.getModificationTime())).getMillisecond());
+                               partValueList.add(new Tuple2<>(partValues, 
timestamp));
+                       }
+
+                       return partValueList;
+               }
+
+               @Override
+               public long extractTimestamp(

Review comment:
       extractTimestamp is a inner method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to