This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 53291bd040d branch-2.1: [fix](hudi) upgrade hudi to 0.15.0
(#44267)(#44995) (#45041)
53291bd040d is described below
commit 53291bd040d5c23931244af457c33b424fc6211b
Author: Socrates <[email protected]>
AuthorDate: Fri Dec 6 15:18:37 2024 +0800
branch-2.1: [fix](hudi) upgrade hudi to 0.15.0 (#44267)(#44995) (#45041)
cherry-pick pr: #44267 #44995
---
be/src/vec/exec/format/table/hudi_jni_reader.cpp | 15 +-
be/src/vec/exec/format/table/hudi_jni_reader.h | 4 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 16 +-
build.sh | 4 +
fe/be-java-extensions/hadoop-hudi-scanner/pom.xml | 227 +++++++++++++++++
.../apache/doris/hudi/HadoopHudiColumnValue.java | 219 +++++++++++++++++
.../apache/doris/hudi/HadoopHudiJniScanner.java | 271 +++++++++++++++++++++
.../src/main/resources/package.xml | 41 ++++
.../src/main/java/org/apache/doris/hudi/Utils.java | 4 +-
.../org/apache/doris/hudi/BaseSplitReader.scala | 15 +-
fe/be-java-extensions/pom.xml | 1 +
.../apache/doris/datasource/FileQueryScanNode.java | 2 +
.../datasource/hive/HiveMetaStoreClientHelper.java | 5 +-
.../apache/doris/datasource/hudi/HudiUtils.java | 2 +-
.../hudi/source/COWIncrementalRelation.java | 11 +-
.../hudi/source/HudiLocalEngineContext.java | 67 ++---
.../hudi/source/HudiPartitionProcessor.java | 14 +-
.../doris/datasource/hudi/source/HudiScanNode.java | 71 ++++--
.../doris/datasource/hudi/source/HudiSplit.java | 3 +-
.../hudi/source/MORIncrementalRelation.java | 14 +-
.../datasource/paimon/source/PaimonScanNode.java | 39 ++-
.../glue/translator/PhysicalPlanTranslator.java | 2 +-
.../apache/doris/planner/SingleNodePlanner.java | 2 +-
.../java/org/apache/doris/qe/SessionVariable.java | 14 ++
fe/pom.xml | 6 +-
gensrc/thrift/PlanNodes.thrift | 4 +-
.../hudi/test_hudi_incremental.out | 174 +++++++++++++
.../hudi/test_hudi_orc_tables.out | 15 ++
.../hudi/test_hudi_schema_evolution.out | 32 +++
.../external_table_p2/hudi/test_hudi_snapshot.out | Bin 348526 -> 696105 bytes
.../external_table_p2/hudi/test_hudi_timestamp.out | 31 ++-
.../hudi/test_hudi_timetravel.out | 120 +++++++++
.../hudi/test_hudi_catalog.groovy | 2 +-
.../hudi/test_hudi_incremental.groovy | 16 +-
..._catalog.groovy => test_hudi_orc_tables.groovy} | 10 +-
.../hudi/test_hudi_schema_evolution.groovy | 14 +-
.../hudi/test_hudi_snapshot.groovy | 13 +-
.../hudi/test_hudi_timestamp.groovy | 20 +-
.../hudi/test_hudi_timetravel.groovy | 15 +-
39 files changed, 1388 insertions(+), 147 deletions(-)
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
index cffa2ce9ac4..1888d2a37c7 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
@@ -18,7 +18,6 @@
#include "hudi_jni_reader.h"
#include <map>
-#include <ostream>
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
@@ -65,7 +64,7 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams&
scan_params,
{"input_format", _hudi_params.input_format}};
// Use compatible hadoop client to read data
- for (auto& kv : _scan_params.properties) {
+ for (const auto& kv : _scan_params.properties) {
if (kv.first.starts_with(HOODIE_CONF_PREFIX)) {
params[kv.first] = kv.second;
} else {
@@ -73,8 +72,16 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams&
scan_params,
}
}
- _jni_connector =
std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner", params,
- required_fields);
+ // if (_hudi_params.hudi_jni_scanner == "hadoop") {
+ // _jni_connector = std::make_unique<JniConnector>(
+ // "org/apache/doris/hudi/HadoopHudiJniScanner", params,
required_fields);
+ // }
+ if (_hudi_params.hudi_jni_scanner == "spark") {
+ _jni_connector =
std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner",
+ params,
required_fields);
+ } else {
+ DCHECK(false) << "Unsupported hudi jni scanner: " <<
_hudi_params.hudi_jni_scanner;
+ }
}
Status HudiJniReader::get_next_block(Block* block, size_t* read_rows, bool*
eof) {
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h
b/be/src/vec/exec/format/table/hudi_jni_reader.h
index e9bb55a69a7..bfa0291a610 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.h
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.h
@@ -17,9 +17,7 @@
#pragma once
-#include <stddef.h>
-
-#include <memory>
+#include <cstddef>
#include <string>
#include <unordered_map>
#include <unordered_set>
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index acd590ba958..e3899d96982 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -23,11 +23,9 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
-#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <iterator>
#include <map>
-#include <ostream>
#include <tuple>
#include <utility>
@@ -47,7 +45,6 @@
#include "vec/common/string_ref.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/columns_with_type_and_name.h"
-#include "vec/core/field.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
@@ -773,17 +770,16 @@ Status VFileScanner::_get_next_reader() {
// create reader for specific format
Status init_status;
- TFileFormatType::type format_type = _params->format_type;
+ // for compatibility, if format_type is not set in range, use the
format type of params
+ TFileFormatType::type format_type =
+ range.__isset.format_type ? range.format_type :
_params->format_type;
// JNI reader can only push down column value range
bool push_down_predicates =
!_is_load && _params->format_type !=
TFileFormatType::FORMAT_JNI;
+ // for compatibility, this logic is deprecated in 3.1
if (format_type == TFileFormatType::FORMAT_JNI &&
range.__isset.table_format_params) {
- if (range.table_format_params.table_format_type == "hudi" &&
- range.table_format_params.hudi_params.delta_logs.empty()) {
- // fall back to native reader if there is no log file
- format_type = TFileFormatType::FORMAT_PARQUET;
- } else if (range.table_format_params.table_format_type == "paimon"
&&
-
!range.table_format_params.paimon_params.__isset.paimon_split) {
+ if (range.table_format_params.table_format_type == "paimon" &&
+ !range.table_format_params.paimon_params.__isset.paimon_split)
{
// use native reader
auto format =
range.table_format_params.paimon_params.file_format;
if (format == "orc") {
diff --git a/build.sh b/build.sh
index ada27d6d94f..98317dbf4ae 100755
--- a/build.sh
+++ b/build.sh
@@ -478,6 +478,8 @@ fi
if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
modules+=("fe-common")
modules+=("be-java-extensions/hudi-scanner")
+ # don't compile hadoop-hudi-scanner for 2.1 now
+ # modules+=("be-java-extensions/hadoop-hudi-scanner")
modules+=("be-java-extensions/java-common")
modules+=("be-java-extensions/java-udf")
modules+=("be-java-extensions/jdbc-scanner")
@@ -725,6 +727,8 @@ EOF
extensions_modules=("java-udf")
extensions_modules+=("jdbc-scanner")
extensions_modules+=("hudi-scanner")
+ # don't compile hadoop-hudi-scanner for 2.1 now
+ # extensions_modules+=("hadoop-hudi-scanner")
extensions_modules+=("paimon-scanner")
extensions_modules+=("max-compute-scanner")
extensions_modules+=("avro-scanner")
diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
new file mode 100644
index 00000000000..4b80d49de17
--- /dev/null
+++ b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
@@ -0,0 +1,227 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>be-java-extensions</artifactId>
+ <groupId>org.apache.doris</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hadoop-hudi-scanner</artifactId>
+
+ <properties>
+ <doris.home>${basedir}/../../</doris.home>
+ <fe_ut_parallel>1</fe_ut_parallel>
+ <hudi.version>0.15.0</hudi.version>
+ <avro.version>1.11.3</avro.version>
+ <luben.zstd.jni.version>1.5.4-2</luben.zstd.jni.version>
+ <hive-apache.version>3.1.2-22</hive-apache.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>java-common</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!--
https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <!--
https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-common -->
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-common</artifactId>
+ <version>${hudi.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-io -->
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-io</artifactId>
+ <version>${hudi.version}</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.hudi/hudi-hadoop-mr
-->
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-hadoop-mr</artifactId>
+ <version>${hudi.version}</version>
+ </dependency>
+
+ <!--
https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop -->
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop-bundle</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+
+ <!--
https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro -->
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>concurrent</artifactId>
+ <version>202</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/io.airlift/aircompressor -->
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>aircompressor</artifactId>
+ <version>${aircompressor.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ <version>${luben.zstd.jni.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo-shaded</artifactId>
+ <version>4.0.2</version>
+ </dependency>
+
+ <!-- hive -->
+ <dependency>
+ <groupId>io.trino.hive</groupId>
+ <artifactId>hive-apache</artifactId>
+ <version>${hive-apache.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.airlift</groupId>
+ <artifactId>aircompressor</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <finalName>hadoop-hudi-scanner</finalName>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/resources/package.xml</descriptor>
+ </descriptors>
+ <archive>
+ <manifest>
+ <mainClass></mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java
new file mode 100644
index 00000000000..ae0199d07d2
--- /dev/null
+++
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hudi;
+
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnValue;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.LongWritable;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Map;
+
+public class HadoopHudiColumnValue implements ColumnValue {
+ private ColumnType dorisType;
+ private ObjectInspector fieldInspector;
+ private Object fieldData;
+ private final ZoneId zoneId;
+
+ public HadoopHudiColumnValue(ZoneId zoneId) {
+ this.zoneId = zoneId;
+ }
+
+ public void setRow(Object record) {
+ this.fieldData = record;
+ }
+
+ public void setField(ColumnType dorisType, ObjectInspector fieldInspector)
{
+ this.dorisType = dorisType;
+ this.fieldInspector = fieldInspector;
+ }
+
+ private Object inspectObject() {
+ return ((PrimitiveObjectInspector)
fieldInspector).getPrimitiveJavaObject(fieldData);
+ }
+
+ @Override
+ public boolean getBoolean() {
+ return (boolean) inspectObject();
+ }
+
+ @Override
+ public short getShort() {
+ return (short) inspectObject();
+ }
+
+ @Override
+ public int getInt() {
+ return (int) inspectObject();
+ }
+
+ @Override
+ public float getFloat() {
+ return (float) inspectObject();
+ }
+
+ @Override
+ public long getLong() {
+ return (long) inspectObject();
+ }
+
+ @Override
+ public double getDouble() {
+ return (double) inspectObject();
+ }
+
+ @Override
+ public String getString() {
+ return inspectObject().toString();
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return (byte[]) inspectObject();
+ }
+
+
+ @Override
+ public byte getByte() {
+ throw new UnsupportedOperationException("Hoodie type does not support
tinyint");
+ }
+
+ @Override
+ public BigDecimal getDecimal() {
+ return ((HiveDecimal) inspectObject()).bigDecimalValue();
+ }
+
+ @Override
+ public LocalDate getDate() {
+ return LocalDate.ofEpochDay((((DateObjectInspector)
fieldInspector).getPrimitiveJavaObject(fieldData))
+ .toEpochDay());
+ }
+
+ @Override
+ public LocalDateTime getDateTime() {
+ if (fieldData instanceof Timestamp) {
+ return ((Timestamp) fieldData).toLocalDateTime();
+ } else if (fieldData instanceof TimestampWritableV2) {
+ return
LocalDateTime.ofInstant(Instant.ofEpochSecond((((TimestampObjectInspector)
fieldInspector)
+ .getPrimitiveJavaObject(fieldData)).toEpochSecond()),
zoneId);
+ } else {
+ long datetime = ((LongWritable) fieldData).get();
+ long seconds;
+ long nanoseconds;
+ if (dorisType.getPrecision() == 3) {
+ seconds = datetime / 1000;
+ nanoseconds = (datetime % 1000) * 1000000;
+ } else if (dorisType.getPrecision() == 6) {
+ seconds = datetime / 1000000;
+ nanoseconds = (datetime % 1000000) * 1000;
+ } else {
+ throw new RuntimeException("Hoodie timestamp only support
milliseconds and microseconds, "
+ + "wrong precision = " + dorisType.getPrecision());
+ }
+ return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds,
nanoseconds), zoneId);
+ }
+ }
+
+ @Override
+ public boolean canGetStringAsBytes() {
+ return false;
+ }
+
+ @Override
+ public boolean isNull() {
+ return fieldData == null;
+ }
+
+ @Override
+ public BigInteger getBigInteger() {
+ throw new UnsupportedOperationException("Hoodie type does not support
largeint");
+ }
+
+ @Override
+ public byte[] getStringAsBytes() {
+ throw new UnsupportedOperationException("Hoodie type does not support
getStringAsBytes");
+ }
+
+ @Override
+ public void unpackArray(List<ColumnValue> values) {
+ ListObjectInspector inspector = (ListObjectInspector) fieldInspector;
+ List<?> items = inspector.getList(fieldData);
+ ObjectInspector itemInspector =
inspector.getListElementObjectInspector();
+ for (int i = 0; i < items.size(); i++) {
+ Object item = items.get(i);
+ HadoopHudiColumnValue childValue = new
HadoopHudiColumnValue(zoneId);
+ childValue.setRow(item);
+ childValue.setField(dorisType.getChildTypes().get(0),
itemInspector);
+ values.add(childValue);
+ }
+ }
+
+ @Override
+ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+ MapObjectInspector inspector = (MapObjectInspector) fieldInspector;
+ ObjectInspector keyObjectInspector =
inspector.getMapKeyObjectInspector();
+ ObjectInspector valueObjectInspector =
inspector.getMapValueObjectInspector();
+ for (Map.Entry kv : inspector.getMap(fieldData).entrySet()) {
+ HadoopHudiColumnValue key = new HadoopHudiColumnValue(zoneId);
+ key.setRow(kv.getKey());
+ key.setField(dorisType.getChildTypes().get(0), keyObjectInspector);
+ keys.add(key);
+
+ HadoopHudiColumnValue value = new HadoopHudiColumnValue(zoneId);
+ value.setRow(kv.getValue());
+ value.setField(dorisType.getChildTypes().get(1),
valueObjectInspector);
+ values.add(value);
+ }
+ }
+
+ @Override
+ public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue>
values) {
+ StructObjectInspector inspector = (StructObjectInspector)
fieldInspector;
+ List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+ for (int i = 0; i < structFieldIndex.size(); i++) {
+ Integer idx = structFieldIndex.get(i);
+ HadoopHudiColumnValue value = new HadoopHudiColumnValue(zoneId);
+ Object obj = null;
+ if (idx != null) {
+ StructField sf = fields.get(idx);
+ obj = inspector.getStructFieldData(fieldData, sf);
+ }
+ value.setRow(obj);
+ value.setField(dorisType.getChildTypes().get(i),
fields.get(i).getFieldObjectInspector());
+ values.add(value);
+ }
+ }
+}
diff --git
a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
new file mode 100644
index 00000000000..f2b38815a36
--- /dev/null
+++
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hudi;
+
+import org.apache.doris.common.classloader.ThreadClassLoaderContext;
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.vec.ColumnType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * HadoopHudiJniScanner is a JniScanner implementation that reads Hudi data
using hudi-hadoop-mr.
+ */
+public class HadoopHudiJniScanner extends JniScanner {
+ private static final Logger LOG =
LoggerFactory.getLogger(HadoopHudiJniScanner.class);
+
+ private static final String HADOOP_CONF_PREFIX = "hadoop_conf.";
+
+ // Hudi data info
+ private final String basePath;
+ private final String dataFilePath;
+ private final long dataFileLength;
+ private final String[] deltaFilePaths;
+ private final String instantTime;
+ private final String serde;
+ private final String inputFormat;
+
+ // schema info
+ private final String hudiColumnNames;
+ private final String[] hudiColumnTypes;
+ private final String[] requiredFields;
+ private List<Integer> requiredColumnIds;
+ private ColumnType[] requiredTypes;
+
+ // Hadoop info
+ private RecordReader<NullWritable, ArrayWritable> reader;
+ private StructObjectInspector rowInspector;
+ private final ObjectInspector[] fieldInspectors;
+ private final StructField[] structFields;
+ private Deserializer deserializer;
+ private final Map<String, String> fsOptionsProps;
+
+ // scanner info
+ private final HadoopHudiColumnValue columnValue;
+ private final int fetchSize;
+ private final ClassLoader classLoader;
+
+ public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {
+ this.basePath = params.get("base_path");
+ this.dataFilePath = params.get("data_file_path");
+ this.dataFileLength = Long.parseLong(params.get("data_file_length"));
+ if (Strings.isNullOrEmpty(params.get("delta_file_paths"))) {
+ this.deltaFilePaths = new String[0];
+ } else {
+ this.deltaFilePaths = params.get("delta_file_paths").split(",");
+ }
+ this.instantTime = params.get("instant_time");
+ this.serde = params.get("serde");
+ this.inputFormat = params.get("input_format");
+
+ this.hudiColumnNames = params.get("hudi_column_names");
+ this.hudiColumnTypes = params.get("hudi_column_types").split("#");
+ this.requiredFields = params.get("required_fields").split(",");
+
+ this.fieldInspectors = new ObjectInspector[requiredFields.length];
+ this.structFields = new StructField[requiredFields.length];
+ this.fsOptionsProps = Maps.newHashMap();
+ for (Map.Entry<String, String> entry : params.entrySet()) {
+ if (entry.getKey().startsWith(HADOOP_CONF_PREFIX)) {
+
fsOptionsProps.put(entry.getKey().substring(HADOOP_CONF_PREFIX.length()),
entry.getValue());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get hudi params {}: {}", entry.getKey(),
entry.getValue());
+ }
+ }
+
+ ZoneId zoneId;
+ if (Strings.isNullOrEmpty(params.get("time_zone"))) {
+ zoneId = ZoneId.systemDefault();
+ } else {
+ zoneId = ZoneId.of(params.get("time_zone"));
+ }
+ this.columnValue = new HadoopHudiColumnValue(zoneId);
+ this.fetchSize = fetchSize;
+ this.classLoader = this.getClass().getClassLoader();
+ }
+
+ @Override
+ public void open() throws IOException {
+ try (ThreadClassLoaderContext ignored = new
ThreadClassLoaderContext(classLoader)) {
+ initRequiredColumnsAndTypes();
+ initTableInfo(requiredTypes, requiredFields, fetchSize);
+ Properties properties = getReaderProperties();
+ initReader(properties);
+ } catch (Exception e) {
+ close();
+ LOG.warn("failed to open hadoop hudi jni scanner", e);
+ throw new IOException("failed to open hadoop hudi jni scanner: " +
e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public int getNext() throws IOException {
+ try (ThreadClassLoaderContext ignored = new
ThreadClassLoaderContext(classLoader)) {
+ NullWritable key = reader.createKey();
+ ArrayWritable value = reader.createValue();
+ int numRows = 0;
+ for (; numRows < fetchSize; numRows++) {
+ if (!reader.next(key, value)) {
+ break;
+ }
+ Object rowData = deserializer.deserialize(value);
+ for (int i = 0; i < fields.length; i++) {
+ Object fieldData =
rowInspector.getStructFieldData(rowData, structFields[i]);
+ columnValue.setRow(fieldData);
+ // LOG.info("rows: {}, column: {}, col name: {}, col type:
{}, inspector: {}",
+ // numRows, i, types[i].getName(),
types[i].getType().name(),
+ // fieldInspectors[i].getTypeName());
+ columnValue.setField(types[i], fieldInspectors[i]);
+ appendData(i, columnValue);
+ }
+ }
+ return numRows;
+ } catch (Exception e) {
+ close();
+ LOG.warn("failed to get next in hadoop hudi jni scanner", e);
+ throw new IOException("failed to get next in hadoop hudi jni
scanner: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try (ThreadClassLoaderContext ignored = new
ThreadClassLoaderContext(classLoader)) {
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (IOException e) {
+ LOG.warn("failed to close hadoop hudi jni scanner", e);
+ throw new IOException("failed to close hadoop hudi jni scanner: "
+ e.getMessage(), e);
+ }
+ }
+
+ private void initRequiredColumnsAndTypes() {
+ String[] splitHudiColumnNames = hudiColumnNames.split(",");
+
+ Map<String, Integer> hudiColNameToIdx =
+ IntStream.range(0, splitHudiColumnNames.length)
+ .boxed()
+ .collect(Collectors.toMap(i ->
splitHudiColumnNames[i], i -> i));
+
+ Map<String, String> hudiColNameToType =
+ IntStream.range(0, splitHudiColumnNames.length)
+ .boxed()
+ .collect(Collectors.toMap(i ->
splitHudiColumnNames[i], i -> hudiColumnTypes[i]));
+
+ requiredTypes = Arrays.stream(requiredFields)
+ .map(field -> ColumnType.parseType(field,
hudiColNameToType.get(field)))
+ .toArray(ColumnType[]::new);
+
+ requiredColumnIds = Arrays.stream(requiredFields)
+ .mapToInt(hudiColNameToIdx::get)
+ .boxed().collect(Collectors.toList());
+ }
+
+ private Properties getReaderProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("hive.io.file.readcolumn.ids",
Joiner.on(",").join(requiredColumnIds));
+ properties.setProperty("hive.io.file.readcolumn.names",
Joiner.on(",").join(this.requiredFields));
+ properties.setProperty("columns", this.hudiColumnNames);
+ properties.setProperty("columns.types",
Joiner.on(",").join(hudiColumnTypes));
+ properties.setProperty("serialization.lib", this.serde);
+ properties.setProperty("hive.io.file.read.all.columns", "false");
+ fsOptionsProps.forEach(properties::setProperty);
+ return properties;
+ }
+
+ private void initReader(Properties properties) throws Exception {
+ String realtimePath = dataFileLength != -1 ? dataFilePath :
deltaFilePaths[0];
+ long realtimeLength = dataFileLength != -1 ? dataFileLength : 0;
+ Path path = new Path(realtimePath);
+ FileSplit fileSplit = new FileSplit(path, 0, realtimeLength,
(String[]) null);
+ List<HoodieLogFile> logFiles =
Arrays.stream(deltaFilePaths).map(HoodieLogFile::new)
+ .collect(Collectors.toList());
+ FileSplit hudiSplit =
+ new HoodieRealtimeFileSplit(fileSplit, basePath, logFiles,
instantTime, false, Option.empty());
+
+ JobConf jobConf = new JobConf(new Configuration());
+ properties.stringPropertyNames().forEach(name -> jobConf.set(name,
properties.getProperty(name)));
+ InputFormat<?, ?> inputFormatClass = createInputFormat(jobConf,
inputFormat);
+ reader = (RecordReader<NullWritable, ArrayWritable>) inputFormatClass
+ .getRecordReader(hudiSplit, jobConf, Reporter.NULL);
+
+ deserializer = getDeserializer(jobConf, properties, serde);
+ rowInspector = getTableObjectInspector(deserializer);
+ for (int i = 0; i < requiredFields.length; i++) {
+ StructField field =
rowInspector.getStructFieldRef(requiredFields[i]);
+ structFields[i] = field;
+ fieldInspectors[i] = field.getFieldObjectInspector();
+ }
+ }
+
+ private InputFormat<?, ?> createInputFormat(Configuration conf, String
inputFormat) throws Exception {
+ Class<?> clazz = conf.getClassByName(inputFormat);
+ Class<? extends InputFormat<?, ?>> cls =
+ (Class<? extends InputFormat<?, ?>>)
clazz.asSubclass(InputFormat.class);
+ return ReflectionUtils.newInstance(cls, conf);
+ }
+
+ private Deserializer getDeserializer(Configuration configuration,
Properties properties, String name)
+ throws Exception {
+ Class<? extends Deserializer> deserializerClass = Class.forName(name,
true, JavaUtils.getClassLoader())
+ .asSubclass(Deserializer.class);
+ Deserializer deserializer =
deserializerClass.getConstructor().newInstance();
+ deserializer.initialize(configuration, properties);
+ return deserializer;
+ }
+
+ private StructObjectInspector getTableObjectInspector(Deserializer
deserializer) throws Exception {
+ ObjectInspector inspector = deserializer.getObjectInspector();
+ Preconditions.checkArgument(inspector.getCategory() ==
ObjectInspector.Category.STRUCT,
+ "expected STRUCT: %s", inspector.getCategory());
+ return (StructObjectInspector) inspector;
+ }
+}
diff --git
a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml
new file mode 100644
index 00000000000..4bbb2610603
--- /dev/null
+++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0
http://maven.apache.org/xsd/assembly-2.0.0.xsd">
+ <id>jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ <unpackOptions>
+ <excludes>
+ <exclude>**/Log4j2Plugins.dat</exclude>
+ </excludes>
+ </unpackOptions>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
index 03085b12f2b..5dd845435c5 100644
---
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
+++
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
@@ -23,6 +23,7 @@ import
org.apache.doris.common.security.authentication.HadoopUGI;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import sun.management.VMManagement;
import java.io.BufferedReader;
@@ -85,7 +86,8 @@ public class Utils {
}
public static HoodieTableMetaClient getMetaClient(Configuration conf,
String basePath) {
+ HadoopStorageConfiguration hadoopStorageConfiguration = new
HadoopStorageConfiguration(conf);
return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf),
() -> HoodieTableMetaClient.builder()
- .setConf(conf).setBasePath(basePath).build());
+
.setConf(hadoopStorageConfiguration).setBasePath(basePath).build());
}
}
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
index dcc068ad700..fc8d74f9713 100644
---
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
+++
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
@@ -36,13 +36,15 @@ import org.apache.hudi.common.table.{HoodieTableConfig,
HoodieTableMetaClient, T
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.hadoop.CachingPath
+import org.apache.hudi.hadoop.fs.CachingPath
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
-import org.apache.hudi.io.storage.HoodieAvroHFileReader
+import org.apache.hudi.io.hadoop.HoodieHBaseAvroHFileReader
import org.apache.hudi.metadata.HoodieTableMetadataUtil
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
DataSourceWriteOptions, HoodieSparkConfUtils, HoodieTableSchema,
HoodieTableState}
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.log4j.Logger
import org.apache.spark.sql.adapter.Spark3_4Adapter
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters,
HoodieSparkAvroSchemaConverters}
@@ -430,7 +432,7 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
try {
if (shouldExtractPartitionValuesFromPartitionPath) {
val filePath = new Path(split.dataFilePath)
- val tablePathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(tableInformation.metaClient.getBasePathV2)
+ val tablePathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(new
Path(tableInformation.metaClient.getBasePathV2.toUri))
val partitionPathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(filePath.getParent)
val relativePath = new
URI(tablePathWithoutScheme.toString).relativize(new
URI(partitionPathWithoutScheme.toString)).toString
val hiveStylePartitioningEnabled =
tableConfig.getHiveStylePartitioningEnable.toBoolean
@@ -497,8 +499,11 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile =>
Iterator[InternalRow] = {
partitionedFile => {
- val reader = new HoodieAvroHFileReader(
- hadoopConf, partitionedFile.filePath.toPath, new
CacheConfig(hadoopConf))
+ var hadoopStorageConfiguration = new
HadoopStorageConfiguration(hadoopConf);
+ var storagePath = new StoragePath(partitionedFile.toPath.toUri.getPath);
+ var emptySchema =
org.apache.hudi.common.util.Option.empty[org.apache.avro.Schema]()
+ val reader = new HoodieHBaseAvroHFileReader(
+ hadoopStorageConfiguration, storagePath, emptySchema)
val requiredRowSchema = requiredDataSchema.structTypeSchema
// NOTE: Schema has to be parsed at this point, since Avro's [[Schema]]
aren't serializable
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index af8584d1776..21e7edbb4fc 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -22,6 +22,7 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<modules>
<module>hudi-scanner</module>
+ <module>hadoop-hudi-scanner</module>
<module>java-common</module>
<module>java-udf</module>
<module>jdbc-scanner</module>
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index fe8772e2341..c4fcaebbb19 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -430,6 +430,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
}
+ // set file format type, and the type might fall back to native format
in setScanParams
+ rangeDesc.setFormatType(getFileFormatType());
setScanParams(rangeDesc, fileSplit);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
index 531676367f3..5b4c4f36e3f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -828,8 +829,10 @@ public class HiveMetaStoreClientHelper {
public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) {
String hudiBasePath = table.getRemoteTable().getSd().getLocation();
Configuration conf = getConfiguration(table);
+ HadoopStorageConfiguration hadoopStorageConfiguration = new
HadoopStorageConfiguration(conf);
return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf),
- () ->
HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build());
+ () ->
HoodieTableMetaClient.builder().setConf(hadoopStorageConfiguration).setBasePath(hudiBasePath)
+ .build());
}
public static Configuration getConfiguration(HMSExternalTable table) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
index d7803b1a516..c98d994a28a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
@@ -86,7 +86,7 @@ public class HudiUtils {
case LONG:
if (logicalType instanceof LogicalTypes.TimestampMillis
|| logicalType instanceof
LogicalTypes.TimestampMicros) {
- return logicalType.getName();
+ return "timestamp";
}
if (logicalType instanceof LogicalTypes.TimeMicros) {
return handleUnsupportedType(schema);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
index 7981a0b4f26..843dded2796 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.table.timeline.TimelineUtils;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.StoragePath;
import java.io.IOException;
import java.util.ArrayList;
@@ -105,7 +106,7 @@ public class COWIncrementalRelation implements
IncrementalRelation {
List<HoodieInstant> commitsToReturn =
commitsTimelineToReturn.getInstants();
// todo: support configuration hoodie.datasource.read.incr.filters
- Path basePath = metaClient.getBasePathV2();
+ StoragePath basePath = metaClient.getBasePathV2();
Map<String, String> regularFileIdToFullPath = new HashMap<>();
Map<String, String> metaBootstrapFileIdToFullPath = new HashMap<>();
HoodieTimeline replacedTimeline =
commitsTimelineToReturn.getCompletedReplaceTimeline();
@@ -113,8 +114,8 @@ public class COWIncrementalRelation implements
IncrementalRelation {
for (HoodieInstant instant : replacedTimeline.getInstants()) {
HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class).getPartitionToReplaceFileIds().forEach(
- (key, value) -> value.forEach(
- e -> replacedFile.put(e,
FSUtils.getPartitionPath(basePath, key).toString())));
+ (key, value) -> value.forEach(
+ e -> replacedFile.put(e,
FSUtils.constructAbsolutePath(basePath, key).toString())));
}
fileToWriteStat = new HashMap<>();
@@ -123,7 +124,7 @@ public class COWIncrementalRelation implements
IncrementalRelation {
commitTimeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
metadata.getPartitionToWriteStats().forEach((partition, stats) -> {
for (HoodieWriteStat stat : stats) {
- fileToWriteStat.put(FSUtils.getPartitionPath(basePath,
stat.getPath()).toString(), stat);
+
fileToWriteStat.put(FSUtils.constructAbsolutePath(basePath,
stat.getPath()).toString(), stat);
}
});
if
(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS.equals(commit.getTimestamp())) {
@@ -158,7 +159,7 @@ public class COWIncrementalRelation implements
IncrementalRelation {
}
- fs = basePath.getFileSystem(configuration);
+ fs = new Path(basePath.toUri().getPath()).getFileSystem(configuration);
fullTableScan = shouldFullTableScan();
includeStartTime = !fullTableScan;
if (fullTableScan || commitsToReturn.isEmpty()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
index 26ef6fdfef7..fecc026cf8d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
@@ -17,10 +17,6 @@
package org.apache.doris.datasource.hudi.source;
-import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
@@ -39,7 +35,7 @@ import
org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.StorageConfiguration;
import java.util.Collections;
import java.util.Iterator;
@@ -50,18 +46,20 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
- * This file is copied from
org.apache.hudi.common.engine.HoodieLocalEngineContext.
+ * This file is copied from
+ * org.apache.hudi.common.engine.HudiLocalEngineContext.
* Because we need set ugi in thread pool
- * A java based engine context, use this implementation on the query engine
integrations if needed.
+ * A java based engine context, use this implementation on the query engine
+ * integrations if needed.
*/
public final class HudiLocalEngineContext extends HoodieEngineContext {
- public HudiLocalEngineContext(Configuration conf) {
+ public HudiLocalEngineContext(StorageConfiguration<?> conf) {
this(conf, new LocalTaskContextSupplier());
}
- public HudiLocalEngineContext(Configuration conf, TaskContextSupplier
taskContextSupplier) {
- super(new SerializableConfiguration(conf), taskContextSupplier);
+ public HudiLocalEngineContext(StorageConfiguration<?> conf,
TaskContextSupplier taskContextSupplier) {
+ super(conf, taskContextSupplier);
}
@Override
@@ -81,27 +79,18 @@ public final class HudiLocalEngineContext extends
HoodieEngineContext {
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func,
int parallelism) {
- return data.stream().parallel().map(v1 -> {
- try {
- return
HiveMetaStoreClientHelper.ugiDoAs(getHadoopConf().get(), () -> func.apply(v1));
- } catch (Exception e) {
- throw new HoodieException("Error occurs when executing map",
e);
- }
- }).collect(Collectors.toList());
+ return
data.stream().parallel().map(FunctionWrapper.throwingMapWrapper(func)).collect(Collectors.toList());
}
@Override
public <I, K, V> List<V> mapToPairAndReduceByKey(
- List<I> data,
- SerializablePairFunction<I, K, V> mapToPairFunc,
- SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc,
+ int parallelism) {
return
data.stream().parallel().map(FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc))
- .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
- .map(list ->
- list.stream()
- .map(e -> e.getValue())
+ .collect(Collectors.groupingBy(p ->
p.getKey())).values().stream()
+ .map(list -> list.stream().map(e -> e.getValue())
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).get())
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
}
@Override
@@ -109,29 +98,28 @@ public final class HudiLocalEngineContext extends
HoodieEngineContext {
Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
flatMapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return
FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
- .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
- .map(entry -> new ImmutablePair<>(entry.getKey(),
entry.getValue().stream().map(
-
Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)))
- .filter(Objects::nonNull);
+
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
+ .map(entry -> new ImmutablePair<>(entry.getKey(),
entry.getValue().stream().map(
+
Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)))
+ .filter(Objects::nonNull);
}
@Override
public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc,
int parallelism) {
return data.stream().parallel()
- .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
- .map(list ->
- list.stream()
- .map(e -> e.getValue())
-
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
+ .collect(Collectors.groupingBy(p ->
p.getKey())).values().stream()
+ .map(list -> list.stream().map(e -> e.getValue())
+
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc))
+ .orElse(null))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
}
@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I,
Stream<O>> func, int parallelism) {
- return
-
data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func)).collect(Collectors.toList());
+ return
data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func))
+ .collect(Collectors.toList());
}
@Override
@@ -142,8 +130,7 @@ public final class HudiLocalEngineContext extends
HoodieEngineContext {
@Override
public <I, K, V> Map<K, V> mapToPair(List<I> data,
SerializablePairFunction<I, K, V> func, Integer parallelism) {
return
data.stream().map(FunctionWrapper.throwingMapToPairWrapper(func)).collect(
- Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal)
-> newVal)
- );
+ Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal,
newVal) -> newVal));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
index 738b2638588..0ab9fef951a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
@@ -21,7 +21,6 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
@@ -49,14 +48,15 @@ public abstract class HudiPartitionProcessor {
.build();
HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
- new HudiLocalEngineContext(tableMetaClient.getHadoopConf()),
metadataConfig,
+ new HudiLocalEngineContext(tableMetaClient.getStorageConf()),
tableMetaClient.getStorage(),
+ metadataConfig,
tableMetaClient.getBasePathV2().toString(), true);
return newTableMetadata.getAllPartitionPaths();
}
public List<String> getPartitionNamesBeforeOrEquals(HoodieTimeline
timeline, String timestamp) {
- return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths(
+ return new ArrayList<>(HoodieTableMetadataUtil.getWritePartitionPaths(
timeline.findInstantsBeforeOrEquals(timestamp).getInstants().stream().map(instant
-> {
try {
return TimelineUtils.getCommitMetadata(instant,
timeline);
@@ -67,7 +67,7 @@ public abstract class HudiPartitionProcessor {
}
public List<String> getPartitionNamesInRange(HoodieTimeline timeline,
String startTimestamp, String endTimestamp) {
- return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths(
+ return new ArrayList<>(HoodieTableMetadataUtil.getWritePartitionPaths(
timeline.findInstantsInRange(startTimestamp,
endTimestamp).getInstants().stream().map(instant -> {
try {
return TimelineUtils.getCommitMetadata(instant,
timeline);
@@ -101,8 +101,10 @@ public abstract class HudiPartitionProcessor {
} else {
// If the partition column size is not equal to the partition
fragments size
// and the partition column size > 1, we do not know how to
map the partition
- // fragments to the partition columns and therefore return an
empty tuple. We don't
- // fail outright so that in some cases we can fallback to
reading the table as non-partitioned
+ // fragments to the partition columns and therefore return an
empty tuple. We
+ // don't
+ // fail outright so that in some cases we can fallback to
reading the table as
+ // non-partitioned
// one
throw new RuntimeException("Failed to parse partition values
of path: " + partitionPath);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index a8f2a362bfd..a73a2065d0f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileSplit;
@@ -37,6 +38,7 @@ import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
@@ -48,8 +50,6 @@ import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
@@ -62,6 +62,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -87,7 +89,7 @@ public class HudiScanNode extends HiveScanNode {
private static final Logger LOG = LogManager.getLogger(HudiScanNode.class);
- private boolean isCowOrRoTable;
+ private boolean isCowTable;
private final AtomicLong noLogsSplitNum = new AtomicLong(0);
@@ -113,19 +115,23 @@ public class HudiScanNode extends HiveScanNode {
private boolean incrementalRead = false;
private TableScanParams scanParams;
private IncrementalRelation incrementalRelation;
+ private SessionVariable sessionVariable;
/**
* External file scan node for Query Hudi table
- * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check
column priv
+ * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check
column
+ * priv
* eg: s3 tvf
- * These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
+ * These scan nodes do not have corresponding catalog/database/table info,
so no
+ * need to do priv check
*/
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv,
- Optional<TableScanParams> scanParams,
Optional<IncrementalRelation> incrementalRelation) {
+ Optional<TableScanParams> scanParams,
Optional<IncrementalRelation> incrementalRelation,
+ SessionVariable sessionVariable) {
super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE,
needCheckColumnPriv);
- isCowOrRoTable = hmsTable.isHoodieCowTable();
+ isCowTable = hmsTable.isHoodieCowTable();
if (LOG.isDebugEnabled()) {
- if (isCowOrRoTable) {
+ if (isCowTable) {
LOG.debug("Hudi table {} can read as cow/read optimize table",
hmsTable.getFullQualifiers());
} else {
LOG.debug("Hudi table {} is a mor table, and will use JNI to
read data in BE",
@@ -136,11 +142,12 @@ public class HudiScanNode extends HiveScanNode {
this.scanParams = scanParams.orElse(null);
this.incrementalRelation = incrementalRelation.orElse(null);
this.incrementalRead = (this.scanParams != null &&
this.scanParams.incrementalRead());
+ this.sessionVariable = sessionVariable;
}
@Override
public TFileFormatType getFileFormatType() throws UserException {
- if (isCowOrRoTable) {
+ if (canUseNativeReader()) {
return super.getFileFormatType();
} else {
// Use jni to read hudi table in BE
@@ -185,13 +192,13 @@ public class HudiScanNode extends HiveScanNode {
throw new UserException("Not support function '" +
scanParams.getParamType() + "' in hudi table");
}
if (incrementalRead) {
- if (isCowOrRoTable) {
+ if (isCowTable) {
try {
Map<String, String> serd =
hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
if ("true".equals(serd.get("hoodie.query.as.ro.table"))
&&
hmsTable.getRemoteTable().getTableName().endsWith("_ro")) {
// Incremental read RO table as RT table, I don't know
why?
- isCowOrRoTable = false;
+ isCowTable = false;
LOG.warn("Execute incremental read on RO table: {}",
hmsTable.getFullQualifiers());
}
} catch (Exception e) {
@@ -236,7 +243,22 @@ public class HudiScanNode extends HiveScanNode {
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (split instanceof HudiSplit) {
- setHudiParams(rangeDesc, (HudiSplit) split);
+ HudiSplit hudiSplit = (HudiSplit) split;
+ if (rangeDesc.getFormatType() == TFileFormatType.FORMAT_JNI
+ && !sessionVariable.isForceJniScanner()
+ && hudiSplit.getHudiDeltaLogs().isEmpty()) {
+ // no logs, is read optimize table, fallback to use native
reader
+ String fileFormat =
FileFormatUtils.getFileFormatBySuffix(hudiSplit.getDataFilePath())
+ .orElse("Unknown");
+ if (fileFormat.equals("parquet")) {
+ rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
+ } else if (fileFormat.equals("orc")) {
+ rangeDesc.setFormatType(TFileFormatType.FORMAT_ORC);
+ } else {
+ throw new RuntimeException("Unsupported file format: " +
fileFormat);
+ }
+ }
+ setHudiParams(rangeDesc, hudiSplit);
}
}
@@ -255,10 +277,15 @@ public class HudiScanNode extends HiveScanNode {
fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes());
// TODO(gaoxin): support complex types
// fileDesc.setNestedFields(hudiSplit.getNestedFields());
+ fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner());
tableFormatFileDesc.setHudiParams(fileDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
+ private boolean canUseNativeReader() {
+ return !sessionVariable.isForceJniScanner() && isCowTable;
+ }
+
private List<HivePartition> getPrunedPartitions(
HoodieTableMetaClient metaClient, Option<String>
snapshotTimestamp) throws AnalysisException {
List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
@@ -304,7 +331,8 @@ public class HudiScanNode extends HiveScanNode {
}
}
}
- // unpartitioned table, create a dummy partition to save location and
inputformat,
+ // unpartitioned table, create a dummy partition to save location and
+ // inputformat,
// so that we can unify the interface.
HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(),
hmsTable.getName(), true,
hmsTable.getRemoteTable().getSd().getInputFormat(),
@@ -315,7 +343,7 @@ public class HudiScanNode extends HiveScanNode {
}
private List<Split> getIncrementalSplits() {
- if (isCowOrRoTable) {
+ if (canUseNativeReader()) {
List<Split> splits = incrementalRelation.collectSplits();
noLogsSplitNum.addAndGet(splits.size());
return splits;
@@ -336,15 +364,15 @@ public class HudiScanNode extends HiveScanNode {
globPath = hudiClient.getBasePathV2().toString() + "/*";
} else {
partitionName =
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
- new Path(partition.getPath()));
+ new StoragePath(partition.getPath()));
globPath = String.format("%s/%s/*",
hudiClient.getBasePathV2().toString(), partitionName);
}
- List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(
- hudiClient.getRawFs(), new Path(globPath));
+ List<StoragePathInfo> statuses =
FSUtils.getGlobStatusExcludingMetaFolder(
+ hudiClient.getRawHoodieStorage(), new StoragePath(globPath));
HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(hudiClient,
- timeline, statuses.toArray(new FileStatus[0]));
+ timeline, statuses);
- if (isCowOrRoTable) {
+ if (canUseNativeReader()) {
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName,
queryInstant).forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
@@ -473,9 +501,9 @@ public class HudiScanNode extends HiveScanNode {
fileSlice.getPartitionPath();
List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath)
- .map(Path::toString)
+ .map(StoragePath::toString)
.collect(Collectors.toList());
- if (logs.isEmpty()) {
+ if (logs.isEmpty() && !sessionVariable.isForceJniScanner()) {
noLogsSplitNum.incrementAndGet();
}
@@ -492,6 +520,7 @@ public class HudiScanNode extends HiveScanNode {
split.setHudiColumnNames(columnNames);
split.setHudiColumnTypes(columnTypes);
split.setInstantTime(queryInstant);
+ split.setHudiJniScanner(sessionVariable.getHudiJniScanner());
return split;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
index c72f7621fea..2270d201793 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
@@ -40,6 +40,5 @@ public class HudiSplit extends FileSplit {
private List<String> hudiColumnNames;
private List<String> hudiColumnTypes;
private List<String> nestedFields;
+ private String hudiJniScanner;
}
-
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
index c06fcc2a578..7df01359922 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
@@ -20,9 +20,7 @@ package org.apache.doris.datasource.hudi.source;
import org.apache.doris.spi.Split;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.GlobPattern;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -34,6 +32,8 @@ import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.storage.StoragePathInfo;
import java.io.IOException;
import java.util.ArrayList;
@@ -54,7 +54,7 @@ public class MORIncrementalRelation implements
IncrementalRelation {
private final boolean endInstantArchived;
private final List<HoodieInstant> includedCommits;
private final List<HoodieCommitMetadata> commitsMetadata;
- private final FileStatus[] affectedFilesInCommits;
+ private final List<StoragePathInfo> affectedFilesInCommits;
private final boolean fullTableScan;
private final String globPattern;
private final boolean includeStartTime;
@@ -96,7 +96,7 @@ public class MORIncrementalRelation implements
IncrementalRelation {
includedCommits = getIncludedCommits();
commitsMetadata = getCommitsMetadata();
affectedFilesInCommits =
HoodieInputFormatUtils.listAffectedFilesForCommits(configuration,
- new Path(metaClient.getBasePath()), commitsMetadata);
+ metaClient.getBasePathV2(), commitsMetadata);
fullTableScan = shouldFullTableScan();
if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME
&& fullTableScan) {
throw new HoodieException("Cannot use stateTransitionTime while
enables full table scan");
@@ -152,8 +152,8 @@ public class MORIncrementalRelation implements
IncrementalRelation {
if (should) {
return true;
}
- for (FileStatus fileStatus : affectedFilesInCommits) {
- if (!metaClient.getFs().exists(fileStatus.getPath())) {
+ for (StoragePathInfo fileStatus : affectedFilesInCommits) {
+ if
(!metaClient.getRawHoodieStorage().exists(fileStatus.getPath())) {
return true;
}
}
@@ -199,7 +199,7 @@ public class MORIncrementalRelation implements
IncrementalRelation {
String latestCommit = includedCommits.get(includedCommits.size() -
1).getTimestamp();
HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, scanTimeline,
affectedFilesInCommits);
- Stream<FileSlice> fileSlices =
HoodieInputFormatUtils.getWritePartitionPaths(commitsMetadata)
+ Stream<FileSlice> fileSlices =
HoodieTableMetadataUtil.getWritePartitionPaths(commitsMetadata)
.stream().flatMap(relativePartitionPath ->
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath,
latestCommit));
if ("".equals(globPattern)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 59f51c8425c..bf917804fb7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -105,9 +105,9 @@ public class PaimonScanNode extends FileQueryScanNode {
private String serializedTable;
public PaimonScanNode(PlanNodeId id,
- TupleDescriptor desc,
- boolean needCheckColumnPriv,
- SessionVariable sessionVariable) {
+ TupleDescriptor desc,
+ boolean needCheckColumnPriv,
+ SessionVariable sessionVariable) {
super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE,
needCheckColumnPriv);
this.sessionVariable = sessionVariable;
}
@@ -127,8 +127,7 @@ public class PaimonScanNode extends FileQueryScanNode {
predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts);
}
- private static final Base64.Encoder BASE64_ENCODER =
- java.util.Base64.getUrlEncoder().withoutPadding();
+ private static final Base64.Encoder BASE64_ENCODER =
java.util.Base64.getUrlEncoder().withoutPadding();
public static <T> String encodeObjectToString(T t) {
try {
@@ -156,11 +155,24 @@ public class PaimonScanNode extends FileQueryScanNode {
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
TPaimonFileDesc fileDesc = new TPaimonFileDesc();
org.apache.paimon.table.source.Split split = paimonSplit.getSplit();
+
+ String fileFormat = getFileFormat(paimonSplit.getPathString());
if (split != null) {
// use jni reader
+ rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI);
fileDesc.setPaimonSplit(encodeObjectToString(split));
+ } else {
+ // use native reader
+ if (fileFormat.equals("orc")) {
+ rangeDesc.setFormatType(TFileFormatType.FORMAT_ORC);
+ } else if (fileFormat.equals("parquet")) {
+ rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
+ } else {
+ throw new RuntimeException("Unsupported file format: " +
fileFormat);
+ }
}
- fileDesc.setFileFormat(getFileFormat(paimonSplit.getPathString()));
+
+ fileDesc.setFileFormat(fileFormat);
fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot ->
slot.getColumn().getName())
.collect(Collectors.joining(",")));
@@ -172,7 +184,8 @@ public class PaimonScanNode extends FileQueryScanNode {
fileDesc.setTblId(source.getTargetTable().getId());
fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
fileDesc.setPaimonTable(encodeObjectToString(source.getPaimonTable()));
- // The hadoop conf should be same with
PaimonExternalCatalog.createCatalog()#getConfiguration()
+ // The hadoop conf should be same with
+ // PaimonExternalCatalog.createCatalog()#getConfiguration()
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties());
Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();
if (optDeletionFile.isPresent()) {
@@ -190,8 +203,8 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public List<Split> getSplits() throws UserException {
boolean forceJniScanner = sessionVariable.isForceJniScanner();
- SessionVariable.IgnoreSplitType ignoreSplitType =
-
SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType());
+ SessionVariable.IgnoreSplitType ignoreSplitType =
SessionVariable.IgnoreSplitType
+ .valueOf(sessionVariable.getIgnoreSplitType());
List<Split> splits = new ArrayList<>();
int[] projected = desc.getSlots().stream().mapToInt(
slot ->
(source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName())))
@@ -288,7 +301,8 @@ public class PaimonScanNode extends FileQueryScanNode {
}
this.selectedPartitionNum = selectedPartitionValues.size();
// TODO: get total partition number
- // We should set fileSplitSize at the end because fileSplitSize may be
modified in splitFile.
+ // We should set fileSplitSize at the end because fileSplitSize may be
modified
+ // in splitFile.
splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}
@@ -318,8 +332,9 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public List<String> getPathPartitionKeys() throws DdlException,
MetaNotFoundException {
- // return new
ArrayList<>(source.getPaimonTable().partitionKeys());
- //Paymon is not aware of partitions and bypasses some existing logic
by returning an empty list
+ // return new ArrayList<>(source.getPaimonTable().partitionKeys());
+ // Paymon is not aware of partitions and bypasses some existing logic
by
+ // returning an empty list
return new ArrayList<>();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 268e4effe30..23f952d8d64 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -676,7 +676,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
+ " for Hudi table");
PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(),
tupleDescriptor, false,
- hudiScan.getScanParams(), hudiScan.getIncrementalRelation());
+ hudiScan.getScanParams(), hudiScan.getIncrementalRelation(),
ConnectContext.get().getSessionVariable());
if (fileScan.getTableSnapshot().isPresent()) {
((FileQueryScanNode)
scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 3b2bbe7fc67..0acdfd67bce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1969,7 +1969,7 @@ public class SingleNodePlanner {
+ "please set enable_nereids_planner =
true to enable new optimizer");
}
scanNode = new HudiScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true,
- Optional.empty(), Optional.empty());
+ Optional.empty(), Optional.empty(),
ConnectContext.get().getSessionVariable());
break;
case ICEBERG:
scanNode = new IcebergScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8c01502c5a2..dabb8817b23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -602,6 +602,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
+ public static final String HUDI_JNI_SCANNER = "hudi_jni_scanner";
+
public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE =
"enable_count_push_down_for_external_table";
public static final String SHOW_ALL_FE_CONNECTION =
"show_all_fe_connection";
@@ -1967,6 +1969,10 @@ public class SessionVariable implements Serializable,
Writable {
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read
external table"})
private boolean forceJniScanner = false;
+ @VariableMgr.VarAttr(name = HUDI_JNI_SCANNER, description = { "使用那种hudi
jni scanner, 'hadoop' 或 'spark'",
+ "Which hudi jni scanner to use, 'hadoop' or 'spark'" })
+ private String hudiJniScanner = "spark";
+
@VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE,
description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown
optimization for external table"})
private boolean enableCountPushDownForExternalTable = true;
@@ -4342,6 +4348,10 @@ public class SessionVariable implements Serializable,
Writable {
return forceJniScanner;
}
+ public String getHudiJniScanner() {
+ return hudiJniScanner;
+ }
+
public String getIgnoreSplitType() {
return ignoreSplitType;
}
@@ -4362,6 +4372,10 @@ public class SessionVariable implements Serializable,
Writable {
forceJniScanner = force;
}
+ public void setHudiJniScanner(String hudiJniScanner) {
+ this.hudiJniScanner = hudiJniScanner;
+ }
+
public boolean isEnableCountPushDownForExternalTable() {
return enableCountPushDownForExternalTable;
}
diff --git a/fe/pom.xml b/fe/pom.xml
index 67dabc9b45a..6c605c5e63e 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -320,7 +320,7 @@ under the License.
<avro.version>1.11.4</avro.version>
<arrow.version>17.0.0</arrow.version>
<!-- hudi -->
- <hudi.version>0.14.1</hudi.version>
+ <hudi.version>0.15.0</hudi.version>
<presto.hadoop.version>2.7.4-11</presto.hadoop.version>
<presto.hive.version>3.0.0-8</presto.hive.version>
@@ -369,7 +369,7 @@ under the License.
<arrow.vector.classifier>shade-format-flatbuffers</arrow.vector.classifier>
<flatbuffers.version>1.12.0</flatbuffers.version>
<jacoco.version>0.8.10</jacoco.version>
- <airlift.version>202</airlift.version>
+ <airlift.concurrent.version>202</airlift.concurrent.version>
<semver4j.version>5.3.0</semver4j.version>
<aliyun-sdk-oss.version>3.15.0</aliyun-sdk-oss.version>
</properties>
@@ -1572,7 +1572,7 @@ under the License.
<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
- <version>${airlift.version}</version>
+ <version>${airlift.concurrent.version}</version>
</dependency>
<dependency>
<groupId>org.semver4j</groupId>
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 9ded9a31ead..a7f6d9ede24 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -337,7 +337,6 @@ struct TMaxComputeFileDesc {
1: optional string partition_spec // deprecated
2: optional string session_id
3: optional string table_batch_read_session
-
}
struct THudiFileDesc {
@@ -351,6 +350,7 @@ struct THudiFileDesc {
8: optional list<string> column_names;
9: optional list<string> column_types;
10: optional list<string> nested_fields;
+ 11: optional string hudi_jni_scanner;
}
struct TTransactionalHiveDeleteDeltaDesc {
@@ -380,6 +380,7 @@ enum TTextSerdeType {
struct TFileScanRangeParams {
// deprecated, move to TFileScanRange
1: optional Types.TFileType file_type;
+ // deprecated, move to TFileScanRange
2: optional TFileFormatType format_type;
// deprecated, move to TFileScanRange
3: optional TFileCompressType compress_type;
@@ -454,6 +455,7 @@ struct TFileRangeDesc {
// for hive table, different files may have different fs,
// so fs_name should be with TFileRangeDesc
12: optional string fs_name
+ 13: optional TFileFormatType format_type;
}
struct TSplitSource {
diff --git
a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out
b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out
index b1bdad85013..50644f34961 100644
--- a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out
+++ b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out
@@ -347,3 +347,177 @@
-- !incremental_9_10 --
1000
+-- !incremental_1_end --
+9000
+
+-- !incremental_earliest_1 --
+1000
+
+-- !incremental_2_end --
+8000
+
+-- !incremental_earliest_2 --
+2000
+
+-- !incremental_1_2 --
+1000
+
+-- !incremental_3_end --
+7000
+
+-- !incremental_earliest_3 --
+3000
+
+-- !incremental_2_3 --
+1000
+
+-- !incremental_4_end --
+6000
+
+-- !incremental_earliest_4 --
+4000
+
+-- !incremental_3_4 --
+1000
+
+-- !incremental_5_end --
+5000
+
+-- !incremental_earliest_5 --
+5000
+
+-- !incremental_4_5 --
+1000
+
+-- !incremental_6_end --
+4000
+
+-- !incremental_earliest_6 --
+6000
+
+-- !incremental_5_6 --
+1000
+
+-- !incremental_7_end --
+3000
+
+-- !incremental_earliest_7 --
+7000
+
+-- !incremental_6_7 --
+1000
+
+-- !incremental_8_end --
+2000
+
+-- !incremental_earliest_8 --
+8000
+
+-- !incremental_7_8 --
+1000
+
+-- !incremental_9_end --
+1000
+
+-- !incremental_earliest_9 --
+9000
+
+-- !incremental_8_9 --
+1000
+
+-- !incremental_10_end --
+0
+
+-- !incremental_earliest_10 --
+10000
+
+-- !incremental_9_10 --
+1000
+
+-- !incremental_1_end --
+9000
+
+-- !incremental_earliest_1 --
+1000
+
+-- !incremental_2_end --
+8000
+
+-- !incremental_earliest_2 --
+2000
+
+-- !incremental_1_2 --
+1000
+
+-- !incremental_3_end --
+7000
+
+-- !incremental_earliest_3 --
+3000
+
+-- !incremental_2_3 --
+1000
+
+-- !incremental_4_end --
+6000
+
+-- !incremental_earliest_4 --
+4000
+
+-- !incremental_3_4 --
+1000
+
+-- !incremental_5_end --
+5000
+
+-- !incremental_earliest_5 --
+5000
+
+-- !incremental_4_5 --
+1000
+
+-- !incremental_6_end --
+4000
+
+-- !incremental_earliest_6 --
+6000
+
+-- !incremental_5_6 --
+1000
+
+-- !incremental_7_end --
+3000
+
+-- !incremental_earliest_7 --
+7000
+
+-- !incremental_6_7 --
+1000
+
+-- !incremental_8_end --
+2000
+
+-- !incremental_earliest_8 --
+8000
+
+-- !incremental_7_8 --
+1000
+
+-- !incremental_9_end --
+1000
+
+-- !incremental_earliest_9 --
+9000
+
+-- !incremental_8_9 --
+1000
+
+-- !incremental_10_end --
+0
+
+-- !incremental_earliest_10 --
+10000
+
+-- !incremental_9_10 --
+1000
+
diff --git
a/regression-test/data/external_table_p2/hudi/test_hudi_orc_tables.out
b/regression-test/data/external_table_p2/hudi/test_hudi_orc_tables.out
new file mode 100644
index 00000000000..9e28074dc91
--- /dev/null
+++ b/regression-test/data/external_table_p2/hudi/test_hudi_orc_tables.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !cow --
+20241204190011744 20241204190011744_0_6 20241204190011744_0_0
a99e363a-6c10-40f3-a675-9117506d1a43-0_0-38-94_20241204190011744.orc 1
A
+20241204190011744 20241204190011744_0_7 20241204190011744_2_0
a99e363a-6c10-40f3-a675-9117506d1a43-0_0-38-94_20241204190011744.orc 3
C
+20241204190011744 20241204190011744_0_8 20241204190011744_4_0
a99e363a-6c10-40f3-a675-9117506d1a43-0_0-38-94_20241204190011744.orc 5
E
+20241204190011744 20241204190011744_0_9 20241204190011744_1_0
a99e363a-6c10-40f3-a675-9117506d1a43-0_0-38-94_20241204190011744.orc 2
B
+20241204190011744 20241204190011744_0_10 20241204190011744_3_0
a99e363a-6c10-40f3-a675-9117506d1a43-0_0-38-94_20241204190011744.orc 4
D
+
+-- !mor --
+20241204190002046 20241204190002046_0_11 20241204190002046_0_0
b1e68412-01d6-467f-b4c2-b4b18ec71346-0_0-30-75_20241204190002046.orc 1
A
+20241204190002046 20241204190002046_0_12 20241204190002046_2_0
b1e68412-01d6-467f-b4c2-b4b18ec71346-0_0-30-75_20241204190002046.orc 3
C
+20241204190002046 20241204190002046_0_13 20241204190002046_4_0
b1e68412-01d6-467f-b4c2-b4b18ec71346-0_0-30-75_20241204190002046.orc 5
E
+20241204190002046 20241204190002046_0_14 20241204190002046_1_0
b1e68412-01d6-467f-b4c2-b4b18ec71346-0_0-30-75_20241204190002046.orc 2
B
+20241204190002046 20241204190002046_0_15 20241204190002046_3_0
b1e68412-01d6-467f-b4c2-b4b18ec71346-0_0-30-75_20241204190002046.orc 4
D
+
diff --git
a/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out
b/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out
index 12dd0cf086d..da7273d4c14 100644
--- a/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out
+++ b/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out
@@ -31,3 +31,35 @@
20241118012149007 20241118012149007_0_4 5
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet
5 Eva {"age":31.5, "address":"Chengdu"}
20241118012149007 20241118012149007_0_5 6
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet
6 Frank {"age":29.2, "address":"Wuhan"}
+-- !adding_simple_columns_table --
+20241118012126237 20241118012126237_0_1 1
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet
1 Alice \N
+20241118012126237 20241118012126237_0_0 2
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet
2 Bob \N
+20241118012126237 20241118012126237_0_2 3
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet
3 Cathy \N
+20241118012132306 20241118012132306_0_3 4
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet
4 David 25
+20241118012132306 20241118012132306_0_4 5
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet
5 Eva 30
+20241118012132306 20241118012132306_0_5 6
5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet
6 Frank 28
+
+-- !altering_simple_columns_table --
+20241118012136512 20241118012136512_0_0 1
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet
1 Alice 25.0
+20241118012136512 20241118012136512_0_2 2
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet
2 Bob 30.0
+20241118012136512 20241118012136512_0_1 3
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet
3 Cathy 28.0
+20241118012138287 20241118012138287_0_3 4
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet
4 David 26.0
+20241118012138287 20241118012138287_0_4 5
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet
5 Eva 31.5
+20241118012138287 20241118012138287_0_5 6
203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet
6 Frank 29.2
+
+-- !adding_complex_columns_table --
+20241118012144831 20241118012144831_0_1 1
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet
1 Alice {"age":25, "address":"Guangzhou", "email":null}
+20241118012144831 20241118012144831_0_0 2
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet
2 Bob {"age":30, "address":"Shanghai", "email":null}
+20241118012144831 20241118012144831_0_2 3
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet
3 Cathy {"age":28, "address":"Beijing", "email":null}
+20241118012146150 20241118012146150_0_3 4
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet
4 David {"age":25, "address":"Shenzhen", "email":"[email protected]"}
+20241118012146150 20241118012146150_0_4 5
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet
5 Eva {"age":30, "address":"Chengdu", "email":"[email protected]"}
+20241118012146150 20241118012146150_0_5 6
3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet
6 Frank {"age":28, "address":"Wuhan", "email":"[email protected]"}
+
+-- !altering_complex_columns_table --
+20241118012147879 20241118012147879_0_0 1
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet
1 Alice {"age":25, "address":"Guangzhou"}
+20241118012147879 20241118012147879_0_2 2
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet
2 Bob {"age":30, "address":"Shanghai"}
+20241118012147879 20241118012147879_0_1 3
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet
3 Cathy {"age":28, "address":"Beijing"}
+20241118012149007 20241118012149007_0_3 4
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet
4 David {"age":26, "address":"Shenzhen"}
+20241118012149007 20241118012149007_0_4 5
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet
5 Eva {"age":31.5, "address":"Chengdu"}
+20241118012149007 20241118012149007_0_5 6
185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet
6 Frank {"age":29.2, "address":"Wuhan"}
+
diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out
b/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out
index efad67ffbfa..1e151c2a86f 100644
Binary files
a/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out and
b/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out differ
diff --git
a/regression-test/data/external_table_p2/hudi/test_hudi_timestamp.out
b/regression-test/data/external_table_p2/hudi/test_hudi_timestamp.out
index dc47ff86d90..9bdb0f7cb72 100644
--- a/regression-test/data/external_table_p2/hudi/test_hudi_timestamp.out
+++ b/regression-test/data/external_table_p2/hudi/test_hudi_timestamp.out
@@ -1,6 +1,31 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
--- !timestamp --
+-- !timestamp1 --
20241115015956800 20241115015956800_0_2 1
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1
Alice 2024-10-25T08:00
-20241115015956800 20241115015956800_0_0 2
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2
Bob 2024-10-25T09:30:00
-20241115015956800 20241115015956800_0_1 3
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3
Charlie 2024-10-25T11:00:00
+20241115015956800 20241115015956800_0_0 2
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2
Bob 2024-10-25T09:30
+20241115015956800 20241115015956800_0_1 3
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3
Charlie 2024-10-25T11:00
+
+-- !timestamp2 --
+20241115015956800 20241115015956800_0_2 1
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1
Alice 2024-10-25T23:00
+20241115015956800 20241115015956800_0_0 2
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2
Bob 2024-10-26T00:30
+20241115015956800 20241115015956800_0_1 3
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3
Charlie 2024-10-26T02:00
+
+-- !timestamp3 --
+20241115015956800 20241115015956800_0_2 1
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1
Alice 2024-10-25T15:00
+20241115015956800 20241115015956800_0_0 2
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2
Bob 2024-10-25T16:30
+20241115015956800 20241115015956800_0_1 3
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3
Charlie 2024-10-25T18:00
+
+-- !timestamp1 --
+20241115015956800 20241115015956800_0_2 1
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1
Alice 2024-10-25T08:00
+20241115015956800 20241115015956800_0_0 2
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2
Bob 2024-10-25T09:30
+20241115015956800 20241115015956800_0_1 3
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3
Charlie 2024-10-25T11:00
+
+-- !timestamp2 --
+20241115015956800 20241115015956800_0_2 1
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1
Alice 2024-10-25T23:00
+20241115015956800 20241115015956800_0_0 2
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2
Bob 2024-10-26T00:30
+20241115015956800 20241115015956800_0_1 3
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3
Charlie 2024-10-26T02:00
+
+-- !timestamp3 --
+20241115015956800 20241115015956800_0_2 1
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 1
Alice 2024-10-25T15:00
+20241115015956800 20241115015956800_0_0 2
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 2
Bob 2024-10-25T16:30
+20241115015956800 20241115015956800_0_1 3
eec4913a-0d5f-4b8b-a0f5-934e252c2e45-0_0-7-14_20241115015956800.parquet 3
Charlie 2024-10-25T18:00
diff --git
a/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out
b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out
index a9b5d23595a..00d15805baf 100644
--- a/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out
+++ b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out
@@ -119,3 +119,123 @@
-- !timetravel10 --
10000
+-- !timetravel1 --
+1000
+
+-- !timetravel2 --
+2000
+
+-- !timetravel3 --
+3000
+
+-- !timetravel4 --
+4000
+
+-- !timetravel5 --
+5000
+
+-- !timetravel6 --
+6000
+
+-- !timetravel7 --
+7000
+
+-- !timetravel8 --
+8000
+
+-- !timetravel9 --
+9000
+
+-- !timetravel10 --
+10000
+
+-- !timetravel1 --
+1000
+
+-- !timetravel2 --
+2000
+
+-- !timetravel3 --
+3000
+
+-- !timetravel4 --
+4000
+
+-- !timetravel5 --
+5000
+
+-- !timetravel6 --
+6000
+
+-- !timetravel7 --
+7000
+
+-- !timetravel8 --
+8000
+
+-- !timetravel9 --
+9000
+
+-- !timetravel10 --
+10000
+
+-- !timetravel1 --
+1000
+
+-- !timetravel2 --
+2000
+
+-- !timetravel3 --
+3000
+
+-- !timetravel4 --
+4000
+
+-- !timetravel5 --
+5000
+
+-- !timetravel6 --
+6000
+
+-- !timetravel7 --
+7000
+
+-- !timetravel8 --
+8000
+
+-- !timetravel9 --
+9000
+
+-- !timetravel10 --
+10000
+
+-- !timetravel1 --
+1000
+
+-- !timetravel2 --
+2000
+
+-- !timetravel3 --
+3000
+
+-- !timetravel4 --
+4000
+
+-- !timetravel5 --
+5000
+
+-- !timetravel6 --
+6000
+
+-- !timetravel7 --
+7000
+
+-- !timetravel8 --
+8000
+
+-- !timetravel9 --
+9000
+
+-- !timetravel10 --
+10000
+
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
index f2082ef89c7..149eecf5817 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
@@ -36,4 +36,4 @@ suite("test_hudi_catalog",
"p2,external,hudi,external_remote,external_remote_hud
def tables = sql """ show tables; """
assertTrue(tables.size() > 0)
sql """drop catalog if exists ${catalog_name};"""
-}
\ No newline at end of file
+}
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
index 8cc1d2a852b..885903646cc 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
@@ -60,7 +60,6 @@ suite("test_hudi_incremental",
"p2,external,hudi,external_remote,external_remote
"20241114152009764",
"20241114152011901",
]
- test_hudi_incremental_querys("user_activity_log_cow_non_partition",
timestamps_cow_non_partition)
// spark-sql "select distinct _hoodie_commit_time from
user_activity_log_cow_partition order by _hoodie_commit_time;"
def timestamps_cow_partition = [
@@ -75,7 +74,6 @@ suite("test_hudi_incremental",
"p2,external,hudi,external_remote,external_remote
"20241114152147114",
"20241114152156417",
]
- test_hudi_incremental_querys("user_activity_log_cow_partition",
timestamps_cow_partition)
// spark-sql "select distinct _hoodie_commit_time from
user_activity_log_mor_non_partition order by _hoodie_commit_time;"
def timestamps_mor_non_partition = [
@@ -90,7 +88,6 @@ suite("test_hudi_incremental",
"p2,external,hudi,external_remote,external_remote
"20241114152028770",
"20241114152030746",
]
- test_hudi_incremental_querys("user_activity_log_mor_non_partition",
timestamps_mor_non_partition)
// spark-sql "select distinct _hoodie_commit_time from
user_activity_log_mor_partition order by _hoodie_commit_time;"
def timestamps_mor_partition = [
@@ -105,7 +102,18 @@ suite("test_hudi_incremental",
"p2,external,hudi,external_remote,external_remote
"20241114152323587",
"20241114152334111",
]
+
+ test_hudi_incremental_querys("user_activity_log_cow_non_partition",
timestamps_cow_non_partition)
+ test_hudi_incremental_querys("user_activity_log_cow_partition",
timestamps_cow_partition)
+ test_hudi_incremental_querys("user_activity_log_mor_non_partition",
timestamps_mor_non_partition)
+ test_hudi_incremental_querys("user_activity_log_mor_partition",
timestamps_mor_partition)
+ sql """set force_jni_scanner=true;"""
+ // don't support incremental query for cow table by jni reader
+ // test_hudi_incremental_querys("user_activity_log_cow_non_partition",
timestamps_cow_non_partition)
+ // test_hudi_incremental_querys("user_activity_log_cow_partition",
timestamps_cow_partition)
+ test_hudi_incremental_querys("user_activity_log_mor_non_partition",
timestamps_mor_non_partition)
test_hudi_incremental_querys("user_activity_log_mor_partition",
timestamps_mor_partition)
+ // sql """set force_jni_scanner=false;"""
sql """drop catalog if exists ${catalog_name};"""
-}
\ No newline at end of file
+}
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_orc_tables.groovy
similarity index 84%
copy from regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
copy to
regression-test/suites/external_table_p2/hudi/test_hudi_orc_tables.groovy
index f2082ef89c7..43638a23881 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_orc_tables.groovy
@@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_hudi_catalog",
"p2,external,hudi,external_remote,external_remote_hudi") {
+suite("test_hudi_orc_tables",
"p2,external,hudi,external_remote,external_remote_hudi") {
String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("disable hudi test")
}
- String catalog_name = "test_hudi_catalog"
+ String catalog_name = "test_hudi_orc_tables"
String props = context.config.otherConfigs.get("hudiEmrCatalog")
sql """drop catalog if exists ${catalog_name};"""
sql """
@@ -33,7 +33,9 @@ suite("test_hudi_catalog",
"p2,external,hudi,external_remote,external_remote_hud
sql """ switch ${catalog_name};"""
sql """ use regression_hudi;"""
sql """ set enable_fallback_to_original_planner=false """
- def tables = sql """ show tables; """
- assertTrue(tables.size() > 0)
+
+ qt_cow """ select * from orc_hudi_table_cow; """
+ qt_mor """ select * from orc_hudi_table_mor; """
+
sql """drop catalog if exists ${catalog_name};"""
}
\ No newline at end of file
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
index b247aaf4924..0da88447cde 100644
---
a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
+++
b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
@@ -33,7 +33,18 @@ suite("test_hudi_schema_evolution",
"p2,external,hudi,external_remote,external_r
sql """ switch ${catalog_name};"""
sql """ use regression_hudi;"""
sql """ set enable_fallback_to_original_planner=false """
+
+ qt_adding_simple_columns_table """ select * from
adding_simple_columns_table order by id """
+ qt_altering_simple_columns_table """ select * from
altering_simple_columns_table order by id """
+ // qt_deleting_simple_columns_table """ select * from
deleting_simple_columns_table order by id """
+ // qt_renaming_simple_columns_table """ select * from
renaming_simple_columns_table order by id """
+ qt_adding_complex_columns_table """ select * from
adding_complex_columns_table order by id """
+ qt_altering_complex_columns_table """ select * from
altering_complex_columns_table order by id """
+ // qt_deleting_complex_columns_table """ select * from
deleting_complex_columns_table order by id """
+ // qt_renaming_complex_columns_table """ select * from
renaming_complex_columns_table order by id """
+
+ sql """set force_jni_scanner = true;"""
qt_adding_simple_columns_table """ select * from
adding_simple_columns_table order by id """
qt_altering_simple_columns_table """ select * from
altering_simple_columns_table order by id """
// qt_deleting_simple_columns_table """ select * from
deleting_simple_columns_table order by id """
@@ -43,6 +54,7 @@ suite("test_hudi_schema_evolution",
"p2,external,hudi,external_remote,external_r
qt_altering_complex_columns_table """ select * from
altering_complex_columns_table order by id """
// qt_deleting_complex_columns_table """ select * from
deleting_complex_columns_table order by id """
// qt_renaming_complex_columns_table """ select * from
renaming_complex_columns_table order by id """
+ sql """set force_jni_scanner = false;"""
sql """drop catalog if exists ${catalog_name};"""
-}
\ No newline at end of file
+}
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
index 53c09e6d5a9..89d89709b3c 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
@@ -64,7 +64,7 @@ suite("test_hudi_snapshot",
"p2,external,hudi,external_remote,external_remote_hu
qt_q09 """SELECT * FROM ${table_name} WHERE
struct_element(struct_element(address, 'coordinates'), 'latitude') BETWEEN 0
AND 100 AND struct_element(struct_element(address, 'coordinates'), 'longitude')
BETWEEN 0 AND 100 ORDER BY event_time LIMIT 5;"""
// Query records with ratings above a specific value and limit output
- qt_q10 """SELECT * FROM ${table_name} WHERE rating > 4.5 ORDER BY
rating DESC LIMIT 5;"""
+ qt_q10 """SELECT * FROM ${table_name} WHERE rating > 4.5 ORDER BY
event_time DESC LIMIT 5;"""
// Query all users' signup dates and limit output
qt_q11 """SELECT user_id, signup_date FROM ${table_name} ORDER BY
signup_date DESC LIMIT 10;"""
@@ -79,13 +79,20 @@ suite("test_hudi_snapshot",
"p2,external,hudi,external_remote,external_remote_hu
qt_q14 """SELECT * FROM ${table_name} WHERE signup_date = '2024-01-15'
ORDER BY user_id LIMIT 5;"""
// Query the total count of purchases for each user and limit output
- qt_q15 """SELECT user_id, array_size(purchases) AS purchase_count FROM
${table_name} ORDER BY purchase_count DESC LIMIT 5;"""
+ qt_q15 """SELECT user_id, array_size(purchases) AS purchase_count FROM
${table_name} ORDER BY user_id LIMIT 5;"""
}
+ test_hudi_snapshot_querys("user_activity_log_mor_non_partition")
+ test_hudi_snapshot_querys("user_activity_log_mor_partition")
test_hudi_snapshot_querys("user_activity_log_cow_non_partition")
test_hudi_snapshot_querys("user_activity_log_cow_partition")
+
+ sql """set force_jni_scanner=true;"""
test_hudi_snapshot_querys("user_activity_log_mor_non_partition")
test_hudi_snapshot_querys("user_activity_log_mor_partition")
+ test_hudi_snapshot_querys("user_activity_log_cow_non_partition")
+ test_hudi_snapshot_querys("user_activity_log_cow_partition")
+ sql """set force_jni_scanner=false;"""
sql """drop catalog if exists ${catalog_name};"""
-}
\ No newline at end of file
+}
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy
index c1ba630e4a7..3d7bd40b2d5 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy
@@ -34,8 +34,22 @@ suite("test_hudi_timestamp",
"p2,external,hudi,external_remote,external_remote_h
sql """ use regression_hudi;"""
sql """ set enable_fallback_to_original_planner=false """
- // TODO: fix hudi timezone issue and enable this
- // qt_timestamp """ select * from hudi_table_with_timestamp order by id;
"""
+ def test_timestamp_different_timezones = {
+ sql """set time_zone = 'America/Los_Angeles';"""
+ qt_timestamp1 """ select * from hudi_table_with_timestamp order by id;
"""
+ sql """set time_zone = 'Asia/Shanghai';"""
+ qt_timestamp2 """ select * from hudi_table_with_timestamp order by id;
"""
+ sql """set time_zone = 'UTC';"""
+ qt_timestamp3 """ select * from hudi_table_with_timestamp order by id;
"""
+ }
+
+ // test native reader
+ test_timestamp_different_timezones()
+ sql """ set force_jni_scanner = true; """
+ // test jni reader
+ test_timestamp_different_timezones()
+ sql """ set force_jni_scanner = false; """
+
sql """drop catalog if exists ${catalog_name};"""
}
@@ -59,4 +73,4 @@ suite("test_hudi_timestamp",
"p2,external,hudi,external_remote,external_remote_h
// INSERT OVERWRITE hudi_table_with_timestamp VALUES
// ('1', 'Alice', timestamp('2024-10-25 08:00:00')),
// ('2', 'Bob', timestamp('2024-10-25 09:30:00')),
-// ('3', 'Charlie', timestamp('2024-10-25 11:00:00'));
\ No newline at end of file
+// ('3', 'Charlie', timestamp('2024-10-25 11:00:00'));
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
index 4d458dc4381..cceeaa41220 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
@@ -54,7 +54,6 @@ suite("test_hudi_timetravel",
"p2,external,hudi,external_remote,external_remote_
"20241114152009764",
"20241114152011901",
]
- test_hudi_timetravel_querys("user_activity_log_cow_non_partition",
timestamps_cow_non_partition)
// spark-sql "select distinct _hoodie_commit_time from
user_activity_log_cow_partition order by _hoodie_commit_time;"
def timestamps_cow_partition = [
@@ -69,7 +68,6 @@ suite("test_hudi_timetravel",
"p2,external,hudi,external_remote,external_remote_
"20241114152147114",
"20241114152156417",
]
- test_hudi_timetravel_querys("user_activity_log_cow_partition",
timestamps_cow_partition)
// spark-sql "select distinct _hoodie_commit_time from
user_activity_log_mor_non_partition order by _hoodie_commit_time;"
def timestamps_mor_non_partition = [
@@ -84,7 +82,6 @@ suite("test_hudi_timetravel",
"p2,external,hudi,external_remote,external_remote_
"20241114152028770",
"20241114152030746",
]
- test_hudi_timetravel_querys("user_activity_log_mor_non_partition",
timestamps_mor_non_partition)
// spark-sql "select distinct _hoodie_commit_time from
user_activity_log_mor_partition order by _hoodie_commit_time;"
def timestamps_mor_partition = [
@@ -99,7 +96,17 @@ suite("test_hudi_timetravel",
"p2,external,hudi,external_remote,external_remote_
"20241114152323587",
"20241114152334111",
]
+
+ test_hudi_timetravel_querys("user_activity_log_cow_non_partition",
timestamps_cow_non_partition)
+ test_hudi_timetravel_querys("user_activity_log_cow_partition",
timestamps_cow_partition)
+ test_hudi_timetravel_querys("user_activity_log_mor_non_partition",
timestamps_mor_non_partition)
+ test_hudi_timetravel_querys("user_activity_log_mor_partition",
timestamps_mor_partition)
+ sql """set force_jni_scanner=true;"""
+ test_hudi_timetravel_querys("user_activity_log_cow_non_partition",
timestamps_cow_non_partition)
+ test_hudi_timetravel_querys("user_activity_log_cow_partition",
timestamps_cow_partition)
+ test_hudi_timetravel_querys("user_activity_log_mor_non_partition",
timestamps_mor_non_partition)
test_hudi_timetravel_querys("user_activity_log_mor_partition",
timestamps_mor_partition)
+ sql """set force_jni_scanner=false;"""
sql """drop catalog if exists ${catalog_name};"""
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]