This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6c1577267f [Paimon]support projection for paimon source (#6343)
6c1577267f is described below
commit 6c1577267ff828f4cf024d4edba6cf6ec09cd073
Author: TaoZex <[email protected]>
AuthorDate: Fri Jun 14 22:02:31 2024 +0800
[Paimon]support projection for paimon source (#6343)
---
docs/en/connector-v2/source/Paimon.md | 3 +-
.../seatunnel/paimon/catalog/PaimonCatalog.java | 32 +++++++
.../seatunnel/paimon/config/PaimonConfig.java | 6 --
.../seatunnel/paimon/source/PaimonSource.java | 47 ++++++---
.../paimon/source/PaimonSourceReader.java | 9 +-
.../paimon/source/PaimonSourceSplitEnumerator.java | 20 +++-
.../converter/SqlToPaimonPredicateConverter.java | 105 +++++++++++++++------
.../seatunnel/paimon/utils/RowTypeConverter.java | 16 +++-
...rterTest.java => SqlToPaimonConverterTest.java} | 57 +++++++++--
.../paimon/utils/RowTypeConverterTest.java | 16 +++-
.../seatunnel/e2e/connector/paimon/PaimonIT.java | 3 +
...ssert.conf => paimon_projection_to_assert.conf} | 16 ++--
.../src/test/resources/paimon_to_assert.conf | 6 ++
13 files changed, 266 insertions(+), 70 deletions(-)
diff --git a/docs/en/connector-v2/source/Paimon.md
b/docs/en/connector-v2/source/Paimon.md
index e50ee0df9e..32155abde0 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -104,7 +104,7 @@ source {
warehouse = "/tmp/paimon"
database = "full_type"
table = "st_test"
- query = "select * from st_test where c_boolean= 'true' and c_tinyint > 116
and c_smallint = 15987 or c_decimal='2924137191386439303744.39292213'"
+ query = "select c_boolean, c_tinyint from st_test where c_boolean= 'true'
and c_tinyint > 116 and c_smallint = 15987 or
c_decimal='2924137191386439303744.39292213'"
}
}
```
@@ -161,4 +161,5 @@ source {
### next version
- Add Paimon Source Connector
+- Support projection for Paimon Source
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index 1e40090805..2c9fcd6f82 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -45,6 +45,10 @@ import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
@Slf4j
public class PaimonCatalog implements Catalog, PaimonTable {
@@ -124,6 +128,16 @@ public class PaimonCatalog implements Catalog, PaimonTable
{
}
}
+ public CatalogTable getTableWithProjection(TablePath tablePath, int[]
projectionIndex)
+ throws CatalogException, TableNotExistException {
+ try {
+ FileStoreTable paimonFileStoreTableTable = (FileStoreTable)
getPaimonTable(tablePath);
+ return toCatalogTable(paimonFileStoreTableTable, tablePath,
projectionIndex);
+ } catch (Exception e) {
+ throw new TableNotExistException(this.catalogName, tablePath);
+ }
+ }
+
@Override
public Table getPaimonTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
@@ -181,8 +195,26 @@ public class PaimonCatalog implements Catalog, PaimonTable
{
private CatalogTable toCatalogTable(
FileStoreTable paimonFileStoreTableTable, TablePath tablePath) {
+ return toCatalogTable(paimonFileStoreTableTable, tablePath, null);
+ }
+
+ private CatalogTable toCatalogTable(
+ FileStoreTable paimonFileStoreTableTable, TablePath tablePath,
int[] projectionIndex) {
org.apache.paimon.schema.TableSchema schema =
paimonFileStoreTableTable.schema();
List<DataField> dataFields = schema.fields();
+ if (!Objects.isNull(projectionIndex)) {
+ Map<Integer, DataField> indexMap =
+ IntStream.range(0, dataFields.size())
+ .boxed()
+ .collect(Collectors.toMap(i -> i,
dataFields::get));
+
+ dataFields =
+ java.util.Arrays.stream(projectionIndex)
+ .distinct()
+ .filter(indexMap::containsKey)
+ .mapToObj(indexMap::get)
+ .collect(Collectors.toList());
+ }
TableSchema.Builder builder = TableSchema.builder();
dataFields.forEach(
dataField -> {
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
index caa5f1e72c..d394455d2f 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
@@ -86,12 +86,6 @@ public class PaimonConfig implements Serializable {
.noDefaultValue()
.withDescription("The table you intend to access");
- public static final Option<List<String>> READ_COLUMNS =
- Options.key("read_columns")
- .listType()
- .noDefaultValue()
- .withDescription("The read columns of the flink table
store");
-
@Deprecated
public static final Option<String> HDFS_SITE_PATH =
Options.key("hdfs_site_path")
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
index 358026b5d3..d0a0c4a793 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
@@ -29,12 +29,20 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.Table;
+import org.apache.paimon.types.RowType;
+
+import net.sf.jsqlparser.statement.select.PlainSelect;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+
+import static
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex;
+import static
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect;
/** Paimon connector source class. */
public class PaimonSource
@@ -52,6 +60,8 @@ public class PaimonSource
private Predicate predicate;
+ private int[] projectionIndex;
+
private CatalogTable catalogTable;
public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog
paimonCatalog) {
@@ -61,12 +71,22 @@ public class PaimonSource
TablePath.of(paimonSourceConfig.getNamespace(),
paimonSourceConfig.getTable());
this.catalogTable = paimonCatalog.getTable(tablePath);
this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
- this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
- // TODO: We can use this to realize the column projection feature later
+
String filterSql = readonlyConfig.get(PaimonSourceConfig.QUERY_SQL);
- this.predicate =
- SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
- this.paimonTable.rowType(), filterSql);
+ PlainSelect plainSelect = convertToPlainSelect(filterSql);
+ RowType paimonRowType = this.paimonTable.rowType();
+ String[] filedNames = paimonRowType.getFieldNames().toArray(new
String[0]);
+ if (!Objects.isNull(plainSelect)) {
+ this.projectionIndex =
convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect);
+ if (!Objects.isNull(projectionIndex)) {
+ this.catalogTable =
+ paimonCatalog.getTableWithProjection(tablePath,
projectionIndex);
+ }
+ this.predicate =
+
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+ paimonRowType, plainSelect);
+ }
+ seaTunnelRowType = RowTypeConverter.convert(paimonRowType,
projectionIndex);
}
@Override
@@ -75,26 +95,27 @@ public class PaimonSource
}
@Override
- public Boundedness getBoundedness() {
- return Boundedness.BOUNDED;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
- public List<CatalogTable> getProducedCatalogTables() {
- return Collections.singletonList(catalogTable);
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
}
@Override
public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader(
SourceReader.Context readerContext) throws Exception {
-
- return new PaimonSourceReader(readerContext, paimonTable,
seaTunnelRowType, predicate);
+ return new PaimonSourceReader(
+ readerContext, paimonTable, seaTunnelRowType, predicate,
projectionIndex);
}
@Override
public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState>
createEnumerator(
SourceSplitEnumerator.Context<PaimonSourceSplit>
enumeratorContext) throws Exception {
- return new PaimonSourceSplitEnumerator(enumeratorContext, paimonTable,
predicate);
+ return new PaimonSourceSplitEnumerator(
+ enumeratorContext, paimonTable, predicate, projectionIndex);
}
@Override
@@ -103,6 +124,6 @@ public class PaimonSource
PaimonSourceState checkpointState)
throws Exception {
return new PaimonSourceSplitEnumerator(
- enumeratorContext, paimonTable, checkpointState, predicate);
+ enumeratorContext, paimonTable, checkpointState, predicate,
projectionIndex);
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
index 6cd1d87c63..3cfa5ee8b9 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
@@ -49,13 +49,19 @@ public class PaimonSourceReader implements
SourceReader<SeaTunnelRow, PaimonSour
private final SeaTunnelRowType seaTunnelRowType;
private volatile boolean noMoreSplit;
private final Predicate predicate;
+ private int[] projection;
public PaimonSourceReader(
- Context context, Table table, SeaTunnelRowType seaTunnelRowType,
Predicate predicate) {
+ Context context,
+ Table table,
+ SeaTunnelRowType seaTunnelRowType,
+ Predicate predicate,
+ int[] projection) {
this.context = context;
this.table = table;
this.seaTunnelRowType = seaTunnelRowType;
this.predicate = predicate;
+ this.projection = projection;
}
@Override
@@ -76,6 +82,7 @@ public class PaimonSourceReader implements
SourceReader<SeaTunnelRow, PaimonSour
// read logic
try (final RecordReader<InternalRow> reader =
table.newReadBuilder()
+ .withProjection(projection)
.withFilter(predicate)
.newRead()
.executeFilter()
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
index f6c76dc895..7b0f14c3ab 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
@@ -51,23 +51,31 @@ public class PaimonSourceSplitEnumerator
private final Predicate predicate;
+ private int[] projection;
+
public PaimonSourceSplitEnumerator(
- Context<PaimonSourceSplit> context, Table table, Predicate
predicate) {
+ Context<PaimonSourceSplit> context,
+ Table table,
+ Predicate predicate,
+ int[] projection) {
this.context = context;
this.table = table;
this.assignedSplit = new HashSet<>();
this.predicate = predicate;
+ this.projection = projection;
}
public PaimonSourceSplitEnumerator(
Context<PaimonSourceSplit> context,
Table table,
PaimonSourceState sourceState,
- Predicate predicate) {
+ Predicate predicate,
+ int[] projection) {
this.context = context;
this.table = table;
this.assignedSplit = sourceState.getAssignedSplits();
this.predicate = predicate;
+ this.projection = projection;
}
@Override
@@ -154,9 +162,13 @@ public class PaimonSourceSplitEnumerator
/** Get all splits of table */
private Set<PaimonSourceSplit> getTableSplits() {
final Set<PaimonSourceSplit> tableSplits = new HashSet<>();
- // TODO Support columns projection
final List<Split> splits =
-
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
+ table.newReadBuilder()
+ .withProjection(projection)
+ .withFilter(predicate)
+ .newScan()
+ .plan()
+ .splits();
splits.forEach(split -> tableSplits.add(new PaimonSourceSplit(split)));
return tableSplits;
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
index 7b076ba558..212bfd6e8b 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
@@ -51,48 +51,97 @@ import
net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.schema.Column;
import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.select.AllColumns;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectBody;
+import net.sf.jsqlparser.statement.select.SelectExpressionItem;
+import net.sf.jsqlparser.statement.select.SelectItem;
import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.stream.IntStream;
public class SqlToPaimonPredicateConverter {
- public static Predicate convertSqlWhereToPaimonPredicate(RowType rowType,
String query) {
+ public static PlainSelect convertToPlainSelect(String query) {
+ if (StringUtils.isBlank(query)) {
+ return null;
+ }
+ Statement statement = null;
try {
- if (StringUtils.isBlank(query)) {
- return null;
- }
- Statement statement = CCJSqlParserUtil.parse(query);
- // Confirm that the SQL statement is a Select statement
- if (!(statement instanceof Select)) {
- throw new IllegalArgumentException("Only SELECT statements are
supported.");
- }
- Select select = (Select) statement;
- SelectBody selectBody = select.getSelectBody();
- if (!(selectBody instanceof PlainSelect)) {
- throw new IllegalArgumentException("Only simple SELECT
statements are supported.");
- }
- PlainSelect plainSelect = (PlainSelect) selectBody;
- if (plainSelect.getHaving() != null
- || plainSelect.getGroupBy() != null
- || plainSelect.getOrderByElements() != null
- || plainSelect.getLimit() != null) {
- throw new IllegalArgumentException(
- "Only SELECT statements with WHERE clause are
supported. The Having, Group By, Order By, Limit clauses are currently
unsupported.");
- }
- Expression whereExpression = plainSelect.getWhere();
- if (Objects.isNull(whereExpression)) {
+ statement = CCJSqlParserUtil.parse(query);
+ } catch (JSQLParserException e) {
+ throw new IllegalArgumentException("Error parsing SQL.", e);
+ }
+ // Confirm that the SQL statement is a Select statement
+ if (!(statement instanceof Select)) {
+ throw new IllegalArgumentException("Only SELECT statements are
supported.");
+ }
+ Select select = (Select) statement;
+ SelectBody selectBody = select.getSelectBody();
+ if (!(selectBody instanceof PlainSelect)) {
+ throw new IllegalArgumentException("Only simple SELECT statements
are supported.");
+ }
+ PlainSelect plainSelect = (PlainSelect) selectBody;
+ if (plainSelect.getHaving() != null
+ || plainSelect.getGroupBy() != null
+ || plainSelect.getOrderByElements() != null
+ || plainSelect.getLimit() != null) {
+ throw new IllegalArgumentException(
+ "Only SELECT statements with WHERE clause are supported.
The Having, Group By, Order By, Limit clauses are currently unsupported.");
+ }
+ return plainSelect;
+ }
+
+ public static int[] convertSqlSelectToPaimonProjectionIndex(
+ String[] fieldNames, PlainSelect plainSelect) {
+ int[] projectionIndex = null;
+ List<SelectItem> selectItems = plainSelect.getSelectItems();
+
+ List<String> columnNames = new ArrayList<>();
+ for (SelectItem selectItem : selectItems) {
+ if (selectItem instanceof AllColumns) {
return null;
+ } else if (selectItem instanceof SelectExpressionItem) {
+ SelectExpressionItem selectExpressionItem =
(SelectExpressionItem) selectItem;
+ String columnName =
selectExpressionItem.getExpression().toString();
+ columnNames.add(columnName);
+ } else {
+ throw new IllegalArgumentException("Error encountered parsing
query fields.");
}
- PredicateBuilder builder = new PredicateBuilder(rowType);
- return parseExpressionToPredicate(builder, rowType,
whereExpression);
- } catch (JSQLParserException e) {
- throw new IllegalArgumentException("Error parsing SQL WHERE
clause", e);
}
+
+ String[] columnNamesArray = columnNames.toArray(new String[0]);
+ projectionIndex =
+ IntStream.range(0, columnNamesArray.length)
+ .map(
+ i -> {
+ String fieldName = columnNamesArray[i];
+ int index =
Arrays.asList(fieldNames).indexOf(fieldName);
+ if (index == -1) {
+ throw new IllegalArgumentException(
+ "column " + fieldName + " does
not exist.");
+ }
+ return index;
+ })
+ .toArray();
+
+ return projectionIndex;
+ }
+
+ public static Predicate convertSqlWhereToPaimonPredicate(
+ RowType rowType, PlainSelect plainSelect) {
+ Expression whereExpression = plainSelect.getWhere();
+ if (Objects.isNull(whereExpression)) {
+ return null;
+ }
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ return parseExpressionToPredicate(builder, rowType, whereExpression);
}
private static Predicate parseExpressionToPredicate(
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
index b250fd21e9..150c8a138a 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
@@ -56,6 +56,7 @@ import org.apache.paimon.types.VarCharType;
import lombok.extern.slf4j.Slf4j;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@@ -73,12 +74,25 @@ public class RowTypeConverter {
* @param rowType Paimon row type
* @return SeaTunnel row type {@link SeaTunnelRowType}
*/
- public static SeaTunnelRowType convert(RowType rowType) {
+ public static SeaTunnelRowType convert(RowType rowType, int[]
projectionIndex) {
String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
SeaTunnelDataType<?>[] dataTypes =
rowType.getFields().stream()
.map(field ->
field.type().accept(PaimonToSeaTunnelTypeVisitor.INSTANCE))
.toArray(SeaTunnelDataType<?>[]::new);
+ if (projectionIndex != null) {
+ String[] projectionFieldNames =
+ Arrays.stream(projectionIndex)
+ .filter(index -> index >= 0 && index <
fieldNames.length)
+ .mapToObj(index -> fieldNames[index])
+ .toArray(String[]::new);
+ SeaTunnelDataType<?>[] projectionDataTypes =
+ Arrays.stream(projectionIndex)
+ .filter(index -> index >= 0 && index <
fieldNames.length)
+ .mapToObj(index -> dataTypes[index])
+ .toArray(SeaTunnelDataType<?>[]::new);
+ return new SeaTunnelRowType(projectionFieldNames,
projectionDataTypes);
+ }
return new SeaTunnelRowType(fieldNames, dataTypes);
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
similarity index 77%
rename from
seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java
rename to
seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
index d04b76c09b..d8e819b505 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
@@ -41,18 +41,25 @@ import org.apache.paimon.utils.DateTimeUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import net.sf.jsqlparser.statement.select.PlainSelect;
+
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
+import static
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
-public class SqlToPaimonPredicateConverterTest {
+public class SqlToPaimonConverterTest {
private RowType rowType;
+ private String[] fieldNames;
+
@BeforeEach
public void setUp() {
rowType =
@@ -71,6 +78,8 @@ public class SqlToPaimonPredicateConverterTest {
new DataField(10, "double_col", new
DoubleType()),
new DataField(11, "date_col", new DateType()),
new DataField(12, "timestamp_col", new
TimestampType())));
+
+ fieldNames = rowType.getFieldNames().toArray(new String[0]);
}
@Test
@@ -90,8 +99,10 @@ public class SqlToPaimonPredicateConverterTest {
+ "date_col = '2022-01-01' AND "
+ "timestamp_col = '2022-01-01T12:00:00.123'";
+ PlainSelect plainSelect = convertToPlainSelect(query);
Predicate predicate =
-
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+ SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+ rowType, plainSelect);
assertNotNull(predicate);
@@ -123,8 +134,10 @@ public class SqlToPaimonPredicateConverterTest {
public void testConvertSqlWhereToPaimonPredicateWithIsNull() {
String query = "SELECT * FROM table WHERE char_col IS NULL";
+ PlainSelect plainSelect = convertToPlainSelect(query);
Predicate predicate =
-
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+ SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+ rowType, plainSelect);
assertNotNull(predicate);
@@ -138,8 +151,10 @@ public class SqlToPaimonPredicateConverterTest {
public void testConvertSqlWhereToPaimonPredicateWithIsNotNull() {
String query = "SELECT * FROM table WHERE char_col IS NOT NULL";
+ PlainSelect plainSelect = convertToPlainSelect(query);
Predicate predicate =
-
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+ SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+ rowType, plainSelect);
assertNotNull(predicate);
@@ -153,8 +168,10 @@ public class SqlToPaimonPredicateConverterTest {
public void testConvertSqlWhereToPaimonPredicateWithAnd() {
String query = "SELECT * FROM table WHERE int_col > 3 AND double_col <
6.6";
+ PlainSelect plainSelect = convertToPlainSelect(query);
Predicate predicate =
-
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+ SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+ rowType, plainSelect);
assertNotNull(predicate);
@@ -169,8 +186,10 @@ public class SqlToPaimonPredicateConverterTest {
public void testConvertSqlWhereToPaimonPredicateWithOr() {
String query = "SELECT * FROM table WHERE int_col > 3 OR double_col <
6.6";
+ PlainSelect plainSelect = convertToPlainSelect(query);
Predicate predicate =
-
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+ SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+ rowType, plainSelect);
assertNotNull(predicate);
@@ -180,4 +199,30 @@ public class SqlToPaimonPredicateConverterTest {
assertEquals(expectedPredicate.toString(), predicate.toString());
}
+
+ @Test
+ public void testConvertSqlSelectToPaimonProjectionArrayWithALL() {
+ String query = "SELECT * FROM table WHERE int_col > 3 OR double_col <
6.6";
+
+ PlainSelect plainSelect = convertToPlainSelect(query);
+ int[] projectionIndex =
+
SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex(
+ fieldNames, plainSelect);
+
+ assertNull(projectionIndex);
+ }
+
+ @Test
+ public void testConvertSqlSelectToPaimonProjectionArrayWithStar() {
+ String query =
+ "SELECT decimal_col, int_col, char_col, timestamp_col,
boolean_col FROM table WHERE int_col > 3 OR double_col < 6.6";
+
+ PlainSelect plainSelect = convertToPlainSelect(query);
+ int[] projectionIndex =
+
SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex(
+ fieldNames, plainSelect);
+
+ int[] expectedProjectionIndex = {4, 7, 0, 12, 2};
+ assertArrayEquals(projectionIndex, expectedProjectionIndex);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
index e075efde15..580bdfde87 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
@@ -47,6 +47,7 @@ public class RowTypeConverterTest {
private SeaTunnelRowType seaTunnelRowType;
+ private SeaTunnelRowType seaTunnelProjectionRowType;
private RowType rowType;
private BasicTypeDefine<DataType> typeDefine;
@@ -128,6 +129,12 @@ public class RowTypeConverterTest {
new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE),
ArrayType.STRING_ARRAY_TYPE
});
+
+ seaTunnelProjectionRowType =
+ new SeaTunnelRowType(
+ new String[] {"c_string", "c_int"},
+ new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE,
BasicType.INT_TYPE});
+
rowType =
DataTypes.ROW(
new DataField(0, "c_tinyint", DataTypes.TINYINT()),
@@ -188,10 +195,17 @@ public class RowTypeConverterTest {
@Test
public void paimonRowTypeToSeaTunnel() {
- SeaTunnelRowType convert = RowTypeConverter.convert(rowType);
+ SeaTunnelRowType convert = RowTypeConverter.convert(rowType, null);
Assertions.assertEquals(convert, seaTunnelRowType);
}
+ @Test
+ public void paimonToSeaTunnelWithProjection() {
+ int[] projection = {7, 2};
+ SeaTunnelRowType convert = RowTypeConverter.convert(rowType,
projection);
+ Assertions.assertEquals(convert, seaTunnelProjectionRowType);
+ }
+
@Test
public void seaTunnelToPaimon() {
RowType convert = RowTypeConverter.reconvert(seaTunnelRowType,
tableSchema);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
index c8b5ed80ee..a24a375c31 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
@@ -55,5 +55,8 @@ public class PaimonIT extends TestSuiteBase {
Assertions.assertEquals(0, textWriteResult.getExitCode());
Container.ExecResult readResult =
container.executeJob("/paimon_to_assert.conf");
Assertions.assertEquals(0, readResult.getExitCode());
+ Container.ExecResult readProjectionResult =
+ container.executeJob("/paimon_projection_to_assert.conf");
+ Assertions.assertEquals(0, readProjectionResult.getExitCode());
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
similarity index 90%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
index b9ecb4283f..6b67aa70c6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
@@ -33,6 +33,7 @@ source {
database = "default"
table = "st_test"
result_table_name = paimon_source
+ query = "select c_string, c_boolean from st_test where c_string is not
null"
}
}
@@ -40,6 +41,12 @@ sink {
Assert {
source_table_name = paimon_source
rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100000
+ }
+ ],
row_rules = [
{
rule_type = MAX_ROW
@@ -64,15 +71,6 @@ sink {
rule_type = NOT_NULL
}
]
- },
- {
- field_name = c_double
- field_type = double
- field_value = [
- {
- rule_type = NOT_NULL
- }
- ]
}
]
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
index b9ecb4283f..f4f6cbcaf2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
@@ -40,6 +40,12 @@ sink {
Assert {
source_table_name = paimon_source
rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100000
+ }
+ ],
row_rules = [
{
rule_type = MAX_ROW