This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0eb72780ee [Feat][Connector-v2][Iceberg]support filter conditions in
iceberg source (#9095)
0eb72780ee is described below
commit 0eb72780ee9a14eea0fcda0262b161efa04861c9
Author: litiliu <[email protected]>
AuthorDate: Mon Apr 21 10:16:05 2025 +0800
[Feat][Connector-v2][Iceberg]support filter conditions in iceberg source
(#9095)
Co-authored-by: litiliu <[email protected]>
---
docs/en/connector-v2/source/Iceberg.md | 40 +++---
.../seatunnel/iceberg/catalog/IcebergCatalog.java | 67 +++++++---
.../iceberg/config/IcebergSourceConfig.java | 6 +-
.../iceberg/config/IcebergSourceOptions.java | 3 +
.../iceberg/config/SourceTableConfig.java | 3 +-
.../source/enumerator/scan/IcebergScanContext.java | 21 +++-
.../seatunnel/iceberg/utils/ExpressionUtils.java | 46 +++++++
.../iceberg/utils/ExpressionUtilsTest.java | 140 +++++++++++++++++++++
.../e2e/connector/iceberg/IcebergSourceIT.java | 51 ++++++++
.../resources/iceberg/filter_iceberg_source.conf | 82 ++++++++++++
.../iceberg/filter_iceberg_source_tables.conf | 76 +++++++++++
11 files changed, 496 insertions(+), 39 deletions(-)
diff --git a/docs/en/connector-v2/source/Iceberg.md
b/docs/en/connector-v2/source/Iceberg.md
index f19c6077db..c620c5e95a 100644
--- a/docs/en/connector-v2/source/Iceberg.md
+++ b/docs/en/connector-v2/source/Iceberg.md
@@ -73,25 +73,27 @@ libfb303-xxx.jar
## Source Options
-| Name | Type | Required | Default |
Description
[...]
-|--------------------------|---------|----------|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| catalog_name | string | yes | - |
User-specified catalog name.
[...]
-| namespace | string | yes | - | The
iceberg database name in the backend catalog.
[...]
-| table | string | no | - | The
iceberg table name in the backend catalog.
[...]
-| table_list | string | no | - | The
iceberg table list in the backend catalog.
[...]
-| iceberg.catalog.config | map | yes | - |
Specify the properties for initializing the Iceberg catalog, which can be
referenced in this
file:https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java
[...]
-| hadoop.config | map | no | - |
Properties passed through to the Hadoop configuration
[...]
-| iceberg.hadoop-conf-path | string | no | - | The
specified loading paths for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files.
[...]
-| schema | config | no | - | Use
projection to select data columns and columns order.
[...]
-| case_sensitive | boolean | no | false | If
data columns where selected via schema [config], controls whether the match to
the schema will be done with case sensitivity.
[...]
-| start_snapshot_timestamp | long | no | - |
Instructs this scan to look for changes starting from the most recent snapshot
for the table as of the timestamp. <br/>timestamp – the timestamp in millis
since the Unix epoch
[...]
-| start_snapshot_id | long | no | - |
Instructs this scan to look for changes starting from a particular snapshot
(exclusive).
[...]
-| end_snapshot_id | long | no | - |
Instructs this scan to look for changes up to a particular snapshot
(inclusive).
[...]
-| use_snapshot_id | long | no | - |
Instructs this scan to look for use the given snapshot ID.
[...]
-| use_snapshot_timestamp | long | no | - |
Instructs this scan to look for use the most recent snapshot as of the given
time in milliseconds. timestamp – the timestamp in millis since the Unix epoch
[...]
+| Name | Type | Required | Default |
Description
[...]
+|--------------------------|---------|----------|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| catalog_name | string | yes | - |
User-specified catalog name.
[...]
+| namespace | string | yes | - | The
iceberg database name in the backend catalog.
[...]
+| table | string | no | - | The
iceberg table name in the backend catalog.
[...]
+| table_list | string | no | - | The
iceberg table list in the backend catalog.
[...]
+| iceberg.catalog.config | map | yes | - |
Specify the properties for initializing the Iceberg catalog, which can be
referenced in this
file:https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java
[...]
+| hadoop.config | map | no | - |
Properties passed through to the Hadoop configuration
[...]
+| iceberg.hadoop-conf-path | string | no | - | The
specified loading paths for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files.
[...]
+| schema | config | no | - | Use
projection to select data columns and columns order.
[...]
+| case_sensitive | boolean | no | false | If
data columns where selected via schema [config], controls whether the match to
the schema will be done with case sensitivity.
[...]
+| start_snapshot_timestamp | long | no | - |
Instructs this scan to look for changes starting from the most recent snapshot
for the table as of the timestamp. <br/>timestamp – the timestamp in millis
since the Unix epoch
[...]
+| start_snapshot_id | long | no | - |
Instructs this scan to look for changes starting from a particular snapshot
(exclusive).
[...]
+| end_snapshot_id | long | no | - |
Instructs this scan to look for changes up to a particular snapshot
(inclusive).
[...]
+| use_snapshot_id | long | no | - |
Instructs this scan to look for use the given snapshot ID.
[...]
+| use_snapshot_timestamp | long | no | - |
Instructs this scan to look for use the most recent snapshot as of the given
time in milliseconds. timestamp – the timestamp in millis since the Unix epoch
[...]
| stream_scan_strategy | enum | no | FROM_LATEST_SNAPSHOT |
Starting strategy for stream mode execution, Default to use
`FROM_LATEST_SNAPSHOT` if don’t specify any value,The optional values
are:<br/>TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to
the incremental mode.<br/>FROM_LATEST_SNAPSHOT: Start incremental mode from the
latest snapshot inclusive.<br/>FROM_EARLIEST_SNAPSHOT: Start incremental mode
from the earliest snapshot inclusive.<br/>FROM_SNAPSHO [...]
-| increment.scan-interval | long | no | 2000 | The
interval of increment scan(mills)
[...]
-| common-options | | no | - |
Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details.
[...]
+| increment.scan-interval | long | no | 2000 | The
interval of increment scan(mills)
[...]
+| common-options | | no | - |
Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details.
[...]
+| query | String | no | - | The
select DML to select the iceberg data. It mustn't contain the table name, and
doesn't support alias. For example: `select * from table where f1 > 100`,
`select fn from table where f1 > 100`. The current support for the LIKE syntax
is limited: the LIKE clause shouldn't start with `%`. The supported one is:
`select f1 from t where f2 like 'tom%' `
[...]
+
## Task Example
@@ -112,6 +114,7 @@ source {
}
namespace = "database1"
table = "source"
+ query = "select fn from table where f1 > 100"
plugin_output = "iceberg"
}
}
@@ -143,6 +146,7 @@ source {
},
{
table = "table_2
+ query = "select fn from table where f1 > 100"
}
]
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
index aa29a65829..9c096088fa 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
@@ -34,9 +34,12 @@ import
org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader;
import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCommonConfig;
+import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceTableConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.ExpressionUtils;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -46,6 +49,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Types;
import lombok.extern.slf4j.Slf4j;
@@ -57,6 +61,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
@@ -261,7 +266,7 @@ public class IcebergCatalog implements Catalog {
TableIdentifier icebergTableIdentifier =
toIcebergTableIdentifier(tablePath);
catalog.loadTable(icebergTableIdentifier)
.newDelete()
-
.deleteFromRowFilter(org.apache.iceberg.expressions.Expressions.alwaysTrue())
+ .deleteFromRowFilter(Expressions.alwaysTrue())
.commit();
log.info("Truncated table at path: {}", tablePath);
}
@@ -269,22 +274,34 @@ public class IcebergCatalog implements Catalog {
public CatalogTable toCatalogTable(Table icebergTable, TablePath
tablePath) {
Schema schema = icebergTable.schema();
List<Types.NestedField> columns = schema.columns();
+ List<String> selectColumns = getSelectColumns(tablePath);
TableSchema.Builder builder = TableSchema.builder();
- columns.forEach(
- nestedField -> {
- String name = nestedField.name();
- SeaTunnelDataType<?> seaTunnelType =
- SchemaUtils.toSeaTunnelType(name,
nestedField.type());
- PhysicalColumn physicalColumn =
- PhysicalColumn.of(
- name,
- seaTunnelType,
- (Long) null,
- nestedField.isOptional(),
- null,
- nestedField.doc());
- builder.column(physicalColumn);
- });
+ columns.stream()
+ .filter(
+ col -> {
+ if (CollectionUtils.isNotEmpty(selectColumns)) {
+ if ("*".equals(selectColumns.get(0))) {
+ return true;
+ }
+ return selectColumns.contains(col.name());
+ }
+ return true;
+ })
+ .forEach(
+ nestedField -> {
+ String name = nestedField.name();
+ SeaTunnelDataType<?> seaTunnelType =
+ SchemaUtils.toSeaTunnelType(name,
nestedField.type());
+ PhysicalColumn physicalColumn =
+ PhysicalColumn.of(
+ name,
+ seaTunnelType,
+ (Long) null,
+ nestedField.isOptional(),
+ null,
+ nestedField.doc());
+ builder.column(physicalColumn);
+ });
Optional.ofNullable(schema.identifierFieldNames())
.filter(names -> !names.isEmpty())
.map(
@@ -312,6 +329,24 @@ public class IcebergCatalog implements Catalog {
catalogName);
}
+ private List<String> getSelectColumns(TablePath tablePath) {
+ if
(Objects.nonNull(readonlyConfig.get(IcebergSourceOptions.KEY_TABLE))) {
+ return ExpressionUtils.parseSelectColumns(
+ readonlyConfig.get(IcebergSourceOptions.QUERY));
+ } else {
+ List<SourceTableConfig> tableConfigs =
+ readonlyConfig.get(IcebergSourceOptions.KEY_TABLE_LIST);
+ if (Objects.nonNull(tableConfigs)) {
+ for (SourceTableConfig config : tableConfigs) {
+ if (config.getTable().equals(tablePath.getTableName())) {
+ return
ExpressionUtils.parseSelectColumns(config.getQuery());
+ }
+ }
+ }
+ }
+ return null;
+ }
+
@Override
public PreviewResult previewAction(
ActionType actionType, TablePath tablePath, Optional<CatalogTable>
catalogTable) {
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceConfig.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceConfig.java
index 47495b082b..2ad3f528b9 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceConfig.java
@@ -41,7 +41,7 @@ public class IcebergSourceConfig extends IcebergCommonConfig {
this.incrementScanInterval =
readonlyConfig.get(IcebergSourceOptions.KEY_INCREMENT_SCAN_INTERVAL);
if (this.getTable() != null) {
- SourceTableConfig tableConfig =
+ SourceTableConfig.SourceTableConfigBuilder builder =
SourceTableConfig.builder()
.namespace(this.getNamespace())
.table(this.getTable())
@@ -60,7 +60,9 @@ public class IcebergSourceConfig extends IcebergCommonConfig {
.streamScanStrategy(
readonlyConfig.get(
IcebergSourceOptions.KEY_STREAM_SCAN_STRATEGY))
- .build();
+
.query(readonlyConfig.get(IcebergSourceOptions.QUERY));
+
+ SourceTableConfig tableConfig = builder.build();
this.tableList = Collections.singletonList(tableConfig);
} else {
this.tableList =
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java
index 755d1f6a3d..a5b501b2ba 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java
@@ -76,4 +76,7 @@ public class IcebergSourceOptions extends
IcebergCommonOptions {
.longType()
.defaultValue(2000L)
.withDescription(" the interval of increment scan(mills)");
+
+ public static final Option<String> QUERY =
+
Options.key("query").stringType().noDefaultValue().withDescription("the select
sql");
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceTableConfig.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceTableConfig.java
index bf22649302..0e8d8e7b1a 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceTableConfig.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceTableConfig.java
@@ -24,7 +24,6 @@ import
org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.expressions.Expression;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -50,7 +49,7 @@ public class SourceTableConfig implements Serializable {
private Long useSnapshotTimestamp;
private IcebergStreamScanStrategy streamScanStrategy =
KEY_STREAM_SCAN_STRATEGY.defaultValue();
- private Expression filter;
+ private String query;
private Long splitSize;
private Integer splitLookback;
private Long splitOpenFileCost;
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java
index 6c6bea685e..acda43579a 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java
@@ -20,17 +20,23 @@ package
org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan
import org.apache.seatunnel.api.table.catalog.TablePath;
import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceTableConfig;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.ExpressionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import net.sf.jsqlparser.JSQLParserException;
@Getter
@Builder(toBuilder = true)
@ToString
+@Slf4j
public class IcebergScanContext {
private final TablePath tablePath;
@@ -73,13 +79,26 @@ public class IcebergScanContext {
.useSnapshotTimestamp(tableConfig.getUseSnapshotTimestamp())
.caseSensitive(sourceConfig.isCaseSensitive())
.schema(schema)
- .filter(tableConfig.getFilter())
+ .filter(getFilter(tableConfig.getQuery()))
.splitSize(tableConfig.getSplitSize())
.splitLookback(tableConfig.getSplitLookback())
.splitOpenFileCost(tableConfig.getSplitOpenFileCost())
.build();
}
+ private static Expression getFilter(String selectStr) {
+ if (StringUtils.isNotBlank(selectStr)) {
+ try {
+ Expression expression =
+
ExpressionUtils.parseWhereClauseToIcebergExpression(selectStr);
+ return expression;
+ } catch (JSQLParserException e) {
+ log.error("Failed to parse where clause to iceberg
expression", e);
+ }
+ }
+ return Expressions.alwaysTrue();
+ }
+
public static IcebergScanContext streamScanContext(
IcebergSourceConfig sourceConfig, SourceTableConfig tableConfig,
Schema schema) {
return scanContext(sourceConfig, tableConfig, schema)
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtils.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtils.java
index 7f9b6a1027..e862c06b76 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtils.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtils.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.iceberg.utils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Types;
@@ -26,6 +28,7 @@ import lombok.SneakyThrows;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.expression.DoubleValue;
import net.sf.jsqlparser.expression.LongValue;
+import net.sf.jsqlparser.expression.NotExpression;
import net.sf.jsqlparser.expression.Parenthesis;
import net.sf.jsqlparser.expression.StringValue;
import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
@@ -37,6 +40,7 @@ import
net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
import net.sf.jsqlparser.expression.operators.relational.InExpression;
import net.sf.jsqlparser.expression.operators.relational.IsBooleanExpression;
import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
+import net.sf.jsqlparser.expression.operators.relational.LikeExpression;
import net.sf.jsqlparser.expression.operators.relational.MinorThan;
import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
@@ -44,6 +48,7 @@ import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.schema.Column;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.delete.Delete;
+import net.sf.jsqlparser.statement.select.PlainSelect;
import java.math.BigDecimal;
import java.time.LocalDate;
@@ -51,6 +56,7 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -66,6 +72,31 @@ public class ExpressionUtils {
.append(ISO_LOCAL_TIME)
.toFormatter();
+ public static List<String> parseSelectColumns(String selectQuery) {
+ if (StringUtils.isNotBlank(selectQuery)) {
+ try {
+ Statement statement = CCJSqlParserUtil.parse(selectQuery);
+ PlainSelect select = (PlainSelect) statement;
+ if (CollectionUtils.isNotEmpty(select.getSelectItems())) {
+ return select.getSelectItems().stream()
+ .map(selectItem -> selectItem.toString())
+ .collect(Collectors.toList());
+ }
+ } catch (JSQLParserException e) {
+ throw new RuntimeException("Failed to parse select columns: "
+ e.getMessage());
+ }
+ }
+ return new ArrayList<>();
+ }
+
+ public static Expression parseWhereClauseToIcebergExpression(String
selectQuery)
+ throws JSQLParserException {
+ // use the JsqlParser to parse the where clause
+ Statement statement = CCJSqlParserUtil.parse(selectQuery);
+ PlainSelect select = (PlainSelect) statement;
+ return convert(select.getWhere(), null);
+ }
+
public static Expression convertDeleteSQL(String sql) throws
JSQLParserException {
Statement statement = CCJSqlParserUtil.parse(sql);
Delete delete = (Delete) statement;
@@ -118,6 +149,10 @@ public class ExpressionUtils {
schema.findField(column.getColumnName()));
return Expressions.notEqual(column.getColumnName(), value);
}
+ if (condition instanceof NotExpression) {
+ NotExpression expr = (NotExpression) condition;
+ return Expressions.not(convert(expr.getExpression(), null));
+ }
if (condition instanceof GreaterThan) {
GreaterThan greaterThan = (GreaterThan) condition;
Column column = (Column) greaterThan.getLeftExpression();
@@ -199,6 +234,17 @@ public class ExpressionUtils {
}
return Expressions.equal(column.getColumnName(),
booleanExpression.isTrue());
}
+ if (condition instanceof LikeExpression) {
+ LikeExpression expr = (LikeExpression) condition;
+ String columnName = ((Column)
expr.getLeftExpression()).getColumnName();
+ String value = ((StringValue)
expr.getRightExpression()).getValue();
+ LikeExpression.KeyWord keyWord = expr.getLikeKeyWord();
+ if (keyWord == LikeExpression.KeyWord.LIKE) {
+ return Expressions.startsWith(columnName, value);
+ } else {
+ throw new UnsupportedOperationException("Unsupported like
keyword: " + keyWord);
+ }
+ }
throw new UnsupportedOperationException(
"Unsupported condition: " + condition.getClass().getName());
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtilsTest.java
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtilsTest.java
index a2b8658390..39f3728b94 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtilsTest.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtilsTest.java
@@ -30,6 +30,10 @@ import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.delete.Delete;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
public class ExpressionUtilsTest {
@Test
@@ -128,4 +132,140 @@ public class ExpressionUtilsTest {
.toString(),
expression.toString());
}
+
+ @Test
+ public void testSimpleConditions() throws Exception {
+ // test integer comparison
+ String whereClause1 = "SELECT * FROM t WHERE age = 30";
+ Expression expr1 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause1);
+ assertEquals(Expressions.equal("age", 30).toString(),
expr1.toString());
+
+ // test string comparison
+ String whereClause2 = "SELECT * FROM t WHERE name = 'John'";
+ Expression expr2 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause2);
+ assertEquals(Expressions.equal("name", "John").toString(),
expr2.toString());
+
+ // test float comparison
+ String whereClause3 = "SELECT * FROM t WHERE salary > 50000.5";
+ Expression expr3 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause3);
+ assertEquals(Expressions.greaterThan("salary", 50000.5).toString(),
expr3.toString());
+
+ // test boolean comparison
+ String whereClause4 = "SELECT * FROM t WHERE is_active is true";
+ Expression expr4 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause4);
+ assertEquals(Expressions.equal("is_active", true).toString(),
expr4.toString());
+ }
+
+ @Test
+ public void testLogicalCombinations() throws Exception {
+ // test AND
+ String whereClause1 = "SELECT * FROM t WHERE age > 30 AND name =
'John'";
+ Expression expr1 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause1);
+ assertEquals(
+ Expressions.and(
+ Expressions.greaterThan("age", 30),
+ Expressions.equal("name", "John"))
+ .toString(),
+ expr1.toString());
+
+ // OR
+ String whereClause2 = "SELECT * FROM t WHERE salary < 50000 OR
is_active is true";
+ Expression expr2 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause2);
+ assertEquals(
+ Expressions.or(
+ Expressions.lessThan("salary", 50000),
+ Expressions.equal("is_active", true))
+ .toString(),
+ expr2.toString());
+
+ // test combination of AND and OR
+ String whereClause3 =
+ "SELECT * FROM t WHERE (age > 30 AND name = 'John') OR salary
< 50000";
+ Expression expr3 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause3);
+ assertEquals(
+ Expressions.or(
+ Expressions.and(
+ Expressions.greaterThan("age", 30),
+ Expressions.equal("name", "John")),
+ Expressions.lessThan("salary", 50000))
+ .toString(),
+ expr3.toString());
+ }
+
+ @Test
+ public void testComplexNestedExpressions() throws Exception {
+ // test nested AND and OR
+ String whereClause1 =
+ "SELECT * FROM t WHERE ((age > 30 AND name = 'John') OR salary
< 50000) AND is_active is true";
+ Expression expr1 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause1);
+ assertEquals(
+ Expressions.and(
+ Expressions.or(
+ Expressions.and(
+ Expressions.greaterThan("age",
30),
+ Expressions.equal("name",
"John")),
+ Expressions.lessThan("salary", 50000)),
+ Expressions.equal("is_active", true))
+ .toString(),
+ expr1.toString());
+
+ // test nested AND and OR with multiple levels
+ String whereClause2 =
+ "SELECT * FROM t WHERE age > 30 AND (name = 'John' OR (salary
< 50000 AND is_active is true))";
+ Expression expr2 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause2);
+ assertEquals(
+ Expressions.and(
+ Expressions.greaterThan("age", 30),
+ Expressions.or(
+ Expressions.equal("name", "John"),
+ Expressions.and(
+ Expressions.lessThan("salary",
50000),
+ Expressions.equal("is_active",
true))))
+ .toString(),
+ expr2.toString());
+ }
+
+ @Test
+ public void testSpecialScenarios() throws Exception {
+ // IS NULL
+ String whereClause1 = "SELECT * FROM t WHERE name IS NULL";
+ Expression expr1 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause1);
+ assertEquals(Expressions.isNull("name").toString(), expr1.toString());
+
+ // IS NOT NULL
+ String whereClause2 = "SELECT * FROM t WHERE name IS NOT NULL";
+ Expression expr2 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause2);
+ assertEquals(Expressions.notNull("name").toString(), expr2.toString());
+
+ // NOT
+ String whereClause3 = "SELECT * FROM t WHERE NOT (age > 30)";
+ Expression expr3 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause3);
+ assertEquals(
+ Expressions.not(Expressions.greaterThan("age",
30)).toString(), expr3.toString());
+
+ // IN
+ String whereClause4 = "SELECT * FROM t WHERE age IN (30, 40, 50)";
+ Expression expr4 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause4);
+ assertEquals(Expressions.in("age", new Object[] {30, 40,
50}).toString(), expr4.toString());
+
+ // start with
+ String whereClause5 = "SELECT * FROM t WHERE name LIKE 'John%'";
+ Expression expr5 =
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause5);
+ assertEquals(Expressions.startsWith("name", "John%").toString(),
expr5.toString());
+ }
+
+ @Test
+ void parseSelectColumns() {
+ String sql = "SELECT id, name, age FROM test.a";
+ List<String> columns = ExpressionUtils.parseSelectColumns(sql);
+ assertEquals(3, columns.size());
+ assertEquals("id", columns.get(0));
+ assertEquals("name", columns.get(1));
+ assertEquals("age", columns.get(2));
+
+ sql = "SELECT * FROM test.a";
+ columns = ExpressionUtils.parseSelectColumns(sql);
+ assertEquals(1, columns.size());
+ assertEquals("*", columns.get(0));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java
index 196ab168f5..476a42b20e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java
@@ -48,6 +48,7 @@ import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
@@ -59,6 +60,11 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@@ -130,6 +136,35 @@ public class IcebergSourceIT extends TestSuiteBase
implements TestResource {
@Override
public void tearDown() throws Exception {}
+ @AfterEach
+ public void clean() {
+ // clean the catalog dir
+ Path catalogPath = Paths.get(CATALOG_DIR);
+ if (java.nio.file.Files.exists(catalogPath)) {
+ try {
+ java.nio.file.Files.walkFileTree(
+ catalogPath,
+ new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file,
BasicFileAttributes attrs)
+ throws IOException {
+ java.nio.file.Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path
dir, IOException exc)
+ throws IOException {
+ java.nio.file.Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
@TestTemplate
public void testIcebergSource(TestContainer container)
throws IOException, InterruptedException {
@@ -137,6 +172,22 @@ public class IcebergSourceIT extends TestSuiteBase
implements TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ @TestTemplate
+ public void testFilterIcebergSourceSingleTable(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/iceberg/filter_iceberg_source.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ @TestTemplate
+ public void testFilterIcebergSourceTables(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/iceberg/filter_iceberg_source_tables.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
private void initializeIcebergTable() {
Map<String, Object> configs = new HashMap<>();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source.conf
new file mode 100644
index 0000000000..8a6339cbff
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ Iceberg {
+ catalog_name = "seatunnel"
+ iceberg.catalog.config={
+ "type"="hadoop"
+ "warehouse"="file:///tmp/seatunnel/iceberg/hadoop/"
+ }
+ namespace = "database1"
+ table = "source"
+ plugin_output = "iceberg"
+ query = "select f1, f2 from t where f1 = 10"
+ }
+}
+
+transform {
+}
+
+sink {
+ Assert {
+ plugin_input = "iceberg"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ }
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ]
+ field_rules = [
+ {
+ field_name = f1
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 10
+ }
+ ]
+ },
+ {
+ field_name = f2
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = true
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source_tables.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source_tables.conf
new file mode 100644
index 0000000000..47995d3c25
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source_tables.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ Iceberg {
+ catalog_name = "seatunnel"
+ iceberg.catalog.config={
+ "type"="hadoop"
+ "warehouse"="file:///tmp/seatunnel/iceberg/hadoop/"
+ }
+ namespace = "database1"
+ table_list = [
+ {
+ table = "source"
+ query = "select f1, f16 from t where f1 = 10"
+ }
+ ]
+ plugin_output = "iceberg"
+ }
+}
+
+transform {
+}
+
+sink {
+ Assert {
+ plugin_input = "iceberg"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ }
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ]
+ field_rules = [
+ {
+ field_name = f1
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 10
+ }
+ ]
+ }
+ ]
+ }
+ }
+}