JingsongLi commented on a change in pull request #13729:
URL: https://github.com/apache/flink/pull/13729#discussion_r517750633



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HivePartitionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.FileSystemLookupFunction;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionReader;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.LOOKUP_JOIN_CACHE_TTL;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_MONITOR_INTERVAL;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_ORDER;
+import static 
org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
+
+/**
+ * Hive Table Source that has lookup ability.
+ *
+ * <p>Hive Table source has both lookup and continuous read ability, when it 
acts as continuous read source
+ * it does not have the lookup ability but can be a temporal table just like 
other stream sources.
+ * When it acts as bounded table, it has the lookup ability.
+ *
+ * <p>A common user case is use hive dimension table and always lookup the 
latest partition data, in this case
+ * hive table source is a continuous read source but currently we implements 
it by LookupFunction. Because currently
+ * TableSource can not tell the downstream when the latest partition has been 
read finished. This is a temporarily
+ * workaround and will re-implement in the future.
+ */
+public class HiveLookupTableSource extends HiveTableSource implements 
LookupTableSource {
+
+       private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = 
Duration.ofHours(1L);
+       private final Configuration configuration;
+       private Duration hiveTableReloadInterval;
+
+       public HiveLookupTableSource(
+                       JobConf jobConf,
+                       ReadableConfig flinkConf,
+                       ObjectPath tablePath,
+                       CatalogTable catalogTable) {
+               super(jobConf, flinkConf, tablePath, catalogTable);
+               this.configuration = new Configuration();
+               catalogTable.getOptions().forEach(configuration::setString);
+               validateLookupConfigurations();
+       }
+
+       @Override
+       public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
+               return 
TableFunctionProvider.of(getLookupFunction(context.getKeys()));
+       }
+
+       @VisibleForTesting
+       TableFunction<RowData> getLookupFunction(int[][] keys) {
+               int[] keyIndices = new int[keys.length];
+               int i = 0;
+               for (int[] key : keys) {
+                       if (key.length > 1) {
+                               throw new UnsupportedOperationException("Hive 
lookup can not support nested key now.");
+                       }
+                       keyIndices[i] = key[0];
+                       i++;
+               }
+               return getLookupFunction(keyIndices);
+       }
+
+       private void validateLookupConfigurations() {
+               String partitionInclude = 
configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
+               if (isStreamingSource()) {
+                       Preconditions.checkArgument(
+                                       
!configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET),
+                                       String.format(
+                                                       "The '%s' is not 
supported when set '%s' to 'latest'",
+                                                       
STREAMING_SOURCE_CONSUME_START_OFFSET.key(),
+                                                       
STREAMING_SOURCE_PARTITION_INCLUDE.key()));
+
+                       Duration monitorInterval = 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+                                       ? DEFAULT_LOOKUP_MONITOR_INTERVAL
+                                       : 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+                       Preconditions.checkArgument(
+                                       monitorInterval.toMillis() >= 
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+                                       String.format(
+                                                       "Currently the value of 
'%s' is required bigger or equal to default value '%s' " +
+                                                                       "when 
set '%s' to 'latest', but actual is '%s'",
+                                                       
STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+                                                       
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+                                                       
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+                                                       
monitorInterval.toMillis())
+                       );
+
+                       hiveTableReloadInterval = monitorInterval;
+               } else {
+                       Preconditions.checkArgument(
+                                       "all".equals(partitionInclude),
+                                       String.format("The only supported %s 
for lookup is '%s' in batch source," +
+                                                       " but actual is '%s'", 
STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
+
+                       hiveTableReloadInterval = 
configuration.get(LOOKUP_JOIN_CACHE_TTL);
+               }
+       }
+
+       private TableFunction<RowData> getLookupFunction(int[] keys) {
+
+               final String defaultPartitionName = 
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+                               
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+               PartitionFetcher.Context<HiveTablePartition> fetcherContext = 
new HiveTablePartitionFetcherContext(
+                               tablePath,
+                               hiveShim,
+                               new JobConfWrapper(jobConf),
+                               catalogTable.getPartitionKeys(),
+                               getProducedTableSchema().getFieldDataTypes(),
+                               getProducedTableSchema().getFieldNames(),
+                               configuration,
+                               defaultPartitionName);
+
+               PartitionFetcher<HiveTablePartition> partitionFetcher;
+               if (catalogTable.getPartitionKeys().isEmpty()) {
+                       // non-partitioned table, the fetcher fetches the 
partition which represents the given table.
+                       partitionFetcher = context -> {
+                               List<HiveTablePartition> partValueList = new 
ArrayList<>();
+                               
context.getNonPartitionedTablePartition().ifPresent(partValueList::add);
+                               return partValueList;
+                       };
+               } else if (isStreamingSource()) {
+                       // streaming-read partitioned table, the fetcher 
fetches the latest partition of the given table.
+                       partitionFetcher = context -> {
+                               List<HiveTablePartition> partValueList = new 
ArrayList<>();
+                               List<Tuple2<List<String>, Comparable>> 
allPartValueToTime = context.getPartValueWithComparableObjList();
+                               // fetch latest partitions for partitioned table
+                               if (allPartValueToTime.size() > 0) {
+                                       //sort in desc order
+                                       allPartValueToTime.sort((o1, o2) -> 
o2.f1.compareTo(o1.f1));
+                                       Tuple2<List<String>, Comparable> 
maxPartition = allPartValueToTime.get(0);
+                                       
context.getPartition(maxPartition.f0).ifPresent(partValueList::add);
+                               } else {
+                                       throw new IllegalArgumentException(
+                                                       String.format("At least 
one partition is required when set '%s' to 'latest' in temporal join," +
+                                                                               
        " but actual partition number is '%s'",
+                                                                       
STREAMING_SOURCE_PARTITION_INCLUDE.key(), allPartValueToTime.size()));
+                               }
+                               return partValueList;
+                       };
+               } else {
+                       // bounded-read partitioned table, the fetcher fetches 
all partitions of the given filesystem table.
+                       partitionFetcher = context -> {
+                               List<HiveTablePartition> partValueList = new 
ArrayList<>();
+                               List<Tuple2<List<String>, Comparable>> 
allPartValueToTime = context.getPartValueWithComparableObjList();
+                               for (Tuple2<List<String>, Comparable> 
partValueToTime : allPartValueToTime) {
+                                       
context.getPartition(partValueToTime.f0).ifPresent(partValueList::add);
+                               }
+                               return partValueList;
+                       };
+               }
+
+               PartitionReader<HiveTablePartition, RowData> partitionReader = 
new HiveInputFormatPartitionReader(
+                               jobConf,
+                               hiveVersion,
+                               tablePath,
+                               getProducedTableSchema().getFieldDataTypes(),
+                               getProducedTableSchema().getFieldNames(),
+                               catalogTable.getPartitionKeys(),
+                               projectedFields,
+                               
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
+
+               return new FileSystemLookupFunction<>(
+                               partitionFetcher,
+                               fetcherContext,
+                               partitionReader,
+                               (RowType) 
getProducedTableSchema().toRowDataType().getLogicalType(),
+                               keys,
+                               hiveTableReloadInterval);
+       }
+
+       /**
+        * PartitionFetcher.Context for {@link HiveTablePartition}.
+        */
+       static class HiveTablePartitionFetcherContext implements 
PartitionFetcher.Context<HiveTablePartition> {
+
+               private static final long serialVersionUID = 1L;
+               private final ObjectPath tablePath;
+               private final HiveShim hiveShim;
+               private final JobConfWrapper confWrapper;
+               private final List<String> partitionKeys;
+               private final DataType[] fieldTypes;
+               private final String[] fieldNames;
+               private final Configuration configuration;
+               private final String defaultPartitionName;
+
+               private transient IMetaStoreClient metaStoreClient;
+               private transient StorageDescriptor tableSd;
+               private transient Properties tableProps;
+               private transient PartitionTimeExtractor extractor;
+               private transient ConsumeOrder consumeOrder;
+               private transient Path tableLocation;
+               private transient Table table;
+               private transient FileSystem fs;
+
+               public HiveTablePartitionFetcherContext(
+                               ObjectPath tablePath,
+                               HiveShim hiveShim,
+                               JobConfWrapper confWrapper,
+                               List<String> partitionKeys,
+                               DataType[] fieldTypes,
+                               String[] fieldNames,
+                               Configuration configuration,
+                               String defaultPartitionName) {
+                       this.tablePath = tablePath;
+                       this.hiveShim = hiveShim;
+                       this.confWrapper = confWrapper;
+                       this.partitionKeys = partitionKeys;
+                       this.fieldTypes = fieldTypes;
+                       this.fieldNames = fieldNames;
+                       this.configuration = configuration;
+                       this.defaultPartitionName = defaultPartitionName;
+               }
+
+               @Override
+               public void open() throws Exception {
+                       metaStoreClient = hiveShim.getHiveMetastoreClient(new 
HiveConf(confWrapper.conf(), HiveConf.class));
+                       table = 
metaStoreClient.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+                       tableSd = table.getSd();
+                       tableProps = 
HiveReflectionUtils.getTableMetadata(hiveShim, table);
+
+                       String consumeOrderStr = 
configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+                       consumeOrder = 
ConsumeOrder.getConsumeOrder(consumeOrderStr);
+                       String extractorKind = 
configuration.get(PARTITION_TIME_EXTRACTOR_KIND);
+                       String extractorClass = 
configuration.get(PARTITION_TIME_EXTRACTOR_CLASS);
+                       String extractorPattern = 
configuration.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
+
+                       extractor = PartitionTimeExtractor.create(
+                                       
Thread.currentThread().getContextClassLoader(),
+                                       extractorKind,
+                                       extractorClass,
+                                       extractorPattern);
+                       tableLocation = new Path(table.getSd().getLocation());
+                       fs = tableLocation.getFileSystem(confWrapper.conf());
+               }
+
+               @Override
+               public Optional<HiveTablePartition> getPartition(List<String> 
partValues) throws Exception {
+                       if (partitionKeys.isEmpty()) {
+                               return Optional.empty();
+                       }
+                       try {
+                               Partition partition = 
metaStoreClient.getPartition(
+                                               tablePath.getDatabaseName(),
+                                               tablePath.getObjectName(),
+                                               partValues);
+                               HiveTablePartition hiveTablePartition = 
HivePartitionUtils.toHiveTablePartition(
+                                               partitionKeys,
+                                               fieldNames,
+                                               fieldTypes,
+                                               hiveShim,
+                                               tableProps,
+                                               defaultPartitionName,
+                                               partition);
+                               return Optional.of(hiveTablePartition);
+                       } catch (NoSuchObjectException e) {
+                               return Optional.empty();
+                       }
+               }
+
+               @Override
+               public Optional<HiveTablePartition> 
getNonPartitionedTablePartition() {
+                       if (partitionKeys.isEmpty()) {
+                               return Optional.of(new 
HiveTablePartition(tableSd, tableProps));
+                       }
+                       return Optional.empty();
+               }
+
+               @Override
+               public List<Tuple2<List<String>, Comparable>> 
getPartValueWithComparableObjList() throws Exception {
+                       List<Tuple2<List<String>, Comparable>> partValueList = 
new ArrayList<>();
+                       switch (consumeOrder) {
+                               case PARTITION_NAME_ORDER:
+                                       List<String> partitionNames = 
metaStoreClient.listPartitionNames(
+                                                       
tablePath.getDatabaseName(),
+                                                       
tablePath.getObjectName(),
+                                                       Short.MAX_VALUE);
+                                       for (String partitionName : 
partitionNames) {
+                                               List<String> partValues = 
extractPartitionValues(new org.apache.flink.core.fs.Path(partitionName));
+                                               Comparable comparable = 
partValues.toString();
+                                               partValueList.add(new 
Tuple2<>(partValues, comparable));
+                                       }
+                                       break;
+                               case CREATE_TIME_ORDER:
+                                       FileStatus[] statuses = 
HivePartitionUtils.getFileStatusRecurse(tableLocation, partitionKeys.size(), 
fs);
+                                       for (FileStatus status : statuses) {
+                                               List<String> partValues = 
extractPartitionValues(
+                                                               new 
org.apache.flink.core.fs.Path(status.getPath().toString()));
+                                               Comparable comparable = 
TimestampData.fromTimestamp(new Timestamp(status.getModificationTime()))
+                                                               
.getMillisecond();
+                                               partValueList.add(new 
Tuple2<>(partValues, comparable));
+                                       }
+                                       break;
+                               case PARTITION_TIME_ORDER:
+                                       List<Partition> partitions = 
metaStoreClient.listPartitions(

Review comment:
       `listPartitionNames`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to