openinx commented on a change in pull request #3271: URL: https://github.com/apache/iceberg/pull/3271#discussion_r740047693
########## File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java ########## @@ -684,12 +713,139 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio @Override public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, CatalogException, PartitionSpecInvalidException { + Preconditions.checkNotNull(tablePath, "Table path cannot be null"); + Preconditions.checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); + + Table table = loadIcebergTable(tablePath); + ensurePartitionedTable(tablePath, table); + Optional<PartitionSpec> matchingPartitionSpec = getMatchingPartitionSpec(partitionSpec, table.specs()); + if (!matchingPartitionSpec.isPresent()) { + throw new PartitionSpecInvalidException(getName(), + table.specs().values().stream() + .map(spec -> spec.fields().stream() + .map(PartitionField::name) + .collect(Collectors.joining(",", "{", "}"))) + .collect(Collectors.toList()), + tablePath, partitionSpec); + } + + Types.StructType structType = matchingPartitionSpec.get().partitionType(); + Expression partitionFilter = Expressions.alwaysTrue(); + for (Map.Entry<String, String> specEntry : partitionSpec.getPartitionSpec().entrySet()) { + String specName = specEntry.getKey(); + String specValue = specEntry.getValue(); + Type type = structType.fieldType(specName); + if (!FILTERS.containsKey(type.getClass())) { + throw new CatalogException(String.format("Failed to list partitions of table %s. Occur unsupported type %s", + tablePath, type)); + } + Expression filter = FILTERS.get(type.getClass()).apply(specName, specValue); + partitionFilter = Expressions.and(partitionFilter, filter); + } + + return getPartitions(table, tablePath, partitionFilter); + } + + private List<CatalogPartitionSpec> getPartitions(Table table, ObjectPath tablePath, Expression filter) throws CatalogException { - throw new UnsupportedOperationException(); + TableScan scan = table.newScan().filter(filter); + Map<Integer, PartitionSpec> specs = table.specs(); + Set<Map<String, Object>> setMap = Sets.newHashSet(); + try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) { + for (DataFile dataFile : CloseableIterable.transform(tasks, FileScanTask::file)) { + Map<String, Object> map = Maps.newHashMap(); + StructLike structLike = dataFile.partition(); + PartitionSpec spec = specs.get(dataFile.specId()); + for (int i = 0; i < structLike.size(); i++) { + String name = spec.fields().get(i).name(); + map.put(name, getValue(spec.schema().findType(name), i, structLike)); + } + + setMap.add(map); + } + } catch (IOException e) { + throw new CatalogException(String.format("Failed to list partitions of table %s", tablePath), e); + } + + List<Map<String, Object>> list = Lists.newArrayList(setMap); + sortPartitions(list); + List<CatalogPartitionSpec> partitionSpecs = Lists.newArrayListWithCapacity(list.size()); + for (Map<String, Object> partitionMap : list) { + Map<String, String> partitions = partitionMap.entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + e -> String.valueOf(e.getValue()) + )); + partitionSpecs.add(new CatalogPartitionSpec(partitions)); + } + + return partitionSpecs; + } + + /** + * Sort the partition map by the value + * + * @param list the partition list + */ + @SuppressWarnings("unchecked") + private void sortPartitions(List<Map<String, Object>> list) { + list.sort((map1, map2) -> { + for (Map.Entry<String, Object> entry : map1.entrySet()) { + String key = entry.getKey(); + Object value1 = entry.getValue(); + Object value2 = map2.get(key); + if (value1 instanceof Comparable && value2 instanceof Comparable && !value1.equals(value2)) { + return ((Comparable<Object>) value1).compareTo(value2); + } + } + + return 0; + }); + } + + private Object getValue(Type type, int index, StructLike structLike) { + if (type instanceof Types.DateType) { + return DateTimeUtil.dateFromDays(structLike.get(index, Integer.class)); + } else if (type instanceof Types.TimeType) { + return DateTimeUtil.timeFromMicros(structLike.get(index, Long.class)); + } else if (type instanceof Types.TimestampType) { + return DateTimeUtil.timestampFromMicros(structLike.get(index, Long.class)); + } else if (type instanceof Types.BinaryType) { + return new String(structLike.get(index, ByteBuffer.class).array()); Review comment: Encoding the binary into a String ? Should we escape the byte[] ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org