This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0eb72780ee [Feat][Connector-v2][Iceberg]support filter conditions in 
iceberg source (#9095)
0eb72780ee is described below

commit 0eb72780ee9a14eea0fcda0262b161efa04861c9
Author: litiliu <[email protected]>
AuthorDate: Mon Apr 21 10:16:05 2025 +0800

    [Feat][Connector-v2][Iceberg]support filter conditions in iceberg source 
(#9095)
    
    Co-authored-by: litiliu <[email protected]>
---
 docs/en/connector-v2/source/Iceberg.md             |  40 +++---
 .../seatunnel/iceberg/catalog/IcebergCatalog.java  |  67 +++++++---
 .../iceberg/config/IcebergSourceConfig.java        |   6 +-
 .../iceberg/config/IcebergSourceOptions.java       |   3 +
 .../iceberg/config/SourceTableConfig.java          |   3 +-
 .../source/enumerator/scan/IcebergScanContext.java |  21 +++-
 .../seatunnel/iceberg/utils/ExpressionUtils.java   |  46 +++++++
 .../iceberg/utils/ExpressionUtilsTest.java         | 140 +++++++++++++++++++++
 .../e2e/connector/iceberg/IcebergSourceIT.java     |  51 ++++++++
 .../resources/iceberg/filter_iceberg_source.conf   |  82 ++++++++++++
 .../iceberg/filter_iceberg_source_tables.conf      |  76 +++++++++++
 11 files changed, 496 insertions(+), 39 deletions(-)

diff --git a/docs/en/connector-v2/source/Iceberg.md 
b/docs/en/connector-v2/source/Iceberg.md
index f19c6077db..c620c5e95a 100644
--- a/docs/en/connector-v2/source/Iceberg.md
+++ b/docs/en/connector-v2/source/Iceberg.md
@@ -73,25 +73,27 @@ libfb303-xxx.jar
 
 ## Source Options
 
-| Name                     | Type    | Required | Default              | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
-|--------------------------|---------|----------|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| catalog_name             | string  | yes      | -                    | 
User-specified catalog name.                                                    
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
-| namespace                | string  | yes      | -                    | The 
iceberg database name in the backend catalog.                                   
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
-| table                    | string  | no       | -                    | The 
iceberg table name in the backend catalog.                                      
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
-| table_list               | string  | no       | -                    | The 
iceberg table list in the backend catalog.                                      
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
-| iceberg.catalog.config   | map     | yes      | -                    | 
Specify the properties for initializing the Iceberg catalog, which can be 
referenced in this 
file:https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java
                                                                                
                                                                                
                                                          [...]
-| hadoop.config            | map     | no       | -                    | 
Properties passed through to the Hadoop configuration                           
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
-| iceberg.hadoop-conf-path | string  | no       | -                    | The 
specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files.                                                          
                                                                                
                                                                                
                                                                                
                              [...]
-| schema                   | config  | no       | -                    | Use 
projection to select data columns and columns order.                            
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
-| case_sensitive           | boolean | no       | false                | If 
data columns where selected via schema [config], controls whether the match to 
the schema will be done with case sensitivity.                                  
                                                                                
                                                                                
                                                                                
                  [...]
-| start_snapshot_timestamp | long    | no       | -                    | 
Instructs this scan to look for changes starting from  the most recent snapshot 
for the table as of the timestamp. <br/>timestamp – the timestamp in millis 
since the Unix epoch                                                            
                                                                                
                                                                                
                        [...]
-| start_snapshot_id        | long    | no       | -                    | 
Instructs this scan to look for changes starting from a particular snapshot 
(exclusive).                                                                    
                                                                                
                                                                                
                                                                                
                        [...]
-| end_snapshot_id          | long    | no       | -                    | 
Instructs this scan to look for changes up to a particular snapshot 
(inclusive).                                                                    
                                                                                
                                                                                
                                                                                
                                [...]
-| use_snapshot_id          | long    | no       | -                    | 
Instructs this scan to look for use the given snapshot ID.                      
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
-| use_snapshot_timestamp   | long    | no       | -                    | 
Instructs this scan to look for use the most recent snapshot as of the given 
time in milliseconds. timestamp – the timestamp in millis since the Unix epoch  
                                                                                
                                                                                
                                                                                
                       [...]
+| Name                     | Type    | Required | Default              | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
+|--------------------------|---------|----------|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| catalog_name             | string  | yes      | -                    | 
User-specified catalog name.                                                    
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
+| namespace                | string  | yes      | -                    | The 
iceberg database name in the backend catalog.                                   
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| table                    | string  | no       | -                    | The 
iceberg table name in the backend catalog.                                      
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| table_list               | string  | no       | -                    | The 
iceberg table list in the backend catalog.                                      
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| iceberg.catalog.config   | map     | yes      | -                    | 
Specify the properties for initializing the Iceberg catalog, which can be 
referenced in this 
file:https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java
                                                                                
                                                                                
                                                          [...]
+| hadoop.config            | map     | no       | -                    | 
Properties passed through to the Hadoop configuration                           
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
+| iceberg.hadoop-conf-path | string  | no       | -                    | The 
specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files.                                                          
                                                                                
                                                                                
                                                                                
                              [...]
+| schema                   | config  | no       | -                    | Use 
projection to select data columns and columns order.                            
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| case_sensitive           | boolean | no       | false                | If 
data columns where selected via schema [config], controls whether the match to 
the schema will be done with case sensitivity.                                  
                                                                                
                                                                                
                                                                                
                  [...]
+| start_snapshot_timestamp | long    | no       | -                    | 
Instructs this scan to look for changes starting from  the most recent snapshot 
for the table as of the timestamp. <br/>timestamp – the timestamp in millis 
since the Unix epoch                                                            
                                                                                
                                                                                
                        [...]
+| start_snapshot_id        | long    | no       | -                    | 
Instructs this scan to look for changes starting from a particular snapshot 
(exclusive).                                                                    
                                                                                
                                                                                
                                                                                
                        [...]
+| end_snapshot_id          | long    | no       | -                    | 
Instructs this scan to look for changes up to a particular snapshot 
(inclusive).                                                                    
                                                                                
                                                                                
                                                                                
                                [...]
+| use_snapshot_id          | long    | no       | -                    | 
Instructs this scan to look for use the given snapshot ID.                      
                                                                                
                                                                                
                                                                                
                                                                                
                    [...]
+| use_snapshot_timestamp   | long    | no       | -                    | 
Instructs this scan to look for use the most recent snapshot as of the given 
time in milliseconds. timestamp – the timestamp in millis since the Unix epoch  
                                                                                
                                                                                
                                                                                
                       [...]
 | stream_scan_strategy     | enum    | no       | FROM_LATEST_SNAPSHOT | 
Starting strategy for stream mode execution, Default to use 
`FROM_LATEST_SNAPSHOT` if don’t specify any value,The optional values 
are:<br/>TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to 
the incremental mode.<br/>FROM_LATEST_SNAPSHOT: Start incremental mode from the 
latest snapshot inclusive.<br/>FROM_EARLIEST_SNAPSHOT: Start incremental mode 
from the earliest snapshot inclusive.<br/>FROM_SNAPSHO [...]
-| increment.scan-interval  | long    | no       | 2000                 | The 
interval of increment scan(mills)                                               
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
-| common-options           |         | no       | -                    | 
Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.                              
                                                                                
                                                                                
                                                                                
                                    [...]
+| increment.scan-interval  | long    | no       | 2000                 | The 
interval of increment scan(mills)                                               
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| common-options           |         | no       | -                    | 
Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.                              
                                                                                
                                                                                
                                                                                
                                    [...]
+| query                    | String  | no       | -                    | The 
select DML to select the iceberg data. It mustn't contain the table name, and 
doesn't support alias. For example: `select * from table where f1 > 100`, 
`select fn from table where f1 > 100`. The current support for the LIKE syntax 
is limited: the LIKE clause shouldn't start with `%`. The supported one is: 
`select f1 from t where f2 like 'tom%'  `                                       
                             [...]
+
 
 ## Task Example
 
@@ -112,6 +114,7 @@ source {
     }
     namespace = "database1"
     table = "source"
+    query = "select fn from table where f1 > 100"
     plugin_output = "iceberg"
   }
 }
