Flyangz commented on a change in pull request #3271:
URL: https://github.com/apache/iceberg/pull/3271#discussion_r740192429



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -684,12 +713,139 @@ public void alterPartitionColumnStatistics(ObjectPath 
tablePath, CatalogPartitio
 
   @Override
   public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+      throws TableNotExistException, TableNotPartitionedException, 
CatalogException, PartitionSpecInvalidException {
+    Preconditions.checkNotNull(tablePath, "Table path cannot be null");
+    Preconditions.checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
+
+    Table table = loadIcebergTable(tablePath);
+    ensurePartitionedTable(tablePath, table);
+    Optional<PartitionSpec> matchingPartitionSpec = 
getMatchingPartitionSpec(partitionSpec, table.specs());
+    if (!matchingPartitionSpec.isPresent()) {
+      throw new PartitionSpecInvalidException(getName(),
+          table.specs().values().stream()
+              .map(spec -> spec.fields().stream()
+                  .map(PartitionField::name)
+                  .collect(Collectors.joining(",", "{", "}")))
+              .collect(Collectors.toList()),
+          tablePath, partitionSpec);
+    }
+
+    Types.StructType structType = matchingPartitionSpec.get().partitionType();
+    Expression partitionFilter = Expressions.alwaysTrue();
+    for (Map.Entry<String, String> specEntry : 
partitionSpec.getPartitionSpec().entrySet()) {
+      String specName = specEntry.getKey();
+      String specValue = specEntry.getValue();
+      Type type = structType.fieldType(specName);
+      if (!FILTERS.containsKey(type.getClass())) {
+        throw new CatalogException(String.format("Failed to list partitions of 
table %s. Occur unsupported type %s",
+            tablePath, type));
+      }
+      Expression filter = FILTERS.get(type.getClass()).apply(specName, 
specValue);
+      partitionFilter = Expressions.and(partitionFilter, filter);
+    }
+
+    return getPartitions(table, tablePath, partitionFilter);
+  }
+
+  private List<CatalogPartitionSpec> getPartitions(Table table, ObjectPath 
tablePath, Expression filter)
       throws CatalogException {
-    throw new UnsupportedOperationException();
+    TableScan scan = table.newScan().filter(filter);
+    Map<Integer, PartitionSpec> specs = table.specs();
+    Set<Map<String, Object>> setMap = Sets.newHashSet();
+    try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+      for (DataFile dataFile : CloseableIterable.transform(tasks, 
FileScanTask::file)) {
+        Map<String, Object> map = Maps.newHashMap();
+        StructLike structLike = dataFile.partition();
+        PartitionSpec spec = specs.get(dataFile.specId());
+        for (int i = 0; i < structLike.size(); i++) {
+          String name = spec.fields().get(i).name();
+          map.put(name, getValue(spec.schema().findType(name), i, structLike));
+        }
+
+        setMap.add(map);
+      }
+    } catch (IOException e) {
+      throw new CatalogException(String.format("Failed to list partitions of 
table %s", tablePath), e);
+    }
+
+    List<Map<String, Object>> list = Lists.newArrayList(setMap);
+    sortPartitions(list);
+    List<CatalogPartitionSpec> partitionSpecs = 
Lists.newArrayListWithCapacity(list.size());
+    for (Map<String, Object> partitionMap : list) {
+      Map<String, String> partitions = 
partitionMap.entrySet().stream().collect(Collectors.toMap(
+          Map.Entry::getKey,
+          e -> String.valueOf(e.getValue())
+      ));
+      partitionSpecs.add(new CatalogPartitionSpec(partitions));
+    }
+
+    return partitionSpecs;
+  }
+
+  /**
+   * Sort the partition map by the value
+   *
+   * @param list the partition list
+   */
+  @SuppressWarnings("unchecked")
+  private void sortPartitions(List<Map<String, Object>> list) {
+    list.sort((map1, map2) -> {
+      for (Map.Entry<String, Object> entry : map1.entrySet()) {
+        String key = entry.getKey();
+        Object value1 = entry.getValue();
+        Object value2 = map2.get(key);
+        if (value1 instanceof Comparable && value2 instanceof Comparable && 
!value1.equals(value2)) {
+          return ((Comparable<Object>) value1).compareTo(value2);
+        }
+      }
+
+      return 0;
+    });
+  }
+
+  private Object getValue(Type type, int index, StructLike structLike) {
+    if (type instanceof Types.DateType) {
+      return DateTimeUtil.dateFromDays(structLike.get(index, Integer.class));
+    } else if (type instanceof Types.TimeType) {
+      return DateTimeUtil.timeFromMicros(structLike.get(index, Long.class));
+    } else if (type instanceof Types.TimestampType) {
+      return DateTimeUtil.timestampFromMicros(structLike.get(index, 
Long.class));
+    } else if (type instanceof Types.BinaryType) {
+      return new String(structLike.get(index, ByteBuffer.class).array());

Review comment:
       Here is to make `ByteBuffer` readable(human string) in the final return 
result, instead of something like [B@2cf93603 .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to