This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c878a7252 [flink] support where sql for compaction (#3403)
c878a7252 is described below

commit c878a7252bf0f47dcd73b26fa3c78caf97dc3c6e
Author: wgcn <[email protected]>
AuthorDate: Mon Jun 24 16:12:31 2024 +0800

    [flink] support where sql for compaction (#3403)
---
 docs/content/flink/procedures.md                   |   6 +-
 .../paimon/flink/procedure/CompactProcedure.java   |  28 +-
 .../ProcedurePositionalArgumentsITCase.java        |   5 +
 .../apache/paimon/flink/action/CompactAction.java  |  71 +++-
 .../paimon/flink/action/CompactActionFactory.java  |   4 +
 .../paimon/flink/action/SortCompactAction.java     |  22 +-
 .../UnawareBucketCompactionTopoBuilder.java        |  24 +-
 .../predicate/SimpleSqlPredicateConvertor.java     | 154 +++++++++
 .../paimon/flink/procedure/CompactProcedure.java   |  10 +-
 .../flink/source/CompactorSourceBuilder.java       |  38 +--
 .../apache/paimon/flink/utils/CalciteModule3.java  | 367 +++++++++++++++++++++
 .../paimon/flink/action/CompactActionITCase.java   |  77 ++++-
 .../predicate/SimpleSqlPredicateConvertorTest.java | 228 +++++++++++++
 .../flink/procedure/CompactProcedureITCase.java    |  26 +-
 .../paimon/flink/sink/CompactorSinkITCase.java     |  16 +-
 .../paimon/flink/source/CompactorSourceITCase.java |   9 +-
 16 files changed, 983 insertions(+), 102 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index e87703d09..f6d50cf37 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -67,9 +67,13 @@ All available procedures are listed below.
             <li>order_strategy(optional): 'order' or 'zorder' or 'hilbert' or 
'none'.</li>
             <li>order_by(optional): the columns need to be sort. Left empty if 
'order_strategy' is 'none'.</li>
             <li>options(optional): additional dynamic options of the 
table.</li>
+            <li>where(optional): partition predicate(Can't be used together 
with "partitions"). Note: as where is a keyword,a pair of backticks need to add 
around like `where`.</li>
       </td>
       <td>
-         CALL sys.compact(`table` => 'default.T', partitions => 'p=0', 
order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4')
+         -- use partition filter <br/>
+         CALL sys.compact(`table` => 'default.T', partitions => 'p=0', 
order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4') 
<br/>
+         -- use partition predicate <br/>
+         CALL sys.compact(`table` => 'default.T', `where` => 'dt>10 and h<20', 
order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4')
       </td>
    </tr>
    <tr>
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 098fa17e9..2fcbc98ba 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -56,7 +56,7 @@ public class CompactProcedure extends ProcedureBase {
 
     public String[] call(ProcedureContext procedureContext, String tableId, 
String partitions)
             throws Exception {
-        return call(procedureContext, tableId, partitions, "", "", "");
+        return call(procedureContext, tableId, partitions, "", "", "", "");
     }
 
     public String[] call(
@@ -66,7 +66,7 @@ public class CompactProcedure extends ProcedureBase {
             String orderStrategy,
             String orderByColumns)
             throws Exception {
-        return call(procedureContext, tableId, partitions, orderStrategy, 
orderByColumns, "");
+        return call(procedureContext, tableId, partitions, orderStrategy, 
orderByColumns, "", "");
     }
 
     public String[] call(
@@ -77,6 +77,26 @@ public class CompactProcedure extends ProcedureBase {
             String orderByColumns,
             String tableOptions)
             throws Exception {
+        return call(
+                procedureContext,
+                tableId,
+                partitions,
+                orderStrategy,
+                orderByColumns,
+                tableOptions,
+                "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String partitions,
+            String orderStrategy,
+            String orderByColumns,
+            String tableOptions,
+            String whereSql)
+            throws Exception {
+
         String warehouse = catalog.warehouse();
         Map<String, String> catalogOptions = catalog.options();
         Map<String, String> tableConf =
@@ -115,6 +135,10 @@ public class CompactProcedure extends ProcedureBase {
             
action.withPartitions(ParameterUtils.getPartitions(partitions.split(";")));
         }
 
+        if (!StringUtils.isBlank(whereSql)) {
+            action.withWhereSql(whereSql);
+        }
+
         return execute(procedureContext, action, jobName);
     }
 
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index 443b60003..f8e987486 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -53,6 +53,11 @@ public class ProcedurePositionalArgumentsITCase extends 
CatalogITCaseBase {
                 .doesNotThrowAnyException();
         assertThatCode(() -> sql("CALL sys.compact('default.T', '', '', '', 
'sink.parallelism=1')"))
                 .doesNotThrowAnyException();
+        assertThatCode(
+                        () ->
+                                sql(
+                                        "CALL sys.compact('default.T', '', '', 
'', 'sink.parallelism=1','pt=1')"))
+                .doesNotThrowAnyException();
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 32a3a28c6..59ca818c3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -20,9 +20,14 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
+import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
 import org.apache.paimon.flink.sink.CompactorSinkBuilder;
 import org.apache.paimon.flink.source.CompactorSourceBuilder;
+import org.apache.paimon.predicate.PartitionPredicateVisitor;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ExecutionOptions;
@@ -30,17 +35,25 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+
 /** Table compact action for Flink. */
 public class CompactAction extends TableActionBase {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CompactAction.class);
+
     private List<Map<String, String>> partitions;
 
+    private String whereSql;
+
     public CompactAction(String warehouse, String database, String tableName) {
         this(warehouse, database, tableName, Collections.emptyMap(), 
Collections.emptyMap());
     }
@@ -72,8 +85,13 @@ public class CompactAction extends TableActionBase {
         return this;
     }
 
+    public CompactAction withWhereSql(String whereSql) {
+        this.whereSql = whereSql;
+        return this;
+    }
+
     @Override
-    public void build() {
+    public void build() throws Exception {
         ReadableConfig conf = env.getConfiguration();
         boolean isStreaming =
                 conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
@@ -94,34 +112,69 @@ public class CompactAction extends TableActionBase {
     }
 
     private void buildForTraditionalCompaction(
-            StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming) {
+            StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming)
+            throws Exception {
         CompactorSourceBuilder sourceBuilder =
                 new CompactorSourceBuilder(identifier.getFullName(), table);
         CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
 
-        sourceBuilder.withPartitions(partitions);
+        sourceBuilder.withPartitionPredicate(getPredicate());
         DataStreamSource<RowData> source =
                 
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
         sinkBuilder.withInput(source).build();
     }
 
     private void buildForUnawareBucketCompaction(
-            StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming) {
+            StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming)
+            throws Exception {
         UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder =
                 new UnawareBucketCompactionTopoBuilder(env, 
identifier.getFullName(), table);
 
-        unawareBucketCompactionTopoBuilder.withPartitions(partitions);
+        
unawareBucketCompactionTopoBuilder.withPartitionPredicate(getPredicate());
         unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
         unawareBucketCompactionTopoBuilder.build();
     }
 
+    protected Predicate getPredicate() throws Exception {
+        Preconditions.checkArgument(
+                partitions == null || whereSql == null,
+                "partitions and where cannot be used together.");
+        Predicate predicate = null;
+        if (partitions != null) {
+            predicate =
+                    PredicateBuilder.or(
+                            partitions.stream()
+                                    .map(
+                                            p ->
+                                                    createPartitionPredicate(
+                                                            p,
+                                                            table.rowType(),
+                                                            ((FileStoreTable) 
table)
+                                                                    
.coreOptions()
+                                                                    
.partitionDefaultName()))
+                                    .toArray(Predicate[]::new));
+        } else if (whereSql != null) {
+            SimpleSqlPredicateConvertor simpleSqlPredicateConvertor =
+                    new SimpleSqlPredicateConvertor(table.rowType());
+            predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate(whereSql);
+        }
+
+        // Check whether predicate contain non parition key.
+        if (predicate != null) {
+            LOGGER.info("the partition predicate of compaction is {}", 
predicate);
+            PartitionPredicateVisitor partitionPredicateVisitor =
+                    new PartitionPredicateVisitor(table.partitionKeys());
+            Preconditions.checkArgument(
+                    predicate.visit(partitionPredicateVisitor),
+                    "Only parition key can be specialized in compaction 
action.");
+        }
+
+        return predicate;
+    }
+
     @Override
     public void run() throws Exception {
         build();
         execute("Compact job");
     }
-
-    public List<Map<String, String>> getPartitions() {
-        return partitions;
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index 22cb6cafa..c65de5c7c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -32,6 +32,8 @@ public class CompactActionFactory implements ActionFactory {
     private static final String ORDER_STRATEGY = "order_strategy";
     private static final String ORDER_BY = "order_by";
 
+    private static final String WHERE = "where";
+
     @Override
     public String identifier() {
         return IDENTIFIER;
@@ -67,6 +69,8 @@ public class CompactActionFactory implements ActionFactory {
         if (params.has(PARTITION)) {
             List<Map<String, String>> partitions = getPartitions(params);
             action.withPartitions(partitions);
+        } else if (params.has(WHERE)) {
+            action.withWhereSql(params.get(WHERE));
         }
 
         return Optional.of(action);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index 1cebe8bc1..2b12aa7a0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -25,8 +25,6 @@ import org.apache.paimon.flink.sorter.TableSortInfo;
 import org.apache.paimon.flink.sorter.TableSorter;
 import org.apache.paimon.flink.sorter.TableSorter.OrderType;
 import org.apache.paimon.flink.source.FlinkSourceBuilder;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 
@@ -44,8 +42,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
-
 /** Compact with sort action. */
 public class SortCompactAction extends CompactAction {
 
@@ -72,7 +68,7 @@ public class SortCompactAction extends CompactAction {
     }
 
     @Override
-    public void build() {
+    public void build() throws Exception {
         // only support batch sort yet
         if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
                 != RuntimeExecutionMode.BATCH) {
@@ -96,21 +92,7 @@ public class SortCompactAction extends CompactAction {
                                                 identifier.getObjectName())
                                         .asSummaryString());
 
-        if (getPartitions() != null) {
-            Predicate partitionPredicate =
-                    PredicateBuilder.or(
-                            getPartitions().stream()
-                                    .map(
-                                            p ->
-                                                    createPartitionPredicate(
-                                                            p,
-                                                            table.rowType(),
-                                                            ((FileStoreTable) 
table)
-                                                                    
.coreOptions()
-                                                                    
.partitionDefaultName()))
-                                    .toArray(Predicate[]::new));
-            sourceBuilder.predicate(partitionPredicate);
-        }
+        sourceBuilder.predicate(getPredicate());
 
         String scanParallelism = 
tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
         if (scanParallelism != null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
index edbca53b2..d10c5c11c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
 import org.apache.paimon.flink.source.BucketUnawareCompactSource;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -34,11 +35,6 @@ import 
org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 
 import javax.annotation.Nullable;
 
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
-
 /**
  * Build for unaware-bucket table flink compaction job.
  *
@@ -53,9 +49,11 @@ public class UnawareBucketCompactionTopoBuilder {
     private final transient StreamExecutionEnvironment env;
     private final String tableIdentifier;
     private final FileStoreTable table;
-    @Nullable private List<Map<String, String>> specifiedPartitions = null;
+
     private boolean isContinuous = false;
 
+    @Nullable private Predicate partitionPredicate;
+
     public UnawareBucketCompactionTopoBuilder(
             StreamExecutionEnvironment env, String tableIdentifier, 
FileStoreTable table) {
         this.env = env;
@@ -67,8 +65,8 @@ public class UnawareBucketCompactionTopoBuilder {
         this.isContinuous = isContinuous;
     }
 
-    public void withPartitions(List<Map<String, String>> partitions) {
-        this.specifiedPartitions = partitions;
+    public void withPartitionPredicate(Predicate predicate) {
+        this.partitionPredicate = predicate;
     }
 
     public void build() {
@@ -93,15 +91,7 @@ public class UnawareBucketCompactionTopoBuilder {
         long scanInterval = 
table.coreOptions().continuousDiscoveryInterval().toMillis();
         BucketUnawareCompactSource source =
                 new BucketUnawareCompactSource(
-                        table,
-                        isContinuous,
-                        scanInterval,
-                        specifiedPartitions != null
-                                ? createPartitionPredicate(
-                                        specifiedPartitions,
-                                        table.rowType(),
-                                        
table.coreOptions().partitionDefaultName())
-                                : null);
+                        table, isContinuous, scanInterval, partitionPredicate);
 
         return BucketUnawareCompactSource.buildSource(env, source, 
isContinuous, tableIdentifier);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
new file mode 100644
index 000000000..02c98f1cd
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.predicate;
+
+import org.apache.paimon.flink.utils.CalciteModule3;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TypeUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/** convert sql to predicate. */
+public class SimpleSqlPredicateConvertor {
+    private final PredicateBuilder builder;
+    private final RowType rowType;
+
+    private CalciteModule3 calciteModule3;
+
+    public SimpleSqlPredicateConvertor(RowType type) throws Exception {
+        this.rowType = type;
+        this.builder = new PredicateBuilder(type);
+        calciteModule3 = new CalciteModule3();
+    }
+
+    public Predicate convertSqlToPredicate(String whereSql) throws Exception {
+        Object config =
+                calciteModule3
+                        .configDelegate()
+                        .withLex(
+                                calciteModule3.sqlParserDelegate().config(),
+                                calciteModule3.lexDelegate().java());
+        Object sqlParser = calciteModule3.sqlParserDelegate().create(whereSql, 
config);
+        Object sqlBasicCall = 
calciteModule3.sqlParserDelegate().parseExpression(sqlParser);
+        return convert(sqlBasicCall);
+    }
+
+    public Predicate convert(Object sqlBasicCall) throws Exception {
+        Object operator = 
calciteModule3.sqlBasicCallDelegate().getOperator(sqlBasicCall);
+        Object kind = calciteModule3.sqlOperatorDelegate().getKind(operator);
+
+        if 
(calciteModule3.sqlOperatorDelegate().instanceOfSqlBinaryOperator(operator)) {
+            List<?> operandList =
+                    
calciteModule3.sqlBasicCallDelegate().getOperandList(sqlBasicCall);
+            Object left = operandList.get(0);
+            Object right = operandList.get(1);
+            if (kind == calciteModule3.sqlKindDelegate().or()) {
+                return PredicateBuilder.or(convert(left), convert(right));
+            } else if (kind == calciteModule3.sqlKindDelegate().and()) {
+                return PredicateBuilder.and(convert(left), convert(right));
+            } else if (kind == calciteModule3.sqlKindDelegate().equals()) {
+                return visitBiFunction(left, right, builder::equal, 
builder::equal);
+            } else if (kind == calciteModule3.sqlKindDelegate().notEquals()) {
+                return visitBiFunction(left, right, builder::notEqual, 
builder::notEqual);
+            } else if (kind == calciteModule3.sqlKindDelegate().lessThan()) {
+                return visitBiFunction(left, right, builder::lessThan, 
builder::greaterThan);
+            } else if (kind == 
calciteModule3.sqlKindDelegate().lessThanOrEqual()) {
+                return visitBiFunction(left, right, builder::lessOrEqual, 
builder::greaterOrEqual);
+            } else if (kind == calciteModule3.sqlKindDelegate().greaterThan()) 
{
+                return visitBiFunction(left, right, builder::greaterThan, 
builder::lessThan);
+            } else if (kind == 
calciteModule3.sqlKindDelegate().greaterThanOrEqual()) {
+                return visitBiFunction(left, right, builder::greaterOrEqual, 
builder::lessOrEqual);
+            } else if (kind == calciteModule3.sqlKindDelegate().in()) {
+                int index = getfieldIndex(left.toString());
+                List<?> elementslist = 
calciteModule3.sqlNodeListDelegate().getList(right);
+
+                List<Object> list = new ArrayList<>();
+                for (Object sqlNode : elementslist) {
+                    Object literal =
+                            TypeUtils.castFromString(
+                                    
calciteModule3.sqlLiteralDelegate().toValue(sqlNode),
+                                    rowType.getFieldTypes().get(index));
+                    list.add(literal);
+                }
+                return builder.in(index, list);
+            }
+        } else if 
(calciteModule3.sqlOperatorDelegate().instanceOfSqlPostfixOperator(operator)) {
+            Object child =
+                    
calciteModule3.sqlBasicCallDelegate().getOperandList(sqlBasicCall).get(0);
+            if (kind == calciteModule3.sqlKindDelegate().isNull()) {
+                String field = String.valueOf(child);
+                return builder.isNull(getfieldIndex(field));
+            } else if (kind == calciteModule3.sqlKindDelegate().isNotNull()) {
+                String field = String.valueOf(child);
+                return builder.isNotNull(getfieldIndex(field));
+            }
+        } else if 
(calciteModule3.sqlOperatorDelegate().instanceOfSqlPrefixOperator(operator)) {
+            if (kind == calciteModule3.sqlKindDelegate().not()) {
+                return convert(
+                                calciteModule3
+                                        .sqlBasicCallDelegate()
+                                        .getOperandList(sqlBasicCall)
+                                        .get(0))
+                        .negate()
+                        .get();
+            }
+        }
+
+        throw new UnsupportedOperationException(String.format("%s not been 
supported.", kind));
+    }
+
+    public Predicate visitBiFunction(
+            Object left,
+            Object right,
+            BiFunction<Integer, Object, Predicate> visitLeft,
+            BiFunction<Integer, Object, Predicate> visitRight)
+            throws Exception {
+        if 
(calciteModule3.sqlIndentifierDelegate().instanceOfSqlIdentifier(left)
+                && 
calciteModule3.sqlLiteralDelegate().instanceOfSqlLiteral(right)) {
+            int index = getfieldIndex(String.valueOf(left));
+            String value = calciteModule3.sqlLiteralDelegate().toValue(right);
+            DataType type = rowType.getFieldTypes().get(index);
+            return visitLeft.apply(index, TypeUtils.castFromString(value, 
type));
+        } else if 
(calciteModule3.sqlIndentifierDelegate().instanceOfSqlIdentifier(right)
+                && 
calciteModule3.sqlLiteralDelegate().instanceOfSqlLiteral(left)) {
+            int index = getfieldIndex(right.toString());
+            return visitRight.apply(
+                    index,
+                    TypeUtils.castFromString(
+                            calciteModule3.sqlLiteralDelegate().toValue(left),
+                            rowType.getFieldTypes().get(index)));
+        }
+
+        throw new UnsupportedOperationException(
+                String.format("%s or %s not been supported.", left, right));
+    }
+
+    public int getfieldIndex(String field) {
+        int index = builder.indexOf(field);
+        if (index == -1) {
+            throw new RuntimeException(String.format("Field `%s` not found", 
field));
+        }
+        return index;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 01421c8bf..bfe7337f6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -51,7 +51,8 @@ public class CompactProcedure extends ProcedureBase {
                         type = @DataTypeHint("STRING"),
                         isOptional = true),
                 @ArgumentHint(name = "order_by", type = 
@DataTypeHint("STRING"), isOptional = true),
-                @ArgumentHint(name = "options", type = 
@DataTypeHint("STRING"), isOptional = true)
+                @ArgumentHint(name = "options", type = 
@DataTypeHint("STRING"), isOptional = true),
+                @ArgumentHint(name = "where", type = @DataTypeHint("STRING"), 
isOptional = true)
             })
     public String[] call(
             ProcedureContext procedureContext,
@@ -59,7 +60,8 @@ public class CompactProcedure extends ProcedureBase {
             String partitions,
             String orderStrategy,
             String orderByColumns,
-            String tableOptions)
+            String tableOptions,
+            String where)
             throws Exception {
         String warehouse = catalog.warehouse();
         Map<String, String> catalogOptions = catalog.options();
@@ -99,6 +101,10 @@ public class CompactProcedure extends ProcedureBase {
             action.withPartitions(getPartitions(partitions.split(";")));
         }
 
+        if (!isBlank(where)) {
+            action.withWhereSql(where);
+        }
+
         return execute(procedureContext, action, jobName);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 275f88ad7..ee531a2e2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.system.BucketsTable;
@@ -38,13 +37,9 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
-
 /**
  * Source builder to build a Flink {@link StaticFileStoreSource} or {@link
  * ContinuousFileStoreSource}. This is for dedicated compactor jobs.
@@ -56,7 +51,7 @@ public class CompactorSourceBuilder {
 
     private boolean isContinuous = false;
     private StreamExecutionEnvironment env;
-    @Nullable private List<Map<String, String>> specifiedPartitions = null;
+    @Nullable private Predicate partitionPredicate = null;
 
     public CompactorSourceBuilder(String tableIdentifier, FileStoreTable 
table) {
         this.tableIdentifier = tableIdentifier;
@@ -73,33 +68,7 @@ public class CompactorSourceBuilder {
         return this;
     }
 
-    public CompactorSourceBuilder withPartition(Map<String, String> partition) 
{
-        return withPartitions(Collections.singletonList(partition));
-    }
-
-    public CompactorSourceBuilder withPartitions(List<Map<String, String>> 
partitions) {
-        this.specifiedPartitions = partitions;
-        return this;
-    }
-
     private Source<RowData, ?, ?> buildSource(BucketsTable bucketsTable) {
-        Predicate partitionPredicate = null;
-        if (specifiedPartitions != null) {
-            // This predicate is based on the row type of the original table, 
not bucket table.
-            // Because TableScan in BucketsTable is the same with 
FileStoreTable,
-            // and partition filter is done by scan.
-            partitionPredicate =
-                    PredicateBuilder.or(
-                            specifiedPartitions.stream()
-                                    .map(
-                                            p ->
-                                                    createPartitionPredicate(
-                                                            p,
-                                                            table.rowType(),
-                                                            table.coreOptions()
-                                                                    
.partitionDefaultName()))
-                                    .toArray(Predicate[]::new));
-        }
 
         if (isContinuous) {
             bucketsTable = bucketsTable.copy(streamingCompactOptions());
@@ -164,4 +133,9 @@ public class CompactorSourceBuilder {
             }
         };
     }
+
+    public CompactorSourceBuilder withPartitionPredicate(@Nullable Predicate 
partitionPredicate) {
+        this.partitionPredicate = partitionPredicate;
+        return this;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/CalciteModule3.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/CalciteModule3.java
new file mode 100644
index 000000000..70be74269
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/CalciteModule3.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.List;
+
+/** Class for load calcite dependency via reflection. */
+public class CalciteModule3 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CalciteModule3.class);
+    private static final String Flink_PLANNER_MODULE_CLASS =
+            "org.apache.flink.table.planner.loader.PlannerModule";
+    private static final String PLANNER_MODULE_METHOD = "getInstance";
+
+    private static final String SUBMODULE_CLASS_LOADER = 
"submoduleClassLoader";
+
+    private static final ClassLoader submoduleClassLoader;
+    private final SqlNodeListDelegate sqlNodeListDelegate;
+    private final SqlLiteralDelegate sqlLiteralDelegate;
+    private final SqlBasicCallDelegate sqlBasicCallDelegate;
+    private final SqlOperatorDelegate sqlOperatorDelegate;
+    private final SqlKindDelegate sqlKindDelegate;
+    private final SqlParserDelegate sqlParserDelegate;
+    private final LexDelegate lexDelegate;
+    private final ConfigDelegate configDelegate;
+    private final SqlIndentifierDelegate sqlIndentifierDelegate;
+
+    static {
+        boolean calciteFound = false;
+        ClassLoader currentClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            currentClassLoader.loadClass(SqlParserDelegate.CLASS_NAME);
+            calciteFound = true;
+        } catch (ClassNotFoundException e) {
+        }
+
+        try {
+            if (calciteFound) {
+                submoduleClassLoader = currentClassLoader;
+            } else {
+                submoduleClassLoader = initCalciteClassLoader();
+            }
+        } catch (Exception e) {
+            LOGGER.error(String.format("Load Calcite class Fail: %s", 
e.getMessage()), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public CalciteModule3() throws ClassNotFoundException {
+        sqlNodeListDelegate = new SqlNodeListDelegate();
+        sqlLiteralDelegate = new SqlLiteralDelegate();
+        sqlBasicCallDelegate = new SqlBasicCallDelegate();
+        sqlOperatorDelegate = new SqlOperatorDelegate();
+        sqlKindDelegate = new SqlKindDelegate();
+        sqlParserDelegate = new SqlParserDelegate();
+        lexDelegate = new LexDelegate();
+        configDelegate = new ConfigDelegate();
+        sqlIndentifierDelegate = new SqlIndentifierDelegate();
+    }
+
+    private static ClassLoader initCalciteClassLoader() throws Exception {
+        Class<?> plannerModuleClass = 
Class.forName(Flink_PLANNER_MODULE_CLASS);
+        Method getInstanceMethod = 
plannerModuleClass.getDeclaredMethod(PLANNER_MODULE_METHOD);
+        getInstanceMethod.setAccessible(true);
+        Object plannerModuleInstance = getInstanceMethod.invoke(null);
+
+        Field submoduleClassLoaderField =
+                plannerModuleClass.getDeclaredField(SUBMODULE_CLASS_LOADER);
+        submoduleClassLoaderField.setAccessible(true);
+        return (ClassLoader) 
submoduleClassLoaderField.get(plannerModuleInstance);
+    }
+
+    private static Object invokeMethod(
+            Class<?> clazz, Object object, String methodName, Class<?>[] 
argsClass, Object[] args)
+            throws Exception {
+        Method method = clazz.getMethod(methodName, argsClass);
+        method.setAccessible(true);
+        return method.invoke(object, args);
+    }
+
+    private static Class<?> loadCalciteClass(String className) throws 
ClassNotFoundException {
+        return Class.forName(className, true, submoduleClassLoader);
+    }
+
+    /** Accessing org.apache.calcite.sql.parser.SqlParser by Reflection. */
+    public static class SqlParserDelegate {
+        private static final String CLASS_NAME = 
"org.apache.calcite.sql.parser.SqlParser";
+        private final Class<?> clazz;
+
+        public SqlParserDelegate() throws ClassNotFoundException {
+            clazz = loadCalciteClass(CLASS_NAME);
+        }
+
+        public Object config() throws Exception {
+            return invokeMethod(clazz, null, "config", new Class[0], new 
Object[0]);
+        }
+
+        public Object create(String whereSql, Object config) throws Exception {
+            return invokeMethod(
+                    clazz,
+                    null,
+                    "create",
+                    new Class[] {String.class, 
loadCalciteClass(ConfigDelegate.CLASS_NAME)},
+                    new Object[] {whereSql, config});
+        }
+
+        public Object parseExpression(Object sqlParser) throws Exception {
+            return invokeMethod(clazz, sqlParser, "parseExpression", new 
Class[0], new Object[0]);
+        }
+    }
+
+    /** Accessing org.apache.calcite.config.Lex by Reflection. */
+    public static class LexDelegate {
+        private static final String CLASS_NAME = 
"org.apache.calcite.config.Lex";
+        private final Class<?> clazz;
+
+        public LexDelegate() throws ClassNotFoundException {
+            this.clazz = loadCalciteClass(CLASS_NAME);
+        }
+
+        public Object java() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("JAVA").get(null);
+        }
+    }
+
+    /** Accessing org.apache.calcite.sql.SqlKind by Reflection. */
+    public static class SqlKindDelegate {
+        private static final String CLASS_NAME = 
"org.apache.calcite.sql.SqlKind";
+        private final Class<?> clazz;
+
+        public SqlKindDelegate() throws ClassNotFoundException {
+            clazz = loadCalciteClass(CLASS_NAME);
+        }
+
+        public Object or() throws NoSuchFieldException, IllegalAccessException 
{
+            return clazz.getField("OR").get(null);
+        }
+
+        public Object and() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("AND").get(null);
+        }
+
+        public Object equals() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("EQUALS").get(null);
+        }
+
+        public Object notEquals() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("NOT_EQUALS").get(null);
+        }
+
+        public Object lessThan() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("LESS_THAN").get(null);
+        }
+
+        public Object lessThanOrEqual() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("LESS_THAN_OR_EQUAL").get(null);
+        }
+
+        public Object greaterThan() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("GREATER_THAN").get(null);
+        }
+
+        public Object greaterThanOrEqual() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("GREATER_THAN_OR_EQUAL").get(null);
+        }
+
+        public Object in() throws NoSuchFieldException, IllegalAccessException 
{
+            return clazz.getField("IN").get(null);
+        }
+
+        public Object isNull() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("IS_NULL").get(null);
+        }
+
+        public Object isNotNull() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("IS_NOT_NULL").get(null);
+        }
+
+        public Object not() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("NOT").get(null);
+        }
+    }
+
+    /** Accessing org.apache.calcite.sql.parser.SqlParser$Config by 
Reflection. */
+    public static class ConfigDelegate {
+        static final String CLASS_NAME = 
"org.apache.calcite.sql.parser.SqlParser$Config";
+        private final Class<?> clazz;
+
+        public ConfigDelegate() throws ClassNotFoundException {
+            this.clazz = loadCalciteClass(CLASS_NAME);
+        }
+
+        public Class<?> getClazz() {
+            return clazz;
+        }
+
+        public Object withLex(Object config, Object lex) throws Exception {
+            return invokeMethod(
+                    config.getClass(),
+                    config,
+                    "withLex",
+                    new Class[] {loadCalciteClass(LexDelegate.CLASS_NAME)},
+                    new Object[] {lex});
+        }
+    }
+
+    /** Accessing org.apache.calcite.sql.SqlBasicCall by Reflection. */
+    public static class SqlBasicCallDelegate {
+        static final String CLASS_NAME = "org.apache.calcite.sql.SqlBasicCall";
+        private final Class<?> clazz;
+
+        public SqlBasicCallDelegate() throws ClassNotFoundException {
+            this.clazz = loadCalciteClass(CLASS_NAME);
+        }
+
+        public Object getOperator(Object basicCall) throws Exception {
+            return invokeMethod(clazz, basicCall, "getOperator", new Class[0], 
new Object[0]);
+        }
+
+        public List<?> getOperandList(Object basicCall) throws Exception {
+            return (List<?>)
+                    invokeMethod(clazz, basicCall, "getOperandList", new 
Class[0], new Object[0]);
+        }
+    }
+
+    /** Accessing org.apache.calcite.sql.SqlOperator by Reflection. */
+    public static class SqlOperatorDelegate {
+        static final String SQL_OPERATOR = 
"org.apache.calcite.sql.SqlOperator";
+        private final Class<?> sqlOperatorClazz;
+
+        static final String SQL_BINARY_OPERATOR = 
"org.apache.calcite.sql.SqlBinaryOperator";
+        private final Class<?> sqlBinaryOperatorClazz;
+
+        static final String SQL_SQL_POSTFIX_OPERATOR = 
"org.apache.calcite.sql.SqlPostfixOperator";
+        private final Class<?> sqlPostfixOperatorClazz;
+
+        static final String SQL_PREFIX_OPERATOR = 
"org.apache.calcite.sql.SqlPrefixOperator";
+        private final Class<?> sqlPrefixOperatorClazz;
+
+        public SqlOperatorDelegate() throws ClassNotFoundException {
+            this.sqlOperatorClazz = loadCalciteClass(SQL_OPERATOR);
+            this.sqlBinaryOperatorClazz = 
loadCalciteClass(SQL_BINARY_OPERATOR);
+            this.sqlPostfixOperatorClazz = 
loadCalciteClass(SQL_SQL_POSTFIX_OPERATOR);
+            this.sqlPrefixOperatorClazz = 
loadCalciteClass(SQL_PREFIX_OPERATOR);
+        }
+
+        public Object getKind(Object operator) throws Exception {
+            return invokeMethod(sqlOperatorClazz, operator, "getKind", new 
Class[0], new Object[0]);
+        }
+
+        public boolean instanceOfSqlBinaryOperator(Object operator) throws 
Exception {
+            return 
sqlBinaryOperatorClazz.isAssignableFrom(operator.getClass());
+        }
+
+        public boolean instanceOfSqlPostfixOperator(Object operator) throws 
Exception {
+            return 
sqlPostfixOperatorClazz.isAssignableFrom(operator.getClass());
+        }
+
+        public boolean instanceOfSqlPrefixOperator(Object operator) throws 
Exception {
+            return 
sqlPrefixOperatorClazz.isAssignableFrom(operator.getClass());
+        }
+    }
+
+    /** Accessing org.apache.calcite.sql.SqlIdentifier by Reflection. */
+    public static class SqlIndentifierDelegate {
+        private static final String SQL_IDENTIFIER = 
"org.apache.calcite.sql.SqlIdentifier";
+        private final Class<?> identifierClazz;
+
+        public SqlIndentifierDelegate() throws ClassNotFoundException {
+            this.identifierClazz = loadCalciteClass(SQL_IDENTIFIER);
+        }
+
+        public boolean instanceOfSqlIdentifier(Object sqlNode) throws 
Exception {
+            return identifierClazz.isAssignableFrom(sqlNode.getClass());
+        }
+    }
+
+    /** Accessing org.apache.calcite.sql.SqlNodeList by Reflection. */
+    public static class SqlNodeListDelegate {
+        private static final String SQL_NODE_LIST = 
"org.apache.calcite.sql.SqlNodeList";
+        private final Class<?> clazz;
+
+        public SqlNodeListDelegate() throws ClassNotFoundException {
+            this.clazz = loadCalciteClass(SQL_NODE_LIST);
+        }
+
+        public List<?> getList(Object sqlNodeList) throws Exception {
+            return (List<?>)
+                    invokeMethod(clazz, sqlNodeList, "getList", new Class[0], 
new Object[0]);
+        }
+    }
+
+    /** Accessing org.apache.calcite.sql.SqlLiteral by Reflection. */
+    public static class SqlLiteralDelegate {
+        private static final String CLASS_NAME = 
"org.apache.calcite.sql.SqlLiteral";
+        private final Class<?> clazz;
+
+        public SqlLiteralDelegate() throws ClassNotFoundException {
+            this.clazz = loadCalciteClass(CLASS_NAME);
+        }
+
+        public boolean instanceOfSqlLiteral(Object sqlNode) throws Exception {
+            return clazz.isAssignableFrom(sqlNode.getClass());
+        }
+
+        public String toValue(Object sqlNode) throws Exception {
+            return (String)
+                    invokeMethod(clazz, sqlNode, "toValue", new Class[] {}, 
new Object[] {});
+        }
+    }
+
+    public SqlNodeListDelegate sqlNodeListDelegate() {
+        return sqlNodeListDelegate;
+    }
+
+    public SqlLiteralDelegate sqlLiteralDelegate() {
+        return sqlLiteralDelegate;
+    }
+
+    public SqlBasicCallDelegate sqlBasicCallDelegate() {
+        return sqlBasicCallDelegate;
+    }
+
+    public SqlOperatorDelegate sqlOperatorDelegate() {
+        return sqlOperatorDelegate;
+    }
+
+    public SqlKindDelegate sqlKindDelegate() {
+        return sqlKindDelegate;
+    }
+
+    public SqlParserDelegate sqlParserDelegate() {
+        return sqlParserDelegate;
+    }
+
+    public LexDelegate lexDelegate() {
+        return lexDelegate;
+    }
+
+    public ConfigDelegate configDelegate() {
+        return configDelegate;
+    }
+
+    public SqlIndentifierDelegate sqlIndentifierDelegate() {
+        return sqlIndentifierDelegate;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 0ca7baff9..9094239c9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -33,16 +33,21 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommonTestUtils;
 import org.apache.paimon.utils.SnapshotManager;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -192,7 +197,8 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
 
         checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
 
-        runAction(true);
+        // repairing that the ut don't specify the real parition of table
+        runActionForUnawareTable(true);
 
         // first compaction, snapshot will be 3
         checkFileAndRowSize(table, 3L, 30_000L, 1, 6);
@@ -233,7 +239,8 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
 
         checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
 
-        runAction(false);
+        // repairing that the ut don't specify the real parition of table
+        runActionForUnawareTable(false);
 
         // first compaction, snapshot will be 3.
         checkFileAndRowSize(table, 3L, 0L, 1, 6);
@@ -264,6 +271,29 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 .isEqualTo("6");
     }
 
+    @Test
+    public void testSpecifyNonPartitionField() throws Exception {
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+
+        //  compaction specify a non-partion field
+        prepareTable(
+                Collections.singletonList("v"),
+                Arrays.asList(),
+                Collections.emptyList(),
+                tableOptions);
+
+        // base records
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+        Assertions.assertThatThrownBy(() -> runAction(false))
+                .hasMessage("Only parition key can be specialized in 
compaction action.");
+    }
+
     private FileStoreTable prepareTable(
             List<String> partitionKeys,
             List<String> primaryKeys,
@@ -290,6 +320,14 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
     }
 
     private void runAction(boolean isStreaming) throws Exception {
+        runAction(isStreaming, false);
+    }
+
+    private void runActionForUnawareTable(boolean isStreaming) throws 
Exception {
+        runAction(isStreaming, true);
+    }
+
+    private void runAction(boolean isStreaming, boolean unawareBucket) throws 
Exception {
         StreamExecutionEnvironment env;
         if (isStreaming) {
             env = streamExecutionEnvironmentBuilder().streamingMode().build();
@@ -297,20 +335,39 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
             env = streamExecutionEnvironmentBuilder().batchMode().build();
         }
 
-        CompactAction action =
-                createAction(
-                        CompactAction.class,
+        ArrayList<String> baseArgs =
+                Lists.newArrayList(
                         "compact",
                         "--warehouse",
                         warehouse,
                         "--database",
                         database,
                         "--table",
-                        tableName,
-                        "--partition",
-                        "dt=20221208,hh=15",
-                        "--partition",
-                        "dt=20221209,hh=15");
+                        tableName);
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        if (unawareBucket) {
+            if (true) {
+                baseArgs.addAll(Lists.newArrayList("--where", "k=1"));
+            } else {
+                baseArgs.addAll(Lists.newArrayList("--partition", "k=1"));
+            }
+        } else {
+            if (random.nextBoolean()) {
+                baseArgs.addAll(
+                        Lists.newArrayList(
+                                "--where", "(dt=20221208 and hh=15) or 
(dt=20221209 and hh=15)"));
+            } else {
+                baseArgs.addAll(
+                        Lists.newArrayList(
+                                "--partition",
+                                "dt=20221208,hh=15",
+                                "--partition",
+                                "dt=20221209,hh=15"));
+            }
+        }
+
+        CompactAction action = createAction(CompactAction.class, 
baseArgs.toArray(new String[0]));
+
         action.withStreamExecutionEnvironment(env).build();
         if (isStreaming) {
             env.executeAsync();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
new file mode 100644
index 000000000..55695da03
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+/** test for {@link SimpleSqlPredicateConvertor} . */
+class SimpleSqlPredicateConvertorTest {
+    RowType rowType;
+    PredicateBuilder predicateBuilder;
+
+    SimpleSqlPredicateConvertor simpleSqlPredicateConvertor;
+
+    @BeforeEach
+    public void init() throws Exception {
+        rowType =
+                RowType.builder()
+                        .field("a", DataTypes.INT())
+                        .field("b", DataTypes.STRING())
+                        .field("c", DataTypes.DATE())
+                        .field("hour", DataTypes.STRING())
+                        .build();
+        predicateBuilder = new PredicateBuilder(rowType);
+        simpleSqlPredicateConvertor = new SimpleSqlPredicateConvertor(rowType);
+    }
+
+    @Test
+    public void testEqual() throws Exception {
+        {
+            //
+            // 
org.apache.calcite.sql.parser.ImmutableSqlParser$Config.withUnquotedCasing(org.apache.calcite.avatica.util.Casing)
+            Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("`hour` ='1'");
+            Assertions.assertThat(predicate)
+                    .isEqualTo(
+                            predicateBuilder.equal(
+                                    predicateBuilder.indexOf("hour"),
+                                    BinaryString.fromString("1")));
+        }
+
+        {
+            Predicate predicate =
+                    simpleSqlPredicateConvertor.convertSqlToPredicate(" 
'2024-05-25' = c");
+            Assertions.assertThat(predicate)
+                    
.isEqualTo(predicateBuilder.equal(predicateBuilder.indexOf("c"), 19868));
+        }
+    }
+
+    @Test
+    public void testNotEqual() throws Exception {
+        {
+            Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a <>'1'");
+            Assertions.assertThat(predicate)
+                    
.isEqualTo(predicateBuilder.notEqual(predicateBuilder.indexOf("a"), 1));
+        }
+
+        {
+            Predicate predicate =
+                    
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <> c");
+            Assertions.assertThat(predicate)
+                    
.isEqualTo(predicateBuilder.notEqual(predicateBuilder.indexOf("c"), 19868));
+        }
+    }
+
+    @Test
+    public void testLessThan() throws Exception {
+        {
+            Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a <'1'");
+            Assertions.assertThat(predicate)
+                    
.isEqualTo(predicateBuilder.lessThan(predicateBuilder.indexOf("a"), 1));
+        }
+
+        {
+            Predicate predicate =
+                    
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <c ");
+            Assertions.assertThat(predicate)
+                    
.isEqualTo(predicateBuilder.greaterThan(predicateBuilder.indexOf("c"), 19868));
+        }
+    }
+
+    @Test
+    public void testLessThanOrEqual() throws Exception {
+        {
+            Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a <='1'");
+            Assertions.assertThat(predicate)
+                    
.isEqualTo(predicateBuilder.lessOrEqual(predicateBuilder.indexOf("a"), 1));
+        }
+
+        {
+            Predicate predicate =
+                    
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <= c");
+            Assertions.assertThat(predicate)
+                    .isEqualTo(
+                            
predicateBuilder.greaterOrEqual(predicateBuilder.indexOf("c"), 19868));
+        }
+    }
+
+    @Test
+    public void testGreatThan() throws Exception {
+        {
+            Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a >'1'");
+            Assertions.assertThat(predicate)
+                    
.isEqualTo(predicateBuilder.greaterThan(predicateBuilder.indexOf("a"), 1));
+        }
+
+        {
+            Predicate predicate =
+                    
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' > c");
+            Assertions.assertThat(predicate)
+                    
.isEqualTo(predicateBuilder.lessThan(predicateBuilder.indexOf("c"), 19868));
+        }
+    }
+
+    @Test
+    public void testGreatEqual() throws Exception {
+        {
+            Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a >='1'");
+            Assertions.assertThat(predicate)
+                    
.isEqualTo(predicateBuilder.greaterOrEqual(predicateBuilder.indexOf("a"), 1));
+        }
+
+        {
+            Predicate predicate =
+                    simpleSqlPredicateConvertor.convertSqlToPredicate(" 
'2024-05-25' >= c");
+            Assertions.assertThat(predicate)
+                    
.isEqualTo(predicateBuilder.lessOrEqual(predicateBuilder.indexOf("c"), 19868));
+        }
+    }
+
+    @Test
+    public void testIN() throws Exception {
+        Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a in ('1','2')");
+        List<Object> elements = Lists.newArrayList(1, 2);
+        Assertions.assertThat(predicate)
+                .isEqualTo(predicateBuilder.in(predicateBuilder.indexOf("a"), 
elements));
+    }
+
+    @Test
+    public void testIsNull() throws Exception {
+        Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a is null ");
+        Assertions.assertThat(predicate)
+                
.isEqualTo(predicateBuilder.isNull(predicateBuilder.indexOf("a")));
+    }
+
+    @Test
+    public void testIsNotNull() throws Exception {
+        Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a is not  null ");
+        Assertions.assertThat(predicate)
+                
.isEqualTo(predicateBuilder.isNotNull(predicateBuilder.indexOf("a")));
+    }
+
+    @Test
+    public void testAnd() throws Exception {
+        Predicate actual =
+                simpleSqlPredicateConvertor.convertSqlToPredicate("a is not 
null and c is null");
+        Predicate expected =
+                PredicateBuilder.and(
+                        
predicateBuilder.isNotNull(predicateBuilder.indexOf("a")),
+                        
predicateBuilder.isNull(predicateBuilder.indexOf("c")));
+        Assertions.assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testOr() throws Exception {
+        Predicate actual =
+                simpleSqlPredicateConvertor.convertSqlToPredicate("a is not  
null or c is null ");
+        Predicate expected =
+                PredicateBuilder.or(
+                        
predicateBuilder.isNotNull(predicateBuilder.indexOf("a")),
+                        
predicateBuilder.isNull(predicateBuilder.indexOf("c")));
+        Assertions.assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testNOT() throws Exception {
+        Predicate actual = 
simpleSqlPredicateConvertor.convertSqlToPredicate("not (a is null) ");
+        Predicate expected = 
predicateBuilder.isNull(predicateBuilder.indexOf("a")).negate().get();
+        Assertions.assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testFieldNoFound() {
+        Assertions.assertThatThrownBy(
+                        () -> 
simpleSqlPredicateConvertor.convertSqlToPredicate("f =1"))
+                .hasMessage("Field `f` not found");
+    }
+
+    @Test
+    public void testSqlNoSupport() {
+        // function not supported
+        Assertions.assertThatThrownBy(
+                        () ->
+                                
simpleSqlPredicateConvertor.convertSqlToPredicate(
+                                        "substring(f,0,1) =1"))
+                .hasMessage("SUBSTRING(`f` FROM 0 FOR 1) or 1 not been 
supported.");
+        // like not supported
+        Assertions.assertThatThrownBy(
+                        () -> 
simpleSqlPredicateConvertor.convertSqlToPredicate("b like 'x'"))
+                .hasMessage("LIKE not been supported.");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
index a961d3b83..1227a8e1e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
@@ -67,8 +67,14 @@ public class CompactProcedureITCase extends 
CatalogITCaseBase {
         checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
 
         tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
-        sql(
-                "CALL sys.compact(`table` => 'default.T', partitions => 
'dt=20221208,hh=15;dt=20221209,hh=15')");
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        if (random.nextBoolean()) {
+            sql(
+                    "CALL sys.compact(`table` => 'default.T', partitions => 
'dt=20221208,hh=15;dt=20221209,hh=15')");
+        } else {
+            sql(
+                    "CALL sys.compact(`table` => 'default.T', `where` => 
'(dt=20221208 and hh=15) or (dt=20221209 and hh=15)')");
+        }
 
         checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
 
@@ -113,11 +119,19 @@ public class CompactProcedureITCase extends 
CatalogITCaseBase {
         TableScan.Plan plan = scan.plan();
         assertThat(plan.splits()).isEmpty();
 
+        ThreadLocalRandom random = ThreadLocalRandom.current();
         // submit streaming compaction job
-        streamSqlIter(
-                        "CALL sys.compact(`table` => 'default.T', partitions 
=> 'dt=20221208,hh=15;dt=20221209,hh=15', "
-                                + "options => 'scan.parallelism=1')")
-                .close();
+        if (random.nextBoolean()) {
+            streamSqlIter(
+                            "CALL sys.compact(`table` => 'default.T', 
partitions => 'dt=20221208,hh=15;dt=20221209,hh=15', "
+                                    + "options => 'scan.parallelism=1')")
+                    .close();
+        } else {
+            streamSqlIter(
+                            "CALL sys.compact(`table` => 'default.T', `where` 
=> '(dt=20221208 and hh=15) or (dt=20221209 and hh=15)', "
+                                    + "options => 'scan.parallelism=1')")
+                    .close();
+        }
 
         // first full compaction
         assertThat(select.collect(2))
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index f81c253cc..a5f260fb2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -31,6 +31,7 @@ import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -68,6 +69,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -119,11 +121,16 @@ public class CompactorSinkITCase extends AbstractTestBase 
{
         StreamExecutionEnvironment env = 
streamExecutionEnvironmentBuilder().batchMode().build();
         CompactorSourceBuilder sourceBuilder =
                 new CompactorSourceBuilder(tablePath.toString(), table);
+        Predicate predicate =
+                createPartitionPredicate(
+                        getSpecifiedPartitions(),
+                        table.rowType(),
+                        table.coreOptions().partitionDefaultName());
         DataStreamSource<RowData> source =
                 sourceBuilder
                         .withEnv(env)
                         .withContinuousMode(false)
-                        .withPartitions(getSpecifiedPartitions())
+                        .withPartitionPredicate(predicate)
                         .build();
         new CompactorSinkBuilder(table).withInput(source).build();
         env.execute();
@@ -154,11 +161,16 @@ public class CompactorSinkITCase extends AbstractTestBase 
{
                 streamExecutionEnvironmentBuilder().streamingMode().build();
         CompactorSourceBuilder sourceBuilder =
                 new CompactorSourceBuilder(tablePath.toString(), table);
+        Predicate predicate =
+                createPartitionPredicate(
+                        getSpecifiedPartitions(),
+                        table.rowType(),
+                        table.coreOptions().partitionDefaultName());
         DataStreamSource<RowData> source =
                 sourceBuilder
                         .withEnv(env)
                         .withContinuousMode(false)
-                        .withPartitions(getSpecifiedPartitions())
+                        .withPartitionPredicate(predicate)
                         .build();
         Integer sinkParalellism = new Random().nextInt(100) + 1;
         new CompactorSinkBuilder(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index 3164dd412..f7db0dfcf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -27,6 +27,7 @@ import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -58,6 +59,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -261,11 +263,16 @@ public class CompactorSourceITCase extends 
AbstractTestBase {
 
         StreamExecutionEnvironment env =
                 streamExecutionEnvironmentBuilder().streamingMode().build();
+        Predicate partitionPredicate =
+                createPartitionPredicate(
+                        specifiedPartitions,
+                        table.rowType(),
+                        table.coreOptions().partitionDefaultName());
         DataStreamSource<RowData> compactorSource =
                 new CompactorSourceBuilder("test", table)
                         .withContinuousMode(isStreaming)
                         .withEnv(env)
-                        .withPartitions(specifiedPartitions)
+                        .withPartitionPredicate(partitionPredicate)
                         .build();
         CloseableIterator<RowData> it = compactorSource.executeAndCollect();
 

Reply via email to