legendtkl commented on code in PR #1434:
URL: https://github.com/apache/incubator-paimon/pull/1434#discussion_r1242268612
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java:
##########
@@ -112,17 +132,95 @@ public RowLevelUpdateInfo applyRowLevelUpdate(
@Override
public RowLevelDeleteInfo applyRowLevelDelete(
@Nullable RowLevelModificationScanContext
rowLevelModificationScanContext) {
+ if (supportDelete()) {
+ return new RowLevelDeleteInfo() {};
+ }
+ // actually, this will not be executed now.
+ throw new UnsupportedOperationException(
+ String.format("table '%s' cannot support delete.",
table.getClass().getName()));
+ }
+
+ /*
+ supported filters:
+ 1. where primary key = x
+ 2. where primary key = x or key = y
+ 3. where primary key in (x, y, z)
+ 4. where partition key = x
+ */
+ @Override
+ public boolean applyDeleteFilters(List<ResolvedExpression> list) {
+ if (supportDelete()) {
+ if (list.size() == 0) {
+ return false;
+ }
+
+ predicates = new ArrayList<>();
+ RowType rowType =
LogicalTypeConversion.toLogicalType(table.rowType());
+ for (ResolvedExpression filter : list) {
+ Optional<Predicate> predicate =
PredicateConverter.convert(rowType, filter);
+ if (predicate.isPresent()
+ && shouldPushdownDeleteFilter(predicate.get(),
list.size())) {
+ predicates.add(predicate.get());
+ } else {
+ // convert failed, leave it to flink
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public Optional<Long> executeDeletion() {
+ // delete partition
+ if (predicates.size() == 1
+ && predicates.get(0) instanceof LeafPredicate
+ && table.partitionKeys()
+ .contains(((LeafPredicate)
predicates.get(0)).fieldName())) {
+ String tablePath = table.options().get(TABLE_PATH_KEY);
+ if (tablePath == null) {
+ throw new RuntimeException(
+ String.format("Cannot find path from options of table
%s.", table.name()));
+ }
+
+ LeafPredicate leaf = (LeafPredicate) predicates.get(0);
+ String[] args =
+ new String[] {
+ "--path",
+ tablePath,
+ "--partition",
+ String.format("%s=%s", leaf.fieldName(),
leaf.literals().get(0))
+ };
+
+ Optional<Action> action = DropPartitionAction.create(args);
+ try {
+ action.get().run();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("DropPartitionAction %s failed.",
Arrays.toString(args)), e);
+ }
Review Comment:
hi, @yuzelin , do you mean
```code
action = DropPartitionAction(xx)
action.run()
```
But in this case, we need to parse the parameters for DropPartitionAction
construct method. This might cause some duplicated code and some complicated
cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]