This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f9ba8ca06 [flink] add SHOW PARTITIONS for paimon (#2146)
f9ba8ca06 is described below

commit f9ba8ca064cbdf9f5507e0d791ab4857375ea5c5
Author: JunZhang <[email protected]>
AuthorDate: Wed Oct 18 15:14:28 2023 +0800

    [flink] add SHOW PARTITIONS for paimon (#2146)
---
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 82 ++++++++++++++++++++--
 .../apache/paimon/flink/CatalogTableITCase.java    | 42 +++++++++++
 2 files changed, 120 insertions(+), 4 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 53916ce89..260b90a5f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -18,8 +18,10 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.procedure.ProcedureUtil;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
@@ -27,6 +29,11 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RowDataPartitionComputer;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.table.api.TableColumn;
@@ -66,6 +73,7 @@ import 
org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -75,10 +83,13 @@ import org.apache.flink.table.procedures.Procedure;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -726,14 +737,77 @@ public class FlinkCatalog extends AbstractCatalog {
 
     @Override
     public final List<CatalogPartitionSpec> listPartitions(ObjectPath 
tablePath)
-            throws CatalogException {
-        return Collections.emptyList();
+            throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+        return getPartitionSpecs(tablePath, null);
+    }
+
+    private List<CatalogPartitionSpec> getPartitionSpecs(
+            ObjectPath tablePath, @Nullable CatalogPartitionSpec partitionSpec)
+            throws CatalogException, TableNotPartitionedException, 
TableNotExistException {
+        Identifier identifier = toIdentifier(tablePath);
+        try {
+            Table table = catalog.getTable(identifier);
+            FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+            if (fileStoreTable.partitionKeys() == null
+                    || fileStoreTable.partitionKeys().size() == 0) {
+                throw new TableNotPartitionedException(getName(), tablePath);
+            }
+
+            List<Split> splits = 
table.newReadBuilder().newScan().plan().splits();
+            List<BinaryRow> partitions =
+                    splits.stream()
+                            .map(m -> ((DataSplit) m).partition())
+                            .collect(Collectors.toList());
+            org.apache.paimon.types.RowType partitionRowType =
+                    fileStoreTable.schema().logicalPartitionType();
+
+            RowDataPartitionComputer partitionComputer =
+                    FileStorePathFactory.getPartitionComputer(
+                            partitionRowType,
+                            new 
CoreOptions(table.options()).partitionDefaultName());
+
+            return partitions.stream()
+                    .map(
+                            m -> {
+                                LinkedHashMap<String, String> partValues =
+                                        partitionComputer.generatePartValues(
+                                                Preconditions.checkNotNull(
+                                                        m,
+                                                        "Partition row data is 
null. This is unexpected."));
+                                if (partitionSpec != null
+                                        && partitionSpec.getPartitionSpec() != 
null) {
+                                    boolean match = true;
+                                    for (Map.Entry<String, String> 
specMapEntry :
+                                            
partitionSpec.getPartitionSpec().entrySet()) {
+                                        String key = specMapEntry.getKey();
+                                        match =
+                                                match & 
partValues.containsKey(key)
+                                                        && partValues
+                                                                .get(key)
+                                                                
.contains(specMapEntry.getValue());
+                                    }
+                                    if (match) {
+                                        return new 
CatalogPartitionSpec(partValues);
+                                    }
+
+                                    return null;
+                                } else {
+                                    return new 
CatalogPartitionSpec(partValues);
+                                }
+                            })
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+        } catch (Catalog.TableNotExistException e) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
     }
 
     @Override
     public final List<CatalogPartitionSpec> listPartitions(
-            ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws 
CatalogException {
-        return Collections.emptyList();
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+        return getPartitionSpecs(tablePath, partitionSpec);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index bc9e773aa..7a54260f9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -29,6 +29,7 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
@@ -409,6 +410,47 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                         "Can not set the write-mode to append-only and 
changelog-producer at the same time.");
     }
 
+    @Test
+    public void testShowPartitions() {
+        sql(
+                "CREATE TABLE NoPartitionTable (\n"
+                        + "    user_id BIGINT,\n"
+                        + "    item_id BIGINT,\n"
+                        + "    behavior STRING,\n"
+                        + "    dt STRING,\n"
+                        + "    hh STRING,\n"
+                        + "    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+                        + ")");
+        assertThatThrownBy(() -> sql("SHOW PARTITIONS NoPartitionTable"))
+                .getRootCause()
+                .isInstanceOf(TableNotPartitionedException.class)
+                .hasMessage("Table default.NoPartitionTable in catalog PAIMON 
is not partitioned.");
+
+        sql(
+                "CREATE TABLE PartitionTable (\n"
+                        + "    user_id BIGINT,\n"
+                        + "    item_id BIGINT,\n"
+                        + "    behavior STRING,\n"
+                        + "    dt STRING,\n"
+                        + "    hh STRING,\n"
+                        + "    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (dt, hh)");
+        sql("INSERT INTO PartitionTable select 1,1,'a','2020-01-01','10'");
+        sql("INSERT INTO PartitionTable select 2,2,'b','2020-01-02','11'");
+        sql("INSERT INTO PartitionTable select 3,3,'c','2020-01-03','11'");
+        List<Row> result = sql("SHOW PARTITIONS PartitionTable");
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[dt=2020-01-01/hh=10], +I[dt=2020-01-02/hh=11], 
+I[dt=2020-01-03/hh=11]]");
+
+        result = sql("SHOW PARTITIONS PartitionTable partition (hh='11')");
+        assertThat(result.toString())
+                .isEqualTo("[+I[dt=2020-01-02/hh=11], 
+I[dt=2020-01-03/hh=11]]");
+
+        result = sql("SHOW PARTITIONS PartitionTable partition 
(dt='2020-01-02', hh='11')");
+        assertThat(result.toString()).isEqualTo("[+I[dt=2020-01-02/hh=11]]");
+    }
+
     @Test
     public void testChangelogProducerOnAppendOnlyTable() {
         assertThatThrownBy(

Reply via email to