@@ -143,6 +146,7 @@ source {
       },
       {
         table = "table_2
+        query = "select fn from table where f1 > 100"
       }
     ]
     
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
index aa29a65829..9c096088fa 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
@@ -34,9 +34,12 @@ import 
org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCommonConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceTableConfig;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.ExpressionUtils;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -46,6 +49,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.types.Types;
 
 import lombok.extern.slf4j.Slf4j;
@@ -57,6 +61,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
@@ -261,7 +266,7 @@ public class IcebergCatalog implements Catalog {
         TableIdentifier icebergTableIdentifier = 
toIcebergTableIdentifier(tablePath);
         catalog.loadTable(icebergTableIdentifier)
                 .newDelete()
-                
.deleteFromRowFilter(org.apache.iceberg.expressions.Expressions.alwaysTrue())
+                .deleteFromRowFilter(Expressions.alwaysTrue())
                 .commit();
         log.info("Truncated table at path: {}", tablePath);
     }
@@ -269,22 +274,34 @@ public class IcebergCatalog implements Catalog {
     public CatalogTable toCatalogTable(Table icebergTable, TablePath 
tablePath) {
         Schema schema = icebergTable.schema();
         List<Types.NestedField> columns = schema.columns();
+        List<String> selectColumns = getSelectColumns(tablePath);
         TableSchema.Builder builder = TableSchema.builder();
-        columns.forEach(
-                nestedField -> {
-                    String name = nestedField.name();
-                    SeaTunnelDataType<?> seaTunnelType =
-                            SchemaUtils.toSeaTunnelType(name, 
nestedField.type());
-                    PhysicalColumn physicalColumn =
-                            PhysicalColumn.of(
-                                    name,
-                                    seaTunnelType,
-                                    (Long) null,
-                                    nestedField.isOptional(),
-                                    null,
-                                    nestedField.doc());
-                    builder.column(physicalColumn);
-                });
+        columns.stream()
+                .filter(
+                        col -> {
+                            if (CollectionUtils.isNotEmpty(selectColumns)) {
+                                if ("*".equals(selectColumns.get(0))) {
+                                    return true;
+                                }
+                                return selectColumns.contains(col.name());
+                            }
+                            return true;
+                        })
+                .forEach(
+                        nestedField -> {
+                            String name = nestedField.name();
+                            SeaTunnelDataType<?> seaTunnelType =
+                                    SchemaUtils.toSeaTunnelType(name, 
nestedField.type());
+                            PhysicalColumn physicalColumn =
+                                    PhysicalColumn.of(
+                                            name,
+                                            seaTunnelType,
+                                            (Long) null,
+                                            nestedField.isOptional(),
+                                            null,
+                                            nestedField.doc());
+                            builder.column(physicalColumn);
+                        });
         Optional.ofNullable(schema.identifierFieldNames())
                 .filter(names -> !names.isEmpty())
                 .map(
@@ -312,6 +329,24 @@ public class IcebergCatalog implements Catalog {
                 catalogName);
     }
 
+    private List<String> getSelectColumns(TablePath tablePath) {
+        if 
(Objects.nonNull(readonlyConfig.get(IcebergSourceOptions.KEY_TABLE))) {
+            return ExpressionUtils.parseSelectColumns(
+                    readonlyConfig.get(IcebergSourceOptions.QUERY));
+        } else {
+            List<SourceTableConfig> tableConfigs =
+                    readonlyConfig.get(IcebergSourceOptions.KEY_TABLE_LIST);
+            if (Objects.nonNull(tableConfigs)) {
+                for (SourceTableConfig config : tableConfigs) {
+                    if (config.getTable().equals(tablePath.getTableName())) {
+                        return 
ExpressionUtils.parseSelectColumns(config.getQuery());
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
     @Override
     public PreviewResult previewAction(
             ActionType actionType, TablePath tablePath, Optional<CatalogTable> 
catalogTable) {
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceConfig.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceConfig.java
index 47495b082b..2ad3f528b9 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceConfig.java
@@ -41,7 +41,7 @@ public class IcebergSourceConfig extends IcebergCommonConfig {
         this.incrementScanInterval =
                 
readonlyConfig.get(IcebergSourceOptions.KEY_INCREMENT_SCAN_INTERVAL);
         if (this.getTable() != null) {
-            SourceTableConfig tableConfig =
+            SourceTableConfig.SourceTableConfigBuilder builder =
                     SourceTableConfig.builder()
                             .namespace(this.getNamespace())
                             .table(this.getTable())
@@ -60,7 +60,9 @@ public class IcebergSourceConfig extends IcebergCommonConfig {
                             .streamScanStrategy(
                                     readonlyConfig.get(
                                             
IcebergSourceOptions.KEY_STREAM_SCAN_STRATEGY))
-                            .build();
+                            
.query(readonlyConfig.get(IcebergSourceOptions.QUERY));
+
+            SourceTableConfig tableConfig = builder.build();
             this.tableList = Collections.singletonList(tableConfig);
         } else {
             this.tableList =
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java
index 755d1f6a3d..a5b501b2ba 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java
@@ -76,4 +76,7 @@ public class IcebergSourceOptions extends 
IcebergCommonOptions {
                     .longType()
                     .defaultValue(2000L)
                     .withDescription(" the interval of increment scan(mills)");
+
+    public static final Option<String> QUERY =
+            
Options.key("query").stringType().noDefaultValue().withDescription("the select 
sql");
 }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceTableConfig.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceTableConfig.java
index bf22649302..0e8d8e7b1a 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceTableConfig.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SourceTableConfig.java
@@ -24,7 +24,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan.
 import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;
 
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.expressions.Expression;
 
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -50,7 +49,7 @@ public class SourceTableConfig implements Serializable {
     private Long useSnapshotTimestamp;
 
     private IcebergStreamScanStrategy streamScanStrategy = 
KEY_STREAM_SCAN_STRATEGY.defaultValue();
-    private Expression filter;
+    private String query;
     private Long splitSize;
     private Integer splitLookback;
     private Long splitOpenFileCost;
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java
index 6c6bea685e..acda43579a 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanContext.java
@@ -20,17 +20,23 @@ package 
org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceTableConfig;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.ExpressionUtils;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 
 import lombok.Builder;
 import lombok.Getter;
 import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import net.sf.jsqlparser.JSQLParserException;
 
 @Getter
 @Builder(toBuilder = true)
 @ToString
+@Slf4j
 public class IcebergScanContext {
 
     private final TablePath tablePath;
@@ -73,13 +79,26 @@ public class IcebergScanContext {
                 .useSnapshotTimestamp(tableConfig.getUseSnapshotTimestamp())
                 .caseSensitive(sourceConfig.isCaseSensitive())
                 .schema(schema)
-                .filter(tableConfig.getFilter())
+                .filter(getFilter(tableConfig.getQuery()))
                 .splitSize(tableConfig.getSplitSize())
                 .splitLookback(tableConfig.getSplitLookback())
                 .splitOpenFileCost(tableConfig.getSplitOpenFileCost())
                 .build();
     }
 
+    private static Expression getFilter(String selectStr) {
+        if (StringUtils.isNotBlank(selectStr)) {
+            try {
+                Expression expression =
+                        
ExpressionUtils.parseWhereClauseToIcebergExpression(selectStr);
+                return expression;
+            } catch (JSQLParserException e) {
+                log.error("Failed to parse where clause to iceberg 
expression", e);
+            }
+        }
+        return Expressions.alwaysTrue();
+    }
+
     public static IcebergScanContext streamScanContext(
             IcebergSourceConfig sourceConfig, SourceTableConfig tableConfig, 
Schema schema) {
         return scanContext(sourceConfig, tableConfig, schema)
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtils.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtils.java
index 7f9b6a1027..e862c06b76 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtils.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.iceberg.utils;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.types.Types;
@@ -26,6 +28,7 @@ import lombok.SneakyThrows;
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.expression.DoubleValue;
 import net.sf.jsqlparser.expression.LongValue;
+import net.sf.jsqlparser.expression.NotExpression;
 import net.sf.jsqlparser.expression.Parenthesis;
 import net.sf.jsqlparser.expression.StringValue;
 import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
@@ -37,6 +40,7 @@ import 
net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
 import net.sf.jsqlparser.expression.operators.relational.InExpression;
 import net.sf.jsqlparser.expression.operators.relational.IsBooleanExpression;
 import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
+import net.sf.jsqlparser.expression.operators.relational.LikeExpression;
 import net.sf.jsqlparser.expression.operators.relational.MinorThan;
 import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
 import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
@@ -44,6 +48,7 @@ import net.sf.jsqlparser.parser.CCJSqlParserUtil;
 import net.sf.jsqlparser.schema.Column;
 import net.sf.jsqlparser.statement.Statement;
 import net.sf.jsqlparser.statement.delete.Delete;
+import net.sf.jsqlparser.statement.select.PlainSelect;
 
 import java.math.BigDecimal;
 import java.time.LocalDate;
@@ -51,6 +56,7 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -66,6 +72,31 @@ public class ExpressionUtils {
                     .append(ISO_LOCAL_TIME)
                     .toFormatter();
 
+    public static List<String> parseSelectColumns(String selectQuery) {
+        if (StringUtils.isNotBlank(selectQuery)) {
+            try {
+                Statement statement = CCJSqlParserUtil.parse(selectQuery);
+                PlainSelect select = (PlainSelect) statement;
+                if (CollectionUtils.isNotEmpty(select.getSelectItems())) {
+                    return select.getSelectItems().stream()
+                            .map(selectItem -> selectItem.toString())
+                            .collect(Collectors.toList());
+                }
+            } catch (JSQLParserException e) {
+                throw new RuntimeException("Failed to parse select columns: " 
+ e.getMessage());
+            }
+        }
+        return new ArrayList<>();
+    }
+
+    public static Expression parseWhereClauseToIcebergExpression(String 
selectQuery)
+            throws JSQLParserException {
+        // use the JsqlParser to parse the where clause
+        Statement statement = CCJSqlParserUtil.parse(selectQuery);
+        PlainSelect select = (PlainSelect) statement;
+        return convert(select.getWhere(), null);
+    }
+
     public static Expression convertDeleteSQL(String sql) throws 
JSQLParserException {
         Statement statement = CCJSqlParserUtil.parse(sql);
         Delete delete = (Delete) statement;
@@ -118,6 +149,10 @@ public class ExpressionUtils {
                                     schema.findField(column.getColumnName()));
             return Expressions.notEqual(column.getColumnName(), value);
         }
+        if (condition instanceof NotExpression) {
+            NotExpression expr = (NotExpression) condition;
+            return Expressions.not(convert(expr.getExpression(), null));
+        }
         if (condition instanceof GreaterThan) {
             GreaterThan greaterThan = (GreaterThan) condition;
             Column column = (Column) greaterThan.getLeftExpression();
@@ -199,6 +234,17 @@ public class ExpressionUtils {
             }
             return Expressions.equal(column.getColumnName(), 
booleanExpression.isTrue());
         }
+        if (condition instanceof LikeExpression) {
+            LikeExpression expr = (LikeExpression) condition;
+            String columnName = ((Column) 
expr.getLeftExpression()).getColumnName();
+            String value = ((StringValue) 
expr.getRightExpression()).getValue();
+            LikeExpression.KeyWord keyWord = expr.getLikeKeyWord();
+            if (keyWord == LikeExpression.KeyWord.LIKE) {
+                return Expressions.startsWith(columnName, value);
+            } else {
+                throw new UnsupportedOperationException("Unsupported like 
keyword: " + keyWord);
+            }
+        }
 
         throw new UnsupportedOperationException(
                 "Unsupported condition: " + condition.getClass().getName());
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtilsTest.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtilsTest.java
index a2b8658390..39f3728b94 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtilsTest.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/ExpressionUtilsTest.java
@@ -30,6 +30,10 @@ import net.sf.jsqlparser.parser.CCJSqlParserUtil;
 import net.sf.jsqlparser.statement.Statement;
 import net.sf.jsqlparser.statement.delete.Delete;
 
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 public class ExpressionUtilsTest {
 
     @Test
@@ -128,4 +132,140 @@ public class ExpressionUtilsTest {
                         .toString(),
                 expression.toString());
     }
+
+    @Test
+    public void testSimpleConditions() throws Exception {
+        // test integer comparison
+        String whereClause1 = "SELECT * FROM t WHERE  age = 30";
+        Expression expr1 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause1);
+        assertEquals(Expressions.equal("age", 30).toString(), 
expr1.toString());
+
+        // test string comparison
+        String whereClause2 = "SELECT * FROM t WHERE name = 'John'";
+        Expression expr2 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause2);
+        assertEquals(Expressions.equal("name", "John").toString(), 
expr2.toString());
+
+        // test float comparison
+        String whereClause3 = "SELECT * FROM t WHERE salary > 50000.5";
+        Expression expr3 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause3);
+        assertEquals(Expressions.greaterThan("salary", 50000.5).toString(), 
expr3.toString());
+
+        // test boolean comparison
+        String whereClause4 = "SELECT * FROM t WHERE is_active is true";
+        Expression expr4 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause4);
+        assertEquals(Expressions.equal("is_active", true).toString(), 
expr4.toString());
+    }
+
+    @Test
+    public void testLogicalCombinations() throws Exception {
+        // test AND
+        String whereClause1 = "SELECT * FROM t WHERE age > 30 AND name = 
'John'";
+        Expression expr1 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause1);
+        assertEquals(
+                Expressions.and(
+                                Expressions.greaterThan("age", 30),
+                                Expressions.equal("name", "John"))
+                        .toString(),
+                expr1.toString());
+
+        // OR
+        String whereClause2 = "SELECT * FROM t WHERE salary < 50000 OR 
is_active is true";
+        Expression expr2 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause2);
+        assertEquals(
+                Expressions.or(
+                                Expressions.lessThan("salary", 50000),
+                                Expressions.equal("is_active", true))
+                        .toString(),
+                expr2.toString());
+
+        // test combination of AND and OR
+        String whereClause3 =
+                "SELECT * FROM t WHERE (age > 30 AND name = 'John') OR salary 
< 50000";
+        Expression expr3 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause3);
+        assertEquals(
+                Expressions.or(
+                                Expressions.and(
+                                        Expressions.greaterThan("age", 30),
+                                        Expressions.equal("name", "John")),
+                                Expressions.lessThan("salary", 50000))
+                        .toString(),
+                expr3.toString());
+    }
+
+    @Test
+    public void testComplexNestedExpressions() throws Exception {
+        // test nested AND and OR
+        String whereClause1 =
+                "SELECT * FROM t WHERE ((age > 30 AND name = 'John') OR salary 
< 50000) AND is_active is true";
+        Expression expr1 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause1);
+        assertEquals(
+                Expressions.and(
+                                Expressions.or(
+                                        Expressions.and(
+                                                Expressions.greaterThan("age", 
30),
+                                                Expressions.equal("name", 
"John")),
+                                        Expressions.lessThan("salary", 50000)),
+                                Expressions.equal("is_active", true))
+                        .toString(),
+                expr1.toString());
+
+        // test nested AND and OR with multiple levels
+        String whereClause2 =
+                "SELECT * FROM t WHERE age > 30 AND (name = 'John' OR (salary 
< 50000 AND is_active is true))";
+        Expression expr2 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause2);
+        assertEquals(
+                Expressions.and(
+                                Expressions.greaterThan("age", 30),
+                                Expressions.or(
+                                        Expressions.equal("name", "John"),
+                                        Expressions.and(
+                                                Expressions.lessThan("salary", 
50000),
+                                                Expressions.equal("is_active", 
true))))
+                        .toString(),
+                expr2.toString());
+    }
+
+    @Test
+    public void testSpecialScenarios() throws Exception {
+        // IS NULL
+        String whereClause1 = "SELECT * FROM t WHERE name IS NULL";
+        Expression expr1 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause1);
+        assertEquals(Expressions.isNull("name").toString(), expr1.toString());
+
+        // IS NOT NULL
+        String whereClause2 = "SELECT * FROM t WHERE name IS NOT NULL";
+        Expression expr2 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause2);
+        assertEquals(Expressions.notNull("name").toString(), expr2.toString());
+
+        // NOT
+        String whereClause3 = "SELECT * FROM t WHERE NOT (age > 30)";
+        Expression expr3 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause3);
+        assertEquals(
+                Expressions.not(Expressions.greaterThan("age", 
30)).toString(), expr3.toString());
+
+        // IN
+        String whereClause4 = "SELECT * FROM t WHERE age IN (30, 40, 50)";
+        Expression expr4 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause4);
+        assertEquals(Expressions.in("age", new Object[] {30, 40, 
50}).toString(), expr4.toString());
+
+        // start with
+        String whereClause5 = "SELECT * FROM t WHERE name LIKE 'John%'";
+        Expression expr5 = 
ExpressionUtils.parseWhereClauseToIcebergExpression(whereClause5);
+        assertEquals(Expressions.startsWith("name", "John%").toString(), 
expr5.toString());
+    }
+
+    @Test
+    void parseSelectColumns() {
+        String sql = "SELECT id, name, age FROM test.a";
+        List<String> columns = ExpressionUtils.parseSelectColumns(sql);
+        assertEquals(3, columns.size());
+        assertEquals("id", columns.get(0));
+        assertEquals("name", columns.get(1));
+        assertEquals("age", columns.get(2));
+
+        sql = "SELECT * FROM test.a";
+        columns = ExpressionUtils.parseSelectColumns(sql);
+        assertEquals(1, columns.size());
+        assertEquals("*", columns.get(0));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java
index 196ab168f5..476a42b20e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java
@@ -48,6 +48,7 @@ import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.types.Types;
 
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
@@ -59,6 +60,11 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
@@ -130,6 +136,35 @@ public class IcebergSourceIT extends TestSuiteBase 
implements TestResource {
     @Override
     public void tearDown() throws Exception {}
 
+    @AfterEach
+    public void clean() {
+        // clean the catalog dir
+        Path catalogPath = Paths.get(CATALOG_DIR);
+        if (java.nio.file.Files.exists(catalogPath)) {
+            try {
+                java.nio.file.Files.walkFileTree(
+                        catalogPath,
+                        new SimpleFileVisitor<Path>() {
+                            @Override
+                            public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs)
+                                    throws IOException {
+                                java.nio.file.Files.delete(file);
+                                return FileVisitResult.CONTINUE;
+                            }
+
+                            @Override
+                            public FileVisitResult postVisitDirectory(Path 
dir, IOException exc)
+                                    throws IOException {
+                                java.nio.file.Files.delete(dir);
+                                return FileVisitResult.CONTINUE;
+                            }
+                        });
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     @TestTemplate
     public void testIcebergSource(TestContainer container)
             throws IOException, InterruptedException {
@@ -137,6 +172,22 @@ public class IcebergSourceIT extends TestSuiteBase 
implements TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    @TestTemplate
+    public void testFilterIcebergSourceSingleTable(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/iceberg/filter_iceberg_source.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testFilterIcebergSourceTables(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/iceberg/filter_iceberg_source_tables.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     private void initializeIcebergTable() {
 
         Map<String, Object> configs = new HashMap<>();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source.conf
new file mode 100644
index 0000000000..8a6339cbff
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 2
+  job.mode = "BATCH"
+}
+
+source {
+  Iceberg {
+    catalog_name = "seatunnel"
+    iceberg.catalog.config={
+      "type"="hadoop"
+      "warehouse"="file:///tmp/seatunnel/iceberg/hadoop/"
+    }
+    namespace = "database1"
+    table = "source"
+    plugin_output = "iceberg"
+    query = "select f1, f2 from t where f1 = 10"
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    plugin_input = "iceberg"
+    rules =
+      {
+      row_rules = [
+                  {
+                    rule_type = MAX_ROW
+                    rule_value = 1
+                  }
+                   {
+                    rule_type = MIN_ROW
+                    rule_value = 1
+                  }
+                ]
+        field_rules = [
+          {
+            field_name = f1
+            field_type = bigint
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = 10
+              }
+            ]
+          },
+          {
+              field_name = f2
+              field_type = boolean
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                  equals_to = true
+                }
+              ]
+            }
+        ]
+      }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source_tables.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source_tables.conf
new file mode 100644
index 0000000000..47995d3c25
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/filter_iceberg_source_tables.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 2
+  job.mode = "BATCH"
+}
+
+source {
+  Iceberg {
+    catalog_name = "seatunnel"
+    iceberg.catalog.config={
+      "type"="hadoop"
+      "warehouse"="file:///tmp/seatunnel/iceberg/hadoop/"
+    }
+    namespace = "database1"
+     table_list = [
+            {
+                table = "source"
+                query = "select f1, f16 from t where f1 = 10"
+            }
+        ]
+    plugin_output = "iceberg"
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    plugin_input = "iceberg"
+    rules =
+      {
+      row_rules = [
+                  {
+                    rule_type = MAX_ROW
+                    rule_value = 1
+                  }
+                   {
+                    rule_type = MIN_ROW
+                    rule_value = 1
+                  }
+                ]
+        field_rules = [
+          {
+            field_name = f1
+            field_type = bigint
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = 10
+              }
+            ]
+          }
+        ]
+      }
+  }
+}

Reply via email to