This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c878a7252 [flink] support where sql for compaction (#3403)
c878a7252 is described below
commit c878a7252bf0f47dcd73b26fa3c78caf97dc3c6e
Author: wgcn <[email protected]>
AuthorDate: Mon Jun 24 16:12:31 2024 +0800
[flink] support where sql for compaction (#3403)
---
docs/content/flink/procedures.md | 6 +-
.../paimon/flink/procedure/CompactProcedure.java | 28 +-
.../ProcedurePositionalArgumentsITCase.java | 5 +
.../apache/paimon/flink/action/CompactAction.java | 71 +++-
.../paimon/flink/action/CompactActionFactory.java | 4 +
.../paimon/flink/action/SortCompactAction.java | 22 +-
.../UnawareBucketCompactionTopoBuilder.java | 24 +-
.../predicate/SimpleSqlPredicateConvertor.java | 154 +++++++++
.../paimon/flink/procedure/CompactProcedure.java | 10 +-
.../flink/source/CompactorSourceBuilder.java | 38 +--
.../apache/paimon/flink/utils/CalciteModule3.java | 367 +++++++++++++++++++++
.../paimon/flink/action/CompactActionITCase.java | 77 ++++-
.../predicate/SimpleSqlPredicateConvertorTest.java | 228 +++++++++++++
.../flink/procedure/CompactProcedureITCase.java | 26 +-
.../paimon/flink/sink/CompactorSinkITCase.java | 16 +-
.../paimon/flink/source/CompactorSourceITCase.java | 9 +-
16 files changed, 983 insertions(+), 102 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index e87703d09..f6d50cf37 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -67,9 +67,13 @@ All available procedures are listed below.
<li>order_strategy(optional): 'order' or 'zorder' or 'hilbert' or
'none'.</li>
<li>order_by(optional): the columns need to be sort. Left empty if
'order_strategy' is 'none'.</li>
<li>options(optional): additional dynamic options of the
table.</li>
+ <li>where(optional): partition predicate(Can't be used together
with "partitions"). Note: as where is a keyword,a pair of backticks need to add
around like `where`.</li>
</td>
<td>
- CALL sys.compact(`table` => 'default.T', partitions => 'p=0',
order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4')
+ -- use partition filter <br/>
+ CALL sys.compact(`table` => 'default.T', partitions => 'p=0',
order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4')
<br/>
+ -- use partition predicate <br/>
+ CALL sys.compact(`table` => 'default.T', `where` => 'dt>10 and h<20',
order_strategy => 'zorder', order_by => 'a,b', options => 'sink.parallelism=4')
</td>
</tr>
<tr>
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 098fa17e9..2fcbc98ba 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -56,7 +56,7 @@ public class CompactProcedure extends ProcedureBase {
public String[] call(ProcedureContext procedureContext, String tableId,
String partitions)
throws Exception {
- return call(procedureContext, tableId, partitions, "", "", "");
+ return call(procedureContext, tableId, partitions, "", "", "", "");
}
public String[] call(
@@ -66,7 +66,7 @@ public class CompactProcedure extends ProcedureBase {
String orderStrategy,
String orderByColumns)
throws Exception {
- return call(procedureContext, tableId, partitions, orderStrategy,
orderByColumns, "");
+ return call(procedureContext, tableId, partitions, orderStrategy,
orderByColumns, "", "");
}
public String[] call(
@@ -77,6 +77,26 @@ public class CompactProcedure extends ProcedureBase {
String orderByColumns,
String tableOptions)
throws Exception {
+ return call(
+ procedureContext,
+ tableId,
+ partitions,
+ orderStrategy,
+ orderByColumns,
+ tableOptions,
+ "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String partitions,
+ String orderStrategy,
+ String orderByColumns,
+ String tableOptions,
+ String whereSql)
+ throws Exception {
+
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Map<String, String> tableConf =
@@ -115,6 +135,10 @@ public class CompactProcedure extends ProcedureBase {
action.withPartitions(ParameterUtils.getPartitions(partitions.split(";")));
}
+ if (!StringUtils.isBlank(whereSql)) {
+ action.withWhereSql(whereSql);
+ }
+
return execute(procedureContext, action, jobName);
}
diff --git
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index 443b60003..f8e987486 100644
---
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -53,6 +53,11 @@ public class ProcedurePositionalArgumentsITCase extends
CatalogITCaseBase {
.doesNotThrowAnyException();
assertThatCode(() -> sql("CALL sys.compact('default.T', '', '', '',
'sink.parallelism=1')"))
.doesNotThrowAnyException();
+ assertThatCode(
+ () ->
+ sql(
+ "CALL sys.compact('default.T', '', '',
'', 'sink.parallelism=1','pt=1')"))
+ .doesNotThrowAnyException();
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 32a3a28c6..59ca818c3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -20,9 +20,14 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
+import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
+import org.apache.paimon.predicate.PartitionPredicateVisitor;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
@@ -30,17 +35,25 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+
/** Table compact action for Flink. */
public class CompactAction extends TableActionBase {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CompactAction.class);
+
private List<Map<String, String>> partitions;
+ private String whereSql;
+
public CompactAction(String warehouse, String database, String tableName) {
this(warehouse, database, tableName, Collections.emptyMap(),
Collections.emptyMap());
}
@@ -72,8 +85,13 @@ public class CompactAction extends TableActionBase {
return this;
}
+ public CompactAction withWhereSql(String whereSql) {
+ this.whereSql = whereSql;
+ return this;
+ }
+
@Override
- public void build() {
+ public void build() throws Exception {
ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
@@ -94,34 +112,69 @@ public class CompactAction extends TableActionBase {
}
private void buildForTraditionalCompaction(
- StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming) {
+ StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming)
+ throws Exception {
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(identifier.getFullName(), table);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
- sourceBuilder.withPartitions(partitions);
+ sourceBuilder.withPartitionPredicate(getPredicate());
DataStreamSource<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
sinkBuilder.withInput(source).build();
}
private void buildForUnawareBucketCompaction(
- StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming) {
+ StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming)
+ throws Exception {
UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder =
new UnawareBucketCompactionTopoBuilder(env,
identifier.getFullName(), table);
- unawareBucketCompactionTopoBuilder.withPartitions(partitions);
+
unawareBucketCompactionTopoBuilder.withPartitionPredicate(getPredicate());
unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
unawareBucketCompactionTopoBuilder.build();
}
+ protected Predicate getPredicate() throws Exception {
+ Preconditions.checkArgument(
+ partitions == null || whereSql == null,
+ "partitions and where cannot be used together.");
+ Predicate predicate = null;
+ if (partitions != null) {
+ predicate =
+ PredicateBuilder.or(
+ partitions.stream()
+ .map(
+ p ->
+ createPartitionPredicate(
+ p,
+ table.rowType(),
+ ((FileStoreTable)
table)
+
.coreOptions()
+
.partitionDefaultName()))
+ .toArray(Predicate[]::new));
+ } else if (whereSql != null) {
+ SimpleSqlPredicateConvertor simpleSqlPredicateConvertor =
+ new SimpleSqlPredicateConvertor(table.rowType());
+ predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate(whereSql);
+ }
+
+ // Check whether predicate contain non parition key.
+ if (predicate != null) {
+ LOGGER.info("the partition predicate of compaction is {}",
predicate);
+ PartitionPredicateVisitor partitionPredicateVisitor =
+ new PartitionPredicateVisitor(table.partitionKeys());
+ Preconditions.checkArgument(
+ predicate.visit(partitionPredicateVisitor),
+ "Only parition key can be specialized in compaction
action.");
+ }
+
+ return predicate;
+ }
+
@Override
public void run() throws Exception {
build();
execute("Compact job");
}
-
- public List<Map<String, String>> getPartitions() {
- return partitions;
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index 22cb6cafa..c65de5c7c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -32,6 +32,8 @@ public class CompactActionFactory implements ActionFactory {
private static final String ORDER_STRATEGY = "order_strategy";
private static final String ORDER_BY = "order_by";
+ private static final String WHERE = "where";
+
@Override
public String identifier() {
return IDENTIFIER;
@@ -67,6 +69,8 @@ public class CompactActionFactory implements ActionFactory {
if (params.has(PARTITION)) {
List<Map<String, String>> partitions = getPartitions(params);
action.withPartitions(partitions);
+ } else if (params.has(WHERE)) {
+ action.withWhereSql(params.get(WHERE));
}
return Optional.of(action);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index 1cebe8bc1..2b12aa7a0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -25,8 +25,6 @@ import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -44,8 +42,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
-
/** Compact with sort action. */
public class SortCompactAction extends CompactAction {
@@ -72,7 +68,7 @@ public class SortCompactAction extends CompactAction {
}
@Override
- public void build() {
+ public void build() throws Exception {
// only support batch sort yet
if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
!= RuntimeExecutionMode.BATCH) {
@@ -96,21 +92,7 @@ public class SortCompactAction extends CompactAction {
identifier.getObjectName())
.asSummaryString());
- if (getPartitions() != null) {
- Predicate partitionPredicate =
- PredicateBuilder.or(
- getPartitions().stream()
- .map(
- p ->
- createPartitionPredicate(
- p,
- table.rowType(),
- ((FileStoreTable)
table)
-
.coreOptions()
-
.partitionDefaultName()))
- .toArray(Predicate[]::new));
- sourceBuilder.predicate(partitionPredicate);
- }
+ sourceBuilder.predicate(getPredicate());
String scanParallelism =
tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
if (scanParallelism != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
index edbca53b2..d10c5c11c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -34,11 +35,6 @@ import
org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import javax.annotation.Nullable;
-import java.util.List;
-import java.util.Map;
-
-import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
-
/**
* Build for unaware-bucket table flink compaction job.
*
@@ -53,9 +49,11 @@ public class UnawareBucketCompactionTopoBuilder {
private final transient StreamExecutionEnvironment env;
private final String tableIdentifier;
private final FileStoreTable table;
- @Nullable private List<Map<String, String>> specifiedPartitions = null;
+
private boolean isContinuous = false;
+ @Nullable private Predicate partitionPredicate;
+
public UnawareBucketCompactionTopoBuilder(
StreamExecutionEnvironment env, String tableIdentifier,
FileStoreTable table) {
this.env = env;
@@ -67,8 +65,8 @@ public class UnawareBucketCompactionTopoBuilder {
this.isContinuous = isContinuous;
}
- public void withPartitions(List<Map<String, String>> partitions) {
- this.specifiedPartitions = partitions;
+ public void withPartitionPredicate(Predicate predicate) {
+ this.partitionPredicate = predicate;
}
public void build() {
@@ -93,15 +91,7 @@ public class UnawareBucketCompactionTopoBuilder {
long scanInterval =
table.coreOptions().continuousDiscoveryInterval().toMillis();
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
- table,
- isContinuous,
- scanInterval,
- specifiedPartitions != null
- ? createPartitionPredicate(
- specifiedPartitions,
- table.rowType(),
-
table.coreOptions().partitionDefaultName())
- : null);
+ table, isContinuous, scanInterval, partitionPredicate);
return BucketUnawareCompactSource.buildSource(env, source,
isContinuous, tableIdentifier);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
new file mode 100644
index 000000000..02c98f1cd
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.predicate;
+
+import org.apache.paimon.flink.utils.CalciteModule3;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TypeUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/** convert sql to predicate. */
+public class SimpleSqlPredicateConvertor {
+ private final PredicateBuilder builder;
+ private final RowType rowType;
+
+ private CalciteModule3 calciteModule3;
+
+ public SimpleSqlPredicateConvertor(RowType type) throws Exception {
+ this.rowType = type;
+ this.builder = new PredicateBuilder(type);
+ calciteModule3 = new CalciteModule3();
+ }
+
+ public Predicate convertSqlToPredicate(String whereSql) throws Exception {
+ Object config =
+ calciteModule3
+ .configDelegate()
+ .withLex(
+ calciteModule3.sqlParserDelegate().config(),
+ calciteModule3.lexDelegate().java());
+ Object sqlParser = calciteModule3.sqlParserDelegate().create(whereSql,
config);
+ Object sqlBasicCall =
calciteModule3.sqlParserDelegate().parseExpression(sqlParser);
+ return convert(sqlBasicCall);
+ }
+
+ public Predicate convert(Object sqlBasicCall) throws Exception {
+ Object operator =
calciteModule3.sqlBasicCallDelegate().getOperator(sqlBasicCall);
+ Object kind = calciteModule3.sqlOperatorDelegate().getKind(operator);
+
+ if
(calciteModule3.sqlOperatorDelegate().instanceOfSqlBinaryOperator(operator)) {
+ List<?> operandList =
+
calciteModule3.sqlBasicCallDelegate().getOperandList(sqlBasicCall);
+ Object left = operandList.get(0);
+ Object right = operandList.get(1);
+ if (kind == calciteModule3.sqlKindDelegate().or()) {
+ return PredicateBuilder.or(convert(left), convert(right));
+ } else if (kind == calciteModule3.sqlKindDelegate().and()) {
+ return PredicateBuilder.and(convert(left), convert(right));
+ } else if (kind == calciteModule3.sqlKindDelegate().equals()) {
+ return visitBiFunction(left, right, builder::equal,
builder::equal);
+ } else if (kind == calciteModule3.sqlKindDelegate().notEquals()) {
+ return visitBiFunction(left, right, builder::notEqual,
builder::notEqual);
+ } else if (kind == calciteModule3.sqlKindDelegate().lessThan()) {
+ return visitBiFunction(left, right, builder::lessThan,
builder::greaterThan);
+ } else if (kind ==
calciteModule3.sqlKindDelegate().lessThanOrEqual()) {
+ return visitBiFunction(left, right, builder::lessOrEqual,
builder::greaterOrEqual);
+ } else if (kind == calciteModule3.sqlKindDelegate().greaterThan())
{
+ return visitBiFunction(left, right, builder::greaterThan,
builder::lessThan);
+ } else if (kind ==
calciteModule3.sqlKindDelegate().greaterThanOrEqual()) {
+ return visitBiFunction(left, right, builder::greaterOrEqual,
builder::lessOrEqual);
+ } else if (kind == calciteModule3.sqlKindDelegate().in()) {
+ int index = getfieldIndex(left.toString());
+ List<?> elementslist =
calciteModule3.sqlNodeListDelegate().getList(right);
+
+ List<Object> list = new ArrayList<>();
+ for (Object sqlNode : elementslist) {
+ Object literal =
+ TypeUtils.castFromString(
+
calciteModule3.sqlLiteralDelegate().toValue(sqlNode),
+ rowType.getFieldTypes().get(index));
+ list.add(literal);
+ }
+ return builder.in(index, list);
+ }
+ } else if
(calciteModule3.sqlOperatorDelegate().instanceOfSqlPostfixOperator(operator)) {
+ Object child =
+
calciteModule3.sqlBasicCallDelegate().getOperandList(sqlBasicCall).get(0);
+ if (kind == calciteModule3.sqlKindDelegate().isNull()) {
+ String field = String.valueOf(child);
+ return builder.isNull(getfieldIndex(field));
+ } else if (kind == calciteModule3.sqlKindDelegate().isNotNull()) {
+ String field = String.valueOf(child);
+ return builder.isNotNull(getfieldIndex(field));
+ }
+ } else if
(calciteModule3.sqlOperatorDelegate().instanceOfSqlPrefixOperator(operator)) {
+ if (kind == calciteModule3.sqlKindDelegate().not()) {
+ return convert(
+ calciteModule3
+ .sqlBasicCallDelegate()
+ .getOperandList(sqlBasicCall)
+ .get(0))
+ .negate()
+ .get();
+ }
+ }
+
+ throw new UnsupportedOperationException(String.format("%s not been
supported.", kind));
+ }
+
+ public Predicate visitBiFunction(
+ Object left,
+ Object right,
+ BiFunction<Integer, Object, Predicate> visitLeft,
+ BiFunction<Integer, Object, Predicate> visitRight)
+ throws Exception {
+ if
(calciteModule3.sqlIndentifierDelegate().instanceOfSqlIdentifier(left)
+ &&
calciteModule3.sqlLiteralDelegate().instanceOfSqlLiteral(right)) {
+ int index = getfieldIndex(String.valueOf(left));
+ String value = calciteModule3.sqlLiteralDelegate().toValue(right);
+ DataType type = rowType.getFieldTypes().get(index);
+ return visitLeft.apply(index, TypeUtils.castFromString(value,
type));
+ } else if
(calciteModule3.sqlIndentifierDelegate().instanceOfSqlIdentifier(right)
+ &&
calciteModule3.sqlLiteralDelegate().instanceOfSqlLiteral(left)) {
+ int index = getfieldIndex(right.toString());
+ return visitRight.apply(
+ index,
+ TypeUtils.castFromString(
+ calciteModule3.sqlLiteralDelegate().toValue(left),
+ rowType.getFieldTypes().get(index)));
+ }
+
+ throw new UnsupportedOperationException(
+ String.format("%s or %s not been supported.", left, right));
+ }
+
+ public int getfieldIndex(String field) {
+ int index = builder.indexOf(field);
+ if (index == -1) {
+ throw new RuntimeException(String.format("Field `%s` not found",
field));
+ }
+ return index;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 01421c8bf..bfe7337f6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -51,7 +51,8 @@ public class CompactProcedure extends ProcedureBase {
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(name = "order_by", type =
@DataTypeHint("STRING"), isOptional = true),
- @ArgumentHint(name = "options", type =
@DataTypeHint("STRING"), isOptional = true)
+ @ArgumentHint(name = "options", type =
@DataTypeHint("STRING"), isOptional = true),
+ @ArgumentHint(name = "where", type = @DataTypeHint("STRING"),
isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
@@ -59,7 +60,8 @@ public class CompactProcedure extends ProcedureBase {
String partitions,
String orderStrategy,
String orderByColumns,
- String tableOptions)
+ String tableOptions,
+ String where)
throws Exception {
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
@@ -99,6 +101,10 @@ public class CompactProcedure extends ProcedureBase {
action.withPartitions(getPartitions(partitions.split(";")));
}
+ if (!isBlank(where)) {
+ action.withWhereSql(where);
+ }
+
return execute(procedureContext, action, jobName);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 275f88ad7..ee531a2e2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.BucketsTable;
@@ -38,13 +37,9 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import javax.annotation.Nullable;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
-
/**
* Source builder to build a Flink {@link StaticFileStoreSource} or {@link
* ContinuousFileStoreSource}. This is for dedicated compactor jobs.
@@ -56,7 +51,7 @@ public class CompactorSourceBuilder {
private boolean isContinuous = false;
private StreamExecutionEnvironment env;
- @Nullable private List<Map<String, String>> specifiedPartitions = null;
+ @Nullable private Predicate partitionPredicate = null;
public CompactorSourceBuilder(String tableIdentifier, FileStoreTable
table) {
this.tableIdentifier = tableIdentifier;
@@ -73,33 +68,7 @@ public class CompactorSourceBuilder {
return this;
}
- public CompactorSourceBuilder withPartition(Map<String, String> partition)
{
- return withPartitions(Collections.singletonList(partition));
- }
-
- public CompactorSourceBuilder withPartitions(List<Map<String, String>>
partitions) {
- this.specifiedPartitions = partitions;
- return this;
- }
-
private Source<RowData, ?, ?> buildSource(BucketsTable bucketsTable) {
- Predicate partitionPredicate = null;
- if (specifiedPartitions != null) {
- // This predicate is based on the row type of the original table,
not bucket table.
- // Because TableScan in BucketsTable is the same with
FileStoreTable,
- // and partition filter is done by scan.
- partitionPredicate =
- PredicateBuilder.or(
- specifiedPartitions.stream()
- .map(
- p ->
- createPartitionPredicate(
- p,
- table.rowType(),
- table.coreOptions()
-
.partitionDefaultName()))
- .toArray(Predicate[]::new));
- }
if (isContinuous) {
bucketsTable = bucketsTable.copy(streamingCompactOptions());
@@ -164,4 +133,9 @@ public class CompactorSourceBuilder {
}
};
}
+
+ public CompactorSourceBuilder withPartitionPredicate(@Nullable Predicate
partitionPredicate) {
+ this.partitionPredicate = partitionPredicate;
+ return this;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/CalciteModule3.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/CalciteModule3.java
new file mode 100644
index 000000000..70be74269
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/CalciteModule3.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.List;
+
+/** Class for load calcite dependency via reflection. */
+public class CalciteModule3 {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CalciteModule3.class);
+ private static final String Flink_PLANNER_MODULE_CLASS =
+ "org.apache.flink.table.planner.loader.PlannerModule";
+ private static final String PLANNER_MODULE_METHOD = "getInstance";
+
+ private static final String SUBMODULE_CLASS_LOADER =
"submoduleClassLoader";
+
+ private static final ClassLoader submoduleClassLoader;
+ private final SqlNodeListDelegate sqlNodeListDelegate;
+ private final SqlLiteralDelegate sqlLiteralDelegate;
+ private final SqlBasicCallDelegate sqlBasicCallDelegate;
+ private final SqlOperatorDelegate sqlOperatorDelegate;
+ private final SqlKindDelegate sqlKindDelegate;
+ private final SqlParserDelegate sqlParserDelegate;
+ private final LexDelegate lexDelegate;
+ private final ConfigDelegate configDelegate;
+ private final SqlIndentifierDelegate sqlIndentifierDelegate;
+
+ static {
+ boolean calciteFound = false;
+ ClassLoader currentClassLoader =
Thread.currentThread().getContextClassLoader();
+ try {
+ currentClassLoader.loadClass(SqlParserDelegate.CLASS_NAME);
+ calciteFound = true;
+ } catch (ClassNotFoundException e) {
+ }
+
+ try {
+ if (calciteFound) {
+ submoduleClassLoader = currentClassLoader;
+ } else {
+ submoduleClassLoader = initCalciteClassLoader();
+ }
+ } catch (Exception e) {
+ LOGGER.error(String.format("Load Calcite class Fail: %s",
e.getMessage()), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public CalciteModule3() throws ClassNotFoundException {
+ sqlNodeListDelegate = new SqlNodeListDelegate();
+ sqlLiteralDelegate = new SqlLiteralDelegate();
+ sqlBasicCallDelegate = new SqlBasicCallDelegate();
+ sqlOperatorDelegate = new SqlOperatorDelegate();
+ sqlKindDelegate = new SqlKindDelegate();
+ sqlParserDelegate = new SqlParserDelegate();
+ lexDelegate = new LexDelegate();
+ configDelegate = new ConfigDelegate();
+ sqlIndentifierDelegate = new SqlIndentifierDelegate();
+ }
+
+ private static ClassLoader initCalciteClassLoader() throws Exception {
+ Class<?> plannerModuleClass =
Class.forName(Flink_PLANNER_MODULE_CLASS);
+ Method getInstanceMethod =
plannerModuleClass.getDeclaredMethod(PLANNER_MODULE_METHOD);
+ getInstanceMethod.setAccessible(true);
+ Object plannerModuleInstance = getInstanceMethod.invoke(null);
+
+ Field submoduleClassLoaderField =
+ plannerModuleClass.getDeclaredField(SUBMODULE_CLASS_LOADER);
+ submoduleClassLoaderField.setAccessible(true);
+ return (ClassLoader)
submoduleClassLoaderField.get(plannerModuleInstance);
+ }
+
+ private static Object invokeMethod(
+ Class<?> clazz, Object object, String methodName, Class<?>[]
argsClass, Object[] args)
+ throws Exception {
+ Method method = clazz.getMethod(methodName, argsClass);
+ method.setAccessible(true);
+ return method.invoke(object, args);
+ }
+
+ private static Class<?> loadCalciteClass(String className) throws
ClassNotFoundException {
+ return Class.forName(className, true, submoduleClassLoader);
+ }
+
+ /** Accessing org.apache.calcite.sql.parser.SqlParser by Reflection. */
+ public static class SqlParserDelegate {
+ private static final String CLASS_NAME =
"org.apache.calcite.sql.parser.SqlParser";
+ private final Class<?> clazz;
+
+ public SqlParserDelegate() throws ClassNotFoundException {
+ clazz = loadCalciteClass(CLASS_NAME);
+ }
+
+ public Object config() throws Exception {
+ return invokeMethod(clazz, null, "config", new Class[0], new
Object[0]);
+ }
+
+ public Object create(String whereSql, Object config) throws Exception {
+ return invokeMethod(
+ clazz,
+ null,
+ "create",
+ new Class[] {String.class,
loadCalciteClass(ConfigDelegate.CLASS_NAME)},
+ new Object[] {whereSql, config});
+ }
+
+ public Object parseExpression(Object sqlParser) throws Exception {
+ return invokeMethod(clazz, sqlParser, "parseExpression", new
Class[0], new Object[0]);
+ }
+ }
+
+ /** Accessing org.apache.calcite.config.Lex by Reflection. */
+ public static class LexDelegate {
+ private static final String CLASS_NAME =
"org.apache.calcite.config.Lex";
+ private final Class<?> clazz;
+
+ public LexDelegate() throws ClassNotFoundException {
+ this.clazz = loadCalciteClass(CLASS_NAME);
+ }
+
+ public Object java() throws NoSuchFieldException,
IllegalAccessException {
+ return clazz.getField("JAVA").get(null);
+ }
+ }
+
+ /** Accessing org.apache.calcite.sql.SqlKind by Reflection. */
+ public static class SqlKindDelegate {
+ private static final String CLASS_NAME =
"org.apache.calcite.sql.SqlKind";
+ private final Class<?> clazz;
+
+ public SqlKindDelegate() throws ClassNotFoundException {
+ clazz = loadCalciteClass(CLASS_NAME);
+ }
+
+ public Object or() throws NoSuchFieldException, IllegalAccessException
{
+ return clazz.getField("OR").get(null);
+ }
+
+ public Object and() throws NoSuchFieldException,
IllegalAccessException {
+ return clazz.getField("AND").get(null);
+ }
+
+ public Object equals() throws NoSuchFieldException,
IllegalAccessException {
+ return clazz.getField("EQUALS").get(null);
+ }
+
+ public Object notEquals() throws NoSuchFieldException,
IllegalAccessException {
+ return clazz.getField("NOT_EQUALS").get(null);
+ }
+
+ public Object lessThan() throws NoSuchFieldException,
IllegalAccessException {
+ return clazz.getField("LESS_THAN").get(null);
+ }
+
+ public Object lessThanOrEqual() throws NoSuchFieldException,
IllegalAccessException {
+ return clazz.getField("LESS_THAN_OR_EQUAL").get(null);
+ }
+
+ public Object greaterThan() throws NoSuchFieldException,
IllegalAccessException {
+ return clazz.getField("GREATER_THAN").get(null);
+ }
+
+ public Object greaterThanOrEqual() throws NoSuchFieldException,
IllegalAccessException {
+ return clazz.getField("GREATER_THAN_OR_EQUAL").get(null);
+ }
+
+ public Object in() throws NoSuchFieldException, IllegalAccessException
{
+ return clazz.getField("IN").get(null);
+ }
+
+ public Object isNull() throws NoSuchFieldException,
IllegalAccessException {
+ return clazz.getField("IS_NULL").get(null);
+ }
+
+ public Object isNotNull() throws NoSuchFieldException,
IllegalAccessException {
+ return clazz.getField("IS_NOT_NULL").get(null);
+ }
+
+ public Object not() throws NoSuchFieldException,
IllegalAccessException {
+ return clazz.getField("NOT").get(null);
+ }
+ }
+
+ /** Accessing org.apache.calcite.sql.parser.SqlParser$Config by
Reflection. */
+ public static class ConfigDelegate {
+ static final String CLASS_NAME =
"org.apache.calcite.sql.parser.SqlParser$Config";
+ private final Class<?> clazz;
+
+ public ConfigDelegate() throws ClassNotFoundException {
+ this.clazz = loadCalciteClass(CLASS_NAME);
+ }
+
+ public Class<?> getClazz() {
+ return clazz;
+ }
+
+ public Object withLex(Object config, Object lex) throws Exception {
+ return invokeMethod(
+ config.getClass(),
+ config,
+ "withLex",
+ new Class[] {loadCalciteClass(LexDelegate.CLASS_NAME)},
+ new Object[] {lex});
+ }
+ }
+
+ /** Accessing org.apache.calcite.sql.SqlBasicCall by Reflection. */
+ public static class SqlBasicCallDelegate {
+ static final String CLASS_NAME = "org.apache.calcite.sql.SqlBasicCall";
+ private final Class<?> clazz;
+
+ public SqlBasicCallDelegate() throws ClassNotFoundException {
+ this.clazz = loadCalciteClass(CLASS_NAME);
+ }
+
+ public Object getOperator(Object basicCall) throws Exception {
+ return invokeMethod(clazz, basicCall, "getOperator", new Class[0],
new Object[0]);
+ }
+
+ public List<?> getOperandList(Object basicCall) throws Exception {
+ return (List<?>)
+ invokeMethod(clazz, basicCall, "getOperandList", new
Class[0], new Object[0]);
+ }
+ }
+
+ /** Accessing org.apache.calcite.sql.SqlOperator by Reflection. */
+ public static class SqlOperatorDelegate {
+ static final String SQL_OPERATOR =
"org.apache.calcite.sql.SqlOperator";
+ private final Class<?> sqlOperatorClazz;
+
+ static final String SQL_BINARY_OPERATOR =
"org.apache.calcite.sql.SqlBinaryOperator";
+ private final Class<?> sqlBinaryOperatorClazz;
+
+ static final String SQL_SQL_POSTFIX_OPERATOR =
"org.apache.calcite.sql.SqlPostfixOperator";
+ private final Class<?> sqlPostfixOperatorClazz;
+
+ static final String SQL_PREFIX_OPERATOR =
"org.apache.calcite.sql.SqlPrefixOperator";
+ private final Class<?> sqlPrefixOperatorClazz;
+
+ public SqlOperatorDelegate() throws ClassNotFoundException {
+ this.sqlOperatorClazz = loadCalciteClass(SQL_OPERATOR);
+ this.sqlBinaryOperatorClazz =
loadCalciteClass(SQL_BINARY_OPERATOR);
+ this.sqlPostfixOperatorClazz =
loadCalciteClass(SQL_SQL_POSTFIX_OPERATOR);
+ this.sqlPrefixOperatorClazz =
loadCalciteClass(SQL_PREFIX_OPERATOR);
+ }
+
+ public Object getKind(Object operator) throws Exception {
+ return invokeMethod(sqlOperatorClazz, operator, "getKind", new
Class[0], new Object[0]);
+ }
+
+ public boolean instanceOfSqlBinaryOperator(Object operator) throws
Exception {
+ return
sqlBinaryOperatorClazz.isAssignableFrom(operator.getClass());
+ }
+
+ public boolean instanceOfSqlPostfixOperator(Object operator) throws
Exception {
+ return
sqlPostfixOperatorClazz.isAssignableFrom(operator.getClass());
+ }
+
+ public boolean instanceOfSqlPrefixOperator(Object operator) throws
Exception {
+ return
sqlPrefixOperatorClazz.isAssignableFrom(operator.getClass());
+ }
+ }
+
+ /** Accessing org.apache.calcite.sql.SqlIdentifier by Reflection. */
+ public static class SqlIndentifierDelegate {
+ private static final String SQL_IDENTIFIER =
"org.apache.calcite.sql.SqlIdentifier";
+ private final Class<?> identifierClazz;
+
+ public SqlIndentifierDelegate() throws ClassNotFoundException {
+ this.identifierClazz = loadCalciteClass(SQL_IDENTIFIER);
+ }
+
+ public boolean instanceOfSqlIdentifier(Object sqlNode) throws
Exception {
+ return identifierClazz.isAssignableFrom(sqlNode.getClass());
+ }
+ }
+
+ /** Accessing org.apache.calcite.sql.SqlNodeList by Reflection. */
+ public static class SqlNodeListDelegate {
+ private static final String SQL_NODE_LIST =
"org.apache.calcite.sql.SqlNodeList";
+ private final Class<?> clazz;
+
+ public SqlNodeListDelegate() throws ClassNotFoundException {
+ this.clazz = loadCalciteClass(SQL_NODE_LIST);
+ }
+
+ public List<?> getList(Object sqlNodeList) throws Exception {
+ return (List<?>)
+ invokeMethod(clazz, sqlNodeList, "getList", new Class[0],
new Object[0]);
+ }
+ }
+
+ /** Accessing org.apache.calcite.sql.SqlLiteral by Reflection. */
+ public static class SqlLiteralDelegate {
+ private static final String CLASS_NAME =
"org.apache.calcite.sql.SqlLiteral";
+ private final Class<?> clazz;
+
+ public SqlLiteralDelegate() throws ClassNotFoundException {
+ this.clazz = loadCalciteClass(CLASS_NAME);
+ }
+
+ public boolean instanceOfSqlLiteral(Object sqlNode) throws Exception {
+ return clazz.isAssignableFrom(sqlNode.getClass());
+ }
+
+ public String toValue(Object sqlNode) throws Exception {
+ return (String)
+ invokeMethod(clazz, sqlNode, "toValue", new Class[] {},
new Object[] {});
+ }
+ }
+
+ public SqlNodeListDelegate sqlNodeListDelegate() {
+ return sqlNodeListDelegate;
+ }
+
+ public SqlLiteralDelegate sqlLiteralDelegate() {
+ return sqlLiteralDelegate;
+ }
+
+ public SqlBasicCallDelegate sqlBasicCallDelegate() {
+ return sqlBasicCallDelegate;
+ }
+
+ public SqlOperatorDelegate sqlOperatorDelegate() {
+ return sqlOperatorDelegate;
+ }
+
+ public SqlKindDelegate sqlKindDelegate() {
+ return sqlKindDelegate;
+ }
+
+ public SqlParserDelegate sqlParserDelegate() {
+ return sqlParserDelegate;
+ }
+
+ public LexDelegate lexDelegate() {
+ return lexDelegate;
+ }
+
+ public ConfigDelegate configDelegate() {
+ return configDelegate;
+ }
+
+ public SqlIndentifierDelegate sqlIndentifierDelegate() {
+ return sqlIndentifierDelegate;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 0ca7baff9..9094239c9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -33,16 +33,21 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
@@ -192,7 +197,8 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
- runAction(true);
+ // repairing that the ut don't specify the real parition of table
+ runActionForUnawareTable(true);
// first compaction, snapshot will be 3
checkFileAndRowSize(table, 3L, 30_000L, 1, 6);
@@ -233,7 +239,8 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
- runAction(false);
+ // repairing that the ut don't specify the real parition of table
+ runActionForUnawareTable(false);
// first compaction, snapshot will be 3.
checkFileAndRowSize(table, 3L, 0L, 1, 6);
@@ -264,6 +271,29 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
.isEqualTo("6");
}
+ @Test
+ public void testSpecifyNonPartitionField() throws Exception {
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+
+ // compaction specify a non-partion field
+ prepareTable(
+ Collections.singletonList("v"),
+ Arrays.asList(),
+ Collections.emptyList(),
+ tableOptions);
+
+ // base records
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+ Assertions.assertThatThrownBy(() -> runAction(false))
+ .hasMessage("Only parition key can be specialized in
compaction action.");
+ }
+
private FileStoreTable prepareTable(
List<String> partitionKeys,
List<String> primaryKeys,
@@ -290,6 +320,14 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
}
private void runAction(boolean isStreaming) throws Exception {
+ runAction(isStreaming, false);
+ }
+
+ private void runActionForUnawareTable(boolean isStreaming) throws
Exception {
+ runAction(isStreaming, true);
+ }
+
+ private void runAction(boolean isStreaming, boolean unawareBucket) throws
Exception {
StreamExecutionEnvironment env;
if (isStreaming) {
env = streamExecutionEnvironmentBuilder().streamingMode().build();
@@ -297,20 +335,39 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
env = streamExecutionEnvironmentBuilder().batchMode().build();
}
- CompactAction action =
- createAction(
- CompactAction.class,
+ ArrayList<String> baseArgs =
+ Lists.newArrayList(
"compact",
"--warehouse",
warehouse,
"--database",
database,
"--table",
- tableName,
- "--partition",
- "dt=20221208,hh=15",
- "--partition",
- "dt=20221209,hh=15");
+ tableName);
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ if (unawareBucket) {
+ if (true) {
+ baseArgs.addAll(Lists.newArrayList("--where", "k=1"));
+ } else {
+ baseArgs.addAll(Lists.newArrayList("--partition", "k=1"));
+ }
+ } else {
+ if (random.nextBoolean()) {
+ baseArgs.addAll(
+ Lists.newArrayList(
+ "--where", "(dt=20221208 and hh=15) or
(dt=20221209 and hh=15)"));
+ } else {
+ baseArgs.addAll(
+ Lists.newArrayList(
+ "--partition",
+ "dt=20221208,hh=15",
+ "--partition",
+ "dt=20221209,hh=15"));
+ }
+ }
+
+ CompactAction action = createAction(CompactAction.class,
baseArgs.toArray(new String[0]));
+
action.withStreamExecutionEnvironment(env).build();
if (isStreaming) {
env.executeAsync();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
new file mode 100644
index 000000000..55695da03
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+/** test for {@link SimpleSqlPredicateConvertor} . */
+class SimpleSqlPredicateConvertorTest {
+ RowType rowType;
+ PredicateBuilder predicateBuilder;
+
+ SimpleSqlPredicateConvertor simpleSqlPredicateConvertor;
+
+ @BeforeEach
+ public void init() throws Exception {
+ rowType =
+ RowType.builder()
+ .field("a", DataTypes.INT())
+ .field("b", DataTypes.STRING())
+ .field("c", DataTypes.DATE())
+ .field("hour", DataTypes.STRING())
+ .build();
+ predicateBuilder = new PredicateBuilder(rowType);
+ simpleSqlPredicateConvertor = new SimpleSqlPredicateConvertor(rowType);
+ }
+
+ @Test
+ public void testEqual() throws Exception {
+ {
+ //
+ //
org.apache.calcite.sql.parser.ImmutableSqlParser$Config.withUnquotedCasing(org.apache.calcite.avatica.util.Casing)
+ Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("`hour` ='1'");
+ Assertions.assertThat(predicate)
+ .isEqualTo(
+ predicateBuilder.equal(
+ predicateBuilder.indexOf("hour"),
+ BinaryString.fromString("1")));
+ }
+
+ {
+ Predicate predicate =
+ simpleSqlPredicateConvertor.convertSqlToPredicate("
'2024-05-25' = c");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.equal(predicateBuilder.indexOf("c"), 19868));
+ }
+ }
+
+ @Test
+ public void testNotEqual() throws Exception {
+ {
+ Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a <>'1'");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.notEqual(predicateBuilder.indexOf("a"), 1));
+ }
+
+ {
+ Predicate predicate =
+
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <> c");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.notEqual(predicateBuilder.indexOf("c"), 19868));
+ }
+ }
+
+ @Test
+ public void testLessThan() throws Exception {
+ {
+ Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a <'1'");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.lessThan(predicateBuilder.indexOf("a"), 1));
+ }
+
+ {
+ Predicate predicate =
+
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <c ");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.greaterThan(predicateBuilder.indexOf("c"), 19868));
+ }
+ }
+
+ @Test
+ public void testLessThanOrEqual() throws Exception {
+ {
+ Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a <='1'");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.lessOrEqual(predicateBuilder.indexOf("a"), 1));
+ }
+
+ {
+ Predicate predicate =
+
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <= c");
+ Assertions.assertThat(predicate)
+ .isEqualTo(
+
predicateBuilder.greaterOrEqual(predicateBuilder.indexOf("c"), 19868));
+ }
+ }
+
+ @Test
+ public void testGreatThan() throws Exception {
+ {
+ Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a >'1'");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.greaterThan(predicateBuilder.indexOf("a"), 1));
+ }
+
+ {
+ Predicate predicate =
+
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' > c");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.lessThan(predicateBuilder.indexOf("c"), 19868));
+ }
+ }
+
+ @Test
+ public void testGreatEqual() throws Exception {
+ {
+ Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a >='1'");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.greaterOrEqual(predicateBuilder.indexOf("a"), 1));
+ }
+
+ {
+ Predicate predicate =
+ simpleSqlPredicateConvertor.convertSqlToPredicate("
'2024-05-25' >= c");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.lessOrEqual(predicateBuilder.indexOf("c"), 19868));
+ }
+ }
+
+ @Test
+ public void testIN() throws Exception {
+ Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a in ('1','2')");
+ List<Object> elements = Lists.newArrayList(1, 2);
+ Assertions.assertThat(predicate)
+ .isEqualTo(predicateBuilder.in(predicateBuilder.indexOf("a"),
elements));
+ }
+
+ @Test
+ public void testIsNull() throws Exception {
+ Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a is null ");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.isNull(predicateBuilder.indexOf("a")));
+ }
+
+ @Test
+ public void testIsNotNull() throws Exception {
+ Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a is not null ");
+ Assertions.assertThat(predicate)
+
.isEqualTo(predicateBuilder.isNotNull(predicateBuilder.indexOf("a")));
+ }
+
+ @Test
+ public void testAnd() throws Exception {
+ Predicate actual =
+ simpleSqlPredicateConvertor.convertSqlToPredicate("a is not
null and c is null");
+ Predicate expected =
+ PredicateBuilder.and(
+
predicateBuilder.isNotNull(predicateBuilder.indexOf("a")),
+
predicateBuilder.isNull(predicateBuilder.indexOf("c")));
+ Assertions.assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testOr() throws Exception {
+ Predicate actual =
+ simpleSqlPredicateConvertor.convertSqlToPredicate("a is not
null or c is null ");
+ Predicate expected =
+ PredicateBuilder.or(
+
predicateBuilder.isNotNull(predicateBuilder.indexOf("a")),
+
predicateBuilder.isNull(predicateBuilder.indexOf("c")));
+ Assertions.assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testNOT() throws Exception {
+ Predicate actual =
simpleSqlPredicateConvertor.convertSqlToPredicate("not (a is null) ");
+ Predicate expected =
predicateBuilder.isNull(predicateBuilder.indexOf("a")).negate().get();
+ Assertions.assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testFieldNoFound() {
+ Assertions.assertThatThrownBy(
+ () ->
simpleSqlPredicateConvertor.convertSqlToPredicate("f =1"))
+ .hasMessage("Field `f` not found");
+ }
+
+ @Test
+ public void testSqlNoSupport() {
+ // function not supported
+ Assertions.assertThatThrownBy(
+ () ->
+
simpleSqlPredicateConvertor.convertSqlToPredicate(
+ "substring(f,0,1) =1"))
+ .hasMessage("SUBSTRING(`f` FROM 0 FOR 1) or 1 not been
supported.");
+ // like not supported
+ Assertions.assertThatThrownBy(
+ () ->
simpleSqlPredicateConvertor.convertSqlToPredicate("b like 'x'"))
+ .hasMessage("LIKE not been supported.");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
index a961d3b83..1227a8e1e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
@@ -67,8 +67,14 @@ public class CompactProcedureITCase extends
CatalogITCaseBase {
checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
- sql(
- "CALL sys.compact(`table` => 'default.T', partitions =>
'dt=20221208,hh=15;dt=20221209,hh=15')");
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ if (random.nextBoolean()) {
+ sql(
+ "CALL sys.compact(`table` => 'default.T', partitions =>
'dt=20221208,hh=15;dt=20221209,hh=15')");
+ } else {
+ sql(
+ "CALL sys.compact(`table` => 'default.T', `where` =>
'(dt=20221208 and hh=15) or (dt=20221209 and hh=15)')");
+ }
checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
@@ -113,11 +119,19 @@ public class CompactProcedureITCase extends
CatalogITCaseBase {
TableScan.Plan plan = scan.plan();
assertThat(plan.splits()).isEmpty();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
// submit streaming compaction job
- streamSqlIter(
- "CALL sys.compact(`table` => 'default.T', partitions
=> 'dt=20221208,hh=15;dt=20221209,hh=15', "
- + "options => 'scan.parallelism=1')")
- .close();
+ if (random.nextBoolean()) {
+ streamSqlIter(
+ "CALL sys.compact(`table` => 'default.T',
partitions => 'dt=20221208,hh=15;dt=20221209,hh=15', "
+ + "options => 'scan.parallelism=1')")
+ .close();
+ } else {
+ streamSqlIter(
+ "CALL sys.compact(`table` => 'default.T', `where`
=> '(dt=20221208 and hh=15) or (dt=20221209 and hh=15)', "
+ + "options => 'scan.parallelism=1')")
+ .close();
+ }
// first full compaction
assertThat(select.collect(2))
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index f81c253cc..a5f260fb2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -31,6 +31,7 @@ import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -68,6 +69,7 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
import static org.assertj.core.api.Assertions.assertThat;
@@ -119,11 +121,16 @@ public class CompactorSinkITCase extends AbstractTestBase
{
StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder().batchMode().build();
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(tablePath.toString(), table);
+ Predicate predicate =
+ createPartitionPredicate(
+ getSpecifiedPartitions(),
+ table.rowType(),
+ table.coreOptions().partitionDefaultName());
DataStreamSource<RowData> source =
sourceBuilder
.withEnv(env)
.withContinuousMode(false)
- .withPartitions(getSpecifiedPartitions())
+ .withPartitionPredicate(predicate)
.build();
new CompactorSinkBuilder(table).withInput(source).build();
env.execute();
@@ -154,11 +161,16 @@ public class CompactorSinkITCase extends AbstractTestBase
{
streamExecutionEnvironmentBuilder().streamingMode().build();
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(tablePath.toString(), table);
+ Predicate predicate =
+ createPartitionPredicate(
+ getSpecifiedPartitions(),
+ table.rowType(),
+ table.coreOptions().partitionDefaultName());
DataStreamSource<RowData> source =
sourceBuilder
.withEnv(env)
.withContinuousMode(false)
- .withPartitions(getSpecifiedPartitions())
+ .withPartitionPredicate(predicate)
.build();
Integer sinkParalellism = new Random().nextInt(100) + 1;
new CompactorSinkBuilder(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index 3164dd412..f7db0dfcf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -27,6 +27,7 @@ import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -58,6 +59,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
import static org.assertj.core.api.Assertions.assertThat;
@@ -261,11 +263,16 @@ public class CompactorSourceITCase extends
AbstractTestBase {
StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder().streamingMode().build();
+ Predicate partitionPredicate =
+ createPartitionPredicate(
+ specifiedPartitions,
+ table.rowType(),
+ table.coreOptions().partitionDefaultName());
DataStreamSource<RowData> compactorSource =
new CompactorSourceBuilder("test", table)
.withContinuousMode(isStreaming)
.withEnv(env)
- .withPartitions(specifiedPartitions)
+ .withPartitionPredicate(partitionPredicate)
.build();
CloseableIterator<RowData> it = compactorSource.executeAndCollect();