wombatu-kun commented on code in PR #16647:
URL: https://github.com/apache/iceberg/pull/16647#discussion_r3345602827


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -724,8 +729,84 @@ public void createPartition(
   @Override
   public void dropPartition(
       ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean 
ignoreIfNotExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+      throws PartitionNotExistException, CatalogException {
+    Table icebergTable;
+    try {
+      icebergTable = loadIcebergTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (ignoreIfNotExists) {
+        return;
+      }
+      throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec, e);
+    }
+
+    if (icebergTable.spec().isUnpartitioned()) {
+      if (ignoreIfNotExists) {
+        return;
+      }
+      throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec);
+    }
+
+    org.apache.iceberg.expressions.Expression filter =
+        buildPartitionRowFilter(icebergTable, partitionSpec);
+
+    if (!ignoreIfNotExists && !partitionHasFiles(icebergTable, filter, 
tablePath)) {
+      throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec);
+    }
+
+    icebergTable.newDelete().deleteFromRowFilter(filter).commit();
+  }
+
+  /**
+   * Build a row filter from a Flink partition spec. The filter is the 
conjunction of {@code col =
+   * value} predicates over the table's partition source columns. {@link
+   * 
org.apache.iceberg.DeleteFiles#deleteFromRowFilter(org.apache.iceberg.expressions.Expression)}
+   * applies a strict-projection check so that only files whose every row 
matches are dropped, which
+   * makes this safe across non-identity transforms and partition-spec 
evolution.
+   */
+  private static org.apache.iceberg.expressions.Expression 
buildPartitionRowFilter(
+      Table table, CatalogPartitionSpec partitionSpec) {
+    Schema schema = table.schema();
+    Set<String> expectedColumns = expectedPartitionColumns(table.spec(), 
schema);
+    Map<String, String> values = partitionSpec.getPartitionSpec();
+
+    Preconditions.checkArgument(
+        values.keySet().equals(expectedColumns),
+        "DROP PARTITION requires values for partition columns %s but got %s",
+        expectedColumns,
+        values.keySet());
+
+    org.apache.iceberg.expressions.Expression filter = 
Expressions.alwaysTrue();
+    for (Map.Entry<String, String> entry : values.entrySet()) {
+      Types.NestedField field = schema.findField(entry.getKey());
+      Object icebergValue = Conversions.fromPartitionString(field.type(), 
entry.getValue());

Review Comment:
   The added tests only exercise VARCHAR partition columns. The linked issue's 
headline example is `DROP PARTITION (id = 0)` (INT); this code does handle INT 
via `Conversions.fromPartitionString`, but there is no test for INT or any 
other non-string type, so those parse paths are unexercised. Also, 
`fromPartitionString` only covers 
BOOLEAN/INT/LONG/FLOAT/DOUBLE/STRING/UUID/FIXED/BINARY/DECIMAL/DATE - `TIME` 
and the `TIMESTAMP` variants fall through to its `default` branch and throw a 
raw `UnsupportedOperationException` (not a `CatalogException`), so an 
identity-partitioned timestamp/time column is unsupported. Could you add INT 
and DATE tests, and decide whether to support `TIMESTAMP` or reject it with a 
clear message?



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -724,8 +729,84 @@ public void createPartition(
   @Override
   public void dropPartition(
       ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean 
ignoreIfNotExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+      throws PartitionNotExistException, CatalogException {
+    Table icebergTable;
+    try {
+      icebergTable = loadIcebergTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (ignoreIfNotExists) {
+        return;
+      }
+      throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec, e);
+    }
+
+    if (icebergTable.spec().isUnpartitioned()) {
+      if (ignoreIfNotExists) {
+        return;
+      }
+      throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec);
+    }
+
+    org.apache.iceberg.expressions.Expression filter =
+        buildPartitionRowFilter(icebergTable, partitionSpec);
+
+    if (!ignoreIfNotExists && !partitionHasFiles(icebergTable, filter, 
tablePath)) {
+      throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec);
+    }
+
+    icebergTable.newDelete().deleteFromRowFilter(filter).commit();
+  }
+
+  /**
+   * Build a row filter from a Flink partition spec. The filter is the 
conjunction of {@code col =
+   * value} predicates over the table's partition source columns. {@link
+   * 
org.apache.iceberg.DeleteFiles#deleteFromRowFilter(org.apache.iceberg.expressions.Expression)}
+   * applies a strict-projection check so that only files whose every row 
matches are dropped, which
+   * makes this safe across non-identity transforms and partition-spec 
evolution.
+   */
+  private static org.apache.iceberg.expressions.Expression 
buildPartitionRowFilter(
+      Table table, CatalogPartitionSpec partitionSpec) {
+    Schema schema = table.schema();
+    Set<String> expectedColumns = expectedPartitionColumns(table.spec(), 
schema);
+    Map<String, String> values = partitionSpec.getPartitionSpec();
+
+    Preconditions.checkArgument(
+        values.keySet().equals(expectedColumns),
+        "DROP PARTITION requires values for partition columns %s but got %s",
+        expectedColumns,
+        values.keySet());
+
+    org.apache.iceberg.expressions.Expression filter = 
Expressions.alwaysTrue();
+    for (Map.Entry<String, String> entry : values.entrySet()) {
+      Types.NestedField field = schema.findField(entry.getKey());
+      Object icebergValue = Conversions.fromPartitionString(field.type(), 
entry.getValue());
+      org.apache.iceberg.expressions.Expression predicate =
+          (icebergValue == null)
+              ? Expressions.isNull(entry.getKey())
+              : Expressions.equal(entry.getKey(), icebergValue);
+      filter = Expressions.and(filter, predicate);
+    }
+    return filter;
+  }
+
+  private static Set<String> expectedPartitionColumns(PartitionSpec spec, 
Schema schema) {
+    Set<String> columns = Sets.newLinkedHashSet();
+    for (PartitionField field : spec.fields()) {
+      if (!field.transform().equals(Transforms.alwaysNull())) {
+        columns.add(schema.findColumnName(field.sourceId()));

Review Comment:
   This keys the expected partition by the source column name 
(`schema.findColumnName(sourceId)`) and parses values with 
`fromPartitionString`, whereas the existing `listPartitions` keys by the 
partition field name (`spec.fields().get(i).name()`) and emits the raw stored 
value (`String.valueOf(structLike.get(...))`). For default identity partitions 
on string/int columns these agree and round-trip, but they diverge for a DATE 
column (`listPartitions` shows the stored epoch-day int such as `dt=19723`, 
while this expects ISO `dt=2024-01-01`, so the two forms do not round-trip) and 
for a renamed identity field (`identity(data) AS d` - list shows `d`, this 
expects `data`). So `SHOW PARTITIONS` output cannot always be pasted back into 
`DROP PARTITION`. Worth aligning the conventions or documenting the limitation.



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java:
##########
@@ -116,4 +118,113 @@ public void testListPartitionsWithPartitionedTable()
     expected.add(partitionSpec2);
     assertThat(list).as("Should produce the expected catalog partition 
specs.").isEqualTo(expected);
   }
+
+  @TestTemplate
+  public void testDropPartitionIdentity() {

Review Comment:
   Good coverage on the error surface. A few cases still missing: a non-string 
partition value (INT - the issue's own example - and DATE), a null partition 
value (the `Expressions.isNull` branch is currently never hit), `DROP IF 
EXISTS` when the table itself does not exist (the `TableNotExistException` -> 
return path), and `ignoreIfNotExists=true` on an unpartitioned table (only the 
throwing case is covered).



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -724,8 +729,84 @@ public void createPartition(
   @Override
   public void dropPartition(
       ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean 
ignoreIfNotExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException();
+      throws PartitionNotExistException, CatalogException {
+    Table icebergTable;
+    try {
+      icebergTable = loadIcebergTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (ignoreIfNotExists) {
+        return;
+      }
+      throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec, e);
+    }
+
+    if (icebergTable.spec().isUnpartitioned()) {
+      if (ignoreIfNotExists) {
+        return;
+      }
+      throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec);
+    }
+
+    org.apache.iceberg.expressions.Expression filter =
+        buildPartitionRowFilter(icebergTable, partitionSpec);
+
+    if (!ignoreIfNotExists && !partitionHasFiles(icebergTable, filter, 
tablePath)) {
+      throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec);
+    }
+
+    icebergTable.newDelete().deleteFromRowFilter(filter).commit();

Review Comment:
   Minor: if the table has multiple partition specs (spec evolution), files 
that predate the current partition column cannot be strictly projected and core 
throws `ValidationException` here, which is unchecked and so escapes the 
declared `throws PartitionNotExistException, CatalogException`. The behavior is 
correct (it refuses an unsafe delete), but wrapping it in a `CatalogException` 
with a clear message would be friendlier to SQL users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to