luoyuxia commented on code in PR #20422:
URL: https://github.com/apache/flink/pull/20422#discussion_r939763843
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -146,10 +159,14 @@
/** A catalog implementation for Hive. */
public class HiveCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveCatalog.class);
+
+ public static final String HIVE_DEFAULT_PARTITION =
"__HIVE_DEFAULT_PARTITION__";
Review Comment:
The name for Hive default partition is configurable in Hive itself. You can
use
`getHiveConf().getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME)`
to get it.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -1734,6 +1814,152 @@ public CatalogColumnStatistics
getPartitionColumnStatistics(
}
}
+ @Override
+ public List<CatalogColumnStatistics> bulkGetPartitionColumnStatistics(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+
+ checkNotNull(partitionSpecs);
+
+ List<CatalogColumnStatistics> result = new
ArrayList<>(partitionSpecs.size());
+
+ final Tuple3<Table, List<Partition>, List<String>> partitionsTuple3 =
Review Comment:
Split this method may improve readability.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -1699,6 +1716,69 @@ public CatalogTableStatistics getPartitionStatistics(
}
}
+ @Override
+ public List<CatalogTableStatistics> bulkGetPartitionStatistics(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+
+ return getTableAndPartitions(tablePath, partitionSpecs).f1.stream()
+ .map(p -> createCatalogTableStatistics(p.getParameters()))
+ .collect(Collectors.toList());
+ }
+
+ /** @return Partitions and names of partitions. */
+ private Tuple3<Table, List<Partition>, List<String>> getTableAndPartitions(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+
+ List<Partition> partitions = new ArrayList<>(partitionSpecs.size());
+ Tuple2<Table, List<String>> partitionsNamesTuple2 = null;
+
+ try {
+ partitionsNamesTuple2 = getTableAndPartitionsNames(tablePath,
partitionSpecs);
+
+ partitions.addAll(
+ client.getPartitionsByNames(
+ partitionsNamesTuple2.f0.getDbName(),
+ partitionsNamesTuple2.f0.getTableName(),
+ partitionsNamesTuple2.f1));
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to get partition stats of table %s 's
partitions %s",
+ tablePath.getFullName(),
String.valueOf(partitionSpecs)),
Review Comment:
nit:
`String.valueOf` is unnecessary.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -1734,6 +1814,152 @@ public CatalogColumnStatistics
getPartitionColumnStatistics(
}
}
+ @Override
+ public List<CatalogColumnStatistics> bulkGetPartitionColumnStatistics(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+
+ checkNotNull(partitionSpecs);
+
+ List<CatalogColumnStatistics> result = new
ArrayList<>(partitionSpecs.size());
+
+ final Tuple3<Table, List<Partition>, List<String>> partitionsTuple3 =
+ getTableAndPartitions(tablePath, partitionSpecs);
+
+ try {
+
+ final List<FieldSchema> distinctColumns =
Review Comment:
Is it for get all columns of the table? If it's, I think we can use
`partitionsTuple3.f0.getSd().getCols()`
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -1734,6 +1814,152 @@ public CatalogColumnStatistics
getPartitionColumnStatistics(
}
}
+ @Override
+ public List<CatalogColumnStatistics> bulkGetPartitionColumnStatistics(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+
+ checkNotNull(partitionSpecs);
+
+ List<CatalogColumnStatistics> result = new
ArrayList<>(partitionSpecs.size());
+
+ final Tuple3<Table, List<Partition>, List<String>> partitionsTuple3 =
+ getTableAndPartitions(tablePath, partitionSpecs);
+
+ try {
+
+ final List<FieldSchema> distinctColumns =
+ partitionsTuple3.f1.stream()
+ .map(p -> p.getSd().getCols())
+ .flatMap(Collection::stream)
+ .distinct()
+ .collect(Collectors.toList());
+
+ Map<String, List<ColumnStatisticsObj>> partitionColumnStatistics =
+ client.getPartitionColumnStatistics(
+ partitionsTuple3.f1.get(0).getDbName(),
+ partitionsTuple3.f1.get(0).getTableName(),
+ partitionsTuple3.f2,
+ getFieldNames(distinctColumns));
+
+ for (String partitionName : partitionsTuple3.f2) {
+ List<ColumnStatisticsObj> columnStatisticsObjs =
+ partitionColumnStatistics.get(partitionName);
+ if (columnStatisticsObjs != null &&
!columnStatisticsObjs.isEmpty()) {
+ result.add(
+ new CatalogColumnStatistics(
+ HiveStatsUtil.createCatalogColumnStats(
+ columnStatisticsObjs,
hiveVersion)));
+ } else {
+ result.add(CatalogColumnStatistics.UNKNOWN);
+ }
+ }
+
+ result.add(
Review Comment:
A little confused about this line. Is it for add column statistic for the
partition columns? If so, why only add one element?
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -1734,6 +1814,152 @@ public CatalogColumnStatistics
getPartitionColumnStatistics(
}
}
+ @Override
+ public List<CatalogColumnStatistics> bulkGetPartitionColumnStatistics(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+
+ checkNotNull(partitionSpecs);
+
+ List<CatalogColumnStatistics> result = new
ArrayList<>(partitionSpecs.size());
+
+ final Tuple3<Table, List<Partition>, List<String>> partitionsTuple3 =
+ getTableAndPartitions(tablePath, partitionSpecs);
+
+ try {
+
+ final List<FieldSchema> distinctColumns =
+ partitionsTuple3.f1.stream()
+ .map(p -> p.getSd().getCols())
+ .flatMap(Collection::stream)
+ .distinct()
+ .collect(Collectors.toList());
+
+ Map<String, List<ColumnStatisticsObj>> partitionColumnStatistics =
+ client.getPartitionColumnStatistics(
+ partitionsTuple3.f1.get(0).getDbName(),
+ partitionsTuple3.f1.get(0).getTableName(),
+ partitionsTuple3.f2,
+ getFieldNames(distinctColumns));
+
+ for (String partitionName : partitionsTuple3.f2) {
+ List<ColumnStatisticsObj> columnStatisticsObjs =
+ partitionColumnStatistics.get(partitionName);
+ if (columnStatisticsObjs != null &&
!columnStatisticsObjs.isEmpty()) {
+ result.add(
+ new CatalogColumnStatistics(
+ HiveStatsUtil.createCatalogColumnStats(
+ columnStatisticsObjs,
hiveVersion)));
+ } else {
+ result.add(CatalogColumnStatistics.UNKNOWN);
+ }
+ }
+
+ result.add(
+ buildPartitionedByColumnSatistics(
+ tablePath,
+ partitionsTuple3
+ .f0
+ .getPartitionKeys()
+
.get(partitionsTuple3.f0.getPartitionKeys().size() - 1),
+ partitionsTuple3.f2));
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to get table stats of table %s 's
partitions %s",
+ tablePath.getFullName(),
String.valueOf(partitionSpecs)),
+ e);
+ }
+
+ return result;
+ }
+
+ private CatalogColumnStatistics buildPartitionedByColumnSatistics(
+ ObjectPath tablePath, FieldSchema partitionedByColumn,
List<String> partitionNames) {
+
+ Long nullCount = null;
+ Long ndv = HiveStatsUtil.max(0L, Long.valueOf(partitionNames.size()));
+
+ CatalogColumnStatisticDataBuilder<? extends
CatalogColumnStatisticsDataBase> builder = null;
+
+ final CatalogColumnStatisticsDataLong.Builder longBuilder =
+ CatalogColumnStatisticsDataLong.builder();
+ longBuilder.ndv(ndv);
+ final CatalogColumnStatisticsDataDate.Builder dateBuilder =
+ CatalogColumnStatisticsDataDate.builder();
+ dateBuilder.ndv(ndv);
+ final CatalogColumnStatisticsDataDouble.Builder doubleBuilder =
+ CatalogColumnStatisticsDataDouble.builder();
+ doubleBuilder.ndv(ndv);
+ final CatalogColumnStatisticsDataString.Builder stringBuilder =
+ CatalogColumnStatisticsDataString.builder();
+ stringBuilder.ndv(ndv);
+ final CatalogColumnStatisticsDataBoolean.Builder booleanBuilder =
+ CatalogColumnStatisticsDataBoolean.builder();
+ final CatalogColumnStatisticsDataBinary.Builder binaryBuilder =
+ CatalogColumnStatisticsDataBinary.builder();
+
+ for (String partitionName : partitionNames) {
+ String[] partName = partitionName.split(Path.SEPARATOR);
+ String[] partVal = partName[partName.length - 1].split("=");
Review Comment:
why always choose the last partName?
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -1734,6 +1814,152 @@ public CatalogColumnStatistics
getPartitionColumnStatistics(
}
}
+ @Override
+ public List<CatalogColumnStatistics> bulkGetPartitionColumnStatistics(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+
+ checkNotNull(partitionSpecs);
+
+ List<CatalogColumnStatistics> result = new
ArrayList<>(partitionSpecs.size());
+
+ final Tuple3<Table, List<Partition>, List<String>> partitionsTuple3 =
+ getTableAndPartitions(tablePath, partitionSpecs);
+
+ try {
+
+ final List<FieldSchema> distinctColumns =
+ partitionsTuple3.f1.stream()
+ .map(p -> p.getSd().getCols())
+ .flatMap(Collection::stream)
+ .distinct()
+ .collect(Collectors.toList());
+
+ Map<String, List<ColumnStatisticsObj>> partitionColumnStatistics =
+ client.getPartitionColumnStatistics(
+ partitionsTuple3.f1.get(0).getDbName(),
+ partitionsTuple3.f1.get(0).getTableName(),
+ partitionsTuple3.f2,
+ getFieldNames(distinctColumns));
+
+ for (String partitionName : partitionsTuple3.f2) {
+ List<ColumnStatisticsObj> columnStatisticsObjs =
+ partitionColumnStatistics.get(partitionName);
+ if (columnStatisticsObjs != null &&
!columnStatisticsObjs.isEmpty()) {
+ result.add(
+ new CatalogColumnStatistics(
+ HiveStatsUtil.createCatalogColumnStats(
+ columnStatisticsObjs,
hiveVersion)));
+ } else {
+ result.add(CatalogColumnStatistics.UNKNOWN);
+ }
+ }
+
+ result.add(
+ buildPartitionedByColumnSatistics(
+ tablePath,
+ partitionsTuple3
+ .f0
+ .getPartitionKeys()
+
.get(partitionsTuple3.f0.getPartitionKeys().size() - 1),
+ partitionsTuple3.f2));
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to get table stats of table %s 's
partitions %s",
+ tablePath.getFullName(),
String.valueOf(partitionSpecs)),
+ e);
+ }
+
+ return result;
+ }
+
+ private CatalogColumnStatistics buildPartitionedByColumnSatistics(
+ ObjectPath tablePath, FieldSchema partitionedByColumn,
List<String> partitionNames) {
+
+ Long nullCount = null;
+ Long ndv = HiveStatsUtil.max(0L, Long.valueOf(partitionNames.size()));
Review Comment:
`Long.valueOf(partitionNames.size())` can be reduced to `(long)
partitionNames.size()`.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -1734,6 +1814,152 @@ public CatalogColumnStatistics
getPartitionColumnStatistics(
}
}
+ @Override
+ public List<CatalogColumnStatistics> bulkGetPartitionColumnStatistics(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+
+ checkNotNull(partitionSpecs);
+
+ List<CatalogColumnStatistics> result = new
ArrayList<>(partitionSpecs.size());
+
+ final Tuple3<Table, List<Partition>, List<String>> partitionsTuple3 =
+ getTableAndPartitions(tablePath, partitionSpecs);
+
+ try {
+
+ final List<FieldSchema> distinctColumns =
+ partitionsTuple3.f1.stream()
+ .map(p -> p.getSd().getCols())
+ .flatMap(Collection::stream)
+ .distinct()
+ .collect(Collectors.toList());
+
+ Map<String, List<ColumnStatisticsObj>> partitionColumnStatistics =
+ client.getPartitionColumnStatistics(
+ partitionsTuple3.f1.get(0).getDbName(),
+ partitionsTuple3.f1.get(0).getTableName(),
+ partitionsTuple3.f2,
+ getFieldNames(distinctColumns));
+
+ for (String partitionName : partitionsTuple3.f2) {
+ List<ColumnStatisticsObj> columnStatisticsObjs =
+ partitionColumnStatistics.get(partitionName);
+ if (columnStatisticsObjs != null &&
!columnStatisticsObjs.isEmpty()) {
+ result.add(
+ new CatalogColumnStatistics(
+ HiveStatsUtil.createCatalogColumnStats(
+ columnStatisticsObjs,
hiveVersion)));
+ } else {
+ result.add(CatalogColumnStatistics.UNKNOWN);
+ }
+ }
+
+ result.add(
+ buildPartitionedByColumnSatistics(
+ tablePath,
+ partitionsTuple3
+ .f0
+ .getPartitionKeys()
+
.get(partitionsTuple3.f0.getPartitionKeys().size() - 1),
+ partitionsTuple3.f2));
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to get table stats of table %s 's
partitions %s",
+ tablePath.getFullName(),
String.valueOf(partitionSpecs)),
+ e);
+ }
+
+ return result;
+ }
+
+ private CatalogColumnStatistics buildPartitionedByColumnSatistics(
+ ObjectPath tablePath, FieldSchema partitionedByColumn,
List<String> partitionNames) {
+
+ Long nullCount = null;
+ Long ndv = HiveStatsUtil.max(0L, Long.valueOf(partitionNames.size()));
+
+ CatalogColumnStatisticDataBuilder<? extends
CatalogColumnStatisticsDataBase> builder = null;
+
+ final CatalogColumnStatisticsDataLong.Builder longBuilder =
+ CatalogColumnStatisticsDataLong.builder();
+ longBuilder.ndv(ndv);
+ final CatalogColumnStatisticsDataDate.Builder dateBuilder =
+ CatalogColumnStatisticsDataDate.builder();
+ dateBuilder.ndv(ndv);
+ final CatalogColumnStatisticsDataDouble.Builder doubleBuilder =
+ CatalogColumnStatisticsDataDouble.builder();
+ doubleBuilder.ndv(ndv);
+ final CatalogColumnStatisticsDataString.Builder stringBuilder =
+ CatalogColumnStatisticsDataString.builder();
+ stringBuilder.ndv(ndv);
+ final CatalogColumnStatisticsDataBoolean.Builder booleanBuilder =
+ CatalogColumnStatisticsDataBoolean.builder();
+ final CatalogColumnStatisticsDataBinary.Builder binaryBuilder =
+ CatalogColumnStatisticsDataBinary.builder();
+
+ for (String partitionName : partitionNames) {
+ String[] partName = partitionName.split(Path.SEPARATOR);
+ String[] partVal = partName[partName.length - 1].split("=");
+ String value = partVal[1];
+ if (value.equals(HIVE_DEFAULT_PARTITION)) {
+ try {
+ nullCount =
+ getPartitionStatistics(
+ tablePath,
+ new CatalogPartitionSpec(
+ Collections.singletonMap(
+
partitionedByColumn.getName(),
+
HIVE_DEFAULT_PARTITION)))
+ .getRowCount();
+ } catch (Exception e) {
+ LOG.info(
+ "Fail to get getPartitionColumnStatistics for
partition {}={}",
+ partitionedByColumn.getName(),
+ HIVE_DEFAULT_PARTITION);
+ }
+ } else {
+ final String type = partitionedByColumn.getType();
+ if ("int".equalsIgnoreCase(type)) {
Review Comment:
Avoid to use "int" directly. May be we can do the check using LogicalType
like `HiveStatsUtil#getColumnStatisticsData`
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java:
##########
@@ -54,4 +54,45 @@ public CatalogColumnStatisticsDataBoolean copy() {
return new CatalogColumnStatisticsDataBoolean(
trueCount, falseCount, getNullCount(), new
HashMap<>(getProperties()));
}
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** {@link CatalogColumnStatisticsDataBoolean} builder static inner class.
*/
+ public static final class Builder
+ implements
CatalogColumnStatisticDataBuilder<CatalogColumnStatisticsDataBoolean> {
+ private Long nullCount;
+ private Map<String, String> properties;
+ private Long trueCount = 0L;
+ private Long falseCount = 0L;
+
+ private Builder() {}
+
+ public Builder nullCount(Long nullCount) {
+ this.nullCount = nullCount;
+ return this;
+ }
+
+ public Builder properties(Map<String, String> properties) {
+ this.properties = properties;
+ return this;
+ }
+
+ public Builder increasTrueCount(Long trueCount) {
Review Comment:
nit:
```suggestion
public Builder increaseTrueCount(Long trueCount) {
```
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -1734,6 +1814,152 @@ public CatalogColumnStatistics
getPartitionColumnStatistics(
}
}
+ @Override
+ public List<CatalogColumnStatistics> bulkGetPartitionColumnStatistics(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+
+ checkNotNull(partitionSpecs);
+
+ List<CatalogColumnStatistics> result = new
ArrayList<>(partitionSpecs.size());
+
+ final Tuple3<Table, List<Partition>, List<String>> partitionsTuple3 =
+ getTableAndPartitions(tablePath, partitionSpecs);
+
+ try {
+
+ final List<FieldSchema> distinctColumns =
Review Comment:
If it's for get partition columns, we can use
`partitionsTuple3.f0.getPartitionKeys();`
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/StatisticDataUtils.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.stats;
+
+/** Utils class for catalog column statistics. */
+public class StatisticDataUtils {
+
+ public static Double acculmulateAverage(
Review Comment:
nit:
```suggestion
public static Double accumulateAverage(
```
##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java:
##########
@@ -240,6 +246,84 @@ void testAlterPartitionColumnStatistics() throws Exception
{
catalog.getPartitionColumnStatistics(path1, partitionSpec));
}
+ private ResolvedSchema createSchemaForStatistics() {
+ return new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("first", DataTypes.STRING()),
+ Column.physical("four", DataTypes.BOOLEAN()),
+ Column.physical("five", DataTypes.DOUBLE()),
+ Column.physical("second", DataTypes.DATE()),
+ Column.physical("third", DataTypes.INT())),
+ Collections.emptyList(),
+ null);
+ }
+
+ @Test
+ public void testGetTableAndColumnStatistics() throws Exception {
+
+ // create a test table
+ catalog.createDatabase(db1, createDb(), false);
+ final ResolvedSchema resolvedSchema = createSchemaForStatistics();
+ final CatalogTable origin =
+ CatalogTable.of(
+
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+ TEST_COMMENT,
+ createPartitionKeys(),
+ getBatchTableProperties());
+ CatalogTable catalogTable = new ResolvedCatalogTable(origin,
resolvedSchema);
+
+ catalog.createTable(path1, catalogTable, false);
+
+ int second = 29;
+ final int partitionFileSize = 20;
+ final String partitionName = "third";
+
+ List<Map<String, String>> partitionSpecs = new ArrayList<>();
+ Map<String, String> partitionSpec = null;
+
+ while (second-- > 10) {
+ int third = 20;
+ while (third-- > 0) {
+ partitionSpec = new HashMap<String, String>();
+ partitionSpec.put("second", "2010-04-" + second + " 00:00:00");
+ partitionSpec.put(partitionName, Integer.toString(third));
+ createPartition(partitionSpec);
+ partitionSpecs.add(partitionSpec);
+ }
+ }
+
+ final List<CatalogPartitionSpec> catalogPartitionSpecList =
+ partitionSpecs.stream()
+ .map(p -> new CatalogPartitionSpec(p))
+ .collect(Collectors.toList());
+
+ final List<CatalogTableStatistics> partitionStatistics =
+ catalog.bulkGetPartitionStatistics(path1,
catalogPartitionSpecList);
+
assertThat(partitionStatistics.size()).isEqualTo(partitionSpecs.size());
+
+ final List<CatalogColumnStatistics> tableColumnStatistics =
+ catalog.bulkGetPartitionColumnStatistics(path1,
catalogPartitionSpecList);
+
+
assertThat(tableColumnStatistics.size()).isEqualTo(catalogPartitionSpecList.size()
+ 1);
+
+ final CatalogColumnStatistics catalogColumnStatistics =
+ tableColumnStatistics.get(tableColumnStatistics.size() - 1);
+ CatalogColumnStatisticsDataLong columnStatisticsDataLong =
+ (CatalogColumnStatisticsDataLong)
+
catalogColumnStatistics.getColumnStatisticsData().get(partitionName);
+
+
assertThat(columnStatisticsDataLong.getNdv()).isEqualTo(catalogPartitionSpecList.size());
+ assertThat(columnStatisticsDataLong.getMin()).isEqualTo(0);
+
assertThat(columnStatisticsDataLong.getMax()).isEqualTo(partitionFileSize - 1);
Review Comment:
+1
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -1699,6 +1716,69 @@ public CatalogTableStatistics getPartitionStatistics(
}
}
+ @Override
+ public List<CatalogTableStatistics> bulkGetPartitionStatistics(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+
+ return getTableAndPartitions(tablePath, partitionSpecs).f1.stream()
+ .map(p -> createCatalogTableStatistics(p.getParameters()))
+ .collect(Collectors.toList());
+ }
+
+ /** @return Partitions and names of partitions. */
+ private Tuple3<Table, List<Partition>, List<String>> getTableAndPartitions(
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]