zhangjun0x01 commented on a change in pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082#discussion_r556322659
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -655,10 +685,54 @@ public void alterPartitionColumnStatistics(ObjectPath
tablePath, CatalogPartitio
return Lists.newArrayList(set);
}
- @Override
- public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
- throws CatalogException {
- throw new UnsupportedOperationException();
+ private String getValue(Schema schema, String name, int index, StructLike
structLike) {
+ Type type = schema.findType(name);
+ if (type instanceof Types.DateType) {
+ return DateTimeUtil.dateFromDays(structLike.get(index,
Integer.class)).toString();
+ } else if (type instanceof Types.TimeType) {
+ return DateTimeUtil.timeFromMicros(structLike.get(index,
Long.class)).toString();
+ } else if (type instanceof Types.TimestampType) {
+ return DateTimeUtil.timestampFromMicros(structLike.get(index,
Long.class)).toString();
+ } else {
+ return String.valueOf(structLike.get(index, Object.class));
+ }
+ }
+
+ private org.apache.iceberg.expressions.Expression getPartitionFilter(Schema
schema,
+
CatalogPartitionSpec partitionSpec) {
+ Map<String, String> partitions = partitionSpec.getPartitionSpec();
+
+ org.apache.iceberg.expressions.Expression filter =
Expressions.alwaysTrue();
+ for (Map.Entry<String, String> entry : partitions.entrySet()) {
+ String name = entry.getKey();
+ String value = entry.getValue();
+ Type type = schema.findType(entry.getKey());
+ // Long,Map,Struct,List type are not supported by flink,so we do not add
them here.
+ if (type instanceof Types.IntegerType) {
+ filter = Expressions.and(filter, Expressions.equal(name,
Integer.valueOf(value)));
+ } else if (type instanceof Types.StringType) {
+ filter = Expressions.and(filter, Expressions.equal(name, value));
+ } else if (type instanceof Types.DoubleType) {
+ filter = Expressions.and(filter, Expressions.equal(name,
Double.valueOf(value)));
+ } else if (type instanceof Types.FloatType) {
+ filter = Expressions.and(filter, Expressions.equal(name,
Float.valueOf(value)));
+ } else if (type instanceof Types.DateType) {
+ filter =
+ Expressions.and(filter, Expressions.equal(name,
DateTimeUtil.daysFromDate(LocalDate.parse(value))));
+ } else if (type instanceof Types.TimeType) {
+ filter =
+ Expressions.and(filter, Expressions.equal(name,
DateTimeUtil.microsFromTime(LocalTime.parse(value))));
+ } else if (type instanceof Types.TimestampType) {
+ filter = Expressions
+ .and(filter, Expressions.equal(name,
DateTimeUtil.microsFromTimestamp(LocalDateTime.parse(value))));
+ } else if (type instanceof Types.BooleanType) {
+ filter = Expressions.and(filter, Expressions.equal(name,
Boolean.valueOf(value)));
+ } else if (type instanceof Types.DecimalType) {
+ filter = Expressions.and(filter, Expressions.equal(name, new
BigDecimal(value)));
+ }
+ }
Review comment:
> Also, to make it simpler, you could pull the `filter =
Expressions.and(filter,...)` value out and then just have a function that
returns the converted value. This way we don't need to repreat the
`Expressions.and` portions so many times.
yes,I updated it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]