This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bfe65565d80 [feature](paimon)support native reader (#29339)
bfe65565d80 is described below
commit bfe65565d80c8dd3bf8d41a8e3656cfaadaa665a
Author: wuwenchi <[email protected]>
AuthorDate: Thu Jan 4 14:31:48 2024 +0800
[feature](paimon)support native reader (#29339)
Support native reader fro paimon.
Upgrade paimon 0.5 to 0.6 : apache/doris-shade#32
---
be/src/vec/exec/scan/vfile_scanner.cpp | 17 +++++--
.../property/constants/PaimonProperties.java | 1 +
.../planner/external/paimon/PaimonScanNode.java | 59 ++++++++++++++++++++--
.../planner/external/paimon/PaimonSource.java | 5 ++
.../doris/planner/external/paimon/PaimonSplit.java | 26 +++++++++-
fe/pom.xml | 4 +-
gensrc/thrift/PlanNodes.thrift | 1 +
7 files changed, 102 insertions(+), 11 deletions(-)
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 6002f4eea67..c70295e1313 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -736,11 +736,22 @@ Status VFileScanner::_get_next_reader() {
// JNI reader can only push down column value range
bool push_down_predicates =
!_is_load && _params->format_type !=
TFileFormatType::FORMAT_JNI;
- if (format_type == TFileFormatType::FORMAT_JNI &&
range.__isset.table_format_params &&
- range.table_format_params.table_format_type == "hudi") {
- if (range.table_format_params.hudi_params.delta_logs.empty()) {
+ if (format_type == TFileFormatType::FORMAT_JNI &&
range.__isset.table_format_params) {
+ if (range.table_format_params.table_format_type == "hudi" &&
+ range.table_format_params.hudi_params.delta_logs.empty()) {
// fall back to native reader if there is no log file
format_type = TFileFormatType::FORMAT_PARQUET;
+ } else if (range.table_format_params.table_format_type == "paimon"
&&
+
!range.table_format_params.paimon_params.__isset.paimon_split) {
+ // use native reader
+ auto format =
range.table_format_params.paimon_params.file_format;
+ if (format == "orc") {
+ format_type = TFileFormatType::FORMAT_ORC;
+ } else if (format == "parquet") {
+ format_type = TFileFormatType::FORMAT_PARQUET;
+ } else {
+ return Status::InternalError("Not supported paimon file
format: {}", format);
+ }
}
}
bool need_to_get_parsed_schema = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
index 06d205ff221..318e2bac30a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
@@ -25,6 +25,7 @@ import java.util.Map;
public class PaimonProperties {
public static final String WAREHOUSE = "warehouse";
+ public static final String FILE_FORMAT = "file.format";
public static final String PAIMON_PREFIX = "paimon";
public static final String PAIMON_CATALOG_TYPE = "metastore";
public static final String HIVE_METASTORE_URIS = "uri";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
index 1e23d2f781d..cf822901992 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
@@ -41,15 +42,20 @@ import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.InstantiationUtil;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -102,7 +108,11 @@ public class PaimonScanNode extends FileQueryScanNode {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
TPaimonFileDesc fileDesc = new TPaimonFileDesc();
- fileDesc.setPaimonSplit(encodeObjectToString(paimonSplit.getSplit()));
+ org.apache.paimon.table.source.Split split = paimonSplit.getSplit();
+ if (split != null) {
+ fileDesc.setPaimonSplit(encodeObjectToString(split));
+ }
+ fileDesc.setFileFormat(source.getFileFormat());
fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot ->
slot.getColumn().getName())
.collect(Collectors.joining(",")));
@@ -127,13 +137,52 @@ public class PaimonScanNode extends FileQueryScanNode {
List<org.apache.paimon.table.source.Split> paimonSplits =
readBuilder.withFilter(predicates)
.withProjection(projected)
.newScan().plan().splits();
+ boolean supportNative = supportNativeReader();
for (org.apache.paimon.table.source.Split split : paimonSplits) {
- PaimonSplit paimonSplit = new PaimonSplit(split);
- splits.add(paimonSplit);
+ if (supportNative && split instanceof DataSplit) {
+ DataSplit dataSplit = (DataSplit) split;
+ Optional<List<RawFile>> optRowFiles =
dataSplit.convertToRawFiles();
+ if (optRowFiles.isPresent()) {
+ List<RawFile> rawFiles = optRowFiles.get();
+ for (RawFile file : rawFiles) {
+ LocationPath locationPath = new
LocationPath(file.path(), source.getCatalog().getProperties());
+ Path finalDataFilePath =
locationPath.toScanRangeLocation();
+ try {
+ splits.addAll(
+ splitFile(
+ finalDataFilePath,
+ 0,
+ null,
+ file.length(),
+ -1,
+ true,
+ null,
+
PaimonSplit.PaimonSplitCreator.DEFAULT));
+ } catch (IOException e) {
+ throw new UserException("Paimon error to split
file: " + e.getMessage(), e);
+ }
+ }
+ } else {
+ splits.add(new PaimonSplit(split));
+ }
+ } else {
+ splits.add(new PaimonSplit(split));
+ }
}
return splits;
}
+ private boolean supportNativeReader() {
+ String fileFormat = source.getFileFormat().toLowerCase();
+ switch (fileFormat) {
+ case "orc":
+ case "parquet":
+ return true;
+ default:
+ return false;
+ }
+ }
+
//When calling 'setPaimonParams' and 'getSplits', the column trimming has
not been performed yet,
// Therefore, paimon_column_names is temporarily reset here
@Override
@@ -157,8 +206,8 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public TFileType getLocationType(String location) throws DdlException,
MetaNotFoundException {
- //todo: no use
- return TFileType.FILE_S3;
+ return
Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
+ new DdlException("Unknown file location " + location + " for
paimon table "));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
index 2f55e30c086..fa838350c82 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.PaimonExternalTable;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileAttributes;
@@ -61,4 +62,8 @@ public class PaimonSource {
public ExternalCatalog getCatalog() {
return paimonExtTable.getCatalog();
}
+
+ public String getFileFormat() {
+ return
originTable.options().getOrDefault(PaimonProperties.FILE_FORMAT, "orc");
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
index 8ecf539db91..13263fb5842 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
@@ -18,21 +18,30 @@
package org.apache.doris.planner.external.paimon;
import org.apache.doris.planner.external.FileSplit;
+import org.apache.doris.planner.external.SplitCreator;
import org.apache.doris.planner.external.TableFormatType;
import org.apache.hadoop.fs.Path;
import org.apache.paimon.table.source.Split;
+import java.util.List;
+
public class PaimonSplit extends FileSplit {
private Split split;
private TableFormatType tableFormatType;
public PaimonSplit(Split split) {
- super(new Path("dummyPath"), 0, 0, 0, null, null);
+ super(new Path("hdfs://dummyPath"), 0, 0, 0, null, null);
this.split = split;
this.tableFormatType = TableFormatType.PAIMON;
}
+ public PaimonSplit(Path file, long start, long length, long fileLength,
String[] hosts,
+ List<String> partitionList) {
+ super(file, start, length, fileLength, hosts, partitionList);
+ this.tableFormatType = TableFormatType.PAIMON;
+ }
+
public Split getSplit() {
return split;
}
@@ -49,4 +58,19 @@ public class PaimonSplit extends FileSplit {
this.tableFormatType = tableFormatType;
}
+ public static class PaimonSplitCreator implements SplitCreator {
+
+ static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();
+
+ @Override
+ public org.apache.doris.spi.Split create(Path path,
+ long start,
+ long length,
+ long fileLength,
+ long modificationTime,
+ String[] hosts,
+ List<String> partitionValues)
{
+ return new PaimonSplit(path, start, length, fileLength, hosts,
partitionValues);
+ }
+ }
}
diff --git a/fe/pom.xml b/fe/pom.xml
index ef0747931cf..dbb691ddf2d 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -221,7 +221,7 @@ under the License.
<doris.home>${fe.dir}/../</doris.home>
<revision>1.2-SNAPSHOT</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
<doris.hive.catalog.shade.version>1.0.2</doris.hive.catalog.shade.version>
+
<doris.hive.catalog.shade.version>1.0.3</doris.hive.catalog.shade.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!--plugin parameters-->
@@ -343,7 +343,7 @@ under the License.
<!--todo waiting release-->
<quartz.version>2.3.2</quartz.version>
<!-- paimon -->
- <paimon.version>0.5.0-incubating</paimon.version>
+ <paimon.version>0.6.0-incubating</paimon.version>
<disruptor.version>3.4.4</disruptor.version>
<trino.parser.version>395</trino.parser.version>
<!-- arrow flight sql -->
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 0e35f6ebc59..23da0f23ebb 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -311,6 +311,7 @@ struct TPaimonFileDesc {
8: optional i64 db_id
9: optional i64 tbl_id
10: optional i64 last_update_time
+ 11: optional string file_format
}
struct TMaxComputeFileDesc {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]