kbendick commented on a change in pull request #2082:
URL: https://github.com/apache/iceberg/pull/2082#discussion_r556265710



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -655,10 +685,54 @@ public void alterPartitionColumnStatistics(ObjectPath 
tablePath, CatalogPartitio
     return Lists.newArrayList(set);
   }
 
-  @Override
-  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+  private String getValue(Schema schema, String name, int index, StructLike 
structLike) {
+    Type type = schema.findType(name);
+    if (type instanceof Types.DateType) {
+      return DateTimeUtil.dateFromDays(structLike.get(index, 
Integer.class)).toString();
+    } else if (type instanceof Types.TimeType) {
+      return DateTimeUtil.timeFromMicros(structLike.get(index, 
Long.class)).toString();
+    } else if (type instanceof Types.TimestampType) {
+      return DateTimeUtil.timestampFromMicros(structLike.get(index, 
Long.class)).toString();
+    } else {
+      return String.valueOf(structLike.get(index, Object.class));
+    }
+  }
+
+  private org.apache.iceberg.expressions.Expression getPartitionFilter(Schema 
schema,
+                                                                       
CatalogPartitionSpec partitionSpec) {
+    Map<String, String> partitions = partitionSpec.getPartitionSpec();
+
+    org.apache.iceberg.expressions.Expression filter = 
Expressions.alwaysTrue();
+    for (Map.Entry<String, String> entry : partitions.entrySet()) {
+      String name = entry.getKey();
+      String value = entry.getValue();
+      Type type = schema.findType(entry.getKey());

Review comment:
       Nit: `entry.getKey()` is already named here as `name` so consider using 
that instead.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -655,10 +685,54 @@ public void alterPartitionColumnStatistics(ObjectPath 
tablePath, CatalogPartitio
     return Lists.newArrayList(set);
   }
 
-  @Override
-  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+  private String getValue(Schema schema, String name, int index, StructLike 
structLike) {
+    Type type = schema.findType(name);
+    if (type instanceof Types.DateType) {
+      return DateTimeUtil.dateFromDays(structLike.get(index, 
Integer.class)).toString();
+    } else if (type instanceof Types.TimeType) {
+      return DateTimeUtil.timeFromMicros(structLike.get(index, 
Long.class)).toString();
+    } else if (type instanceof Types.TimestampType) {
+      return DateTimeUtil.timestampFromMicros(structLike.get(index, 
Long.class)).toString();
+    } else {
+      return String.valueOf(structLike.get(index, Object.class));
+    }
+  }
+
+  private org.apache.iceberg.expressions.Expression getPartitionFilter(Schema 
schema,
+                                                                       
CatalogPartitionSpec partitionSpec) {
+    Map<String, String> partitions = partitionSpec.getPartitionSpec();
+
+    org.apache.iceberg.expressions.Expression filter = 
Expressions.alwaysTrue();
+    for (Map.Entry<String, String> entry : partitions.entrySet()) {
+      String name = entry.getKey();
+      String value = entry.getValue();
+      Type type = schema.findType(entry.getKey());
+      // Long,Map,Struct,List type are not supported by flink,so we do not add 
them  here.
+      if (type instanceof Types.IntegerType) {
+        filter = Expressions.and(filter, Expressions.equal(name, 
Integer.valueOf(value)));
+      } else if (type instanceof Types.StringType) {
+        filter = Expressions.and(filter, Expressions.equal(name, value));
+      } else if (type instanceof Types.DoubleType) {
+        filter = Expressions.and(filter, Expressions.equal(name, 
Double.valueOf(value)));
+      } else if (type instanceof Types.FloatType) {
+        filter = Expressions.and(filter, Expressions.equal(name, 
Float.valueOf(value)));
+      } else if (type instanceof Types.DateType) {
+        filter =
+            Expressions.and(filter, Expressions.equal(name, 
DateTimeUtil.daysFromDate(LocalDate.parse(value))));
+      } else if (type instanceof Types.TimeType) {
+        filter =
+            Expressions.and(filter, Expressions.equal(name, 
DateTimeUtil.microsFromTime(LocalTime.parse(value))));
+      } else if (type instanceof Types.TimestampType) {
+        filter = Expressions
+            .and(filter, Expressions.equal(name, 
DateTimeUtil.microsFromTimestamp(LocalDateTime.parse(value))));
+      } else if (type instanceof Types.BooleanType) {
+        filter = Expressions.and(filter, Expressions.equal(name, 
Boolean.valueOf(value)));
+      } else if (type instanceof Types.DecimalType) {
+        filter = Expressions.and(filter, Expressions.equal(name, new 
BigDecimal(value)));
+      }
+    }

Review comment:
       Perhaps this could be made simpler by using a `Map<class ? extends Type, 
Function<String, PrimitiveType>>` and then handling the non-PrimitiveType 
values that require special handling using if statements? Inspiration could be 
found in this file, for both the map as well as a way to handle 
`convertLiteral`: 
https://github.com/apache/iceberg/blob/04e73deb7d68e3c4011101384f725abb1aae6236/spark3/src/main/java/org/apache/iceberg/spark/SparkFilters.java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to