This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f9ba8ca06 [flink] add SHOW PARTITIONS for paimon (#2146)
f9ba8ca06 is described below
commit f9ba8ca064cbdf9f5507e0d791ab4857375ea5c5
Author: JunZhang <[email protected]>
AuthorDate: Wed Oct 18 15:14:28 2023 +0800
[flink] add SHOW PARTITIONS for paimon (#2146)
---
.../java/org/apache/paimon/flink/FlinkCatalog.java | 82 ++++++++++++++++++++--
.../apache/paimon/flink/CatalogTableITCase.java | 42 +++++++++++
2 files changed, 120 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 53916ce89..260b90a5f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -18,8 +18,10 @@
package org.apache.paimon.flink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
@@ -27,6 +29,11 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RowDataPartitionComputer;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.table.api.TableColumn;
@@ -66,6 +73,7 @@ import
org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -75,10 +83,13 @@ import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -726,14 +737,77 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public final List<CatalogPartitionSpec> listPartitions(ObjectPath
tablePath)
- throws CatalogException {
- return Collections.emptyList();
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ return getPartitionSpecs(tablePath, null);
+ }
+
+ private List<CatalogPartitionSpec> getPartitionSpecs(
+ ObjectPath tablePath, @Nullable CatalogPartitionSpec partitionSpec)
+ throws CatalogException, TableNotPartitionedException,
TableNotExistException {
+ Identifier identifier = toIdentifier(tablePath);
+ try {
+ Table table = catalog.getTable(identifier);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+ if (fileStoreTable.partitionKeys() == null
+ || fileStoreTable.partitionKeys().size() == 0) {
+ throw new TableNotPartitionedException(getName(), tablePath);
+ }
+
+ List<Split> splits =
table.newReadBuilder().newScan().plan().splits();
+ List<BinaryRow> partitions =
+ splits.stream()
+ .map(m -> ((DataSplit) m).partition())
+ .collect(Collectors.toList());
+ org.apache.paimon.types.RowType partitionRowType =
+ fileStoreTable.schema().logicalPartitionType();
+
+ RowDataPartitionComputer partitionComputer =
+ FileStorePathFactory.getPartitionComputer(
+ partitionRowType,
+ new
CoreOptions(table.options()).partitionDefaultName());
+
+ return partitions.stream()
+ .map(
+ m -> {
+ LinkedHashMap<String, String> partValues =
+ partitionComputer.generatePartValues(
+ Preconditions.checkNotNull(
+ m,
+ "Partition row data is
null. This is unexpected."));
+ if (partitionSpec != null
+ && partitionSpec.getPartitionSpec() !=
null) {
+ boolean match = true;
+ for (Map.Entry<String, String>
specMapEntry :
+
partitionSpec.getPartitionSpec().entrySet()) {
+ String key = specMapEntry.getKey();
+ match =
+ match &
partValues.containsKey(key)
+ && partValues
+ .get(key)
+
.contains(specMapEntry.getValue());
+ }
+ if (match) {
+ return new
CatalogPartitionSpec(partValues);
+ }
+
+ return null;
+ } else {
+ return new
CatalogPartitionSpec(partValues);
+ }
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ } catch (Catalog.TableNotExistException e) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
}
@Override
public final List<CatalogPartitionSpec> listPartitions(
- ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws
CatalogException {
- return Collections.emptyList();
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ return getPartitionSpecs(tablePath, partitionSpec);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index bc9e773aa..7a54260f9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -29,6 +29,7 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
@@ -409,6 +410,47 @@ public class CatalogTableITCase extends CatalogITCaseBase {
"Can not set the write-mode to append-only and
changelog-producer at the same time.");
}
+ @Test
+ public void testShowPartitions() {
+ sql(
+ "CREATE TABLE NoPartitionTable (\n"
+ + " user_id BIGINT,\n"
+ + " item_id BIGINT,\n"
+ + " behavior STRING,\n"
+ + " dt STRING,\n"
+ + " hh STRING,\n"
+ + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+ + ")");
+ assertThatThrownBy(() -> sql("SHOW PARTITIONS NoPartitionTable"))
+ .getRootCause()
+ .isInstanceOf(TableNotPartitionedException.class)
+ .hasMessage("Table default.NoPartitionTable in catalog PAIMON
is not partitioned.");
+
+ sql(
+ "CREATE TABLE PartitionTable (\n"
+ + " user_id BIGINT,\n"
+ + " item_id BIGINT,\n"
+ + " behavior STRING,\n"
+ + " dt STRING,\n"
+ + " hh STRING,\n"
+ + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+ + ") PARTITIONED BY (dt, hh)");
+ sql("INSERT INTO PartitionTable select 1,1,'a','2020-01-01','10'");
+ sql("INSERT INTO PartitionTable select 2,2,'b','2020-01-02','11'");
+ sql("INSERT INTO PartitionTable select 3,3,'c','2020-01-03','11'");
+ List<Row> result = sql("SHOW PARTITIONS PartitionTable");
+ assertThat(result.toString())
+ .isEqualTo(
+ "[+I[dt=2020-01-01/hh=10], +I[dt=2020-01-02/hh=11],
+I[dt=2020-01-03/hh=11]]");
+
+ result = sql("SHOW PARTITIONS PartitionTable partition (hh='11')");
+ assertThat(result.toString())
+ .isEqualTo("[+I[dt=2020-01-02/hh=11],
+I[dt=2020-01-03/hh=11]]");
+
+ result = sql("SHOW PARTITIONS PartitionTable partition
(dt='2020-01-02', hh='11')");
+ assertThat(result.toString()).isEqualTo("[+I[dt=2020-01-02/hh=11]]");
+ }
+
@Test
public void testChangelogProducerOnAppendOnlyTable() {
assertThatThrownBy(