legendtkl commented on code in PR #1434:
URL: https://github.com/apache/incubator-paimon/pull/1434#discussion_r1243074547
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java:
##########
@@ -112,17 +132,95 @@ public RowLevelUpdateInfo applyRowLevelUpdate(
@Override
public RowLevelDeleteInfo applyRowLevelDelete(
@Nullable RowLevelModificationScanContext
rowLevelModificationScanContext) {
+ if (supportDelete()) {
+ return new RowLevelDeleteInfo() {};
+ }
+ // actually, this will not be executed now.
+ throw new UnsupportedOperationException(
+ String.format("table '%s' cannot support delete.",
table.getClass().getName()));
+ }
+
+ /*
+ supported filters:
+ 1. where primary key = x
+ 2. where primary key = x or key = y
+ 3. where primary key in (x, y, z)
+ 4. where partition key = x
+ */
+ @Override
+ public boolean applyDeleteFilters(List<ResolvedExpression> list) {
+ if (supportDelete()) {
+ if (list.size() == 0) {
+ return false;
+ }
+
+ predicates = new ArrayList<>();
+ RowType rowType =
LogicalTypeConversion.toLogicalType(table.rowType());
+ for (ResolvedExpression filter : list) {
+ Optional<Predicate> predicate =
PredicateConverter.convert(rowType, filter);
+ if (predicate.isPresent()
+ && shouldPushdownDeleteFilter(predicate.get(),
list.size())) {
+ predicates.add(predicate.get());
+ } else {
+ // convert failed, leave it to flink
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public Optional<Long> executeDeletion() {
+ // delete partition
+ if (predicates.size() == 1
+ && predicates.get(0) instanceof LeafPredicate
+ && table.partitionKeys()
+ .contains(((LeafPredicate)
predicates.get(0)).fieldName())) {
+ String tablePath = table.options().get(TABLE_PATH_KEY);
+ if (tablePath == null) {
+ throw new RuntimeException(
+ String.format("Cannot find path from options of table
%s.", table.name()));
+ }
+
+ LeafPredicate leaf = (LeafPredicate) predicates.get(0);
+ String[] args =
+ new String[] {
+ "--path",
+ tablePath,
+ "--partition",
+ String.format("%s=%s", leaf.fieldName(),
leaf.literals().get(0))
+ };
+
+ Optional<Action> action = DropPartitionAction.create(args);
+ try {
+ action.get().run();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("DropPartitionAction %s failed.",
Arrays.toString(args)), e);
+ }
Review Comment:
> I mean you can:
>
> 1. get `FileStoreCommit` from table;
> 2. call `FileStoreCommit#dropPartitions` directly.
@yuzelin got it. Resolved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]