This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 719af36f138 [Cherry-Pick][Fix](lakesoul) Fix CVEs and some filter
pushdown issues (#39044) (#40812)
719af36f138 is described below
commit 719af36f1384892ae25de362c83ffac162569e60
Author: Ceng <[email protected]>
AuthorDate: Mon Sep 23 14:09:36 2024 +0800
[Cherry-Pick][Fix](lakesoul) Fix CVEs and some filter pushdown issues
(#39044) (#40812)
bp #39044
Signed-off-by: dmetasoul01 <[email protected]>
Co-authored-by: Xu Chen <[email protected]>
Co-authored-by: dmetasoul01 <[email protected]>
---
fe/be-java-extensions/lakesoul-scanner/pom.xml | 74 +--
.../apache/doris/lakesoul/LakeSoulJniScanner.java | 61 ++-
.../org/apache/doris/lakesoul/LakeSoulUtils.java | 30 +-
.../apache/doris/lakesoul/arrow/ArrowUtils.java | 24 +-
.../lakesoul/arrow/LakeSoulArrowJniScanner.java | 58 ++-
.../doris/lakesoul/parquet/ParquetFilter.java | 288 -----------
fe/fe-core/pom.xml | 69 ++-
.../lakesoul/LakeSoulExternalCatalog.java | 48 +-
.../datasource/lakesoul/LakeSoulExternalTable.java | 26 +-
.../doris/datasource/lakesoul/LakeSoulUtils.java | 526 +++++++++++++++++++++
.../lakesoul/source/LakeSoulScanNode.java | 170 ++++++-
.../datasource/lakesoul/LakeSoulPredicateTest.java | 280 +++++++++++
fe/pom.xml | 4 +-
.../lakesoul/test_lakesoul_filter.groovy | 74 +++
.../lakesoul/test_external_table_lakesoul.groovy | 24 +-
15 files changed, 1265 insertions(+), 491 deletions(-)
diff --git a/fe/be-java-extensions/lakesoul-scanner/pom.xml
b/fe/be-java-extensions/lakesoul-scanner/pom.xml
index cbbb473483e..4a1ae6b2e8a 100644
--- a/fe/be-java-extensions/lakesoul-scanner/pom.xml
+++ b/fe/be-java-extensions/lakesoul-scanner/pom.xml
@@ -47,87 +47,19 @@ under the License.
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-vector</artifactId>
- <version>${arrow.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-memory-unsafe</artifactId>
- <version>${arrow.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-c-data</artifactId>
- <version>${arrow.version}</version>
- </dependency>
-
- <!-- scala deps -->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- <version>${scala.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- <version>${scala.version}</version>
- </dependency>
<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-io-java</artifactId>
- <version>2.5.4</version>
+ <version>${lakesoul.version}</version>
+ <classifier>shaded</classifier>
<exclusions>
<exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.arrow</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.antlr</groupId>
- <artifactId>antlr4-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
+ <groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
-
- <dependency>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-column</artifactId>
- <version>1.12.2</version>
- </dependency>
-
</dependencies>
<build>
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
index 3dfbff756db..bedef57d3b7 100644
---
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
@@ -17,25 +17,30 @@
package org.apache.doris.lakesoul;
-import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.lakesoul.arrow.LakeSoulArrowJniScanner;
-import org.apache.doris.lakesoul.parquet.ParquetFilter;
import com.dmetasoul.lakesoul.LakeSoulArrowReader;
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.types.pojo.Schema;
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.lakesoul.shaded.com.fasterxml.jackson.core.type.TypeReference;
+import com.lakesoul.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+import com.lakesoul.shaded.io.substrait.proto.Plan;
+import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
+import com.lakesoul.shaded.org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class LakeSoulJniScanner extends LakeSoulArrowJniScanner {
+ private static final Logger LOG =
LoggerFactory.getLogger(LakeSoulJniScanner.class);
private final Map<String, String> params;
@@ -60,6 +65,8 @@ public class LakeSoulJniScanner extends
LakeSoulArrowJniScanner {
withAllocator(nativeIOReader.getAllocator());
nativeIOReader.setBatchSize(batchSize);
+ LOG.info("opening LakeSoulJniScanner with params={}", params);
+
// add files
for (String file :
params.get(LakeSoulUtils.FILE_NAMES).split(LakeSoulUtils.LIST_DELIM)) {
nativeIOReader.addFile(file);
@@ -72,19 +79,43 @@ public class LakeSoulJniScanner extends
LakeSoulArrowJniScanner {
Arrays.stream(primaryKeys.split(LakeSoulUtils.LIST_DELIM)).collect(Collectors.toList()));
}
- Schema schema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
+ String options = params.getOrDefault(LakeSoulUtils.OPTIONS, "{}");
+ Map<String, String> optionsMap = new ObjectMapper().readValue(
+ options, new TypeReference<Map<String, String>>() {}
+ );
+ String base64Predicate =
optionsMap.get(LakeSoulUtils.SUBSTRAIT_PREDICATE);
+ if (base64Predicate != null) {
+ Plan predicate = SubstraitUtil.decodeBase64String(base64Predicate);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("push predicate={}", predicate);
+ }
+ nativeIOReader.addFilterProto(predicate);
+ }
+
+ for (String key : LakeSoulUtils.OBJECT_STORE_OPTIONS) {
+ String value = optionsMap.get(key);
+ if (key != null) {
+ nativeIOReader.setObjectStoreOption(key, value);
+ }
+ }
+
+ Schema tableSchema =
Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
String[] requiredFieldNames =
params.get(LakeSoulUtils.REQUIRED_FIELDS).split(LakeSoulUtils.LIST_DELIM);
List<Field> requiredFields = new ArrayList<>();
for (String fieldName : requiredFieldNames) {
- requiredFields.add(schema.findField(fieldName));
+ String name = fieldName;
+ if (StringUtils.isEmpty(name)) {
+ continue;
+ }
+ requiredFields.add(tableSchema.findField(fieldName));
}
requiredSchema = new Schema(requiredFields);
nativeIOReader.setSchema(requiredSchema);
- HashSet<String> partitionColumn = new HashSet<>();
+ List<Field> partitionFields = new ArrayList<>();
for (String partitionKV :
params.getOrDefault(LakeSoulUtils.PARTITION_DESC, "")
.split(LakeSoulUtils.LIST_DELIM)) {
if (partitionKV.isEmpty()) {
@@ -94,17 +125,15 @@ public class LakeSoulJniScanner extends
LakeSoulArrowJniScanner {
if (kv.length != 2) {
throw new IllegalArgumentException("Invalid partition column =
" + partitionKV);
}
- partitionColumn.add(kv[0]);
+ nativeIOReader.setDefaultColumnValue(kv[0], kv[1]);
+ partitionFields.add(tableSchema.findField(kv[0]));
+ }
+ if (!partitionFields.isEmpty()) {
+ nativeIOReader.setPartitionSchema(new Schema(partitionFields));
}
initTableInfo(params);
- for (ScanPredicate predicate : predicates) {
- if (!partitionColumn.contains(predicate.columName)) {
-
nativeIOReader.addFilter(ParquetFilter.toParquetFilter(predicate).toString());
- }
- }
-
nativeIOReader.initializeReader();
lakesoulArrowReader = new LakeSoulArrowReader(nativeIOReader,
awaitTimeout);
}
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
index 6c7f88f3ab3..ca07a81d0da 100644
---
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
@@ -17,13 +17,29 @@
package org.apache.doris.lakesoul;
+import java.util.Arrays;
+import java.util.List;
+
public class LakeSoulUtils {
- public static String FILE_NAMES = "file_paths";
- public static String PRIMARY_KEYS = "primary_keys";
- public static String SCHEMA_JSON = "table_schema";
- public static String PARTITION_DESC = "partition_descs";
- public static String REQUIRED_FIELDS = "required_fields";
+ public static final String FILE_NAMES = "file_paths";
+ public static final String PRIMARY_KEYS = "primary_keys";
+ public static final String SCHEMA_JSON = "table_schema";
+ public static final String PARTITION_DESC = "partition_descs";
+ public static final String REQUIRED_FIELDS = "required_fields";
+ public static final String OPTIONS = "options";
+ public static final String SUBSTRAIT_PREDICATE = "substrait_predicate";
+ public static final String LIST_DELIM = ";";
+ public static final String PARTITIONS_KV_DELIM = "=";
+
+ public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+ public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+ public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+ public static final String FS_S3A_PATH_STYLE_ACCESS =
"fs.s3a.path.style.access";
- public static String LIST_DELIM = ";";
- public static String PARTITIONS_KV_DELIM = "=";
+ public static final List<String> OBJECT_STORE_OPTIONS = Arrays.asList(
+ FS_S3A_ACCESS_KEY,
+ FS_S3A_SECRET_KEY,
+ FS_S3A_ENDPOINT,
+ FS_S3A_PATH_STYLE_ACCESS
+ );
}
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
index 3ad28ba783a..a3f9bc4788d 100644
---
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
@@ -20,10 +20,10 @@ package org.apache.doris.lakesoul.arrow;
import org.apache.doris.common.jni.utils.OffHeap;
import org.apache.doris.common.jni.utils.TypeNativeBytes;
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.util.Preconditions;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-import org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf;
+import com.lakesoul.shaded.org.apache.arrow.util.Preconditions;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.ArrowType;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -40,7 +40,7 @@ public class ArrowUtils {
LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, 0,
ZoneOffset.UTC);
OffHeap.putLong(null, address + offset,
TypeNativeBytes.convertToDateTimeV2(v.getYear(),
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
- v.getMinute(), v.getSecond(), v.getNano() / 1000));
+ v.getMinute(), v.getSecond(), v.getNano() / 1000));
offset += 8;
}
@@ -58,7 +58,7 @@ public class ArrowUtils {
LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int)
nanoSec, ZoneOffset.UTC);
OffHeap.putLong(null, address + offset,
TypeNativeBytes.convertToDateTimeV2(v.getYear(),
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
- v.getMinute(), v.getSecond(), v.getNano() / 1000));
+ v.getMinute(), v.getSecond(), v.getNano() / 1000));
offset += 8;
}
@@ -76,7 +76,7 @@ public class ArrowUtils {
LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int)
nanoSec, ZoneOffset.UTC);
OffHeap.putLong(null, address + offset,
TypeNativeBytes.convertToDateTimeV2(v.getYear(),
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
- v.getMinute(), v.getSecond(), v.getNano() / 1000));
+ v.getMinute(), v.getSecond(), v.getNano() / 1000));
offset += 8;
}
@@ -94,7 +94,7 @@ public class ArrowUtils {
LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int)
nanoSec, ZoneOffset.UTC);
OffHeap.putLong(null, address + offset,
TypeNativeBytes.convertToDateTimeV2(v.getYear(),
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
- v.getMinute(), v.getSecond(), v.getNano() / 1000));
+ v.getMinute(), v.getSecond(), v.getNano() / 1000));
offset += 8;
}
@@ -215,11 +215,9 @@ public class ArrowUtils {
return hiveType.toString();
}
- private static class ArrowTypeToHiveTypeConverter
- implements ArrowType.ArrowTypeVisitor<String> {
+ private static class ArrowTypeToHiveTypeConverter implements
ArrowType.ArrowTypeVisitor<String> {
- private static final ArrowTypeToHiveTypeConverter INSTANCE =
- new ArrowTypeToHiveTypeConverter();
+ private static final ArrowTypeToHiveTypeConverter INSTANCE = new
ArrowTypeToHiveTypeConverter();
@Override
public String visit(ArrowType.Null type) {
@@ -359,6 +357,4 @@ public class ArrowUtils {
return "unsupported";
}
}
-
-
}
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
index 320d653a20a..b8fb8e92bd4 100644
---
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
@@ -23,28 +23,28 @@ import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.VectorTable;
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DateDayVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.TimeStampMicroTZVector;
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliTZVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampNanoTZVector;
-import org.apache.arrow.vector.TimeStampNanoVector;
-import org.apache.arrow.vector.TimeStampSecTZVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.VarCharVector;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.complex.ListVector;
-import org.apache.arrow.vector.complex.StructVector;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.arrow.vector.types.pojo.Schema;
+import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf;
+import com.lakesoul.shaded.org.apache.arrow.memory.BufferAllocator;
+import com.lakesoul.shaded.org.apache.arrow.vector.BitVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.DateDayVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.DecimalVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.FieldVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecTZVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.ValueVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.VarCharVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot;
+import com.lakesoul.shaded.org.apache.arrow.vector.complex.ListVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.complex.StructVector;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field;
+import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema;
import org.apache.log4j.Logger;
import java.io.IOException;
@@ -101,7 +101,7 @@ public class LakeSoulArrowJniScanner extends JniScanner {
String[] requiredFields = new String[fields.size()];
for (int i = 0; i < fields.size(); i++) {
columnTypes[i] =
- ColumnType.parseType(fields.get(i).getName(),
ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
+ ColumnType.parseType(fields.get(i).getName(),
ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
requiredFields[i] = fields.get(i).getName();
}
predicates = new ScanPredicate[0];
@@ -116,8 +116,8 @@ public class LakeSoulArrowJniScanner extends JniScanner {
String[] requiredFields = new String[fields.size()];
for (int i = 0; i < fields.size(); i++) {
columnTypes[i] =
- ColumnType.parseType(fields.get(i).getName(),
- ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
+ ColumnType.parseType(fields.get(i).getName(),
+ ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
requiredFields[i] = fields.get(i).getName();
}
@@ -142,10 +142,8 @@ public class LakeSoulArrowJniScanner extends JniScanner {
private Integer fillMetaAddressVector(int batchSize, ColumnType
columnType, long metaAddress, Integer offset,
ValueVector valueVector) {
// nullMap
- long
- validityBuffer =
- ArrowUtils.loadValidityBuffer(valueVector.getValidityBuffer(),
batchSize,
- valueVector.getField().isNullable());
+ long validityBuffer =
ArrowUtils.loadValidityBuffer(valueVector.getValidityBuffer(), batchSize,
+ valueVector.getField().isNullable());
extraOffHeap.add(validityBuffer);
OffHeap.putLong(null, metaAddress + (offset++) * 8, validityBuffer);
@@ -172,7 +170,7 @@ public class LakeSoulArrowJniScanner extends JniScanner {
continue;
}
offset = fillMetaAddressVector(batchSize,
columnType.getChildTypes().get(i), metaAddress, offset,
- childrenVector);
+ childrenVector);
}
} else if (columnType.isStringType()) {
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
deleted file mode 100644
index 7d2820acd79..00000000000
---
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
+++ /dev/null
@@ -1,288 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.lakesoul.parquet;
-
-import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ScanPredicate;
-
-import org.apache.parquet.filter2.predicate.FilterApi;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
-import org.apache.parquet.io.api.Binary;
-
-public class ParquetFilter {
-
- public static FilterPredicate toParquetFilter(ScanPredicate predicate) {
- ScanPredicate.FilterOp filterOp = predicate.op;
- switch (filterOp) {
- case FILTER_IN:
- return convertIn(predicate);
- case FILTER_NOT_IN:
- return convertNotIn(predicate);
- case FILTER_LESS:
- return convertLess(predicate);
- case FILTER_LARGER:
- return convertLarger(predicate);
- case FILTER_LESS_OR_EQUAL:
- return convertLessOrEqual(predicate);
- case FILTER_LARGER_OR_EQUAL:
- return convertLargerOrEqual(predicate);
- default:
- break;
- }
- throw new RuntimeException("Unsupported ScanPredicate" +
ScanPredicate.dump(new ScanPredicate[] {predicate}));
- }
-
- private static FilterPredicate convertNotIn(ScanPredicate predicate) {
- String colName = predicate.columName;
- ColumnType.Type colType = predicate.type;
- ScanPredicate.PredicateValue[] predicateValues =
predicate.predicateValues();
- FilterPredicate resultPredicate = null;
- for (ScanPredicate.PredicateValue predicateValue : predicateValues) {
- if (resultPredicate == null) {
- resultPredicate = makeNotEquals(colName, colType,
predicateValue);
- } else {
- resultPredicate = FilterApi.and(resultPredicate,
makeNotEquals(colName, colType, predicateValue));
- }
- }
- return resultPredicate;
- }
-
- private static FilterPredicate convertIn(ScanPredicate predicate) {
- String colName = predicate.columName;
- ColumnType.Type colType = predicate.type;
- ScanPredicate.PredicateValue[] predicateValues =
predicate.predicateValues();
- FilterPredicate resultPredicate = null;
- for (ScanPredicate.PredicateValue predicateValue : predicateValues) {
- if (resultPredicate == null) {
- resultPredicate = makeEquals(colName, colType, predicateValue);
- } else {
- resultPredicate = FilterApi.or(resultPredicate,
makeEquals(colName, colType, predicateValue));
- }
- }
- return resultPredicate;
- }
-
- private static FilterPredicate convertLarger(ScanPredicate predicate) {
- String colName = predicate.columName;
- ColumnType.Type colType = predicate.type;
- ScanPredicate.PredicateValue predicateValue =
predicate.predicateValues()[0];
- return makeLarger(colName, colType, predicateValue);
- }
-
- private static FilterPredicate convertLargerOrEqual(ScanPredicate
predicate) {
- String colName = predicate.columName;
- ColumnType.Type colType = predicate.type;
- ScanPredicate.PredicateValue predicateValue =
predicate.predicateValues()[0];
- return makeLargerOrEqual(colName, colType, predicateValue);
- }
-
- private static FilterPredicate convertLess(ScanPredicate predicate) {
- String colName = predicate.columName;
- ColumnType.Type colType = predicate.type;
- ScanPredicate.PredicateValue predicateValue =
predicate.predicateValues()[0];
- return makeLess(colName, colType, predicateValue);
- }
-
- private static FilterPredicate convertLessOrEqual(ScanPredicate predicate)
{
- String colName = predicate.columName;
- ColumnType.Type colType = predicate.type;
- ScanPredicate.PredicateValue predicateValue =
predicate.predicateValues()[0];
- return makeLessOrEqual(colName, colType, predicateValue);
- }
-
- private static FilterPredicate makeNotEquals(String colName,
ColumnType.Type type,
- ScanPredicate.PredicateValue
value) {
- switch (type) {
- case BOOLEAN:
- return FilterApi.notEq(FilterApi.booleanColumn(colName),
value.getBoolean());
- case TINYINT:
- return FilterApi.notEq(FilterApi.intColumn(colName), (int)
value.getByte());
- case SMALLINT:
- return FilterApi.notEq(FilterApi.intColumn(colName), (int)
value.getShort());
- case INT:
- return FilterApi.notEq(FilterApi.intColumn(colName),
value.getInt());
- case BIGINT:
- return FilterApi.notEq(FilterApi.longColumn(colName),
value.getLong());
- case FLOAT:
- return FilterApi.notEq(FilterApi.floatColumn(colName),
value.getFloat());
- case DOUBLE:
- return FilterApi.notEq(FilterApi.doubleColumn(colName),
value.getDouble());
- case CHAR:
- case VARCHAR:
- case STRING:
- return FilterApi.notEq(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
- case BINARY:
- return FilterApi.notEq(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
- case ARRAY:
- case MAP:
- case STRUCT:
- default:
- throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
- }
- }
-
-
- private static FilterPredicate makeEquals(String colName, ColumnType.Type
type,
- ScanPredicate.PredicateValue
value) {
- switch (type) {
- case BOOLEAN:
- return FilterApi.eq(FilterApi.booleanColumn(colName),
value.getBoolean());
- case TINYINT:
- return FilterApi.eq(FilterApi.intColumn(colName), (int)
value.getByte());
- case SMALLINT:
- return FilterApi.eq(FilterApi.intColumn(colName), (int)
value.getShort());
- case INT:
- return FilterApi.eq(FilterApi.intColumn(colName),
value.getInt());
- case BIGINT:
- return FilterApi.eq(FilterApi.longColumn(colName),
value.getLong());
- case FLOAT:
- return FilterApi.eq(FilterApi.floatColumn(colName),
value.getFloat());
- case DOUBLE:
- return FilterApi.eq(FilterApi.doubleColumn(colName),
value.getDouble());
- case CHAR:
- case VARCHAR:
- case STRING:
- return FilterApi.eq(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
- case BINARY:
- return FilterApi.eq(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
- case ARRAY:
- case MAP:
- case STRUCT:
- default:
- throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
- }
- }
-
- private static FilterPredicate makeLarger(String colName, ColumnType.Type
type,
- ScanPredicate.PredicateValue
value) {
- switch (type) {
- case TINYINT:
- return FilterApi.gt(FilterApi.intColumn(colName), (int)
value.getByte());
- case SMALLINT:
- return FilterApi.gt(FilterApi.intColumn(colName), (int)
value.getShort());
- case INT:
- return FilterApi.gt(FilterApi.intColumn(colName),
value.getInt());
- case BIGINT:
- return FilterApi.gt(FilterApi.longColumn(colName),
value.getLong());
- case FLOAT:
- return FilterApi.gt(FilterApi.floatColumn(colName),
value.getFloat());
- case DOUBLE:
- return FilterApi.gt(FilterApi.doubleColumn(colName),
value.getDouble());
- case CHAR:
- case VARCHAR:
- case STRING:
- return FilterApi.gt(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
- case BINARY:
- return FilterApi.gt(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
- case ARRAY:
- case MAP:
- case STRUCT:
- default:
- throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
- }
-
- }
-
- private static FilterPredicate makeLargerOrEqual(String colName,
ColumnType.Type type,
-
ScanPredicate.PredicateValue value) {
- switch (type) {
- case TINYINT:
- return FilterApi.gtEq(FilterApi.intColumn(colName), (int)
value.getByte());
- case SMALLINT:
- return FilterApi.gtEq(FilterApi.intColumn(colName), (int)
value.getShort());
- case INT:
- return FilterApi.gtEq(FilterApi.intColumn(colName),
value.getInt());
- case BIGINT:
- return FilterApi.gtEq(FilterApi.longColumn(colName),
value.getLong());
- case FLOAT:
- return FilterApi.gtEq(FilterApi.floatColumn(colName),
value.getFloat());
- case DOUBLE:
- return FilterApi.gtEq(FilterApi.doubleColumn(colName),
value.getDouble());
- case CHAR:
- case VARCHAR:
- case STRING:
- return FilterApi.gtEq(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
- case BINARY:
- return FilterApi.gtEq(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
- case ARRAY:
- case MAP:
- case STRUCT:
- default:
- throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
- }
-
- }
-
- private static FilterPredicate makeLess(String colName, ColumnType.Type
type, ScanPredicate.PredicateValue value) {
- switch (type) {
- case TINYINT:
- return FilterApi.lt(FilterApi.intColumn(colName), (int)
value.getByte());
- case SMALLINT:
- return FilterApi.lt(FilterApi.intColumn(colName), (int)
value.getShort());
- case INT:
- return FilterApi.lt(FilterApi.intColumn(colName),
value.getInt());
- case BIGINT:
- return FilterApi.lt(FilterApi.longColumn(colName),
value.getLong());
- case FLOAT:
- return FilterApi.lt(FilterApi.floatColumn(colName),
value.getFloat());
- case DOUBLE:
- return FilterApi.lt(FilterApi.doubleColumn(colName),
value.getDouble());
- case CHAR:
- case VARCHAR:
- case STRING:
- return FilterApi.lt(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
- case BINARY:
- return FilterApi.lt(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
- case ARRAY:
- case MAP:
- case STRUCT:
- default:
- throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
- }
-
- }
-
- private static FilterPredicate makeLessOrEqual(String colName,
ColumnType.Type type,
-
ScanPredicate.PredicateValue value) {
- switch (type) {
- case TINYINT:
- return FilterApi.ltEq(FilterApi.intColumn(colName), (int)
value.getByte());
- case SMALLINT:
- return FilterApi.ltEq(FilterApi.intColumn(colName), (int)
value.getShort());
- case INT:
- return FilterApi.ltEq(FilterApi.intColumn(colName),
value.getInt());
- case BIGINT:
- return FilterApi.ltEq(FilterApi.longColumn(colName),
value.getLong());
- case FLOAT:
- return FilterApi.ltEq(FilterApi.floatColumn(colName),
value.getFloat());
- case DOUBLE:
- return FilterApi.ltEq(FilterApi.doubleColumn(colName),
value.getDouble());
- case CHAR:
- case VARCHAR:
- case STRING:
- return FilterApi.ltEq(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
- case BINARY:
- return FilterApi.ltEq(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
- case ARRAY:
- case MAP:
- case STRUCT:
- default:
- throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
- }
- }
-}
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index bac2346185d..92e576fb2e1 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -565,18 +565,77 @@ under the License.
<artifactId>hadoop-auth</artifactId>
</dependency>
+ <!-- lakesoul -->
<dependency>
<groupId>com.dmetasoul</groupId>
- <artifactId>lakesoul-common</artifactId>
- <version>2.5.4</version>
- <classifier>shaded</classifier>
+ <artifactId>lakesoul-io-java</artifactId>
+ <version>${lakesoul.version}</version>
<exclusions>
<exclusion>
- <groupId>*</groupId>
+ <groupId>io.netty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-format</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.json4s</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>42.7.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ <scope>compile</scope>
+ </dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
@@ -1220,4 +1279,4 @@ under the License.
</extension>
</extensions>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
index dd8342ad660..b27b4c6fc0d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
@@ -25,15 +25,19 @@ import
org.apache.doris.datasource.property.PropertyConverter;
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
-import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
public class LakeSoulExternalCatalog extends ExternalCatalog {
- private DBManager dbManager;
+ private static final Logger LOG =
LogManager.getLogger(LakeSoulExternalCatalog.class);
+
+ private DBManager lakesoulMetadataManager;
private final Map<String, String> props;
@@ -48,49 +52,47 @@ public class LakeSoulExternalCatalog extends
ExternalCatalog {
@Override
protected List<String> listDatabaseNames() {
initLocalObjectsImpl();
- return dbManager.listNamespaces();
+ return lakesoulMetadataManager.listNamespaces();
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
- List<TableInfo> tifs = dbManager.getTableInfosByNamespace(dbName);
- List<String> tableNames = Lists.newArrayList();
- for (TableInfo item : tifs) {
- tableNames.add(item.getTableName());
- }
+ List<String> tableNames =
lakesoulMetadataManager.listTableNamesByNamespace(dbName);
return tableNames;
}
@Override
public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
makeSureInitialized();
- TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(dbName,
tblName);
-
+ TableInfo tableInfo =
lakesoulMetadataManager.getTableInfoByNameAndNamespace(tblName, dbName);
return null != tableInfo;
}
@Override
protected void initLocalObjectsImpl() {
- if (dbManager == null) {
- if (props != null) {
- if (props.containsKey(DBUtil.urlKey)) {
- System.setProperty(DBUtil.urlKey,
props.get(DBUtil.urlKey));
- }
- if (props.containsKey(DBUtil.usernameKey)) {
- System.setProperty(DBUtil.usernameKey,
props.get(DBUtil.usernameKey));
- }
- if (props.containsKey(DBUtil.passwordKey)) {
- System.setProperty(DBUtil.passwordKey,
props.get(DBUtil.passwordKey));
- }
+ if (props != null) {
+ if (props.containsKey(DBUtil.urlKey)) {
+ System.setProperty(DBUtil.urlKey, props.get(DBUtil.urlKey));
+ }
+ if (props.containsKey(DBUtil.usernameKey)) {
+ System.setProperty(DBUtil.usernameKey,
props.get(DBUtil.usernameKey));
+ }
+ if (props.containsKey(DBUtil.passwordKey)) {
+ System.setProperty(DBUtil.passwordKey,
props.get(DBUtil.passwordKey));
}
- dbManager = new DBManager();
}
+ lakesoulMetadataManager = new DBManager();
}
public TableInfo getLakeSoulTable(String dbName, String tblName) {
makeSureInitialized();
- return dbManager.getTableInfoByNameAndNamespace(tblName, dbName);
+ return lakesoulMetadataManager.getTableInfoByNameAndNamespace(tblName,
dbName);
+ }
+
+ public List<PartitionInfo> listPartitionInfo(String tableId) {
+ makeSureInitialized();
+ return lakesoulMetadataManager.getAllPartitionInfo(tableId);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
index 46e8d1db47c..9dd2f4811e9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
@@ -25,17 +25,23 @@ import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.ExternalAnalysisTask;
import org.apache.doris.thrift.TLakeSoulTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import com.google.common.collect.Lists;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
@@ -45,11 +51,24 @@ import java.util.Optional;
import java.util.stream.Collectors;
public class LakeSoulExternalTable extends ExternalTable {
-
+ private static final Logger LOG =
LogManager.getLogger(LakeSoulExternalTable.class);
public static final int LAKESOUL_TIMESTAMP_SCALE_MS = 6;
+ public final String tableId;
+
public LakeSoulExternalTable(long id, String name, String dbName,
LakeSoulExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.LAKESOUl_EXTERNAL_TABLE);
+ TableInfo tableInfo = getLakeSoulTableInfo();
+ if (tableInfo == null) {
+ throw new RuntimeException(String.format("LakeSoul table %s.%s
does not exist", dbName, name));
+ }
+ tableId = tableInfo.getTableId();
+ }
+
+ @Override
+ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
+ makeSureInitialized();
+ return new ExternalAnalysisTask(info);
}
private Type arrowFiledToDorisType(Field field) {
@@ -150,6 +169,7 @@ public class LakeSoulExternalTable extends ExternalTable {
String tableSchema = tableInfo.getTableSchema();
DBUtil.TablePartitionKeys partitionKeys =
DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
Schema schema;
+ LOG.info("tableSchema={}", tableSchema);
try {
schema = Schema.fromJSON(tableSchema);
} catch (IOException e) {
@@ -174,6 +194,10 @@ public class LakeSoulExternalTable extends ExternalTable {
return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName,
name);
}
+ public List<PartitionInfo> listPartitionInfo() {
+ return ((LakeSoulExternalCatalog) catalog).listPartitionInfo(tableId);
+ }
+
public String tablePath() {
return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName,
name).getTablePath();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
new file mode 100644
index 00000000000..fba74d4f978
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java
@@ -0,0 +1,526 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.DecimalLiteral;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.Subquery;
+import org.apache.doris.planner.ColumnBound;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.thrift.TExprOpcode;
+
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import io.substrait.expression.Expression;
+import io.substrait.extension.DefaultExtensionCatalog;
+import io.substrait.type.Type;
+import io.substrait.type.TypeCreator;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class LakeSoulUtils {
+
+ public static final String FILE_NAMES = "file_paths";
+ public static final String PRIMARY_KEYS = "primary_keys";
+ public static final String SCHEMA_JSON = "table_schema";
+ public static final String PARTITION_DESC = "partition_descs";
+ public static final String REQUIRED_FIELDS = "required_fields";
+ public static final String OPTIONS = "options";
+ public static final String SUBSTRAIT_PREDICATE = "substrait_predicate";
+ public static final String CDC_COLUMN = "lakesoul_cdc_change_column";
+ public static final String LIST_DELIM = ";";
+ public static final String PARTITIONS_KV_DELIM = "=";
+ public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+ public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+ public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+ public static final String FS_S3A_REGION = "fs.s3a.endpoint.region";
+ public static final String FS_S3A_PATH_STYLE_ACCESS =
"fs.s3a.path.style.access";
+
+ private static final OffsetDateTime EPOCH;
+ private static final LocalDate EPOCH_DAY;
+
+ static {
+ EPOCH = Instant.ofEpochSecond(0L).atOffset(ZoneOffset.UTC);
+ EPOCH_DAY = EPOCH.toLocalDate();
+ }
+
+ public static List<PartitionInfo> applyPartitionFilters(
+ List<PartitionInfo> allPartitionInfo,
+ String tableName,
+ Schema partitionArrowSchema,
+ Map<String, ColumnRange> columnNameToRange
+ ) throws IOException {
+
+ Expression conjunctionFilter = null;
+ for (Field field : partitionArrowSchema.getFields()) {
+ ColumnRange columnRange = columnNameToRange.get(field.getName());
+ if (columnRange != null) {
+ Expression expr = columnRangeToSubstraitFilter(field,
columnRange);
+ if (expr != null) {
+ if (conjunctionFilter == null) {
+ conjunctionFilter = expr;
+ } else {
+ conjunctionFilter =
SubstraitUtil.and(conjunctionFilter, expr);
+ }
+ }
+ }
+ }
+ return SubstraitUtil.applyPartitionFilters(
+ allPartitionInfo,
+ partitionArrowSchema,
+ SubstraitUtil.substraitExprToProto(conjunctionFilter, tableName)
+ );
+ }
+
+ public static Expression columnRangeToSubstraitFilter(
+ Field columnField,
+ ColumnRange columnRange
+ ) throws IOException {
+ Optional<RangeSet<ColumnBound>> rangeSetOpt =
columnRange.getRangeSet();
+ if (columnRange.hasConjunctiveIsNull() || !rangeSetOpt.isPresent()) {
+ return SubstraitUtil.CONST_TRUE;
+ } else {
+ RangeSet<ColumnBound> rangeSet = rangeSetOpt.get();
+ if (rangeSet.isEmpty()) {
+ return SubstraitUtil.CONST_TRUE;
+ } else {
+ Expression conjunctionFilter = null;
+ for (Range range : rangeSet.asRanges()) {
+ Expression expr = rangeToSubstraitFilter(columnField,
range);
+ if (expr != null) {
+ if (conjunctionFilter == null) {
+ conjunctionFilter = expr;
+ } else {
+ conjunctionFilter =
SubstraitUtil.or(conjunctionFilter, expr);
+ }
+ }
+ }
+ return conjunctionFilter;
+ }
+ }
+ }
+
+ public static Expression rangeToSubstraitFilter(Field columnField, Range
range) throws IOException {
+ if (!range.hasLowerBound() && !range.hasUpperBound()) {
+ // Range.all()
+ return SubstraitUtil.CONST_TRUE;
+ } else {
+ Expression upper = SubstraitUtil.CONST_TRUE;
+ if (range.hasUpperBound()) {
+ String func = range.upperBoundType() == BoundType.OPEN ?
"lt:any_any" : "lte:any_any";
+ Expression left =
SubstraitUtil.arrowFieldToSubstraitField(columnField);
+ Expression right = SubstraitUtil.anyToSubstraitLiteral(
+ SubstraitUtil.arrowFieldToSubstraitType(columnField),
+ ((ColumnBound)
range.upperEndpoint()).getValue().getRealValue());
+ upper = SubstraitUtil.makeBinary(
+ left,
+ right,
+ DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+ func,
+ TypeCreator.NULLABLE.BOOLEAN
+ );
+ }
+ Expression lower = SubstraitUtil.CONST_TRUE;
+ if (range.hasLowerBound()) {
+ String func = range.lowerBoundType() == BoundType.OPEN ?
"gt:any_any" : "gte:any_any";
+ Expression left =
SubstraitUtil.arrowFieldToSubstraitField(columnField);
+ Expression right = SubstraitUtil.anyToSubstraitLiteral(
+ SubstraitUtil.arrowFieldToSubstraitType(columnField),
+ ((ColumnBound)
range.lowerEndpoint()).getValue().getRealValue());
+ lower = SubstraitUtil.makeBinary(
+ left,
+ right,
+ DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+ func,
+ TypeCreator.NULLABLE.BOOLEAN
+ );
+ }
+ return SubstraitUtil.and(upper, lower);
+ }
+ }
+
+ public static io.substrait.proto.Plan getPushPredicate(
+ List<Expr> conjuncts,
+ String tableName,
+ Schema tableSchema,
+ Schema partitionArrowSchema,
+ Map<String, String> properties,
+ boolean incRead
+ ) throws IOException {
+
+ Set<String> partitionColumn =
+ partitionArrowSchema
+ .getFields()
+ .stream()
+ .map(Field::getName)
+ .collect(Collectors.toSet());
+ Expression conjunctionFilter = null;
+ String cdcColumn = properties.get(CDC_COLUMN);
+ if (cdcColumn != null && !incRead) {
+ conjunctionFilter =
SubstraitUtil.cdcColumnMergeOnReadFilter(tableSchema.findField(cdcColumn));
+ }
+ for (Expr expr : conjuncts) {
+ if (!isAllPartitionPredicate(expr, partitionColumn)) {
+ Expression predicate = convertToSubstraitExpr(expr,
tableSchema);
+ if (predicate != null) {
+ if (conjunctionFilter == null) {
+ conjunctionFilter = predicate;
+ } else {
+ conjunctionFilter =
SubstraitUtil.and(conjunctionFilter, predicate);
+ }
+ }
+ }
+ }
+ if (conjunctionFilter == null) {
+ return null;
+ }
+ return SubstraitUtil.substraitExprToProto(conjunctionFilter,
tableName);
+ }
+
+ public static boolean isAllPartitionPredicate(Expr predicate, Set<String>
partitionColumns) {
+ if (predicate == null) {
+ return false;
+ }
+ if (predicate instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate)
predicate;
+ return isAllPartitionPredicate(compoundPredicate.getChild(0),
partitionColumns)
+ && isAllPartitionPredicate(compoundPredicate.getChild(1),
partitionColumns);
+ }
+ // Make sure the col slot is always first
+ SlotRef slotRef = convertDorisExprToSlotRef(predicate.getChild(0));
+ LiteralExpr literalExpr =
convertDorisExprToLiteralExpr(predicate.getChild(1));
+ if (slotRef == null || literalExpr == null) {
+ return false;
+ }
+ String colName = slotRef.getColumnName();
+ return partitionColumns.contains(colName);
+
+ }
+
+ public static SlotRef convertDorisExprToSlotRef(Expr expr) {
+ SlotRef slotRef = null;
+ if (expr instanceof SlotRef) {
+ slotRef = (SlotRef) expr;
+ } else if (expr instanceof CastExpr) {
+ if (expr.getChild(0) instanceof SlotRef) {
+ slotRef = (SlotRef) expr.getChild(0);
+ }
+ }
+ return slotRef;
+ }
+
+ public static LiteralExpr convertDorisExprToLiteralExpr(Expr expr) {
+ LiteralExpr literalExpr = null;
+ if (expr instanceof LiteralExpr) {
+ literalExpr = (LiteralExpr) expr;
+ } else if (expr instanceof CastExpr) {
+ if (expr.getChild(0) instanceof LiteralExpr) {
+ literalExpr = (LiteralExpr) expr.getChild(0);
+ }
+ }
+ return literalExpr;
+ }
+
+ public static Expression convertToSubstraitExpr(Expr predicate, Schema
tableSchema) throws IOException {
+ if (predicate == null) {
+ return null;
+ }
+ if (predicate instanceof BoolLiteral) {
+ BoolLiteral boolLiteral = (BoolLiteral) predicate;
+ boolean value = boolLiteral.getValue();
+ if (value) {
+ return SubstraitUtil.CONST_TRUE;
+ } else {
+ return SubstraitUtil.CONST_FALSE;
+ }
+ }
+ if (predicate instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate)
predicate;
+ switch (compoundPredicate.getOp()) {
+ case AND: {
+ Expression left =
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+ Expression right =
convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema);
+ if (left != null && right != null) {
+ return SubstraitUtil.and(left, right);
+ } else if (left != null) {
+ return left;
+ } else {
+ return right;
+ }
+ }
+ case OR: {
+ Expression left =
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+ Expression right =
convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema);
+ if (left != null && right != null) {
+ return SubstraitUtil.or(left, right);
+ }
+ return null;
+ }
+ case NOT: {
+ Expression child =
convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema);
+ if (child != null) {
+ return SubstraitUtil.not(child);
+ }
+ return null;
+ }
+ default:
+ return null;
+ }
+ } else if (predicate instanceof InPredicate) {
+ InPredicate inExpr = (InPredicate) predicate;
+ if (inExpr.contains(Subquery.class)) {
+ return null;
+ }
+ SlotRef slotRef = convertDorisExprToSlotRef(inExpr.getChild(0));
+ if (slotRef == null) {
+ return null;
+ }
+ String colName = slotRef.getColumnName();
+ Field field = tableSchema.findField(colName);
+ Expression fieldRef =
SubstraitUtil.arrowFieldToSubstraitField(field);
+
+ colName = field.getName();
+ Type type = field.getType().accept(
+ new
SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable())
+ );
+ List<Expression.Literal> valueList = new ArrayList<>();
+ for (int i = 1; i < inExpr.getChildren().size(); ++i) {
+ if (!(inExpr.getChild(i) instanceof LiteralExpr)) {
+ return null;
+ }
+ LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i);
+ Object value = extractDorisLiteral(type, literalExpr);
+ if (value == null) {
+ return null;
+ }
+ valueList.add(SubstraitUtil.anyToSubstraitLiteral(type,
value));
+ }
+ if (inExpr.isNotIn()) {
+ // not in
+ return SubstraitUtil.notIn(fieldRef, valueList);
+ } else {
+ // in
+ return SubstraitUtil.in(fieldRef, valueList);
+ }
+ }
+ return convertBinaryExpr(predicate, tableSchema);
+ }
+
+ private static Expression convertBinaryExpr(Expr dorisExpr, Schema
tableSchema) throws IOException {
+ TExprOpcode opcode = dorisExpr.getOpcode();
+ // Make sure the col slot is always first
+ SlotRef slotRef = convertDorisExprToSlotRef(dorisExpr.getChild(0));
+ LiteralExpr literalExpr =
convertDorisExprToLiteralExpr(dorisExpr.getChild(1));
+ if (slotRef == null || literalExpr == null) {
+ return null;
+ }
+ String colName = slotRef.getColumnName();
+ Field field = tableSchema.findField(colName);
+ Expression fieldRef = SubstraitUtil.arrowFieldToSubstraitField(field);
+
+ Type type = field.getType().accept(
+ new
SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable())
+ );
+ Object value = extractDorisLiteral(type, literalExpr);
+ if (value == null) {
+ if (opcode == TExprOpcode.EQ_FOR_NULL && literalExpr instanceof
NullLiteral) {
+ return SubstraitUtil.makeUnary(
+ fieldRef,
+ DefaultExtensionCatalog.FUNCTIONS_COMPARISON,
+ "is_null:any",
+ TypeCreator.NULLABLE.BOOLEAN);
+ } else {
+ return null;
+ }
+ }
+ Expression literal = SubstraitUtil.anyToSubstraitLiteral(
+ type,
+ value
+ );
+
+ String namespace;
+ String func;
+ switch (opcode) {
+ case EQ:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "equal:any_any";
+ break;
+ case EQ_FOR_NULL:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "is_null:any";
+ break;
+ case NE:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "not_equal:any_any";
+ break;
+ case GE:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "gte:any_any";
+ break;
+ case GT:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "gt:any_any";
+ break;
+ case LE:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "lte:any_any";
+ break;
+ case LT:
+ namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "lt:any_any";
+ break;
+ case INVALID_OPCODE:
+ if (dorisExpr instanceof IsNullPredicate) {
+ if (((IsNullPredicate) dorisExpr).isNotNull()) {
+ namespace =
DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "is_not_null:any";
+ } else {
+ namespace =
DefaultExtensionCatalog.FUNCTIONS_COMPARISON;
+ func = "is_null:any";
+ }
+ break;
+ }
+ return null;
+ default:
+ return null;
+ }
+ return SubstraitUtil.makeBinary(fieldRef, literal, namespace, func,
TypeCreator.NULLABLE.BOOLEAN);
+ }
+
+ public static Object extractDorisLiteral(Type type, LiteralExpr expr) {
+
+ if (expr instanceof BoolLiteral) {
+ if (type instanceof Type.Bool) {
+ return ((BoolLiteral) expr).getValue();
+ }
+ if (type instanceof Type.Str) {
+ return expr.getStringValue();
+ }
+ } else if (expr instanceof DateLiteral) {
+ DateLiteral dateLiteral = (DateLiteral) expr;
+ if (type instanceof Type.Date) {
+ if (dateLiteral.getType().isDatetimeV2() ||
dateLiteral.getType().isDatetime()) {
+ return null;
+ }
+ return (int) LocalDate.of((int) dateLiteral.getYear(),
+ (int) dateLiteral.getMonth(),
+ (int) dateLiteral.getDay()).toEpochDay();
+ }
+ if (type instanceof Type.TimestampTZ || type instanceof
Type.Timestamp) {
+ return dateLiteral.getLongValue();
+ }
+ if (type instanceof Type.Str) {
+ return expr.getStringValue();
+ }
+ } else if (expr instanceof DecimalLiteral) {
+ DecimalLiteral decimalLiteral = (DecimalLiteral) expr;
+ if (type instanceof Type.Decimal) {
+ return decimalLiteral.getValue();
+ } else if (type instanceof Type.FP64) {
+ return decimalLiteral.getDoubleValue();
+ }
+ if (type instanceof Type.Str) {
+ return expr.getStringValue();
+ }
+ } else if (expr instanceof FloatLiteral) {
+ FloatLiteral floatLiteral = (FloatLiteral) expr;
+
+ if (floatLiteral.getType() == org.apache.doris.catalog.Type.FLOAT)
{
+ return type instanceof Type.FP32
+ || type instanceof Type.FP64
+ || type instanceof Type.Decimal ? ((FloatLiteral)
expr).getValue() : null;
+ } else {
+ return type instanceof Type.FP64
+ || type instanceof Type.Decimal ? ((FloatLiteral)
expr).getValue() : null;
+ }
+ } else if (expr instanceof IntLiteral) {
+ if (type instanceof Type.I8
+ || type instanceof Type.I16
+ || type instanceof Type.I32
+ || type instanceof Type.I64
+ || type instanceof Type.FP32
+ || type instanceof Type.FP64
+ || type instanceof Type.Decimal
+ || type instanceof Type.Date
+ ) {
+ return expr.getRealValue();
+ }
+ if (!expr.getType().isInteger32Type()) {
+ if (type instanceof Type.Time || type instanceof
Type.Timestamp || type instanceof Type.TimestampTZ) {
+ return expr.getLongValue();
+ }
+ }
+
+ } else if (expr instanceof StringLiteral) {
+ String value = expr.getStringValue();
+ if (type instanceof Type.Str) {
+ return value;
+ }
+ if (type instanceof Type.Date) {
+ try {
+ return (int) ChronoUnit.DAYS.between(
+ EPOCH_DAY,
+ LocalDate.parse(value,
DateTimeFormatter.ISO_LOCAL_DATE));
+ } catch (DateTimeParseException e) {
+ return null;
+ }
+ }
+ if (type instanceof Type.Timestamp || type instanceof
Type.TimestampTZ) {
+ try {
+ return ChronoUnit.MICROS.between(
+ EPOCH,
+ OffsetDateTime.parse(value,
DateTimeFormatter.ISO_DATE_TIME));
+ } catch (DateTimeParseException e) {
+ return null;
+ }
+ }
+ }
+ return null;
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
index 1779aeaca10..dd15ec310de 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
@@ -22,9 +22,13 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
+import org.apache.doris.datasource.lakesoul.LakeSoulUtils;
+import org.apache.doris.datasource.property.constants.MinioProperties;
+import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@@ -34,15 +38,26 @@ import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLakeSoulFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.DataFileInfo;
import com.dmetasoul.lakesoul.meta.DataOperation;
import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
+import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
-import com.lakesoul.shaded.com.alibaba.fastjson.JSON;
-import com.lakesoul.shaded.com.alibaba.fastjson.JSONObject;
+import io.substrait.proto.Plan;
+import lombok.SneakyThrows;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -52,19 +67,55 @@ import java.util.stream.Collectors;
public class LakeSoulScanNode extends FileQueryScanNode {
- protected final LakeSoulExternalTable lakeSoulExternalTable;
+ private static final Logger LOG =
LogManager.getLogger(LakeSoulScanNode.class);
- protected final TableInfo table;
+ protected LakeSoulExternalTable lakeSoulExternalTable;
+
+ String tableName;
+
+ String location;
+
+ String partitions;
+
+ Schema tableArrowSchema;
+
+ Schema partitionArrowSchema;
+ private Map<String, String> tableProperties;
+
+ String readType;
public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE,
needCheckColumnPriv);
+ }
+
+ @Override
+ protected void doInitialize() throws UserException {
+ super.doInitialize();
lakeSoulExternalTable = (LakeSoulExternalTable) desc.getTable();
- table = lakeSoulExternalTable.getLakeSoulTableInfo();
+ TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo();
+ location = tableInfo.getTablePath();
+ tableName = tableInfo.getTableName();
+ partitions = tableInfo.getPartitions();
+ readType = LakeSoulOptions.ReadType$.MODULE$.FULL_READ();
+ try {
+ tableProperties = new ObjectMapper().readValue(
+ tableInfo.getProperties(),
+ new TypeReference<Map<String, String>>() {}
+ );
+ tableArrowSchema = Schema.fromJSON(tableInfo.getTableSchema());
+ List<Field> partitionFields =
+ DBUtil.parseTableInfoPartitions(partitions)
+ .rangeKeys
+ .stream()
+
.map(tableArrowSchema::findField).collect(Collectors.toList());
+ partitionArrowSchema = new Schema(partitionFields);
+ } catch (IOException e) {
+ throw new UserException(e);
+ }
}
@Override
protected TFileType getLocationType() throws UserException {
- String location = table.getTablePath();
return getLocationType(location);
}
@@ -81,12 +132,12 @@ public class LakeSoulScanNode extends FileQueryScanNode {
@Override
protected List<String> getPathPartitionKeys() throws UserException {
- return new
ArrayList<>(DBUtil.parseTableInfoPartitions(table.getPartitions()).rangeKeys);
+ return new
ArrayList<>(DBUtil.parseTableInfoPartitions(partitions).rangeKeys);
}
@Override
protected TableIf getTargetTable() throws UserException {
- return lakeSoulExternalTable;
+ return desc.getTable();
}
@Override
@@ -94,13 +145,21 @@ public class LakeSoulScanNode extends FileQueryScanNode {
return lakeSoulExternalTable.getHadoopProperties();
}
+ @SneakyThrows
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}", rangeDesc);
+ }
if (split instanceof LakeSoulSplit) {
setLakeSoulParams(rangeDesc, (LakeSoulSplit) split);
}
}
+ public ExternalCatalog getCatalog() {
+ return lakeSoulExternalTable.getCatalog();
+ }
+
public static boolean isExistHashPartition(TableInfo tif) {
JSONObject tableProperties = JSON.parseObject(tif.getProperties());
if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM())
@@ -111,13 +170,53 @@ public class LakeSoulScanNode extends FileQueryScanNode {
}
}
- public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit
lakeSoulSplit) {
+ public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit
lakeSoulSplit) throws IOException {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(lakeSoulSplit.getTableFormatType().value());
TLakeSoulFileDesc fileDesc = new TLakeSoulFileDesc();
fileDesc.setFilePaths(lakeSoulSplit.getPaths());
fileDesc.setPrimaryKeys(lakeSoulSplit.getPrimaryKeys());
fileDesc.setTableSchema(lakeSoulSplit.getTableSchema());
+
+
+ JSONObject options = new JSONObject();
+ Plan predicate = LakeSoulUtils.getPushPredicate(
+ conjuncts,
+ tableName,
+ tableArrowSchema,
+ partitionArrowSchema,
+ tableProperties,
+
readType.equals(LakeSoulOptions.ReadType$.MODULE$.INCREMENTAL_READ()));
+ if (predicate != null) {
+ options.put(LakeSoulUtils.SUBSTRAIT_PREDICATE,
SubstraitUtil.encodeBase64String(predicate));
+ }
+ Map<String, String> catalogProps = getCatalog().getProperties();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}", catalogProps);
+ }
+
+ if (catalogProps.get(S3Properties.Env.ENDPOINT) != null) {
+ options.put(LakeSoulUtils.FS_S3A_ENDPOINT,
catalogProps.get(S3Properties.Env.ENDPOINT));
+ if (options.containsKey(MinioProperties.ENDPOINT)) {
+ // Use path style access for minio
+ options.put(LakeSoulUtils.FS_S3A_PATH_STYLE_ACCESS, "true");
+ } else {
+ // use virtual hosted style access for all other s3 compatible
storage services
+ options.put(LakeSoulUtils.FS_S3A_PATH_STYLE_ACCESS, "false");
+ }
+ if (catalogProps.get(S3Properties.Env.ACCESS_KEY) != null) {
+ options.put(LakeSoulUtils.FS_S3A_ACCESS_KEY,
catalogProps.get(S3Properties.Env.ACCESS_KEY));
+ }
+ if (catalogProps.get(S3Properties.Env.SECRET_KEY) != null) {
+ options.put(LakeSoulUtils.FS_S3A_SECRET_KEY,
catalogProps.get(S3Properties.Env.SECRET_KEY));
+ }
+ if (catalogProps.get(S3Properties.Env.REGION) != null) {
+ options.put(LakeSoulUtils.FS_S3A_REGION,
catalogProps.get(S3Properties.Env.REGION));
+ }
+ }
+
+ fileDesc.setOptions(JSON.toJSONString(options));
+
fileDesc.setPartitionDescs(lakeSoulSplit.getPartitionDesc()
.entrySet().stream().map(entry ->
String.format("%s=%s", entry.getKey(),
entry.getValue())).collect(Collectors.toList()));
@@ -126,24 +225,51 @@ public class LakeSoulScanNode extends FileQueryScanNode {
}
public List<Split> getSplits() throws UserException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getSplits with columnFilters={}", columnFilters);
+ LOG.debug("getSplits with columnNameToRange={}",
columnNameToRange);
+ LOG.debug("getSplits with conjuncts={}", conjuncts);
+ }
+
+ List<PartitionInfo> allPartitionInfo =
lakeSoulExternalTable.listPartitionInfo();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allPartitionInfo={}", allPartitionInfo);
+ }
+ List<PartitionInfo> filteredPartitionInfo = allPartitionInfo;
+ try {
+ filteredPartitionInfo =
+ LakeSoulUtils.applyPartitionFilters(
+ allPartitionInfo,
+ tableName,
+ partitionArrowSchema,
+ columnNameToRange
+ );
+ } catch (IOException e) {
+ throw new UserException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("filteredPartitionInfo={}", filteredPartitionInfo);
+ }
+ DataFileInfo[] dataFileInfos =
DataOperation.getTableDataInfo(filteredPartitionInfo);
+
List<Split> splits = new ArrayList<>();
Map<String, Map<Integer, List<String>>> splitByRangeAndHashPartition =
new LinkedHashMap<>();
- TableInfo tif = table;
- DataFileInfo[] dfinfos =
DataOperation.getTableDataInfo(table.getTableId());
- for (DataFileInfo pif : dfinfos) {
- if (isExistHashPartition(tif) && pif.file_bucket_id() != -1) {
-
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new
LinkedHashMap<>())
- .computeIfAbsent(pif.file_bucket_id(), v -> new
ArrayList<>())
- .add(pif.path());
+ TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo();
+
+ for (DataFileInfo fileInfo : dataFileInfos) {
+ if (isExistHashPartition(tableInfo) && fileInfo.file_bucket_id()
!= -1) {
+
splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k ->
new LinkedHashMap<>())
+ .computeIfAbsent(fileInfo.file_bucket_id(), v -> new
ArrayList<>())
+ .add(fileInfo.path());
} else {
-
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new
LinkedHashMap<>())
+
splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k ->
new LinkedHashMap<>())
.computeIfAbsent(-1, v -> new ArrayList<>())
- .add(pif.path());
+ .add(fileInfo.path());
}
}
List<String> pkKeys = null;
- if (!table.getPartitions().equals(";")) {
- pkKeys =
Lists.newArrayList(table.getPartitions().split(";")[1].split(","));
+ if (!tableInfo.getPartitions().equals(";")) {
+ pkKeys =
Lists.newArrayList(tableInfo.getPartitions().split(";")[1].split(","));
}
for (Map.Entry<String, Map<Integer, List<String>>> entry :
splitByRangeAndHashPartition.entrySet()) {
@@ -161,7 +287,7 @@ public class LakeSoulScanNode extends FileQueryScanNode {
split.getValue(),
pkKeys,
rangeDesc,
- table.getTableSchema(),
+ tableInfo.getTableSchema(),
0, 0, 0,
new String[0], null);
lakeSoulSplit.setTableFormatType(TableFormatType.LAKESOUL);
@@ -169,8 +295,6 @@ public class LakeSoulScanNode extends FileQueryScanNode {
}
}
return splits;
-
}
-
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
new file mode 100644
index 00000000000..016819a382b
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.CompoundPredicate.Operator;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.DecimalLiteral;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+
+import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import io.substrait.expression.Expression;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LakeSoulPredicateTest {
+
+ public static Schema schema;
+
+ @BeforeClass
+ public static void before() throws AnalysisException, IOException {
+ schema = new Schema(
+ Arrays.asList(
+ new Field("c_int", FieldType.nullable(new
ArrowType.Int(32, true)), null),
+ new Field("c_long", FieldType.nullable(new
ArrowType.Int(64, true)), null),
+ new Field("c_bool", FieldType.nullable(new
ArrowType.Bool()), null),
+ new Field("c_float", FieldType.nullable(new
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), null),
+ new Field("c_double", FieldType.nullable(new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null),
+ new Field("c_dec", FieldType.nullable(new
ArrowType.Decimal(20, 10)), null),
+ new Field("c_date", FieldType.nullable(new
ArrowType.Date(DateUnit.DAY)), null),
+ new Field("c_ts", FieldType.nullable(new
ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), null),
+ new Field("c_str", FieldType.nullable(new
ArrowType.Utf8()), null)
+ ));
+ }
+
+ @Test
+ public void testBinaryPredicate() throws AnalysisException, IOException {
+ List<LiteralExpr> literalList = new ArrayList<LiteralExpr>() {{
+ add(new BoolLiteral(true));
+ add(new DateLiteral("2023-01-02", Type.DATEV2));
+ add(new DateLiteral("2024-01-02 12:34:56.123456",
Type.DATETIMEV2));
+ add(new DecimalLiteral(new BigDecimal("1.23")));
+ add(new FloatLiteral(1.23, Type.FLOAT));
+ add(new FloatLiteral(3.456, Type.DOUBLE));
+ add(new IntLiteral(1, Type.TINYINT));
+ add(new IntLiteral(1, Type.SMALLINT));
+ add(new IntLiteral(1, Type.INT));
+ add(new IntLiteral(1, Type.BIGINT));
+ add(new StringLiteral("abc"));
+ add(new StringLiteral("2023-01-02"));
+ add(new StringLiteral("2023-01-02 01:02:03.456789"));
+ }};
+
+ List<SlotRef> slotRefs = new ArrayList<SlotRef>() {{
+ add(new SlotRef(new TableName(), "c_int"));
+ add(new SlotRef(new TableName(), "c_long"));
+ add(new SlotRef(new TableName(), "c_bool"));
+ add(new SlotRef(new TableName(), "c_float"));
+ add(new SlotRef(new TableName(), "c_double"));
+ add(new SlotRef(new TableName(), "c_dec"));
+ add(new SlotRef(new TableName(), "c_date"));
+ add(new SlotRef(new TableName(), "c_ts"));
+ add(new SlotRef(new TableName(), "c_str"));
+ }};
+
+ // true indicates support for pushdown
+ Boolean[][] expects = new Boolean[][] {
+ { // int
+ false, false, false, false, false, false, true, true,
true, true, false, false, false
+ },
+ { // long
+ false, false, false, false, false, false, true, true,
true, true, false, false, false
+ },
+ { // boolean
+ true, false, false, false, false, false, false, false,
false, false, false, false, false
+ },
+ { // float
+ false, false, false, false, true, false, true, true,
true, true, false, false, false
+ },
+ { // double
+ false, false, false, true, true, true, true, true,
true, true, false, false, false
+ },
+ { // decimal
+ false, false, false, true, true, true, true, true,
true, true, false, false, false
+ },
+ { // date
+ false, true, false, false, false, false, true, true,
true, true, false, true, false
+ },
+ { // timestamp
+ false, true, true, false, false, false, false, false,
false, true, false, false, false
+ },
+ { // string
+ true, true, true, true, false, false, false, false,
false, false, true, true, true
+ }
+ };
+
+ ArrayListMultimap<Boolean, Expr> validPredicateMap =
ArrayListMultimap.create();
+
+ // binary predicate
+ for (int i = 0; i < slotRefs.size(); i++) {
+ final int loc = i;
+ List<Boolean> ret = literalList.stream().map(literal -> {
+ BinaryPredicate expr = new
BinaryPredicate(BinaryPredicate.Operator.EQ, slotRefs.get(loc), literal);
+ Expression expression = null;
+ try {
+ expression = LakeSoulUtils.convertToSubstraitExpr(expr,
schema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ validPredicateMap.put(expression != null, expr);
+ return expression != null;
+ }).collect(Collectors.toList());
+ Assert.assertArrayEquals(expects[i], ret.toArray());
+ }
+
+ // in predicate
+ for (int i = 0; i < slotRefs.size(); i++) {
+ final int loc = i;
+ List<Boolean> ret = literalList.stream().map(literal -> {
+ InPredicate expr = new InPredicate(slotRefs.get(loc),
Lists.newArrayList(literal), false);
+ Expression expression = null;
+ try {
+ expression = LakeSoulUtils.convertToSubstraitExpr(expr,
schema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ validPredicateMap.put(expression != null, expr);
+ return expression != null;
+ }).collect(Collectors.toList());
+ Assert.assertArrayEquals(expects[i], ret.toArray());
+ }
+
+ // not in predicate
+ for (int i = 0; i < slotRefs.size(); i++) {
+ final int loc = i;
+ List<Boolean> ret = literalList.stream().map(literal -> {
+ InPredicate expr = new InPredicate(slotRefs.get(loc),
Lists.newArrayList(literal), true);
+ Expression expression = null;
+ try {
+ expression = LakeSoulUtils.convertToSubstraitExpr(expr,
schema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ validPredicateMap.put(expression != null, expr);
+ return expression != null;
+ }).collect(Collectors.toList());
+ Assert.assertArrayEquals(expects[i], ret.toArray());
+ }
+
+ // bool literal
+
+ Expression trueExpr = LakeSoulUtils.convertToSubstraitExpr(new
BoolLiteral(true), schema);
+ Expression falseExpr = LakeSoulUtils.convertToSubstraitExpr(new
BoolLiteral(false), schema);
+ Assert.assertEquals(SubstraitUtil.CONST_TRUE, trueExpr);
+ Assert.assertEquals(SubstraitUtil.CONST_FALSE, falseExpr);
+ validPredicateMap.put(true, new BoolLiteral(true));
+ validPredicateMap.put(true, new BoolLiteral(false));
+
+ List<Expr> validExprs = validPredicateMap.get(true);
+ List<Expr> invalidExprs = validPredicateMap.get(false);
+ // OR predicate
+ // both valid
+ for (int i = 0; i < validExprs.size(); i++) {
+ for (int j = 0; j < validExprs.size(); j++) {
+ CompoundPredicate orPredicate = new
CompoundPredicate(Operator.OR,
+ validExprs.get(i), validExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+ Assert.assertNotNull("pred: " + orPredicate.toSql(),
expression);
+ }
+ }
+ // both invalid
+ for (int i = 0; i < invalidExprs.size(); i++) {
+ for (int j = 0; j < invalidExprs.size(); j++) {
+ CompoundPredicate orPredicate = new
CompoundPredicate(Operator.OR,
+ invalidExprs.get(i), invalidExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+ Assert.assertNull("pred: " + orPredicate.toSql(), expression);
+ }
+ }
+ // valid or invalid
+ for (int i = 0; i < validExprs.size(); i++) {
+ for (int j = 0; j < invalidExprs.size(); j++) {
+ CompoundPredicate orPredicate = new
CompoundPredicate(Operator.OR,
+ validExprs.get(i), invalidExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema);
+ Assert.assertNull("pred: " + orPredicate.toSql(), expression);
+ }
+ }
+
+ // AND predicate
+ // both valid
+ for (int i = 0; i < validExprs.size(); i++) {
+ for (int j = 0; j < validExprs.size(); j++) {
+ CompoundPredicate andPredicate = new
CompoundPredicate(Operator.AND,
+ validExprs.get(i), validExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+ Assert.assertNotNull("pred: " + andPredicate.toSql(),
expression);
+ }
+ }
+ // both invalid
+ for (int i = 0; i < invalidExprs.size(); i++) {
+ for (int j = 0; j < invalidExprs.size(); j++) {
+ CompoundPredicate andPredicate = new
CompoundPredicate(Operator.AND,
+ invalidExprs.get(i), invalidExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+ Assert.assertNull("pred: " + andPredicate.toSql(), expression);
+ }
+ }
+ // valid and invalid
+ for (int i = 0; i < validExprs.size(); i++) {
+ for (int j = 0; j < invalidExprs.size(); j++) {
+ CompoundPredicate andPredicate = new
CompoundPredicate(Operator.AND,
+ validExprs.get(i), invalidExprs.get(j));
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema);
+ Assert.assertNotNull("pred: " + andPredicate.toSql(),
expression);
+
Assert.assertEquals(SubstraitUtil.substraitExprToProto(LakeSoulUtils.convertToSubstraitExpr(validExprs.get(i),
schema), "table"),
+ SubstraitUtil.substraitExprToProto(expression,
"table"));
+ }
+ }
+
+ // NOT predicate
+ // valid
+ for (int i = 0; i < validExprs.size(); i++) {
+ CompoundPredicate notPredicate = new
CompoundPredicate(Operator.NOT,
+ validExprs.get(i), null);
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema);
+ Assert.assertNotNull("pred: " + notPredicate.toSql(), expression);
+ }
+ // invalid
+ for (int i = 0; i < invalidExprs.size(); i++) {
+ CompoundPredicate notPredicate = new
CompoundPredicate(Operator.NOT,
+ invalidExprs.get(i), null);
+ Expression expression =
LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema);
+ Assert.assertNull("pred: " + notPredicate.toSql(), expression);
+ }
+ }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index 42ab14bd3dc..b679f466f07 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -317,6 +317,8 @@ under the License.
<hudi.version>0.14.1</hudi.version>
<presto.hadoop.version>2.7.4-11</presto.hadoop.version>
<presto.hive.version>3.0.0-8</presto.hive.version>
+ <!-- lakesoul -->
+ <lakesoul.version>2.6.2</lakesoul.version>
<parquet.version>1.13.1</parquet.version>
<commons-collections.version>3.2.2</commons-collections.version>
@@ -1824,4 +1826,4 @@ under the License.
</snapshots>
</repository>
</repositories>
-</project>
\ No newline at end of file
+</project>
diff --git
a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy
new file mode 100644
index 00000000000..01c5d981c49
--- /dev/null
+++
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_lakesoul_filter",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableLakesoulTest")
+ // open it when docker image is ready to run in regression test
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String catalog_name = "lakesoul"
+ String db_name = "default"
+ String pg_user = context.config.otherConfigs.get("lakesoulPGUser")
+ String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd")
+ String pg_url = context.config.otherConfigs.get("lakesoulPGUrl")
+ String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK")
+ String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK")
+ String minio_endpoint =
context.config.otherConfigs.get("lakesoulMinioEndpoint")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog lakesoul properties (
+ 'type'='lakesoul',
+ 'lakesoul.pg.username'='${pg_user}',
+ 'lakesoul.pg.password'='${pg_pwd}',
+ 'lakesoul.pg.url'='${pg_url}',
+ 'minio.endpoint'='${minio_endpoint}',
+ 'minio.access_key'='${minio_ak}',
+ 'minio.secret_key'='${minio_sk}'
+ );"""
+
+ // analyze
+ sql """use `${catalog_name}`.`${db_name}`"""
+
+ sql """show tables;"""
+ // select
+ sql """select * from region;"""
+
+ sql """select * from nation;"""
+
+ sql """select * from nation where n_regionkey = 0 or n_nationkey >
14;"""
+
+ sql """select * from nation where n_regionkey = 0 and n_nationkey >
0;"""
+
+ sql """select * from nation where n_regionkey = 0;"""
+
+ // non-selecting query
+ sql """select count(*) from customer;"""
+
+ // filter by non-partition column
+ sql """select count(*) from customer where c_mktsegment='BUILDING';"""
+
+ // filter by partition column
+ sql """select count(*) from customer where c_nationkey=19;"""
+
+ // filter by both partition and non-partition column
+ sql """select count(*) from customer where c_mktsegment='BUILDING' and
c_nationkey=19;"""
+
+ sql """select * from lineitem where l_shipdate <= DATE '1992-12-01'
limit 10;"""
+
+ sql """select count(*) from part where p_type like 'MEDIUM
POLISHED%';"""
+ }
+}
+
diff --git
a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
index 9369a28e8fe..6a1f83a5e24 100644
---
a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
+++
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
@@ -43,18 +43,18 @@ suite("test_external_table_lakesoul",
"p2,external,lakesoul,external_remote,exte
"""
// analyze
- sql """use `${catalog_name}`.`${db_name}`"""
-
- sql q1
- sql q2
- sql q3
- sql q4
- sql q5
- sql q6
- sql q7
- sql q8
- sql q9
- sql q11
+ sql """use `${catalog_name}`.`${db_name}`"""
+
+ sql q1
+ sql q2
+ sql q3
+ sql q4
+ sql q5
+ sql q6
+ sql q7
+ sql q8
+ sql q9
+ sql q11
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]