This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 34dd69d19bd [Feature][external catalog/lakesoul] LakeSoul Catalog
support Filter Pushdown & Cdc data handling & S3 data access (#37979)
34dd69d19bd is described below
commit 34dd69d19bd8ad5b474b3346e31281668c620c9a
Author: Ceng <[email protected]>
AuthorDate: Tue Jul 30 10:21:28 2024 +0800
[Feature][external catalog/lakesoul] LakeSoul Catalog support Filter
Pushdown & Cdc data handling & S3 data access (#37979)
## Proposed changes
Issue Number: close #37978
---------
Signed-off-by: zenghua <[email protected]>
---
fe/be-java-extensions/lakesoul-scanner/pom.xml | 22 +-
.../apache/doris/lakesoul/LakeSoulJniScanner.java | 82 +++-
.../org/apache/doris/lakesoul/LakeSoulUtils.java | 30 +-
.../apache/doris/lakesoul/arrow/ArrowUtils.java | 8 +-
.../lakesoul/arrow/LakeSoulArrowJniScanner.java | 44 +-
fe/fe-core/pom.xml | 44 +-
.../lakesoul/LakeSoulExternalCatalog.java | 22 +-
.../datasource/lakesoul/LakeSoulExternalTable.java | 22 +-
.../doris/datasource/lakesoul/LakeSoulUtils.java | 535 +++++++++++++++++++++
.../lakesoul/source/LakeSoulScanNode.java | 158 +++++-
.../doris/nereids/rules/analysis/BindRelation.java | 1 +
.../datasource/lakesoul/LakeSoulPredicateTest.java | 280 +++++++++++
fe/pom.xml | 4 +-
regression-test/conf/regression-conf.groovy | 11 +
.../lakesoul/test_lakesoul_filter.out | 8 +
.../pipeline/external/conf/regression-conf.groovy | 4 +
.../lakesoul/test_lakesoul_catalog.groovy | 23 +-
.../lakesoul/test_lakesoul_filter.groovy | 58 +++
.../lakesoul/test_external_table_lakesoul.groovy | 24 +-
19 files changed, 1269 insertions(+), 111 deletions(-)
diff --git a/fe/be-java-extensions/lakesoul-scanner/pom.xml
b/fe/be-java-extensions/lakesoul-scanner/pom.xml
index cbbb473483e..24d7efc7614 100644
--- a/fe/be-java-extensions/lakesoul-scanner/pom.xml
+++ b/fe/be-java-extensions/lakesoul-scanner/pom.xml
@@ -47,21 +47,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-vector</artifactId>
- <version>${arrow.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-memory-unsafe</artifactId>
- <version>${arrow.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-c-data</artifactId>
- <version>${arrow.version}</version>
- </dependency>
<!-- scala deps -->
<dependency>
@@ -85,7 +70,8 @@ under the License.
<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-io-java</artifactId>
- <version>2.5.4</version>
+ <version>${lakesoul.version}</version>
+ <classifier>shaded</classifier>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -99,10 +85,6 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
- <exclusion>
- <groupId>org.apache.arrow</groupId>
- <artifactId>*</artifactId>
- </exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
index 3dfbff756db..a7ac785d1fb 100644
---
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
@@ -17,25 +17,30 @@
package org.apache.doris.lakesoul;
-import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.lakesoul.arrow.LakeSoulArrowJniScanner;
-import org.apache.doris.lakesoul.parquet.ParquetFilter;
import com.dmetasoul.lakesoul.LakeSoulArrowReader;
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.types.pojo.Schema;
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.lakesoul.shaded.com.fasterxml.jackson.core.type.TypeReference;
+import com.lakesoul.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
+import io.substrait.proto.Plan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class LakeSoulJniScanner extends LakeSoulArrowJniScanner {
+ private static final Logger LOG =
LoggerFactory.getLogger(LakeSoulJniScanner.class);
private final Map<String, String> params;
@@ -60,6 +65,9 @@ public class LakeSoulJniScanner extends
LakeSoulArrowJniScanner {
withAllocator(nativeIOReader.getAllocator());
nativeIOReader.setBatchSize(batchSize);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("opening LakeSoulJniScanner with params={}", params);
+ }
// add files
for (String file :
params.get(LakeSoulUtils.FILE_NAMES).split(LakeSoulUtils.LIST_DELIM)) {
nativeIOReader.addFile(file);
@@ -72,19 +80,39 @@ public class LakeSoulJniScanner extends
LakeSoulArrowJniScanner {
Arrays.stream(primaryKeys.split(LakeSoulUtils.LIST_DELIM)).collect(Collectors.toList()));
}
- Schema schema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
+ String options = params.getOrDefault(LakeSoulUtils.OPTIONS, "{}");
+ Map<String, String> optionsMap = new ObjectMapper().readValue(
+ options, new TypeReference<Map<String, String>>() {}
+ );
+ String base64Predicate =
optionsMap.get(LakeSoulUtils.SUBSTRAIT_PREDICATE);
+ if (base64Predicate != null) {
+ Plan predicate = SubstraitUtil.decodeBase64String(base64Predicate);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("push predicate={}", predicate);
+ }
+ nativeIOReader.addFilterProto(predicate);
+ }
+
+ for (String key : LakeSoulUtils.OBJECT_STORE_OPTIONS) {
+ String value = optionsMap.get(key);
+ if (key != null) {
+ nativeIOReader.setObjectStoreOption(key, value);
+ }
+ }
+
+ Schema tableSchema =
Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
String[] requiredFieldNames =
params.get(LakeSoulUtils.REQUIRED_FIELDS).split(LakeSoulUtils.LIST_DELIM);
List<Field> requiredFields = new ArrayList<>();
for (String fieldName : requiredFieldNames) {
- requiredFields.add(schema.findField(fieldName));
+ requiredFields.add(tableSchema.findField(fieldName));
}
requiredSchema = new Schema(requiredFields);
nativeIOReader.setSchema(requiredSchema);
- HashSet<String> partitionColumn = new HashSet<>();
+ List<Field> partitionFields = new ArrayList<>();
for (String partitionKV :
params.getOrDefault(LakeSoulUtils.PARTITION_DESC, "")
.split(LakeSoulUtils.LIST_DELIM)) {
if (partitionKV.isEmpty()) {
@@ -94,17 +122,15 @@ public class LakeSoulJniScanner extends
LakeSoulArrowJniScanner {
if (kv.length != 2) {
throw new IllegalArgumentException("Invalid partition column =
" + partitionKV);
}
- partitionColumn.add(kv[0]);
+ nativeIOReader.setDefaultColumnValue(kv[0], kv[1]);
+ partitionFields.add(tableSchema.findField(kv[0]));
+ }
+ if (!partitionFields.isEmpty()) {
+ nativeIOReader.setPartitionSchema(new Schema(partitionFields));
}
initTableInfo(params);
- for (ScanPredicate predicate : predicates) {
- if (!partitionColumn.contains(predicate.columName)) {
-
nativeIOReader.addFilter(ParquetFilter.toParquetFilter(predicate).toString());
- }
- }
-
nativeIOReader.initializeReader();
lakesoulArrowReader = new LakeSoulArrowReader(nativeIOReader,
awaitTimeout);
}
@@ -156,4 +182,28 @@ public class LakeSoulJniScanner extends
LakeSoulArrowJniScanner {
currentBatch.close();
}
}
+
+ public static void main(String[] args) throws IOException {
+ HashMap<String, String> params = new HashMap<>();
+ params.put("required_fields", "r_regionkey;r_name;r_comment");
+ params.put("primary_keys", "r_regionkey;r_name");
+ params.put("query_id", "e9d075a6500a4cac-b94630fd4b30c171");
+ params.put("file_paths",
+
"file:/Users/ceng/Documents/GitHub/LakeSoul/rust/lakesoul-datafusion/"
+ + "default/region/part-RzmUvDFtYV8ceb3J_0000.parquet"
+ );
+ params.put("options", "{}");
+ params.put("table_schema",
+ "{\"fields\":["
+ +
"{\"name\":\"r_regionkey\",\"type\":{\"name\":\"int\",\"isSigned\":true,\"bitWidth\":64},"
+ + "\"nullable\":false,\"children\":[]},"
+ +
"{\"name\":\"r_name\",\"type\":{\"name\":\"utf8\"},\"nullable\":false,\"children\":[]},"
+ +
"{\"name\":\"r_comment\",\"type\":{\"name\":\"utf8\"},\"nullable\":false,\"children\":[]}"
+ + "],"
+ + "\"metadata\":null}");
+ params.put("partition_descs", "");
+ LakeSoulJniScanner scanner = new LakeSoulJniScanner(1024, params);
+ scanner.open();
+ System.out.println(scanner.getNext());
+ }
}
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
index 6c7f88f3ab3..ca07a81d0da 100644
---
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
@@ -17,13 +17,29 @@
package org.apache.doris.lakesoul;
+import java.util.Arrays;
+import java.util.List;
+
public class LakeSoulUtils {
- public static String FILE_NAMES = "file_paths";
- public static String PRIMARY_KEYS = "primary_keys";
- public static String SCHEMA_JSON = "table_schema";
- public static String PARTITION_DESC = "partition_descs";
- public static String REQUIRED_FIELDS = "required_fields";
+ public static final String FILE_NAMES = "file_paths";
+ public static final String PRIMARY_KEYS = "primary_keys";
+ public static final String SCHEMA_JSON = "table_schema";
+ public static final String PARTITION_DESC = "partition_descs";
+ public static final String REQUIRED_FIELDS = "required_fields";
+ public static final String OPTIONS = "options";
+ public static final String SUBSTRAIT_PREDICATE = "substrait_predicate";
+ public static final String LIST_DELIM = ";";
+ public static final String PARTITIONS_KV_DELIM = "=";
+
+ public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+ public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+ public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+ public static final String FS_S3A_PATH_STYLE_ACCESS =
"fs.s3a.path.style.access";
- public static String LIST_DELIM = ";";
- public static String PARTITIONS_KV_DELIM = "=";
+ public static final List<String> OBJECT_STORE_OPTIONS = Arrays.asList(
+ FS_S3A_ACCESS_KEY,
+ FS_S3A_SECRET_KEY,
+ FS_S3A_ENDPOINT,
+ FS_S3A_PATH_STYLE_ACCESS
+ );
}
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
index 3ad28ba783a..94ac32935e8 100644
---
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
@@ -20,10 +20,10 @@ package org.apache.doris.lakesoul.arrow;
import org.apache.doris.common.jni.utils.OffHeap;
import org.apache.doris.common.jni.utils.TypeNativeBytes;
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.util.Preconditions;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-import org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf;
+import com.lakesoul.shaded.org.apache.arrow.util.Preconditions;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.ArrowType;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
import java.time.LocalDate;
import java.time.LocalDateTime;
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
index 320d653a20a..3c73c2f1ab4 100644
---
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
@@ -23,28 +23,28 @@ import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.VectorTable;
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DateDayVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.TimeStampMicroTZVector;
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliTZVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampNanoTZVector;
-import org.apache.arrow.vector.TimeStampNanoVector;
-import org.apache.arrow.vector.TimeStampSecTZVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.VarCharVector;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.complex.ListVector;
-import org.apache.arrow.vector.complex.StructVector;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.types.pojo.Schema;
+import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf;
+import com.lakesoul.shaded.org.apache.arrow.memory.BufferAllocator;
+import com.lakesoul.shaded.org.apache.arrow.vector.BitVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.DateDayVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.DecimalVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.FieldVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.ValueVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.VarCharVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot;
+import com.lakesoul.shaded.org.apache.arrow.vector.complex.ListVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.complex.StructVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
import org.apache.log4j.Logger;
import java.io.IOException;
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index bac2346185d..8021f2a3b18 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -568,7 +568,7 @@ under the License.
<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-common</artifactId>
- <version>2.5.4</version>
+ <version>${lakesoul.version}</version>
<classifier>shaded</classifier>
<exclusions>
<exclusion>
@@ -577,6 +577,46 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.dmetasoul</groupId>
+ <artifactId>lakesoul-io-java</artifactId>
+ <version>${lakesoul.version}</version>
+ <classifier>shaded</classifier>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
@@ -1220,4 +1260,4 @@ under the License.
</extension>
</extensions>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
index dd8342ad660..e813ac2fc97 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.lakesoul;
+
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
@@ -25,6 +26,7 @@ import org.apache.doris.datasource.property.PropertyConverter;
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import com.google.common.collect.Lists;
@@ -33,7 +35,7 @@ import java.util.Map;
public class LakeSoulExternalCatalog extends ExternalCatalog {
- private DBManager dbManager;
+ private DBManager lakesoulMetadataManager;
private final Map<String, String> props;
@@ -48,13 +50,13 @@ public class LakeSoulExternalCatalog extends
ExternalCatalog {
@Override
protected List<String> listDatabaseNames() {
initLocalObjectsImpl();
- return dbManager.listNamespaces();
+ return lakesoulMetadataManager.listNamespaces();
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
- List<TableInfo> tifs = dbManager.getTableInfosByNamespace(dbName);
+ List<TableInfo> tifs =
lakesoulMetadataManager.getTableInfosByNamespace(dbName);
List<String> tableNames = Lists.newArrayList();
for (TableInfo item : tifs) {
tableNames.add(item.getTableName());
@@ -65,14 +67,13 @@ public class LakeSoulExternalCatalog extends
ExternalCatalog {
@Override
public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
makeSureInitialized();
- TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(dbName,
tblName);
-
+ TableInfo tableInfo =
lakesoulMetadataManager.getTableInfoByNameAndNamespace(dbName, tblName);
return null != tableInfo;
}
@Override
protected void initLocalObjectsImpl() {
- if (dbManager == null) {
+ if (lakesoulMetadataManager == null) {
if (props != null) {
if (props.containsKey(DBUtil.urlKey)) {
System.setProperty(DBUtil.urlKey,
props.get(DBUtil.urlKey));
@@ -84,13 +85,18 @@ public class LakeSoulExternalCatalog extends
ExternalCatalog {
System.setProperty(DBUtil.passwordKey,
props.get(DBUtil.passwordKey));
}
}
- dbManager = new DBManager();
+ lakesoulMetadataManager = new DBManager();
}
}
public TableInfo getLakeSoulTable(String dbName, String tblName) {
makeSureInitialized();
- return dbManager.getTableInfoByNameAndNamespace(tblName, dbName);
+ return lakesoulMetadataManager.getTableInfoByNameAndNamespace(tblName,
dbName);
+ }
+
+ public List<PartitionInfo> listPartitionInfo(String tableId) {
+ makeSureInitialized();
+ return lakesoulMetadataManager.getAllPartitionInfo(tableId);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
index 46e8d1db47c..a5cf3478ae8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
@@ -25,17 +25,23 @@ import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.ExternalAnalysisTask;
import org.apache.doris.thrift.TLakeSoulTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import com.google.common.collect.Lists;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
@@ -45,11 +51,20 @@ import java.util.Optional;
import java.util.stream.Collectors;
public class LakeSoulExternalTable extends ExternalTable {
-
+ private static final Logger LOG =
LogManager.getLogger(LakeSoulExternalTable.class);
public static final int LAKESOUL_TIMESTAMP_SCALE_MS = 6;
+ public final String tableId;
+
public LakeSoulExternalTable(long id, String name, String dbName,
LakeSoulExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.LAKESOUl_EXTERNAL_TABLE);
+ tableId = getLakeSoulTableInfo().getTableId();
+ }
+
+ @Override
+ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
+ makeSureInitialized();
+ return new ExternalAnalysisTask(info);
}
private Type arrowFiledToDorisType(Field field) {
@@ -150,6 +165,7 @@ public class LakeSoulExternalTable extends ExternalTable {
String tableSchema = tableInfo.getTableSchema();
DBUtil.TablePartitionKeys partitionKeys =
DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
Schema schema;
+ LOG.info("tableSchema={}", tableSchema);
try {
schema = Schema.fromJSON(tableSchema);
} catch (IOException e) {
@@ -174,6 +190,10 @@ public class LakeSoulExternalTable extends ExternalTable {
return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName,
name);
}
+ public List<PartitionInfo> listPartitionInfo() {
+ return ((LakeSoulExternalCatalog) catalog).listPartitionInfo(tableId);
+ }
+
public String tablePath() {
return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName,
name).getTablePath();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
new file mode 100644
index 00000000000..8f7cf83dbfc
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
@@ -0,0 +1,535 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.DecimalLiteral;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.Subquery;
+import org.apache.doris.planner.ColumnBound;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.thrift.TExprOpcode;
+
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
+import io.substrait.expression.Expression;
+import io.substrait.extension.DefaultExtensionCatalog;
+import io.substrait.type.Type;
+import io.substrait.type.TypeCreator;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class LakeSoulUtils {
+
+ public static final String FILE_NAMES = "file_paths";
+ public static final String PRIMARY_KEYS = "primary_keys";
+ public static final String SCHEMA_JSON = "table_schema";
+ public static final String PARTITION_DESC = "partition_descs";
+ public static final String REQUIRED_FIELDS = "required_fields";
+ public static final String OPTIONS = "options";
+ public static final String SUBSTRAIT_PREDICATE = "substrait_predicate";
+ public static final String CDC_COLUMN = "lakesoul_cdc_change_column";
+ public static final String LIST_DELIM = ";";
+ public static final String PARTITIONS_KV_DELIM = "=";
+ public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+ public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+ public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+ public static final String FS_S3A_REGION = "fs.s3a.endpoint.region";
+ public static final String FS_S3A_PATH_STYLE_ACCESS =
"fs.s3a.path.style.access";
+
+ private static final OffsetDateTime EPOCH;
+ private static final LocalDate EPOCH_DAY;
+
+ static {
+ EPOCH = Instant.ofEpochSecond(0L).atOffset(ZoneOffset.UTC);
+ EPOCH_DAY = EPOCH.toLocalDate();
+ }
+
+ public static List<PartitionInfo> applyPartitionFilters(
+ List<PartitionInfo> allPartitionInfo,
+ String tableName,
+ Schema partitionArrowSchema,
+ Map<String, ColumnRange> columnNameToRange
+ ) throws IOException {
+
+ Expression conjunctionFilter = null;
+ for (Field field : partitionArrowSchema.getFields()) {
+ ColumnRange columnRange = columnNameToRange.get(field.getName());
+ if (columnRange != null) {
+ Expression expr = columnRangeToSubstraitFilter(field,
columnRange);
+ if (expr != null) {
+ if (conjunctionFilter == null) {
+ conjunctionFilter = expr;
+ } else {
+ conjunctionFilter =
SubstraitUtil.and(conjunctionFilter, expr);
+ }
+ }
+ }
+ }
+ return SubstraitUtil.applyPartitionFilters(
+ allPartitionInfo,
+ partitionArrowSchema,
+ SubstraitUtil.substraitExprToProto(conjunctionFilter, tableName)
+ );
+ }
+
+ public static Expression columnRangeToSubstraitFilter(
+ Field columnField,
+ ColumnRange columnRange
+ ) throws IOException {
+ Optional<RangeSet<ColumnBound>> rangeSetOpt =
columnRange.getRangeSet();
+ if (columnRange.hasConjunctiveIsNull() || !rangeSetOpt.isPresent()) {
+ return SubstraitUtil.CONST_TRUE;
+ } else {
+ RangeSet<ColumnBound> rangeSet = rangeSetOpt.get();
+ if (rangeSet.isEmpty()) {
+ return SubstraitUtil.CONST_TRUE;
+ } else {
+ Expression conjunctionFilter = null;
+ for (Range range : rangeSet.asRanges()) {
+ Expression expr = rangeToSubstraitFilter(columnField,
range);
+ if (expr != null) {
+ if (conjunctionFilter == null) {
+ conjunctionFilter = expr;
+ } else {
+ conjunctionFilter =
SubstraitUtil.or(conjunctionFilter, expr);
+ }
+ }
+ }
+ return conjunctionFilter;
+ }
+ }
+ }
+
+ public static Expression rangeToSubstraitFilter(Field columnField, Range
range) throws IOException {
+ if (!range.hasLowerBound() && !range.hasUpperBound()) {
+ // Range.all()
+ return SubstraitUtil.CONST_TRUE;
+ } else {
+ Expression upper = SubstraitUtil.CONST_TRUE;
+ if (range.hasUpperBound()) {
+ String func = range.upperBoundType() == BoundType.OPEN ?
"lt:any_any" : "lte:any_any";
+ Expression left =
SubstraitUtil.arrowFieldToSubstraitField(columnField);
+ Expression right = SubstraitUtil.anyToSubstraitLiteral(
+ SubstraitUtil.arrowFieldToSubstraitType(columnField),
+ ((ColumnBound)
range.upperEndpoint()).getValue().getRealValue());
+ upper = SubstraitUtil.makeBinary(
+ left,
+ right,
+ DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+ func,
+ TypeCreator.NULLABLE.BOOLEAN
+ );
+ }
+ Expression lower = SubstraitUtil.CONST_TRUE;
+ if (range.hasLowerBound()) {
+ String func = range.lowerBoundType() == BoundType.OPEN ?
"gt:any_any" : "gte:any_any";
+ Expression left =
SubstraitUtil.arrowFieldToSubstraitField(columnField);
+ Expression right = SubstraitUtil.anyToSubstraitLiteral(
+ SubstraitUtil.arrowFieldToSubstraitType(columnField),
+ ((ColumnBound)
range.lowerEndpoint()).getValue().getRealValue());
+ lower = SubstraitUtil.makeBinary(
+ left,
+ right,
+ DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+ func,
+ TypeCreator.NULLABLE.BOOLEAN
+ );
+ }
+ return SubstraitUtil.and(upper, lower);
+ }
+ }
+
+ public static io.substrait.proto.Plan getPushPredicate(
+ List<Expr> conjuncts,
+ String tableName,
+ Schema tableSchema,
+ Schema partitionArrowSchema,
+ Map<String, String> properties,
+ boolean incRead
+ ) throws IOException {
+
+ Set<String> partitionColumn =
+ partitionArrowSchema
+ .getFields()
+ .stream()
+ .map(Field::getName)
+ .collect(Collectors.toSet());
+ Expression conjunctionFilter = null;
+ String cdcColumn = properties.get(CDC_COLUMN);
+ if (cdcColumn != null && !incRead) {
+ conjunctionFilter =
SubstraitUtil.cdcColumnMergeOnReadFilter(tableSchema.findField(cdcColumn));
+ }
+ for (Expr expr : conjuncts) {
+ if (!isAllPartitionPredicate(expr, partitionColumn)) {
+ Expression predicate = convertToSubstraitExpr(expr,
tableSchema);
+ if (predicate != null) {
+ if (conjunctionFilter == null) {
+ conjunctionFilter = predicate;
+ } else {
+ conjunctionFilter =
SubstraitUtil.and(conjunctionFilter, predicate);
+ }
+ }
+ }
+ }
+ if (conjunctionFilter == null) {
+ return null;
+ }
+ return SubstraitUtil.substraitExprToProto(conjunctionFilter,
tableName);
+ }
+
+ public static boolean isAllPartitionPredicate(Expr predicate, Set<String>
partitionColumns) {
+ if (predicate == null) {
+ return false;
+ }
+ if (predicate instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate)
predicate;
+ return isAllPartitionPredicate(compoundPredicate.getChild(0),
partitionColumns)
+ && isAllPartitionPredicate(compoundPredicate.getChild(1),
partitionColumns);
+ }
+ // Make sure the col slot is always first
+ SlotRef slotRef = convertDorisExprToSlotRef(predicate.getChild(0));
+ LiteralExpr literalExpr =
convertDorisExprToLiteralExpr(predicate.getChild(1));
+ if (slotRef == null || literalExpr == null) {
+ return false;
+ }
+ String colName = slotRef.getColumnName();
+ return partitionColumns.contains(colName);
+
+ }
+
+ public static SlotRef convertDorisExprToSlotRef(Expr expr) {
+ SlotRef slotRef = null;
+ if (expr instanceof SlotRef) {
+ slotRef = (SlotRef) expr;
+ } else if (expr instanceof CastExpr) {
+ if (expr.getChild(0) instanceof SlotRef) {
+ slotRef = (SlotRef) expr.getChild(0);
+ }
+ }
+ return slotRef;
+ }
+
+ public static LiteralExpr convertDorisExprToLiteralExpr(Expr expr) {
+ LiteralExpr literalExpr = null;
+ if (expr instanceof LiteralExpr) {
+ literalExpr = (LiteralExpr) expr;
+ } else if (expr instanceof CastExpr) {
+ if (expr.getChild(0) instanceof LiteralExpr) {
+ literalExpr = (LiteralExpr) expr.getChild(0);
+ }
+ }
+ return literalExpr;
+ }
+
+ public static Expression convertToSubstraitExpr(Expr predicate, Schema
tableSchema) throws IOException {
+ if (predicate == null) {
+ return null;
+ }
+ if (predicate instanceof BoolLiteral) {
+ BoolLiteral boolLiteral = (BoolLiteral) predicate;
+ boolean value = boolLiteral.getValue();
+ if (value) {
+ return SubstraitUtil.CONST_TRUE;
+ } else {
+ return SubstraitUtil.CONST_FALSE;
+ }
+ }
+ if (predicate instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate)
predicate;
+ switch (compoundPredicate.getOp()) {
+ case AND: {
+ Expression left =
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+ Expression right =
convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema);
+ if (left != null && right != null) {
+ return SubstraitUtil.and(left, right);
+ } else if (left != null) {
+ return left;
+ } else {
+ return right;
+ }
+ }
+ case OR: {
+ Expression left =
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+ Expression right =
convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema);
+ if (left != null && right != null) {
+ return SubstraitUtil.or(left, right);
+ }
+ return null;
+ }
+ case NOT: {
+ Expression child =
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+ if (child != null) {
+ return SubstraitUtil.not(child);
+ }
+ return null;
+ }
+ default:
+ return null;
+ }
+ } else if (predicate instanceof InPredicate) {
+ InPredicate inExpr = (InPredicate) predicate;
+ if (inExpr.contains(Subquery.class)) {
+ return null;
+ }
+ SlotRef slotRef = convertDorisExprToSlotRef(inExpr.getChild(0));
+ if (slotRef == null) {
+ return null;
+ }
+ String colName = slotRef.getColumnName();
+ Field field = tableSchema.findField(colName);
+ Expression fieldRef =
SubstraitUtil.arrowFieldToSubstraitField(field);
+
+ colName = field.getName();
+ Type type = field.getType().accept(
+ new
SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable())
+ );
+ List<Expression.Literal> valueList = new ArrayList<>();
+ for (int i = 1; i < inExpr.getChildren().size(); ++i) {
+ if (!(inExpr.getChild(i) instanceof LiteralExpr)) {
+ return null;
+ }
+ LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i);
+ Object value = extractDorisLiteral(type, literalExpr);
+ if (value == null) {
+ return null;
+ }
+ valueList.add(SubstraitUtil.anyToSubstraitLiteral(type,
value));
+ }
+ if (inExpr.isNotIn()) {
+ // not in
+ return SubstraitUtil.notIn(fieldRef, valueList);
+ } else {
+ // in
+ return SubstraitUtil.in(fieldRef, valueList);
+ }
+ }
+ return convertBinaryExpr(predicate, tableSchema);
+ }
+
+ private static Expression convertBinaryExpr(Expr dorisExpr, Schema
tableSchema) throws IOException {
+ TExprOpcode opcode = dorisExpr.getOpcode();
+ // Make sure the col slot is always first
+ SlotRef slotRef = convertDorisExprToSlotRef(dorisExpr.getChild(0));
+ LiteralExpr literalExpr =
convertDorisExprToLiteralExpr(dorisExpr.getChild(1));
+ if (slotRef == null || literalExpr == null) {
+ return null;
+ }
+ String colName = slotRef.getColumnName();
+ Field field = tableSchema.findField(colName);
+ Expression fieldRef = SubstraitUtil.arrowFieldToSubstraitField(field);
+
+ Type type = field.getType().accept(
+ new
SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable())
+ );
+ Object value = extractDorisLiteral(type, literalExpr);
+ if (value == null) {
+ if (opcode == TExprOpcode.EQ_FOR_NULL && literalExpr instanceof
NullLiteral) {
+ return SubstraitUtil.makeUnary(
+ fieldRef,
+ DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+ "is_null:any",
+ TypeCreator.NULLABLE.BOOLEAN);
+ } else {
+ return null;
+ }
+ }
+ Expression literal = SubstraitUtil.anyToSubstraitLiteral(
+ type,
+ value
+ );
+
+ String namespace;
+ String func;
+ switch (opcode) {
+ case EQ:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "equal:any_any";
+ break;
+ case EQ_FOR_NULL:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "is_null:any";
+ break;
+ case NE:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "not_equal:any_any";
+ break;
+ case GE:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "gte:any_any";
+ break;
+ case GT:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "gt:any_any";
+ break;
+ case LE:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "lte:any_any";
+ break;
+ case LT:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "lt:any_any";
+ break;
+ case INVALID_OPCODE:
+ if (dorisExpr instanceof FunctionCallExpr) {
+ String name = dorisExpr.getExprName().toLowerCase();
+ String s = literalExpr.getStringValue();
+ if (name.equals("like") && !s.startsWith("%") &&
s.endsWith("%")) {
+ namespace = DefaultExtensionCatalog.FUNCTIONS_STRING;
+ func = "like:bool";
+ break;
+ }
+ } else if (dorisExpr instanceof IsNullPredicate) {
+ if (((IsNullPredicate) dorisExpr).isNotNull()) {
+ namespace =
DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "is_not_null:any";
+
+ } else {
+ namespace =
DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "is_null:any";
+ }
+ break;
+ }
+ return null;
+ default:
+ return null;
+ }
+ return SubstraitUtil.makeBinary(fieldRef, literal, namespace, func,
TypeCreator.NULLABLE.BOOLEAN);
+ }
+
+ public static Object extractDorisLiteral(Type type, LiteralExpr expr) {
+
+ if (expr instanceof BoolLiteral) {
+ if (type instanceof Type.Bool) {
+ return ((BoolLiteral) expr).getValue();
+ }
+ if (type instanceof Type.Str) {
+ return expr.getStringValue();
+ }
+ } else if (expr instanceof DateLiteral) {
+ DateLiteral dateLiteral = (DateLiteral) expr;
+ if (type instanceof Type.Date) {
+ if (dateLiteral.getType().isDatetimeV2() ||
dateLiteral.getType().isDatetime()) {
+ return null;
+ }
+ return dateLiteral.getLongValue();
+ }
+ if (type instanceof Type.TimestampTZ || type instanceof
Type.Timestamp) {
+ return dateLiteral.getLongValue();
+ }
+ if (type instanceof Type.Str) {
+ return expr.getStringValue();
+ }
+ } else if (expr instanceof DecimalLiteral) {
+ DecimalLiteral decimalLiteral = (DecimalLiteral) expr;
+ if (type instanceof Type.Decimal) {
+ return decimalLiteral.getValue();
+ } else if (type instanceof Type.FP64) {
+ return decimalLiteral.getDoubleValue();
+ }
+ if (type instanceof Type.Str) {
+ return expr.getStringValue();
+ }
+ } else if (expr instanceof FloatLiteral) {
+ FloatLiteral floatLiteral = (FloatLiteral) expr;
+
+ if (floatLiteral.getType() == org.apache.doris.catalog.Type.FLOAT)
{
+ return type instanceof Type.FP32
+ || type instanceof Type.FP64
+ || type instanceof Type.Decimal ? ((FloatLiteral)
expr).getValue() : null;
+ } else {
+ return type instanceof Type.FP64
+ || type instanceof Type.Decimal ? ((FloatLiteral)
expr).getValue() : null;
+ }
+ } else if (expr instanceof IntLiteral) {
+ if (type instanceof Type.I8
+ || type instanceof Type.I16
+ || type instanceof Type.I32
+ || type instanceof Type.I64
+ || type instanceof Type.FP32
+ || type instanceof Type.FP64
+ || type instanceof Type.Decimal
+ || type instanceof Type.Date
+ ) {
+ return expr.getRealValue();
+ }
+ if (!expr.getType().isInteger32Type()) {
+ if (type instanceof Type.Time || type instanceof
Type.Timestamp || type instanceof Type.TimestampTZ) {
+ return expr.getLongValue();
+ }
+ }
+
+ } else if (expr instanceof StringLiteral) {
+ String value = expr.getStringValue();
+ if (type instanceof Type.Str) {
+ return value;
+ }
+ if (type instanceof Type.Date) {
+ try {
+ return (int) ChronoUnit.DAYS.between(
+ EPOCH_DAY,
+ LocalDate.parse(value,
DateTimeFormatter.ISO_LOCAL_DATE));
+ } catch (DateTimeParseException e) {
+ return null;
+ }
+ }
+ if (type instanceof Type.Timestamp || type instanceof
Type.TimestampTZ) {
+ try {
+ return ChronoUnit.MICROS.between(
+ EPOCH,
+ OffsetDateTime.parse(value,
DateTimeFormatter.ISO_DATE_TIME));
+ } catch (DateTimeParseException e) {
+ return null;
+ }
+ }
+ }
+ return null;
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
index 1779aeaca10..fd36bfd52bd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
@@ -17,14 +17,19 @@
package org.apache.doris.datasource.lakesoul.source;
+
+
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
+import org.apache.doris.datasource.lakesoul.LakeSoulUtils;
+import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@@ -34,15 +39,26 @@ import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLakeSoulFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.DataFileInfo;
import com.dmetasoul.lakesoul.meta.DataOperation;
import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import com.google.common.collect.Lists;
import com.lakesoul.shaded.com.alibaba.fastjson.JSON;
import com.lakesoul.shaded.com.alibaba.fastjson.JSONObject;
+import com.lakesoul.shaded.com.fasterxml.jackson.core.type.TypeReference;
+import com.lakesoul.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
+import io.substrait.proto.Plan;
+import lombok.SneakyThrows;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -52,19 +68,54 @@ import java.util.stream.Collectors;
public class LakeSoulScanNode extends FileQueryScanNode {
- protected final LakeSoulExternalTable lakeSoulExternalTable;
+ private static final Logger LOG =
LogManager.getLogger(LakeSoulScanNode.class);
+ protected LakeSoulExternalTable lakeSoulExternalTable;
+
+ String tableName;
+
+ String location;
- protected final TableInfo table;
+ String partitions;
+
+ Schema tableArrowSchema;
+
+ Schema partitionArrowSchema;
+ private Map<String, String> tableProperties;
+
+ String readType;
public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE,
needCheckColumnPriv);
+ }
+
+ @Override
+ protected void doInitialize() throws UserException {
+ super.doInitialize();
lakeSoulExternalTable = (LakeSoulExternalTable) desc.getTable();
- table = lakeSoulExternalTable.getLakeSoulTableInfo();
+ TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo();
+ location = tableInfo.getTablePath();
+ tableName = tableInfo.getTableName();
+ partitions = tableInfo.getPartitions();
+ readType = LakeSoulOptions.ReadType$.MODULE$.FULL_READ();
+ try {
+ tableProperties = new ObjectMapper().readValue(
+ tableInfo.getProperties(),
+ new TypeReference<Map<String, String>>() {}
+ );
+ tableArrowSchema = Schema.fromJSON(tableInfo.getTableSchema());
+ List<Field> partitionFields =
+ DBUtil.parseTableInfoPartitions(partitions)
+ .rangeKeys
+ .stream()
+
.map(tableArrowSchema::findField).collect(Collectors.toList());
+ partitionArrowSchema = new Schema(partitionFields);
+ } catch (IOException e) {
+ throw new UserException(e);
+ }
}
@Override
protected TFileType getLocationType() throws UserException {
- String location = table.getTablePath();
return getLocationType(location);
}
@@ -81,12 +132,12 @@ public class LakeSoulScanNode extends FileQueryScanNode {
@Override
protected List<String> getPathPartitionKeys() throws UserException {
- return new
ArrayList<>(DBUtil.parseTableInfoPartitions(table.getPartitions()).rangeKeys);
+ return new
ArrayList<>(DBUtil.parseTableInfoPartitions(partitions).rangeKeys);
}
@Override
protected TableIf getTargetTable() throws UserException {
- return lakeSoulExternalTable;
+ return desc.getTable();
}
@Override
@@ -94,13 +145,21 @@ public class LakeSoulScanNode extends FileQueryScanNode {
return lakeSoulExternalTable.getHadoopProperties();
}
+ @SneakyThrows
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}", rangeDesc);
+ }
if (split instanceof LakeSoulSplit) {
setLakeSoulParams(rangeDesc, (LakeSoulSplit) split);
}
}
+ public ExternalCatalog getCatalog() {
+ return lakeSoulExternalTable.getCatalog();
+ }
+
public static boolean isExistHashPartition(TableInfo tif) {
JSONObject tableProperties = JSON.parseObject(tif.getProperties());
if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM())
@@ -111,13 +170,47 @@ public class LakeSoulScanNode extends FileQueryScanNode {
}
}
- public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit
lakeSoulSplit) {
+ public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit
lakeSoulSplit) throws IOException {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(lakeSoulSplit.getTableFormatType().value());
TLakeSoulFileDesc fileDesc = new TLakeSoulFileDesc();
fileDesc.setFilePaths(lakeSoulSplit.getPaths());
fileDesc.setPrimaryKeys(lakeSoulSplit.getPrimaryKeys());
fileDesc.setTableSchema(lakeSoulSplit.getTableSchema());
+
+
+ JSONObject options = new JSONObject();
+ Plan predicate = LakeSoulUtils.getPushPredicate(
+ conjuncts,
+ tableName,
+ tableArrowSchema,
+ partitionArrowSchema,
+ tableProperties,
+
readType.equals(LakeSoulOptions.ReadType$.MODULE$.INCREMENTAL_READ()));
+ if (predicate != null) {
+ options.put(LakeSoulUtils.SUBSTRAIT_PREDICATE,
SubstraitUtil.encodeBase64String(predicate));
+ }
+ Map<String, String> catalogProps = getCatalog().getProperties();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}", catalogProps);
+ }
+
+ if (catalogProps.get(S3Properties.Env.ENDPOINT) != null) {
+ options.put(LakeSoulUtils.FS_S3A_ENDPOINT,
catalogProps.get(S3Properties.Env.ENDPOINT));
+ options.put(LakeSoulUtils.FS_S3A_PATH_STYLE_ACCESS, "true");
+ if (catalogProps.get(S3Properties.Env.ACCESS_KEY) != null) {
+ options.put(LakeSoulUtils.FS_S3A_ACCESS_KEY,
catalogProps.get(S3Properties.Env.ACCESS_KEY));
+ }
+ if (catalogProps.get(S3Properties.Env.SECRET_KEY) != null) {
+ options.put(LakeSoulUtils.FS_S3A_SECRET_KEY,
catalogProps.get(S3Properties.Env.SECRET_KEY));
+ }
+ if (catalogProps.get(S3Properties.Env.REGION) != null) {
+ options.put(LakeSoulUtils.FS_S3A_REGION,
catalogProps.get(S3Properties.Env.REGION));
+ }
+ }
+
+ fileDesc.setOptions(JSON.toJSONString(options));
+
fileDesc.setPartitionDescs(lakeSoulSplit.getPartitionDesc()
.entrySet().stream().map(entry ->
String.format("%s=%s", entry.getKey(),
entry.getValue())).collect(Collectors.toList()));
@@ -126,24 +219,51 @@ public class LakeSoulScanNode extends FileQueryScanNode {
}
public List<Split> getSplits() throws UserException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getSplits with columnFilters={}", columnFilters);
+ LOG.debug("getSplits with columnNameToRange={}",
columnNameToRange);
+ LOG.debug("getSplits with conjuncts={}", conjuncts);
+ }
+
+ List<PartitionInfo> allPartitionInfo =
lakeSoulExternalTable.listPartitionInfo();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allPartitionInfo={}", allPartitionInfo);
+ }
+ List<PartitionInfo> filteredPartitionInfo = allPartitionInfo;
+ try {
+ filteredPartitionInfo =
+ LakeSoulUtils.applyPartitionFilters(
+ allPartitionInfo,
+ tableName,
+ partitionArrowSchema,
+ columnNameToRange
+ );
+ } catch (IOException e) {
+ throw new UserException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("filteredPartitionInfo={}", filteredPartitionInfo);
+ }
+ DataFileInfo[] dataFileInfos =
DataOperation.getTableDataInfo(filteredPartitionInfo);
+
List<Split> splits = new ArrayList<>();
Map<String, Map<Integer, List<String>>> splitByRangeAndHashPartition =
new LinkedHashMap<>();
- TableInfo tif = table;
- DataFileInfo[] dfinfos =
DataOperation.getTableDataInfo(table.getTableId());
- for (DataFileInfo pif : dfinfos) {
- if (isExistHashPartition(tif) && pif.file_bucket_id() != -1) {
-
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new
LinkedHashMap<>())
- .computeIfAbsent(pif.file_bucket_id(), v -> new
ArrayList<>())
- .add(pif.path());
+ TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo();
+
+ for (DataFileInfo fileInfo : dataFileInfos) {
+ if (isExistHashPartition(tableInfo) && fileInfo.file_bucket_id()
!= -1) {
+
splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k ->
new LinkedHashMap<>())
+ .computeIfAbsent(fileInfo.file_bucket_id(), v -> new
ArrayList<>())
+ .add(fileInfo.path());
} else {
-
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new
LinkedHashMap<>())
+
splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k ->
new LinkedHashMap<>())
.computeIfAbsent(-1, v -> new ArrayList<>())
- .add(pif.path());
+ .add(fileInfo.path());
}
}
List<String> pkKeys = null;
- if (!table.getPartitions().equals(";")) {
- pkKeys =
Lists.newArrayList(table.getPartitions().split(";")[1].split(","));
+ if (!tableInfo.getPartitions().equals(";")) {
+ pkKeys =
Lists.newArrayList(tableInfo.getPartitions().split(";")[1].split(","));
}
for (Map.Entry<String, Map<Integer, List<String>>> entry :
splitByRangeAndHashPartition.entrySet()) {
@@ -161,7 +281,7 @@ public class LakeSoulScanNode extends FileQueryScanNode {
split.getValue(),
pkKeys,
rangeDesc,
- table.getTableSchema(),
+ tableInfo.getTableSchema(),
0, 0, 0,
new String[0], null);
lakeSoulSplit.setTableFormatType(TableFormatType.LAKESOUL);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index 82945fb6963..64178846abf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -283,6 +283,7 @@ public class BindRelation extends OneAnalysisRuleFactory {
case PAIMON_EXTERNAL_TABLE:
case MAX_COMPUTE_EXTERNAL_TABLE:
case TRINO_CONNECTOR_EXTERNAL_TABLE:
+ case LAKESOUl_EXTERNAL_TABLE:
return new
LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table,
qualifierWithoutTableName,
unboundRelation.getTableSample(),
unboundRelation.getTableSnapshot());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
new file mode 100644
index 00000000000..aebd74f5e02
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.CompoundPredicate.Operator;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.DecimalLiteral;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.DateUnit;
+import
com.lakesoul.shaded.org.apache.arrow.vector.types.FloatingPointPrecision;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.TimeUnit;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.ArrowType;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.FieldType;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
+import io.substrait.expression.Expression;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LakeSoulPredicateTest {
+
+ public static Schema schema;
+
+ @BeforeClass
+ public static void before() throws AnalysisException, IOException {
+ schema = new Schema(
+ Arrays.asList(
+ new Field("c_int", FieldType.nullable(new
ArrowType.Int(32, true)), null),
+ new Field("c_long", FieldType.nullable(new
ArrowType.Int(64, true)), null),
+ new Field("c_bool", FieldType.nullable(new
ArrowType.Bool()), null),
+ new Field("c_float", FieldType.nullable(new
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), null),
+ new Field("c_double", FieldType.nullable(new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null),
+ new Field("c_dec", FieldType.nullable(new
ArrowType.Decimal(20, 10)), null),
+ new Field("c_date", FieldType.nullable(new
ArrowType.Date(DateUnit.DAY)), null),
+ new Field("c_ts", FieldType.nullable(new
ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), null),
+ new Field("c_str", FieldType.nullable(new
ArrowType.Utf8()), null)
+ ));
+ }
+
+ @Test
+ public void testBinaryPredicate() throws AnalysisException, IOException {
+ List<LiteralExpr> literalList = new ArrayList<LiteralExpr>() {{
+ add(new BoolLiteral(true));
+ add(new DateLiteral("2023-01-02", Type.DATEV2));
+ add(new DateLiteral("2024-01-02 12:34:56.123456",
Type.DATETIMEV2));
+ add(new DecimalLiteral(new BigDecimal("1.23")));
+ add(new FloatLiteral(1.23, Type.FLOAT));
+ add(new FloatLiteral(3.456, Type.DOUBLE));
+ add(new IntLiteral(1, Type.TINYINT));
+ add(new IntLiteral(1, Type.SMALLINT));
+ add(new IntLiteral(1, Type.INT));
+ add(new IntLiteral(1, Type.BIGINT));
+ add(new StringLiteral("abc"));
+ add(new StringLiteral("2023-01-02"));
+ add(new StringLiteral("2023-01-02 01:02:03.456789"));
+ }};
+
+ List<SlotRef> slotRefs = new ArrayList<SlotRef>() {{
+ add(new SlotRef(new TableName(), "c_int"));
+ add(new SlotRef(new TableName(), "c_long"));
+ add(new SlotRef(new TableName(), "c_bool"));
+ add(new SlotRef(new TableName(), "c_float"));
+ add(new SlotRef(new TableName(), "c_double"));
+ add(new SlotRef(new TableName(), "c_dec"));
+ add(new SlotRef(new TableName(), "c_date"));
+ add(new SlotRef(new TableName(), "c_ts"));
+ add(new SlotRef(new TableName(), "c_str"));
+ }};
+
+ // true indicates support for pushdown
+ Boolean[][] expects = new Boolean[][] {
+ { // int
+ false, false, false, false, false, false, true, true,
true, true, false, false, false
+ },
+ { // long
+ false, false, false, false, false, false, true, true,
true, true, false, false, false
+ },
+ { // boolean
+ true, false, false, false, false, false, false, false,
false, false, false, false, false
+ },
+ { // float
+ false, false, false, false, true, false, true, true,
true, true, false, false, false
+ },
+ { // double
+ false, false, false, true, true, true, true, true,
true, true, false, false, false
+ },
+ { // decimal
+ false, false, false, true, true, true, true, true,
true, true, false, false, false
+ },
+ { // date
+ false, true, false, false, false, false, true, true,
true, true, false, true, false
+ },
+ { // timestamp
+ false, true, true, false, false, false, false, false,
false, true, false, false, false
+ },
+ { // string
+ true, true, true, true, false, false, false, false,
false, false, true, true, true
+ }
+ };
+
+ ArrayListMultimap<Boolean, Expr> validPredicateMap =
ArrayListMultimap.create();
+
+ // binary predicate
+ for (int i = 0; i < slotRefs.size(); i++) {
+ final int loc = i;
+ List<Boolean> ret = literalList.stream().map(literal -> {
+ BinaryPredicate expr = new
BinaryPredicate(BinaryPredicate.Operator.EQ, slotRefs.get(loc), literal);
+ Expression expression = null;
+ try {
+ expression = LakeSoulUtils.convertToSubstraitExpr(expr,
schema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ validPredicateMap.put(expression != null, expr);
+ return expression != null;
+ }).collect(Collectors.toList());
+ Assert.assertArrayEquals(expects[i], ret.toArray());
+ }
+
+ // in predicate
+ for (int i = 0; i < slotRefs.size(); i++) {
+ final int loc = i;
+ List<Boolean> ret = literalList.stream().map(literal -> {
+ InPredicate expr = new InPredicate(slotRefs.get(loc),
Lists.newArrayList(literal), false);
+ Expression expression = null;
+ try {
+ expression = LakeSoulUtils.convertToSubstraitExpr(expr,
schema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ validPredicateMap.put(expression != null, expr);
+ return expression != null;
+ }).collect(Collectors.toList());
+ Assert.assertArrayEquals(expects[i], ret.toArray());
+ }
+
+ // not in predicate
+ for (int i = 0; i < slotRefs.size(); i++) {
+ final int loc = i;
+ List<Boolean> ret = literalList.stream().map(literal -> {
+ InPredicate expr = new InPredicate(slotRefs.get(loc),
Lists.newArrayList(literal), true);
+ Expression expression = null;
+ try {
+ expression = LakeSoulUtils.convertToSubstraitExpr(expr,
schema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ validPredicateMap.put(expression != null, expr);
+ return expression != null;
+ }).collect(Collectors.toList());
+ Assert.assertArrayEquals(expects[i], ret.toArray());
+ }
+
+ // bool literal
+
+ Expression trueExpr = LakeSoulUtils.convertToSubstraitExpr(new
BoolLiteral(true), schema);
+ Expression falseExpr = LakeSoulUtils.convertToSubstraitExpr(new
BoolLiteral(false), schema);
+ Assert.assertEquals(SubstraitUtil.CONST_TRUE, trueExpr);
+ Assert.assertEquals(SubstraitUtil.CONST_FALSE, falseExpr);
+ validPredicateMap.put(true, new BoolLiteral(true));
+ validPredicateMap.put(true, new BoolLiteral(false));
+
+ List<Expr> validExprs = validPredicateMap.get(true);
+ List<Expr> invalidExprs = validPredicateMap.get(false);
+ // OR predicate
+ // both valid
+ for (int i = 0; i < validExprs.size(); i++) {
+ for (int j = 0; j < validExprs.size(); j++) {
+ CompoundPredicate orPredicate = new
CompoundPredicate(Operator.OR,
+ validExprs.get(i), validExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+ Assert.assertNotNull("pred: " + orPredicate.toSql(),
expression);
+ }
+ }
+ // both invalid
+ for (int i = 0; i < invalidExprs.size(); i++) {
+ for (int j = 0; j < invalidExprs.size(); j++) {
+ CompoundPredicate orPredicate = new
CompoundPredicate(Operator.OR,
+ invalidExprs.get(i), invalidExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+ Assert.assertNull("pred: " + orPredicate.toSql(), expression);
+ }
+ }
+ // valid or invalid
+ for (int i = 0; i < validExprs.size(); i++) {
+ for (int j = 0; j < invalidExprs.size(); j++) {
+ CompoundPredicate orPredicate = new
CompoundPredicate(Operator.OR,
+ validExprs.get(i), invalidExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+ Assert.assertNull("pred: " + orPredicate.toSql(), expression);
+ }
+ }
+
+ // AND predicate
+ // both valid
+ for (int i = 0; i < validExprs.size(); i++) {
+ for (int j = 0; j < validExprs.size(); j++) {
+ CompoundPredicate andPredicate = new
CompoundPredicate(Operator.AND,
+ validExprs.get(i), validExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+ Assert.assertNotNull("pred: " + andPredicate.toSql(),
expression);
+ }
+ }
+ // both invalid
+ for (int i = 0; i < invalidExprs.size(); i++) {
+ for (int j = 0; j < invalidExprs.size(); j++) {
+ CompoundPredicate andPredicate = new
CompoundPredicate(Operator.AND,
+ invalidExprs.get(i), invalidExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+ Assert.assertNull("pred: " + andPredicate.toSql(), expression);
+ }
+ }
+ // valid and invalid
+ for (int i = 0; i < validExprs.size(); i++) {
+ for (int j = 0; j < invalidExprs.size(); j++) {
+ CompoundPredicate andPredicate = new
CompoundPredicate(Operator.AND,
+ validExprs.get(i), invalidExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+ Assert.assertNotNull("pred: " + andPredicate.toSql(),
expression);
+
Assert.assertEquals(SubstraitUtil.substraitExprToProto(LakeSoulUtils.convertToSubstraitExpr(validExprs.get(i),
schema), "table"),
+ SubstraitUtil.substraitExprToProto(expression,
"table"));
+ }
+ }
+
+ // NOT predicate
+ // valid
+ for (int i = 0; i < validExprs.size(); i++) {
+ CompoundPredicate notPredicate = new
CompoundPredicate(Operator.NOT,
+ validExprs.get(i), null);
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema);
+ Assert.assertNotNull("pred: " + notPredicate.toSql(), expression);
+ }
+ // invalid
+ for (int i = 0; i < invalidExprs.size(); i++) {
+ CompoundPredicate notPredicate = new
CompoundPredicate(Operator.NOT,
+ invalidExprs.get(i), null);
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema);
+ Assert.assertNull("pred: " + notPredicate.toSql(), expression);
+ }
+ }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index 0c08bb327ae..3d43297fe21 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -317,6 +317,8 @@ under the License.
<hudi.version>0.14.1</hudi.version>
<presto.hadoop.version>2.7.4-11</presto.hadoop.version>
<presto.hive.version>3.0.0-8</presto.hive.version>
+ <!-- lakesoul -->
+ <lakesoul.version>2.6.1</lakesoul.version>
<parquet.version>1.13.1</parquet.version>
<commons-collections.version>3.2.2</commons-collections.version>
@@ -1831,4 +1833,4 @@ under the License.
</snapshots>
</repository>
</repositories>
-</project>
\ No newline at end of file
+</project>
diff --git a/regression-test/conf/regression-conf.groovy
b/regression-test/conf/regression-conf.groovy
index 08d03632c37..d4591bf1782 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -228,3 +228,14 @@ enableTrinoConnectorTest = false
enableKerberosTest=false
kerberosHmsPort=9883
kerberosHdfsPort=8820
+
+
+// LakeSoul catalog test config
+enableLakesoulTest = false
+lakesoulPGUser="*******"
+lakesoulPGPwd="*******"
+lakesoulPGUrl="*******"
+lakesoulMinioAK="*******"
+lakesoulMinioSK="*******"
+lakesoulMinioEndpoint="*******"
+
diff --git
a/regression-test/data/external_table_p0/lakesoul/test_lakesoul_filter.out
b/regression-test/data/external_table_p0/lakesoul/test_lakesoul_filter.out
new file mode 100644
index 00000000000..cb1899326bd
--- /dev/null
+++ b/regression-test/data/external_table_p0/lakesoul/test_lakesoul_filter.out
@@ -0,0 +1,8 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !lakesoul --
+0 AFRICA lar deposits. blithely final packages cajole. regular waters
are final requests. regular accounts are according to
+2 ASIA ges. thinly even pinto beans ca
+3 EUROPE ly final courts cajole furiously final excuse
+1 AMERICA hs use ironic, even requests. s
+4 MIDDLE EAST uickly special accounts cajole carefully blithely close
requests. carefully final asymptotes haggle furiousl
+
diff --git a/regression-test/pipeline/external/conf/regression-conf.groovy
b/regression-test/pipeline/external/conf/regression-conf.groovy
index 93965b84219..31e2c2629ce 100644
--- a/regression-test/pipeline/external/conf/regression-conf.groovy
+++ b/regression-test/pipeline/external/conf/regression-conf.groovy
@@ -160,3 +160,7 @@ enableTrinoConnectorTest = true
enableKerberosTest = true
kerberosHmsPort=9883
kerberosHdfsPort=8820
+
+
+// LakeSoul catalog test config
+enableLakesoulTest = true
diff --git
a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
index e0b8a924c30..ffd95d93097 100644
---
a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
+++
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
@@ -16,20 +16,33 @@
// under the License.
suite("test_lakesoul_catalog",
"p0,external,doris,external_docker,external_docker_doris") {
- def enabled = false;
+ String enabled = context.config.otherConfigs.get("enableLakesoulTest")
// open it when docker image is ready to run in regression test
- if (enabled) {
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
String catalog_name = "lakesoul"
String db_name = "default"
+ String pg_user = context.config.otherConfigs.get("lakesoulPGUser")
+ String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd")
+ String pg_url = context.config.otherConfigs.get("lakesoulPGUrl")
+ String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK")
+ String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK")
+ String minio_endpoint =
context.config.otherConfigs.get("lakesoulMinioEndpoint")
sql """drop catalog if exists ${catalog_name}"""
- sql """
- create catalog lakesoul properties
('type'='lakesoul','lakesoul.pg.username'='lakesoul_test','lakesoul.pg.password'='lakesoul_test','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified');"""
+ sql """create catalog lakesoul properties (
+ 'type'='lakesoul',
+ 'lakesoul.pg.username'='${pg_user}',
+ 'lakesoul.pg.password'='${pg_pwd}',
+ 'lakesoul.pg.url'='${pg_url}',
+ 'minio.endpoint'='${minio_endpoint}',
+ 'minio.access_key'='${minio_ak}',
+ 'minio.secret_key'='${minio_sk}'
+ );"""
// analyze
sql """use `${catalog_name}`.`${db_name}`"""
- sq """show tables;"""
+ sql """show tables;"""
// select
sql """select * from nation;"""
diff --git
a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy
new file mode 100644
index 00000000000..799e8ba61bb
--- /dev/null
+++
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_lakesoul_filter",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableLakesoulTest")
+ // open it when docker image is ready to run in regression test
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String catalog_name = "lakesoul"
+ String db_name = "default"
+ String pg_user = context.config.otherConfigs.get("lakesoulPGUser")
+ String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd")
+ String pg_url = context.config.otherConfigs.get("lakesoulPGUrl")
+ String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK")
+ String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK")
+ String minio_endpoint =
context.config.otherConfigs.get("lakesoulMinioEndpoint")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog lakesoul properties (
+ 'type'='lakesoul',
+ 'lakesoul.pg.username'='${pg_user}',
+ 'lakesoul.pg.password'='${pg_pwd}',
+ 'lakesoul.pg.url'='${pg_url}',
+ 'minio.endpoint'='${minio_endpoint}',
+ 'minio.access_key'='${minio_ak}',
+ 'minio.secret_key'='${minio_sk}'
+ );"""
+
+ // analyze
+ sql """use `${catalog_name}`.`${db_name}`"""
+
+ sql """show tables;"""
+ // select
+ sql """select * from region;"""
+
+ sql """select * from nation;"""
+
+ sql """select * from nation where n_regionkey = 0 or n_nationkey >
14;"""
+
+ sql """select * from nation where n_regionkey = 0 and n_nationkey >
0;"""
+
+ sql """select * from nation where n_regionkey = 0;"""
+ }
+}
+
diff --git
a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
index 9369a28e8fe..bb85dc687d7 100644
---
a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
+++
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
@@ -34,13 +34,25 @@ suite("test_external_table_lakesoul",
"p2,external,lakesoul,external_remote,exte
if (enabled != null && enabled.equalsIgnoreCase("true")) {
- String catalog_name = "lakesoul"
- String db_name = "default"
+ String catalog_name = "lakesoul"
+ String db_name = "default"
+ String pg_user = context.config.otherConfigs.get("lakesoulPGUser")
+ String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd")
+ String pg_url = context.config.otherConfigs.get("lakesoulPGUrl")
+ String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK")
+ String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK")
+ String minio_endpoint =
context.config.otherConfigs.get("lakesoulMinioEndpoint")
- sql """drop catalog if exists ${catalog_name}"""
- sql """
- create catalog lakesoul properties
('type'='lakesoul','lakesoul.pg.username'='lakesoul','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5433/lakesoul_test?stringtype=unspecified');
- """
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog lakesoul properties (
+ 'type'='lakesoul',
+ 'lakesoul.pg.username'='${pg_user}',
+ 'lakesoul.pg.password'='${pg_pwd}',
+ 'lakesoul.pg.url'='${pg_url}',
+ 'minio.endpoint'='${minio_endpoint}',
+ 'minio.access_key'='${minio_ak}',
+ 'minio.secret_key'='${minio_sk}'
+ );"""
// analyze
sql """use `${catalog_name}`.`${db_name}`"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]