This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d97b94d4b72 [FLINK-27384][hive] Fix the modified partitions are missed
in temporal table with create-time mode
d97b94d4b72 is described below
commit d97b94d4b723aaa403f5849ceaa76f59f4dd3b5a
Author: empcl <[email protected]>
AuthorDate: Fri Jul 29 11:12:26 2022 +0800
[FLINK-27384][hive] Fix the modified partitions are missed in temporal
table with create-time mode
This closes #20376.
---
.../hive/read/HivePartitionFetcherContextBase.java | 23 ++++++----------------
1 file changed, 6 insertions(+), 17 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
index 41e7718e029..25a7471343a 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
@@ -46,7 +46,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.stream.Collectors;
import static
org.apache.flink.connector.file.table.DefaultPartTimeExtractor.toMills;
import static
org.apache.flink.connector.file.table.FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_CLASS;
@@ -75,8 +74,6 @@ public abstract class HivePartitionFetcherContextBase<P>
implements HivePartitio
protected transient Path tableLocation;
private transient PartitionTimeExtractor extractor;
private transient Table table;
- // remember the map from partition to its create time
- private transient Map<List<String>, Long> partValuesToCreateTime;
public HivePartitionFetcherContextBase(
ObjectPath tablePath,
@@ -119,7 +116,6 @@ public abstract class HivePartitionFetcherContextBase<P>
implements HivePartitio
extractorPattern,
formatterPattern);
tableLocation = new Path(table.getSd().getLocation());
- partValuesToCreateTime = new HashMap<>();
}
@Override
@@ -137,22 +133,18 @@ public abstract class HivePartitionFetcherContextBase<P>
implements HivePartitio
}
break;
case CREATE_TIME:
+ Map<List<String>, Long> partValuesToCreateTime = new
HashMap<>();
partitionNames =
metaStoreClient.listPartitionNames(
tablePath.getDatabaseName(),
tablePath.getObjectName(),
Short.MAX_VALUE);
- List<String> newNames =
- partitionNames.stream()
- .filter(
- n ->
-
!partValuesToCreateTime.containsKey(
-
extractPartitionValues(n)))
- .collect(Collectors.toList());
- List<Partition> newPartitions =
+ List<Partition> partitions =
metaStoreClient.getPartitionsByNames(
- tablePath.getDatabaseName(),
tablePath.getObjectName(), newNames);
- for (Partition partition : newPartitions) {
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ partitionNames);
+ for (Partition partition : partitions) {
partValuesToCreateTime.put(
partition.getValues(),
getPartitionCreateTime(partition));
}
@@ -233,9 +225,6 @@ public abstract class HivePartitionFetcherContextBase<P>
implements HivePartitio
@Override
public void close() throws Exception {
- if (partValuesToCreateTime != null) {
- partValuesToCreateTime.clear();
- }
if (this.metaStoreClient != null) {
this.metaStoreClient.close();
}