This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 625961ee5ff branch-3.1: [fix](hudi) Remove spark hudi jni scanner
#50394 (#52222)
625961ee5ff is described below
commit 625961ee5ffa2f46b82bcff0a8f39841ef6b1b7d
Author: Socrates <[email protected]>
AuthorDate: Tue Jun 24 23:05:02 2025 +0800
branch-3.1: [fix](hudi) Remove spark hudi jni scanner #50394 (#52222)
bp: #50394
---
be/src/vec/exec/format/table/hudi_jni_reader.cpp | 11 +-
build.sh | 6 +-
fe/be-java-extensions/hudi-scanner/pom.xml | 275 --------
.../org/apache/doris/hudi/HudiColumnValue.java | 187 ------
.../java/org/apache/doris/hudi/HudiJniScanner.java | 229 -------
.../src/main/java/org/apache/doris/hudi/Utils.java | 89 ---
.../hudi-scanner/src/main/resources/package.xml | 41 --
.../org/apache/doris/hudi/BaseSplitReader.scala | 734 ---------------------
.../apache/doris/hudi/HoodieRecordIterator.scala | 178 -----
.../doris/hudi/MORIncrementalSplitReader.scala | 86 ---
.../apache/doris/hudi/MORSnapshotSplitReader.scala | 184 ------
.../org/apache/doris/hudi/HudiJniScannerTest.java | 31 -
fe/be-java-extensions/pom.xml | 1 -
.../doris/datasource/hudi/source/HudiScanNode.java | 2 -
.../doris/datasource/hudi/source/HudiSplit.java | 1 -
.../java/org/apache/doris/qe/SessionVariable.java | 14 -
gensrc/thrift/PlanNodes.thrift | 2 +-
.../hudi/test_hudi_incremental.groovy | 1 -
.../hudi/test_hudi_schema_evolution.groovy | 1 -
.../hudi/test_hudi_snapshot.groovy | 1 -
.../hudi/test_hudi_timetravel.groovy | 1 -
21 files changed, 5 insertions(+), 2070 deletions(-)
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
index eb88dda9512..f73f9a29225 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
@@ -72,15 +72,8 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams&
scan_params,
}
}
- if (_hudi_params.hudi_jni_scanner == "spark") {
- _jni_connector =
std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner",
- params,
required_fields);
- } else {
- // _hudi_params.hudi_jni_scanner == "hadoop"
- // and default use hadoop hudi jni scanner
- _jni_connector = std::make_unique<JniConnector>(
- "org/apache/doris/hudi/HadoopHudiJniScanner", params,
required_fields);
- }
+ _jni_connector =
std::make_unique<JniConnector>("org/apache/doris/hudi/HadoopHudiJniScanner",
+ params, required_fields);
}
Status HudiJniReader::get_next_block(Block* block, size_t* read_rows, bool*
eof) {
diff --git a/build.sh b/build.sh
index 2cf95d87326..594e990821f 100755
--- a/build.sh
+++ b/build.sh
@@ -66,7 +66,7 @@ Usage: $0 <options>
Environment variables:
USE_AVX2 If the CPU does not support AVX2 instruction
set, please set USE_AVX2=0. Default is ON.
STRIP_DEBUG_INFO If set STRIP_DEBUG_INFO=ON, the debug
information in the compiled binaries will be stored separately in the
'be/lib/debug_info' directory. Default is OFF.
- DISABLE_BE_JAVA_EXTENSIONS If set DISABLE_BE_JAVA_EXTENSIONS=ON, we will
do not build binary with java-udf,hudi-scanner,jdbc-scanner and so on Default
is OFF.
+ DISABLE_BE_JAVA_EXTENSIONS If set DISABLE_BE_JAVA_EXTENSIONS=ON, we will
do not build binary with java-udf,hadoop-hudi-scanner,jdbc-scanner and so on
Default is OFF.
DISABLE_JAVA_CHECK_STYLE If set DISABLE_JAVA_CHECK_STYLE=ON, it will
skip style check of java code in FE.
DISABLE_BUILD_AZURE If set DISABLE_BUILD_AZURE=ON, it will not
build azure into BE.
Eg.
@@ -83,7 +83,7 @@ Usage: $0 <options>
$0 --be --fe build Backend, Frontend, Spark Dpp
application and Java UDF library
$0 --be --coverage build Backend with coverage enabled
$0 --be --output PATH build Backend, the result will be
output to PATH(relative paths are available)
- $0 --be-extension-ignore avro-scanner build be-java-extensions, choose
which modules to ignore. Multiple modules separated by commas, like
--be-extension-ignore avro-scanner,hudi-scanner
+ $0 --be-extension-ignore avro-scanner build be-java-extensions, choose
which modules to ignore. Multiple modules separated by commas, like
--be-extension-ignore avro-scanner,hadoop-hudi-scanner
USE_AVX2=0 $0 --be build Backend and not using AVX2
instruction.
USE_AVX2=0 STRIP_DEBUG_INFO=ON $0 build all and not using AVX2
instruction, and strip the debug info for Backend
@@ -539,7 +539,6 @@ if [[ "${BUILD_HIVE_UDF}" -eq 1 ]]; then
fi
if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
modules+=("fe-common")
- modules+=("be-java-extensions/hudi-scanner")
modules+=("be-java-extensions/hadoop-hudi-scanner")
modules+=("be-java-extensions/java-common")
modules+=("be-java-extensions/java-udf")
@@ -847,7 +846,6 @@ EOF
extensions_modules=("java-udf")
extensions_modules+=("jdbc-scanner")
- extensions_modules+=("hudi-scanner")
extensions_modules+=("hadoop-hudi-scanner")
extensions_modules+=("paimon-scanner")
extensions_modules+=("trino-connector-scanner")
diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml
b/fe/be-java-extensions/hudi-scanner/pom.xml
deleted file mode 100644
index c8f56e55a83..00000000000
--- a/fe/be-java-extensions/hudi-scanner/pom.xml
+++ /dev/null
@@ -1,275 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
- <parent>
- <artifactId>be-java-extensions</artifactId>
- <groupId>org.apache.doris</groupId>
- <version>${revision}</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>hudi-scanner</artifactId>
-
- <properties>
- <doris.home>${basedir}/../../</doris.home>
- <fe_ut_parallel>1</fe_ut_parallel>
- <scala.version>2.12.15</scala.version>
- <scala.binary.version>2.12</scala.binary.version>
- <avro.version>1.11.3</avro.version>
- </properties>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>${avro.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-tools</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-spark-client</artifactId>
- <version>${hudi.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-spark-common_${scala.binary.version}</artifactId>
- <version>${hudi.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-spark3-common</artifactId>
- <version>${hudi.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hudi</groupId>
-
<artifactId>${hudi-spark.version}_${scala.binary.version}</artifactId>
- <version>${hudi.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>json4s-ast_2.11</artifactId>
- <groupId>org.json4s</groupId>
- </exclusion>
- <exclusion>
- <artifactId>json4s-core_2.11</artifactId>
- <groupId>org.json4s</groupId>
- </exclusion>
- <exclusion>
- <artifactId>json4s-jackson_2.11</artifactId>
- <groupId>org.json4s</groupId>
- </exclusion>
- <exclusion>
- <artifactId>json4s-scalap_2.11</artifactId>
- <groupId>org.json4s</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-avro</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.antlr</groupId>
- <artifactId>antlr4-runtime</artifactId>
- <version>${antlr4.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <exclusions>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>jackson-module-scala_2.12</artifactId>
- <groupId>com.fasterxml.jackson.module</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-client-api</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-client-runtime</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- </exclusions>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-launcher_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <!-- version of spark's jackson module is error -->
- <groupId>com.fasterxml.jackson.module</groupId>
-
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
- <version>${jackson.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.doris</groupId>
- <artifactId>java-common</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- <build>
- <finalName>hudi-scanner</finalName>
- <sourceDirectory>src/main/java</sourceDirectory>
- <testSourceDirectory>src/test/java</testSourceDirectory>
- <directory>${project.basedir}/target/</directory>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- <testResources>
- <testResource>
- <directory>src/test/resources</directory>
- </testResource>
- </testResources>
-
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>4.7.2</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <scalaVersion>${scala.version}</scalaVersion>
- <args>
- <arg>-unchecked</arg>
- <arg>-deprecation</arg>
- <arg>-feature</arg>
- </args>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <executions>
- <execution>
- <id>default-compile</id>
- <phase>none</phase>
- </execution>
- <execution>
- <id>default-testCompile</id>
- <phase>none</phase>
- </execution>
- <execution>
- <id>java-compile</id>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- <phase>compile</phase>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptors>
- <descriptor>src/main/resources/package.xml</descriptor>
- </descriptors>
- <archive>
- <manifest>
- <mainClass></mainClass>
- </manifest>
- </archive>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
deleted file mode 100644
index 1c489affe16..00000000000
---
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
+++ /dev/null
@@ -1,187 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi;
-
-import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ColumnValue;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.List;
-
-public class HudiColumnValue implements ColumnValue {
- private SpecializedGetters data;
- private int ordinal;
- private ColumnType columnType;
-
- HudiColumnValue() {
- }
-
- HudiColumnValue(SpecializedGetters data, int ordinal, ColumnType
columnType) {
- this.data = data;
- this.ordinal = ordinal;
- this.columnType = columnType;
- }
-
- public void reset(SpecializedGetters data, int ordinal, ColumnType
columnType) {
- this.data = data;
- this.ordinal = ordinal;
- this.columnType = columnType;
- }
-
- public void reset(int ordinal, ColumnType columnType) {
- this.ordinal = ordinal;
- this.columnType = columnType;
- }
-
- public void reset(SpecializedGetters data) {
- this.data = data;
- }
-
- @Override
- public boolean canGetStringAsBytes() {
- return true;
- }
-
- @Override
- public boolean isNull() {
- return data.isNullAt(ordinal);
- }
-
- @Override
- public boolean getBoolean() {
- return data.getBoolean(ordinal);
- }
-
- @Override
- public byte getByte() {
- return data.getByte(ordinal);
- }
-
- @Override
- public short getShort() {
- return data.getShort(ordinal);
- }
-
- @Override
- public int getInt() {
- return data.getInt(ordinal);
- }
-
- @Override
- public float getFloat() {
- return data.getFloat(ordinal);
- }
-
- @Override
- public long getLong() {
- return data.getLong(ordinal);
- }
-
- @Override
- public double getDouble() {
- return data.getDouble(ordinal);
- }
-
- @Override
- public BigInteger getBigInteger() {
- throw new UnsupportedOperationException("Hoodie type does not support
largeint");
- }
-
- @Override
- public BigDecimal getDecimal() {
- return data.getDecimal(ordinal, columnType.getPrecision(),
columnType.getScale()).toJavaBigDecimal();
- }
-
- @Override
- public String getString() {
- return data.getUTF8String(ordinal).toString();
- }
-
- @Override
- public byte[] getStringAsBytes() {
- return data.getUTF8String(ordinal).getBytes();
- }
-
- @Override
- public LocalDate getDate() {
- return LocalDate.ofEpochDay(data.getInt(ordinal));
- }
-
- @Override
- public LocalDateTime getDateTime() {
- long datetime = data.getLong(ordinal);
- long seconds;
- long nanoseconds;
- if (columnType.getPrecision() == 3) {
- seconds = datetime / 1000;
- nanoseconds = (datetime % 1000) * 1000000;
- } else if (columnType.getPrecision() == 6) {
- seconds = datetime / 1000000;
- nanoseconds = (datetime % 1000000) * 1000;
- } else {
- throw new RuntimeException("Hoodie timestamp only support
milliseconds and microseconds, wrong precision = "
- + columnType.getPrecision());
- }
- return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds,
nanoseconds), ZoneId.systemDefault());
- }
-
- @Override
- public byte[] getBytes() {
- return data.getBinary(ordinal);
- }
-
- @Override
- public void unpackArray(List<ColumnValue> values) {
- ArrayData array = data.getArray(ordinal);
- for (int i = 0; i < array.numElements(); ++i) {
- values.add(new HudiColumnValue(array, i,
columnType.getChildTypes().get(0)));
- }
- }
-
- @Override
- public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
- MapData map = data.getMap(ordinal);
- ArrayData key = map.keyArray();
- for (int i = 0; i < key.numElements(); ++i) {
- keys.add(new HudiColumnValue(key, i,
columnType.getChildTypes().get(0)));
- }
- ArrayData value = map.valueArray();
- for (int i = 0; i < value.numElements(); ++i) {
- values.add(new HudiColumnValue(value, i,
columnType.getChildTypes().get(1)));
- }
- }
-
- @Override
- public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue>
values) {
- // todo: support pruned struct fields
- InternalRow struct = data.getStruct(ordinal, structFieldIndex.size());
- for (int i : structFieldIndex) {
- values.add(new HudiColumnValue(struct, i,
columnType.getChildTypes().get(i)));
- }
- }
-}
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
deleted file mode 100644
index bc082e56732..00000000000
---
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++ /dev/null
@@ -1,229 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi;
-
-import org.apache.doris.common.jni.JniScanner;
-import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.util.WeakIdentityHashMap;
-import org.apache.log4j.Logger;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.Filter;
-import scala.collection.Iterator;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-/**
- * The hudi JniScanner
- */
-public class HudiJniScanner extends JniScanner {
- private static final Logger LOG = Logger.getLogger(HudiJniScanner.class);
-
- private final int fetchSize;
- private final String debugString;
- private final HoodieSplit split;
- private final ClassLoader classLoader;
-
- private long getRecordReaderTimeNs = 0;
- private Iterator<InternalRow> recordIterator;
-
- /**
- * `GenericDatumReader` of avro is a thread local map, that stores
`WeakIdentityHashMap`.
- * `WeakIdentityHashMap` has cached the avro resolving decoder, and the
cached resolver can only be cleaned when
- * its avro schema is recycled and become a week reference. However, the
behavior of the week reference queue
- * of `WeakIdentityHashMap` is unpredictable. Secondly, the decoder is
very memory intensive, the number of threads
- * to call the thread local map cannot be too many.
- * Two solutions:
- * 1. Reduce the number of threads reading avro logs and keep the readers
in a fixed thread pool.
- * 2. Regularly cleaning the cached resolvers in the thread local map by
reflection.
- */
- private static final AtomicLong lastUpdateTime = new
AtomicLong(System.currentTimeMillis());
- private static final long RESOLVER_TIME_OUT = 60000;
- private static final ExecutorService avroReadPool;
- private static ThreadLocal<WeakIdentityHashMap<?, ?>> AVRO_RESOLVER_CACHE;
- private static final Map<Long, WeakIdentityHashMap<?, ?>> cachedResolvers
= new ConcurrentHashMap<>();
- private static final ReadWriteLock cleanResolverLock = new
ReentrantReadWriteLock();
- private static final ScheduledExecutorService cleanResolverService =
Executors.newScheduledThreadPool(1);
-
- static {
- int numThreads = Math.max(Runtime.getRuntime().availableProcessors() *
2, 4);
- if (numThreads > 48) {
- numThreads = Runtime.getRuntime().availableProcessors();
- }
- avroReadPool = Executors.newFixedThreadPool(numThreads,
- new
ThreadFactoryBuilder().setNameFormat("avro-log-reader-%d").build());
- LOG.info("Create " + numThreads + " daemon threads to load avro logs");
-
- Class<?> avroReader = GenericDatumReader.class;
- try {
- Field field = avroReader.getDeclaredField("RESOLVER_CACHE");
- field.setAccessible(true);
- AVRO_RESOLVER_CACHE = (ThreadLocal<WeakIdentityHashMap<?, ?>>)
field.get(null);
- LOG.info("Get the resolved cache for avro reader");
- } catch (Exception e) {
- AVRO_RESOLVER_CACHE = null;
- LOG.warn("Failed to get the resolved cache for avro reader");
- }
-
- cleanResolverService.scheduleAtFixedRate(() -> {
- cleanResolverLock.writeLock().lock();
- try {
- if (System.currentTimeMillis() - lastUpdateTime.get() >
RESOLVER_TIME_OUT) {
- for (WeakIdentityHashMap<?, ?> solver :
cachedResolvers.values()) {
- solver.clear();
- }
- lastUpdateTime.set(System.currentTimeMillis());
- }
- } finally {
- cleanResolverLock.writeLock().unlock();
- }
- }, RESOLVER_TIME_OUT, RESOLVER_TIME_OUT, TimeUnit.MILLISECONDS);
- }
-
- public HudiJniScanner(int fetchSize, Map<String, String> params) {
- debugString = params.entrySet().stream().map(kv -> kv.getKey() + "=" +
kv.getValue())
- .collect(Collectors.joining("\n"));
- try {
- this.classLoader = this.getClass().getClassLoader();
- this.fetchSize = fetchSize;
- this.split = new HoodieSplit(params);
- } catch (Exception e) {
- LOG.error("Failed to initialize hudi scanner, split params:\n" +
debugString, e);
- throw e;
- }
- }
-
- @Override
- public void open() throws IOException {
- Future<?> avroFuture = avroReadPool.submit(() -> {
- Thread.currentThread().setContextClassLoader(classLoader);
- initTableInfo(split.requiredTypes(), split.requiredFields(),
fetchSize);
- long startTime = System.nanoTime();
- // RecordReader will use ProcessBuilder to start a hotspot
process, which may be stuck,
- // so use another process to kill this stuck process.
- // TODO(gaoxin): better way to solve the stuck process?
- AtomicBoolean isKilled = new AtomicBoolean(false);
- ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(1);
- executorService.scheduleAtFixedRate(() -> {
- if (!isKilled.get()) {
- synchronized (HudiJniScanner.class) {
- List<Long> pids = Utils.getChildProcessIds(
- Utils.getCurrentProcId());
- for (long pid : pids) {
- String cmd = Utils.getCommandLine(pid);
- if (cmd != null &&
cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) {
- Utils.killProcess(pid);
- isKilled.set(true);
- LOG.info("Kill hotspot debugger process " +
pid);
- }
- }
- }
- }
- }, 100, 1000, TimeUnit.MILLISECONDS);
-
- cleanResolverLock.readLock().lock();
- try {
- lastUpdateTime.set(System.currentTimeMillis());
- AuthenticationConfig authenticationConfig =
AuthenticationConfig.getKerberosConfig(split.hadoopConf());
- HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator
- .getHadoopAuthenticator(authenticationConfig);
- if (split.incrementalRead()) {
- recordIterator = hadoopAuthenticator.doAs(() -> new
MORIncrementalSplitReader(split)
- .buildScanIterator(new Filter[0]));
- } else {
- recordIterator = hadoopAuthenticator.doAs(() -> new
MORSnapshotSplitReader(split)
- .buildScanIterator(new Filter[0]));
- }
- if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get()
!= null) {
-
cachedResolvers.computeIfAbsent(Thread.currentThread().getId(),
- threadId -> AVRO_RESOLVER_CACHE.get());
- AVRO_RESOLVER_CACHE.get().clear();
- }
- } catch (Exception e) {
- LOG.error("Failed to open hudi scanner, split params:\n" +
debugString, e);
- throw new RuntimeException(e.getMessage(), e);
- } finally {
- cleanResolverLock.readLock().unlock();
- }
- isKilled.set(true);
- executorService.shutdownNow();
- getRecordReaderTimeNs += System.nanoTime() - startTime;
- });
- try {
- avroFuture.get();
- } catch (Exception e) {
- throw new IOException(e.getMessage(), e);
- }
- }
-
- @Override
- public void close() throws IOException {
- if (recordIterator instanceof Closeable) {
- ((Closeable) recordIterator).close();
- }
- recordIterator = null;
- }
-
- @Override
- public int getNext() throws IOException {
- try {
- int readRowNumbers = 0;
- HudiColumnValue columnValue = new HudiColumnValue();
- int numFields = split.requiredFields().length;
- ColumnType[] columnTypes = split.requiredTypes();
- while (readRowNumbers < fetchSize && recordIterator.hasNext()) {
- columnValue.reset(recordIterator.next());
- for (int i = 0; i < numFields; i++) {
- columnValue.reset(i, columnTypes[i]);
- appendData(i, columnValue);
- }
- readRowNumbers++;
- }
- return readRowNumbers;
- } catch (Exception e) {
- close();
- LOG.error("Failed to get the next batch of hudi, split params:\n"
+ debugString, e);
- throw new IOException("Failed to get the next batch of hudi.", e);
- }
- }
-
- @Override
- public Map<String, String> getStatistics() {
- return Collections.singletonMap("timer:GetRecordReaderTime",
String.valueOf(getRecordReaderTimeNs));
- }
-}
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
deleted file mode 100644
index c0fbec633e8..00000000000
---
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi;
-
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.management.ManagementFactory;
-import java.util.LinkedList;
-import java.util.List;
-
-public class Utils {
- public static long getCurrentProcId() {
- try {
- return ManagementFactory.getRuntimeMXBean().getPid();
- } catch (Exception e) {
- throw new RuntimeException("Couldn't find PID of current JVM
process.", e);
- }
- }
-
- public static List<Long> getChildProcessIds(long pid) {
- try {
- Process pgrep = (new ProcessBuilder("pgrep", "-P",
String.valueOf(pid))).start();
- BufferedReader reader = new BufferedReader(new
InputStreamReader(pgrep.getInputStream()));
- List<Long> result = new LinkedList<>();
- String line;
- while ((line = reader.readLine()) != null) {
- result.add(Long.valueOf(line.trim()));
- }
- pgrep.waitFor();
- return result;
- } catch (Exception e) {
- throw new RuntimeException("Couldn't get child processes of PID "
+ pid, e);
- }
- }
-
- public static String getCommandLine(long pid) {
- try {
- return FileUtils.readFileToString(new
File(String.format("/proc/%d/cmdline", pid))).trim();
- } catch (IOException e) {
- return null;
- }
- }
-
- public static void killProcess(long pid) {
- try {
- Process kill = (new ProcessBuilder("kill", "-9",
String.valueOf(pid))).start();
- kill.waitFor();
- } catch (Exception e) {
- throw new RuntimeException("Couldn't kill process PID " + pid, e);
- }
- }
-
- public static HoodieTableMetaClient getMetaClient(Configuration conf,
String basePath) {
- HadoopStorageConfiguration hadoopStorageConfiguration = new
HadoopStorageConfiguration(conf);
- AuthenticationConfig authenticationConfig =
AuthenticationConfig.getKerberosConfig(conf);
- HadoopAuthenticator hadoopAuthenticator =
HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
- try {
- return hadoopAuthenticator.doAs(() ->
HoodieTableMetaClient.builder()
-
.setConf(hadoopStorageConfiguration).setBasePath(basePath).build());
- } catch (IOException e) {
- throw new RuntimeException("Failed to get HoodieTableMetaClient",
e);
- }
- }
-}
diff --git a/fe/be-java-extensions/hudi-scanner/src/main/resources/package.xml
b/fe/be-java-extensions/hudi-scanner/src/main/resources/package.xml
deleted file mode 100644
index 4bbb2610603..00000000000
--- a/fe/be-java-extensions/hudi-scanner/src/main/resources/package.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0
http://maven.apache.org/xsd/assembly-2.0.0.xsd">
- <id>jar-with-dependencies</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <dependencySets>
- <dependencySet>
- <outputDirectory>/</outputDirectory>
- <useProjectArtifact>true</useProjectArtifact>
- <unpack>true</unpack>
- <scope>runtime</scope>
- <unpackOptions>
- <excludes>
- <exclude>**/Log4j2Plugins.dat</exclude>
- </excludes>
- </unpackOptions>
- </dependencySet>
- </dependencySets>
-</assembly>
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
deleted file mode 100644
index fc8d74f9713..00000000000
---
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
+++ /dev/null
@@ -1,734 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi
-
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.doris.common.jni.vec.ColumnType
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hbase.io.hfile.CacheConfig
-import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema}
-import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.client.utils.SparkInternalSchemaConverter
-import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig,
TypedProperties}
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
-import org.apache.hudi.common.table.timeline.HoodieTimeline
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
-import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.hadoop.fs.CachingPath
-import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
-import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
-import org.apache.hudi.io.hadoop.HoodieHBaseAvroHFileReader
-import org.apache.hudi.metadata.HoodieTableMetadataUtil
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
DataSourceWriteOptions, HoodieSparkConfUtils, HoodieTableSchema,
HoodieTableState}
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
-import org.apache.log4j.Logger
-import org.apache.spark.sql.adapter.Spark3_4Adapter
-import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters,
HoodieSparkAvroSchemaConverters}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.datasources.{PartitionedFile,
PartitioningUtils}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.sql.{SQLContext, SparkSession, SparkSessionExtensions}
-import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.{SparkConf, SparkContext}
-
-import java.lang.reflect.Constructor
-import java.net.URI
-import java.util.Objects
-import java.util.concurrent.TimeUnit
-import java.{util => jutil}
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-import scala.util.{Failure, Success, Try}
-
-class DorisSparkAdapter extends Spark3_4Adapter {
- override def getAvroSchemaConverters: HoodieAvroSchemaConverters =
HoodieSparkAvroSchemaConverters
-}
-
-class HoodieSplit(private val params: jutil.Map[String, String]) {
- val queryId: String = params.remove("query_id")
- val basePath: String = params.remove("base_path")
- val dataFilePath: String = params.remove("data_file_path")
- val dataFileLength: Long = params.remove("data_file_length").toLong
- val deltaFilePaths: Array[String] = {
- val deltas = params.remove("delta_file_paths")
- if (StringUtils.isNullOrEmpty(deltas)) new Array[String](0) else
deltas.split(",")
- }
-
- val hudiColumnNames: Array[String] =
params.remove("hudi_column_names").split(",")
- val hudiColumnTypes: Map[String, String] = hudiColumnNames.zip(
- params.remove("hudi_column_types").split("#")).toMap
-
- val requiredFields: Array[String] = {
- val readFields =
params.remove("required_fields").split(",").filter(_.nonEmpty)
- if (readFields.isEmpty) {
- // If only read the partition columns, the JniConnector will produce
empty required fields.
- // Read the "_hoodie_record_key" field at least to know how many rows in
current hoodie split
- // Even if the JniConnector doesn't read this field, the call of
releaseTable will reclaim the resource
- Array(HoodieRecord.RECORD_KEY_METADATA_FIELD)
- } else {
- readFields
- }
- }
- val requiredTypes: Array[ColumnType] = requiredFields.map(
- field => ColumnType.parseType(field, hudiColumnTypes(field)))
-
- val nestedFields: Array[String] = {
- val fields = params.remove("nested_fields")
- if (StringUtils.isNullOrEmpty(fields)) new Array[String](0) else
fields.split(",")
- }
- val instantTime: String = params.remove("instant_time")
- val serde: String = params.remove("serde")
- val inputFormat: String = params.remove("input_format")
-
- val hadoopProperties: Map[String, String] = {
- val properties = new jutil.HashMap[String, String]
- val iterator = params.entrySet().iterator()
- while (iterator.hasNext) {
- val kv = iterator.next()
- if (kv.getKey.startsWith(BaseSplitReader.HADOOP_CONF_PREFIX)) {
-
properties.put(kv.getKey.substring(BaseSplitReader.HADOOP_CONF_PREFIX.length),
kv.getValue)
- iterator.remove()
- }
- }
- properties.asScala.toMap
- }
-
- lazy val hadoopConf: Configuration = {
- val conf = new Configuration
- hadoopProperties.foreach(kv => conf.set(kv._1, kv._2))
- conf
- }
-
- def incrementalRead: Boolean = {
-
"true".equalsIgnoreCase(optParams.getOrElse("hoodie.datasource.read.incr.operation",
"false"))
- }
-
- // NOTE: In cases when Hive Metastore is used as catalog and the table is
partitioned, schema in the HMS might contain
- // Hive-specific partitioning columns created specifically for HMS to
handle partitioning appropriately. In that
- // case we opt in to not be providing catalog's schema, and instead
force Hudi relations to fetch the schema
- // from the table itself
- val schemaSpec: Option[StructType] = None
-
- val optParams: Map[String, String] = params.asScala.toMap
-
- override def equals(obj: Any): Boolean = {
- if (obj == null) {
- return false
- }
- obj match {
- case split: HoodieSplit =>
- hashCode() == split.hashCode()
- case _ => false
- }
- }
-
- override def hashCode(): Int = {
- Objects.hash(queryId, basePath)
- }
-}
-
-case class HoodieTableInformation(sparkSession: SparkSession,
- metaClient: HoodieTableMetaClient,
- timeline: HoodieTimeline,
- tableConfig: HoodieTableConfig,
- resolvedTargetFields: Array[String],
- tableAvroSchema: Schema,
- internalSchemaOpt: Option[InternalSchema])
-
-/**
- * Reference to Apache Hudi
- * see <a
href="https://github.com/apache/hudi/blob/release-0.13.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala">HoodieBaseRelation</a>
- */
-abstract class BaseSplitReader(val split: HoodieSplit) {
-
- import BaseSplitReader._
-
- protected val optParams: Map[String, String] = split.optParams
-
- protected val tableInformation: HoodieTableInformation = cache.get(split)
-
- protected val timeline: HoodieTimeline = tableInformation.timeline
-
- protected val sparkSession: SparkSession = tableInformation.sparkSession
- protected val sqlContext: SQLContext = sparkSession.sqlContext
- imbueConfigs(sqlContext)
-
- protected val tableConfig: HoodieTableConfig = tableInformation.tableConfig
- protected val tableName: String = tableConfig.getTableName
-
- // NOTE: Record key-field is assumed singular here due to the either of
- // - In case Hudi's meta fields are enabled: record key will be
pre-materialized (stored) as part
- // of the record's payload (as part of the Hudi's metadata)
- // - In case Hudi's meta fields are disabled (virtual keys): in
that case record has to bear _single field_
- // identified as its (unique) primary key w/in its payload (this is
a limitation of [[SimpleKeyGenerator]],
- // which is the only [[KeyGenerator]] permitted for virtual-keys
payloads)
- protected lazy val recordKeyField: String =
- if (tableConfig.populateMetaFields()) {
- HoodieRecord.RECORD_KEY_METADATA_FIELD
- } else {
- val keyFields = tableConfig.getRecordKeyFields.get()
- checkState(keyFields.length == 1)
- keyFields.head
- }
-
- protected lazy val preCombineFieldOpt: Option[String] =
- Option(tableConfig.getPreCombineField)
- .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key))
match {
- // NOTE: This is required to compensate for cases when empty string is
used to stub
- // property value to avoid it being set with the default value
- // TODO(HUDI-3456) cleanup
- case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
- case _ => None
- }
-
- /**
- * Columns that relation has to read from the storage to properly execute on
its semantic: for ex,
- * for Merge-on-Read tables key fields as well and pre-combine field
comprise mandatory set of columns,
- * meaning that regardless of whether this columns are being requested by
the query they will be fetched
- * regardless so that relation is able to combine records properly (if
necessary)
- *
- * @VisibleInTests
- */
- val mandatoryFields: Seq[String]
-
- /**
- * NOTE: Initialization of teh following members is coupled on purpose to
minimize amount of I/O
- * required to fetch table's Avro and Internal schemas
- */
- protected lazy val (tableAvroSchema: Schema, internalSchemaOpt:
Option[InternalSchema]) = {
- (tableInformation.tableAvroSchema, tableInformation.internalSchemaOpt)
- }
-
- protected lazy val tableStructSchema: StructType =
convertAvroSchemaToStructType(tableAvroSchema)
-
- protected lazy val partitionColumns: Array[String] =
tableConfig.getPartitionFields.orElse(Array.empty)
-
- protected lazy val specifiedQueryTimestamp: Option[String] =
Some(split.instantTime)
-
- private def queryTimestamp: Option[String] =
-
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
-
- lazy val tableState: HoodieTableState = {
- val recordMergerImpls =
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
- val recordMergerStrategy =
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
-
Option(tableInformation.metaClient.getTableConfig.getRecordMergerStrategy))
- val configProperties = getConfigProperties(sparkSession, optParams)
- val metadataConfig = HoodieMetadataConfig.newBuilder()
- .fromProperties(configProperties)
- .enable(configProperties.getBoolean(
- HoodieMetadataConfig.ENABLE.key(),
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS)
- &&
HoodieTableMetadataUtil.isFilesPartitionAvailable(tableInformation.metaClient))
- .build()
-
- // Subset of the state of table's configuration as of at the time of the
query
- HoodieTableState(
- tablePath = split.basePath,
- latestCommitTimestamp = queryTimestamp,
- recordKeyField = recordKeyField,
- preCombineFieldOpt = preCombineFieldOpt,
- usesVirtualKeys = !tableConfig.populateMetaFields(),
- recordPayloadClassName = tableConfig.getPayloadClass,
- metadataConfig = metadataConfig,
- recordMergerImpls = recordMergerImpls,
- recordMergerStrategy = recordMergerStrategy
- )
- }
-
- private def getConfigValue(config: ConfigProperty[String],
- defaultValueOption: Option[String] =
Option.empty): String = {
- optParams.getOrElse(config.key(),
- sqlContext.getConf(config.key(),
defaultValueOption.getOrElse(config.defaultValue())))
- }
-
- def imbueConfigs(sqlContext: SQLContext): Unit = {
-
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
"true")
-
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled",
"true")
- // TODO(HUDI-3639) vectorized reader has to be disabled to make sure
MORIncrementalRelation is working properly
-
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
"false")
- }
-
- def buildScanIterator(filters: Array[Filter]): Iterator[InternalRow] = {
- // NOTE: PLEASE READ CAREFULLY BEFORE MAKING CHANGES
- // *Appending* additional columns to the ones requested by the
caller is not a problem, as those
- // will be eliminated by the caller's projection;
- // (!) Please note, however, that it's critical to avoid _reordering_ of
the requested columns as this
- // will break the upstream projection
- val targetColumns: Array[String] =
appendMandatoryColumns(tableInformation.resolvedTargetFields)
- // NOTE: We explicitly fallback to default table's Avro schema to make
sure we avoid unnecessary Catalyst > Avro
- // schema conversion, which is lossy in nature (for ex, it doesn't
preserve original Avro type-names) and
- // could have an effect on subsequent de-/serializing records in
some exotic scenarios (when Avro unions
- // w/ more than 2 types are involved)
- val sourceSchema = tableAvroSchema
- val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
- projectSchema(Either.cond(internalSchemaOpt.isDefined,
internalSchemaOpt.get, sourceSchema), targetColumns)
-
- val tableAvroSchemaStr = tableAvroSchema.toString
- val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr,
internalSchemaOpt)
- val requiredSchema = HoodieTableSchema(
- requiredStructSchema, requiredAvroSchema.toString,
Some(requiredInternalSchema))
-
- composeIterator(tableSchema, requiredSchema, targetColumns, filters)
- }
-
- /**
- * Composes iterator provided file split to read from, table and partition
schemas, data filters to be applied
- *
- * @param tableSchema target table's schema
- * @param requiredSchema projected schema required by the reader
- * @param requestedColumns columns requested by the query
- * @param filters data filters to be applied
- * @return instance of RDD (holding [[InternalRow]]s)
- */
- protected def composeIterator(tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- requestedColumns: Array[String],
- filters: Array[Filter]): Iterator[InternalRow]
-
- private final def appendMandatoryColumns(requestedColumns: Array[String]):
Array[String] = {
- // For a nested field in mandatory columns, we should first get the
root-level field, and then
- // check for any missing column, as the requestedColumns should only
contain root-level fields
- // We should only append root-level field as well
- val missing = mandatoryFields.map(col =>
HoodieAvroUtils.getRootLevelFieldName(col))
- .filter(rootField => !requestedColumns.contains(rootField))
- requestedColumns ++ missing
- }
-
- /**
- * Projects provided schema by picking only required (projected) top-level
columns from it
- *
- * @param tableSchema schema to project (either of [[InternalSchema]] or
Avro's [[Schema]])
- * @param requiredColumns required top-level columns to be projected
- */
- def projectSchema(tableSchema: Either[Schema, InternalSchema],
- requiredColumns: Array[String]): (Schema, StructType,
InternalSchema) = {
- tableSchema match {
- case Right(internalSchema) =>
- checkState(!internalSchema.isEmptySchema)
- val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(
- internalSchema, requiredColumns.toList.asJava)
- val requiredAvroSchema =
AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema")
- val requiredStructSchema =
convertAvroSchemaToStructType(requiredAvroSchema)
-
- (requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
-
- case Left(avroSchema) =>
- val fieldMap = avroSchema.getFields.asScala.map(f => f.name() ->
f).toMap
- val requiredFields = requiredColumns.map { col =>
- val f = fieldMap(col)
- // We have to create a new [[Schema.Field]] since Avro schemas can't
share field
- // instances (and will throw "org.apache.avro.AvroRuntimeException:
Field already used")
- new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(),
f.order())
- }.toList
- val requiredAvroSchema = Schema.createRecord(avroSchema.getName,
avroSchema.getDoc,
- avroSchema.getNamespace, avroSchema.isError, requiredFields.asJava)
- val requiredStructSchema =
convertAvroSchemaToStructType(requiredAvroSchema)
-
- (requiredAvroSchema, requiredStructSchema,
InternalSchema.getEmptyInternalSchema)
- }
- }
-
- /**
- * Converts Avro's [[Schema]] to Catalyst's [[StructType]]
- */
- protected def convertAvroSchemaToStructType(avroSchema: Schema): StructType
= {
- val schemaConverters = sparkAdapter.getAvroSchemaConverters
- schemaConverters.toSqlType(avroSchema) match {
- case (dataType, _) => dataType.asInstanceOf[StructType]
- }
- }
-
- protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema):
(StructType, HoodieTableSchema, HoodieTableSchema) = {
- // Since schema requested by the caller might contain partition columns,
we might need to
- // prune it, removing all partition columns from it in case these columns
are not persisted
- // in the data files
- //
- // NOTE: This partition schema is only relevant to file reader to be able
to embed
- // values of partition columns (hereafter referred to as partition
values) encoded into
- // the partition path, and omitted from the data file, back into
fetched rows;
- // Note that, by default, partition columns are not omitted
therefore specifying
- // partition schema for reader is not required
- if (shouldExtractPartitionValuesFromPartitionPath) {
- val partitionSchema = StructType(partitionColumns.map(StructField(_,
StringType)))
- val prunedDataStructSchema =
prunePartitionColumns(tableSchema.structTypeSchema)
- val prunedRequiredSchema =
prunePartitionColumns(requiredSchema.structTypeSchema)
-
- (partitionSchema,
- HoodieTableSchema(prunedDataStructSchema,
convertToAvroSchema(prunedDataStructSchema, tableName).toString),
- HoodieTableSchema(prunedRequiredSchema,
convertToAvroSchema(prunedRequiredSchema, tableName).toString))
- } else {
- (StructType(Nil), tableSchema, requiredSchema)
- }
- }
-
- /**
- * Controls whether partition values (ie values of partition columns) should
be
- * <ol>
- * <li>Extracted from partition path and appended to individual rows read
from the data file (we
- * delegate this to Spark's [[ParquetFileFormat]])</li>
- * <li>Read from the data-file as is (by default Hudi persists all columns
including partition ones)</li>
- * </ol>
- *
- * This flag is only be relevant in conjunction with the usage of
[["hoodie.datasource.write.drop.partition.columns"]]
- * config, when Hudi will NOT be persisting partition columns in the data
file, and therefore values for
- * such partition columns (ie "partition values") will have to be parsed
from the partition path, and appended
- * to every row only in the fetched dataset.
- *
- * NOTE: Partition values extracted from partition path might be deviating
from the values of the original
- * partition columns: for ex, if originally as partition column was used
column [[ts]] bearing epoch
- * timestamp, which was used by [[TimestampBasedKeyGenerator]] to generate
partition path of the format
- * [["yyyy/mm/dd"]], appended partition value would bear the format verbatim
as it was used in the
- * partition path, meaning that string value of "2022/01/01" will be
appended, and not its original
- * representation
- */
- protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
- // Controls whether partition columns (which are the source for the
partition path values) should
- // be omitted from persistence in the data files. On the read path it
affects whether partition values (values
- // of partition columns) will be read from the data file or extracted from
partition path
-
- val shouldOmitPartitionColumns =
tableInformation.tableConfig.shouldDropPartitionColumns &&
partitionColumns.nonEmpty
- val shouldExtractPartitionValueFromPath =
-
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
-
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
- shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
- }
-
- private def prunePartitionColumns(dataStructSchema: StructType): StructType =
- StructType(dataStructSchema.filterNot(f =>
partitionColumns.contains(f.name)))
-
- /**
- * For enable hoodie.datasource.write.drop.partition.columns, need to create
an InternalRow on partition values
- * and pass this reader on parquet file. So that, we can query the partition
columns.
- */
- protected def getPartitionColumnsAsInternalRow(): InternalRow = {
- try {
- if (shouldExtractPartitionValuesFromPartitionPath) {
- val filePath = new Path(split.dataFilePath)
- val tablePathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(new
Path(tableInformation.metaClient.getBasePathV2.toUri))
- val partitionPathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(filePath.getParent)
- val relativePath = new
URI(tablePathWithoutScheme.toString).relativize(new
URI(partitionPathWithoutScheme.toString)).toString
- val hiveStylePartitioningEnabled =
tableConfig.getHiveStylePartitioningEnable.toBoolean
- if (hiveStylePartitioningEnabled) {
- val partitionSpec = PartitioningUtils.parsePathFragment(relativePath)
-
InternalRow.fromSeq(partitionColumns.map(partitionSpec(_)).map(UTF8String.fromString))
- } else {
- if (partitionColumns.length == 1) {
- InternalRow.fromSeq(Seq(UTF8String.fromString(relativePath)))
- } else {
- val parts = relativePath.split("/")
- assert(parts.size == partitionColumns.length)
- InternalRow.fromSeq(parts.map(UTF8String.fromString))
- }
- }
- } else {
- InternalRow.empty
- }
- } catch {
- case NonFatal(e) =>
- LOG.warn(s"Failed to get the right partition InternalRow for file:
${split.dataFilePath}", e)
- InternalRow.empty
- }
- }
-
- /**
- * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]]
handling [[ColumnarBatch]],
- * when Parquet's Vectorized Reader is used
- *
- * TODO move to HoodieBaseRelation, make private
- */
- private[hudi] def buildHoodieParquetReader(sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration,
- appendPartitionValues: Boolean =
false): PartitionedFile => Iterator[InternalRow] = {
- val parquetFileFormat: ParquetFileFormat =
sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues).get
- val readParquetFile: PartitionedFile => Iterator[Any] =
parquetFileFormat.buildReaderWithPartitionValues(
- sparkSession = sparkSession,
- dataSchema = dataSchema,
- partitionSchema = partitionSchema,
- requiredSchema = requiredSchema,
- filters = filters,
- options = options,
- hadoopConf = hadoopConf
- )
-
- file: PartitionedFile => {
- val iter = readParquetFile(file)
- iter.flatMap {
- case r: InternalRow => Seq(r)
- case b: ColumnarBatch => b.rowIterator().asScala
- }
- }
- }
-
- private def createHFileReader(spark: SparkSession,
- dataSchema: HoodieTableSchema,
- requiredDataSchema: HoodieTableSchema,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration): PartitionedFile =>
Iterator[InternalRow] = {
- partitionedFile => {
- var hadoopStorageConfiguration = new
HadoopStorageConfiguration(hadoopConf);
- var storagePath = new StoragePath(partitionedFile.toPath.toUri.getPath);
- var emptySchema =
org.apache.hudi.common.util.Option.empty[org.apache.avro.Schema]()
- val reader = new HoodieHBaseAvroHFileReader(
- hadoopStorageConfiguration, storagePath, emptySchema)
-
- val requiredRowSchema = requiredDataSchema.structTypeSchema
- // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]]
aren't serializable
- // to be passed from driver to executor
- val requiredAvroSchema = new
Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
- val avroToRowConverter =
AvroConversionUtils.createAvroToInternalRowConverter(
- requiredAvroSchema, requiredRowSchema)
-
- reader.getRecordIterator(requiredAvroSchema).asScala
- .map(record => {
-
avroToRowConverter.apply(record.getData.asInstanceOf[GenericRecord]).get
- })
- }
- }
-
- /**
- * Returns file-reader routine accepting [[PartitionedFile]] and returning
an [[Iterator]]
- * over [[InternalRow]]
- */
- protected def createBaseFileReader(spark: SparkSession,
- partitionSchema: StructType,
- dataSchema: HoodieTableSchema,
- requiredDataSchema: HoodieTableSchema,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration):
BaseFileReader = {
- val tableBaseFileFormat = tableConfig.getBaseFileFormat
-
- // NOTE: PLEASE READ CAREFULLY
- // Lambda returned from this method is going to be invoked on the
executor, and therefore
- // we have to eagerly initialize all of the readers even though only
one specific to the type
- // of the file being read will be used. This is required to avoid
serialization of the whole
- // relation (containing file-index for ex) and passing it to the
executor
- val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType)
=
- tableBaseFileFormat match {
- case HoodieFileFormat.PARQUET =>
- val parquetReader = buildHoodieParquetReader(
- sparkSession = spark,
- dataSchema = dataSchema.structTypeSchema,
- partitionSchema = partitionSchema,
- requiredSchema = requiredDataSchema.structTypeSchema,
- filters = filters,
- options = options,
- hadoopConf = hadoopConf,
- // We're delegating to Spark to append partition values to every row
only in cases
- // when these corresponding partition-values are not persisted w/in
the data file itself
- appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
- )
- // Since partition values by default are omitted, and not persisted
w/in data-files by Spark,
- // data-file readers (such as [[ParquetFileFormat]]) have to inject
partition values while reading
- // the data. As such, actual full schema produced by such reader is
composed of
- // a) Data-file schema (projected or not)
- // b) Appended partition column values
- val readerSchema =
StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
-
- (parquetReader, readerSchema)
-
- case HoodieFileFormat.HFILE =>
- val hfileReader = createHFileReader(
- spark = spark,
- dataSchema = dataSchema,
- requiredDataSchema = requiredDataSchema,
- filters = filters,
- options = options,
- hadoopConf = hadoopConf
- )
-
- (hfileReader, requiredDataSchema.structTypeSchema)
-
- case _ => throw new UnsupportedOperationException(s"Base file format is
not currently supported ($tableBaseFileFormat)")
- }
-
- BaseFileReader(
- read = partitionedFile => {
- val extension =
FSUtils.getFileExtension(partitionedFile.filePath.toString())
- if (tableBaseFileFormat.getFileExtension.equals(extension)) {
- read(partitionedFile)
- } else {
- throw new UnsupportedOperationException(s"Invalid base-file format
($extension), expected ($tableBaseFileFormat)")
- }
- },
- schema = schema
- )
- }
-
- protected def embedInternalSchema(conf: Configuration, internalSchemaOpt:
Option[InternalSchema]): Configuration = {
- val internalSchema =
internalSchemaOpt.getOrElse(InternalSchema.getEmptyInternalSchema)
- val querySchemaString = SerDeHelper.toJson(internalSchema)
- if (!StringUtils.isNullOrEmpty(querySchemaString)) {
- val validCommits =
timeline.getInstants.iterator.asScala.map(_.getFileName).mkString(",")
- LOG.warn(s"Table valid commits: $validCommits")
-
- conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
SerDeHelper.toJson(internalSchema))
- conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, split.basePath)
- conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST,
validCommits)
- }
- conf
- }
-}
-
-object SparkMockHelper {
- private lazy val mockSparkContext = {
- val conf = new SparkConf().setMaster("local").setAppName("mock_sc")
- .set("spark.ui.enabled", "false")
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
- sc
- }
-
- implicit class MockSparkSession(builder: SparkSession.Builder) {
- def createMockSession(split: HoodieSplit): SparkSession = {
- val sparkSessionClass = classOf[SparkSession]
- val constructor: Constructor[SparkSession] =
sparkSessionClass.getDeclaredConstructors
- .find(_.getParameterCount ==
5).get.asInstanceOf[Constructor[SparkSession]]
- constructor.setAccessible(true)
- val ss = constructor.newInstance(mockSparkContext, None, None, new
SparkSessionExtensions, Map.empty)
- split.hadoopProperties.foreach(kv =>
ss.sessionState.conf.setConfString(kv._1, kv._2))
- ss
- }
- }
-}
-
-object BaseSplitReader {
-
- import SparkMockHelper.MockSparkSession
-
- private val LOG = Logger.getLogger(BaseSplitReader.getClass)
- val HADOOP_CONF_PREFIX = "hadoop_conf."
-
- // Use [[SparkAdapterSupport]] instead ?
- private lazy val sparkAdapter = new DorisSparkAdapter
-
- private lazy val cache: LoadingCache[HoodieSplit, HoodieTableInformation] = {
- val loader = new CacheLoader[HoodieSplit, HoodieTableInformation] {
- override def load(split: HoodieSplit): HoodieTableInformation = {
- // create mock spark session
- val sparkSession = SparkSession.builder().createMockSession(split)
- val metaClient = Utils.getMetaClient(split.hadoopConf, split.basePath)
- // NOTE: We're including compaction here since it's not considering a
"commit" operation
- val timeline =
metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
-
- val specifiedQueryTimestamp: Option[String] = Some(split.instantTime)
- val schemaResolver = new TableSchemaResolver(metaClient)
- val internalSchemaOpt = if
(!isSchemaEvolutionEnabledOnRead(split.optParams, sparkSession)) {
- None
- } else {
- Try {
-
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
-
.getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
- } match {
- case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
- case Failure(_) =>
- None
- }
- }
- val tableName = metaClient.getTableConfig.getTableName
- val (name, namespace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
- val avroSchema: Schema = internalSchemaOpt.map { is =>
- AvroInternalSchemaConverter.convert(is, namespace + "." + name)
- } orElse {
- specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
- } orElse {
- split.schemaSpec.map(s => convertToAvroSchema(s, tableName))
- } getOrElse {
- Try(schemaResolver.getTableAvroSchema) match {
- case Success(schema) => schema
- case Failure(e) =>
- throw new HoodieSchemaException("Failed to fetch schema from the
table", e)
- }
- }
-
- // match column name in lower case
- val colNames = internalSchemaOpt.map { internalSchema =>
- internalSchema.getAllColsFullName.asScala.map(f => f.toLowerCase ->
f).toMap
- } getOrElse {
- avroSchema.getFields.asScala.map(f => f.name().toLowerCase ->
f.name()).toMap
- }
- val resolvedTargetFields = split.requiredFields.map(field =>
colNames.getOrElse(field.toLowerCase, field))
-
- HoodieTableInformation(sparkSession,
- metaClient,
- timeline,
- metaClient.getTableConfig,
- resolvedTargetFields,
- avroSchema,
- internalSchemaOpt)
- }
- }
- CacheBuilder.newBuilder()
- .expireAfterAccess(10, TimeUnit.MINUTES)
- .maximumSize(4096)
- .build(loader)
- }
-
- private def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String],
sparkSession: SparkSession): Boolean = {
- // NOTE: Schema evolution could be configured both t/h optional parameters
vehicle as well as
- // t/h Spark Session configuration (for ex, for Spark SQL)
- optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
||
-
sparkSession.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
- }
-
- private def getConfigProperties(spark: SparkSession, options: Map[String,
String]) = {
- val sqlConf: SQLConf = spark.sessionState.conf
- val properties = new TypedProperties()
- // Ambiguous reference when invoking Properties.putAll() in Java 11
- // Reference https://github.com/scala/bug/issues/10418
- options.filter(p => p._2 != null).foreach(p =>
properties.setProperty(p._1, p._2))
-
- // TODO(HUDI-5361) clean up properties carry-over
-
- // To support metadata listing via Spark SQL we allow users to pass the
config via SQL Conf in spark session. Users
- // would be able to run SET hoodie.metadata.enable=true in the spark sql
session to enable metadata listing.
- val isMetadataTableEnabled = HoodieSparkConfUtils.getConfigValue(options,
sqlConf, HoodieMetadataConfig.ENABLE.key, null)
- if (isMetadataTableEnabled != null) {
- properties.setProperty(HoodieMetadataConfig.ENABLE.key(),
String.valueOf(isMetadataTableEnabled))
- }
-
- val listingModeOverride = HoodieSparkConfUtils.getConfigValue(options,
sqlConf,
- DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, null)
- if (listingModeOverride != null) {
-
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
listingModeOverride)
- }
-
- properties
- }
-}
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
deleted file mode 100644
index f393e9e1246..00000000000
---
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
-import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.util.StringUtils
-import org.apache.hudi.{DataSourceReadOptions, HoodieMergeOnReadFileSplit,
HoodieTableSchema, HoodieTableState, LogFileIterator, RecordMergingFileIterator}
-import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.catalyst.InternalRow
-
-import java.io.Closeable
-import java.util.function.Predicate
-
-/**
- * Class holding base-file readers for 3 different use-cases:
- *
- * <ol>
- * <li>Full-schema reader: is used when whole row has to be read to perform
merging correctly.
- * This could occur, when no optimizations could be applied and we have to
fallback to read the whole row from
- * the base file and the corresponding delta-log file to merge them
correctly</li>
- *
- * <li>Required-schema reader: is used when it's fine to only read row's
projected columns.
- * This could occur, when row could be merged with corresponding delta-log
record while leveraging only
- * projected columns</li>
- *
- * <li>Required-schema reader (skip-merging): is used when when no merging
will be performed (skip-merged).
- * This could occur, when file-group has no delta-log files</li>
- * </ol>
- */
-private[hudi] case class HoodieMergeOnReadBaseFileReaders(fullSchemaReader:
BaseFileReader,
-
requiredSchemaReader: BaseFileReader,
-
requiredSchemaReaderSkipMerging: BaseFileReader)
-
-/**
- * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an
iterator over all of the records stored in
- * Base file as well as all of the Delta Log files simply returning
concatenation of these streams, while not
- * performing any combination/merging of the records w/ the same primary keys
(ie producing duplicates potentially)
- */
-private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
- baseFileReader: BaseFileReader,
- dataSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- tableState: HoodieTableState,
- config: Configuration)
- extends LogFileIterator(split, dataSchema, requiredSchema, tableState,
config) {
-
- private val requiredSchemaProjection =
generateUnsafeProjection(baseFileReader.schema, structTypeSchema)
-
- private val baseFileIterator = baseFileReader(split.dataFile.get)
-
- override def doHasNext: Boolean = {
- if (baseFileIterator.hasNext) {
- // No merge is required, simply load current row and project into
required schema
- nextRecord = requiredSchemaProjection(baseFileIterator.next())
- true
- } else {
- super[LogFileIterator].doHasNext
- }
- }
-}
-
-/**
- * Reference to Apache Hudi
- * see <a
href="https://github.com/apache/hudi/blob/release-0.13.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala">HoodieMergeOnReadRDD</a>
- */
-class HoodieMORRecordIterator(config: Configuration,
- fileReaders: HoodieMergeOnReadBaseFileReaders,
- tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- tableState: HoodieTableState,
- mergeType: String,
- fileSplit: HoodieMergeOnReadFileSplit,
- includeStartTime: Boolean = false,
- startTimestamp: String = null,
- endTimestamp: String = null) extends
Iterator[InternalRow] with Closeable {
- protected val maxCompactionMemoryInBytes: Long = config.getLongBytes(
- "hoodie.compaction.memory", 512 * 1024 * 1024)
-
- protected val recordIterator: Iterator[InternalRow] = {
- val iter = fileSplit match {
- case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
- val projectedReader =
projectReader(fileReaders.requiredSchemaReaderSkipMerging,
requiredSchema.structTypeSchema)
- projectedReader(dataFileOnlySplit.dataFile.get)
-
- case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
- new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema,
tableState, config)
-
- case split => mergeType match {
- case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
- val reader = fileReaders.requiredSchemaReaderSkipMerging
- new SkipMergeIterator(split, reader, tableSchema, requiredSchema,
tableState, config)
-
- case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
- val reader = pickBaseFileReader()
- new RecordMergingFileIterator(split, reader, tableSchema,
requiredSchema, tableState, config)
-
- case _ => throw new UnsupportedOperationException(s"Not supported
merge type ($mergeType)")
- }
- }
-
- val commitTimeMetadataFieldIdx =
requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
- val needsFiltering = commitTimeMetadataFieldIdx >= 0 &&
!StringUtils.isNullOrEmpty(startTimestamp) &&
!StringUtils.isNullOrEmpty(endTimestamp)
- if (needsFiltering) {
- val filterT: Predicate[InternalRow] =
getCommitTimeFilter(includeStartTime, commitTimeMetadataFieldIdx)
- iter.filter(filterT.test)
- }
- else {
- iter
- }
- }
-
- private def getCommitTimeFilter(includeStartTime: Boolean,
commitTimeMetadataFieldIdx: Int): Predicate[InternalRow] = {
- if (includeStartTime) {
- new Predicate[InternalRow] {
- override def test(row: InternalRow): Boolean = {
- val commitTime = row.getString(commitTimeMetadataFieldIdx)
- commitTime >= startTimestamp && commitTime <= endTimestamp
- }
- }
- } else {
- new Predicate[InternalRow] {
- override def test(row: InternalRow): Boolean = {
- val commitTime = row.getString(commitTimeMetadataFieldIdx)
- commitTime > startTimestamp && commitTime <= endTimestamp
- }
- }
- }
- }
-
- private def pickBaseFileReader(): BaseFileReader = {
- // NOTE: This is an optimization making sure that even for MOR tables we
fetch absolute minimum
- // of the stored data possible, while still properly executing
corresponding relation's semantic
- // and meet the query's requirements.
- //
- // Here we assume that iff queried table does use one of the
standard (and whitelisted)
- // Record Payload classes then we can avoid reading and parsing the
records w/ _full_ schema,
- // and instead only rely on projected one, nevertheless being able
to perform merging correctly
- if (isProjectionCompatible(tableState)) {
- fileReaders.requiredSchemaReader
- } else {
- fileReaders.fullSchemaReader
- }
- }
-
- override def hasNext: Boolean = {
- recordIterator.hasNext
- }
-
- override def next(): InternalRow = {
- recordIterator.next()
- }
-
- override def close(): Unit = {
- recordIterator match {
- case closeable: Closeable =>
- closeable.close()
- case _ =>
- }
- }
-}
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala
deleted file mode 100644
index 73c87e29034..00000000000
---
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi
-
-import org.apache.hudi.HoodieTableSchema
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources._
-
-/**
- * Reference to Apache Hudi
- * see <a
href="https://github.com/apache/hudi/blob/release-0.14.1/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala">MergeOnReadIncrementalRelation</a>
- */
-class MORIncrementalSplitReader(override val split: HoodieSplit) extends
MORSnapshotSplitReader(split) with IncrementalSplitReaderTrait {
-
- override protected def composeIterator(tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- requestedColumns: Array[String],
- filters: Array[Filter]):
Iterator[InternalRow] = {
- // The only required filters are ones that make sure we're only fetching
records that
- // fall into incremental span of the timeline being queried
- val requiredFilters = incrementalSpanRecordFilters
- val optionalFilters = filters
- val readers = createBaseFileReaders(tableSchema, requiredSchema,
requestedColumns, requiredFilters, optionalFilters)
-
- new HoodieMORRecordIterator(split.hadoopConf,
- readers,
- tableSchema,
- requiredSchema,
- tableState,
- mergeType,
- getFileSplit(),
- includeStartTime = includeStartTime,
- startTimestamp = startTs,
- endTimestamp = endTs)
- }
-
-}
-
-/**
- * Reference to Apache Hudi
- * see <a
href="https://github.com/apache/hudi/blob/release-0.14.1/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala">HoodieIncrementalRelationTrait</a>
- */
-trait IncrementalSplitReaderTrait extends BaseSplitReader {
- protected val includeStartTime: Boolean =
"true".equalsIgnoreCase(optParams("hoodie.datasource.read.incr.includeStartTime"))
- protected val startTs: String =
optParams("hoodie.datasource.read.begin.instanttime")
- protected val endTs: String =
optParams("hoodie.datasource.read.end.instanttime")
-
- // Record filters making sure that only records w/in the requested bounds
are being fetched as part of the
- // scan collected by this relation
- protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
- val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
-
- val largerThanFilter = if (includeStartTime) {
- GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs)
- } else {
- GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs)
- }
-
- val lessThanFilter =
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, endTs)
-
- Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
- }
-
- override lazy val mandatoryFields: Seq[String] = {
- // NOTE: This columns are required for Incremental flow to be able to
handle the rows properly, even in
- // cases when no columns are requested to be fetched (for ex, when
using {@code count()} API)
- Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
- preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
- }
-}
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
deleted file mode 100644
index 02a4fa40045..00000000000
---
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi
-
-import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema
-import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.common.model.HoodieLogFile
-import org.apache.hudi.{DataSourceReadOptions, HoodieMergeOnReadFileSplit,
HoodieTableSchema}
-import org.apache.spark.paths.SparkPath
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-
-/**
- * Reference to Apache Hudi
- * see <a
href="https://github.com/apache/hudi/blob/release-0.13.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala">MergeOnReadSnapshotRelation</a>
- */
-class MORSnapshotSplitReader(override val split: HoodieSplit) extends
BaseSplitReader(split) {
- /**
- * NOTE: These are the fields that are required to properly fulfil
Merge-on-Read (MOR)
- * semantic:
- *
- * <ol>
- * <li>Primary key is required to make sure we're able to correlate records
from the base
- * file with the updated records from the delta-log file</li>
- * <li>Pre-combine key is required to properly perform the combining (or
merging) of the
- * existing and updated records</li>
- * </ol>
- *
- * However, in cases when merging is NOT performed (for ex, if file-group
only contains base
- * files but no delta-log files, or if the query-type is equal to
[["skip_merge"]]) neither
- * of primary-key or pre-combine-key are required to be fetched from storage
(unless requested
- * by the query), therefore saving on throughput
- */
- protected lazy val mandatoryFieldsForMerging: Seq[String] =
- Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
-
- override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
-
- protected val mergeType: String =
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
- DataSourceReadOptions.REALTIME_MERGE.defaultValue)
-
- override protected def composeIterator(tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- requestedColumns: Array[String],
- filters: Array[Filter]):
Iterator[InternalRow] = {
- // todo: push down predicates about key field
- val requiredFilters = Seq.empty
- val optionalFilters = filters
- val readers = createBaseFileReaders(tableSchema, requiredSchema,
requestedColumns, requiredFilters, optionalFilters)
-
- new HoodieMORRecordIterator(split.hadoopConf,
- readers,
- tableSchema,
- requiredSchema,
- tableState,
- mergeType,
- getFileSplit())
- }
-
- protected def getFileSplit(): HoodieMergeOnReadFileSplit = {
- val logFiles = split.deltaFilePaths.map(new HoodieLogFile(_))
-
.sorted(Ordering.comparatorToOrdering(HoodieLogFile.getLogFileComparator)).toList
- val partitionedBaseFile = if (split.dataFilePath.isEmpty) {
- None
- } else {
- Some(PartitionedFile(getPartitionColumnsAsInternalRow(),
SparkPath.fromPathString(split.dataFilePath), 0, split.dataFileLength))
- }
- HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles)
- }
-
- override def imbueConfigs(sqlContext: SQLContext): Unit = {
- super.imbueConfigs(sqlContext)
-
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
"true")
- // there's no thread local TaskContext, so the parquet reader will still
use on heap memory even setting true
-
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.columnVector.offheap.enabled",
"true")
- }
-
- protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
- requiredSchema: HoodieTableSchema,
- requestedColumns: Array[String],
- requiredFilters: Seq[Filter],
- optionalFilters: Seq[Filter] =
Seq.empty): HoodieMergeOnReadBaseFileReaders = {
- val (partitionSchema, dataSchema, requiredDataSchema) =
- tryPrunePartitionColumns(tableSchema, requiredSchema)
-
- val fullSchemaReader = createBaseFileReader(
- spark = sqlContext.sparkSession,
- partitionSchema = partitionSchema,
- dataSchema = dataSchema,
- requiredDataSchema = dataSchema,
- // This file-reader is used to read base file records, subsequently
merging them with the records
- // stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
- // applying any filtering _before_ we complete combining them w/
delta-log records (to make sure that
- // we combine them correctly);
- // As such only required filters could be pushed-down to such reader
- filters = requiredFilters,
- options = optParams,
- // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
- // to configure Parquet reader appropriately
- hadoopConf = embedInternalSchema(split.hadoopConf, internalSchemaOpt)
- )
-
- val requiredSchemaReader = createBaseFileReader(
- spark = sqlContext.sparkSession,
- partitionSchema = partitionSchema,
- dataSchema = dataSchema,
- requiredDataSchema = requiredDataSchema,
- // This file-reader is used to read base file records, subsequently
merging them with the records
- // stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
- // applying any filtering _before_ we complete combining them w/
delta-log records (to make sure that
- // we combine them correctly);
- // As such only required filters could be pushed-down to such reader
- filters = requiredFilters,
- options = optParams,
- // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
- // to configure Parquet reader appropriately
- hadoopConf = embedInternalSchema(split.hadoopConf,
requiredDataSchema.internalSchema)
- )
-
- // Check whether fields required for merging were also requested to be
fetched
- // by the query:
- // - In case they were, there's no optimization we could apply here (we
will have
- // to fetch such fields)
- // - In case they were not, we will provide 2 separate file-readers
- // a) One which would be applied to file-groups w/ delta-logs
(merging)
- // b) One which would be applied to file-groups w/ no delta-logs or
- // in case query-mode is skipping merging
- val mandatoryColumns =
mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
- if (mandatoryColumns.forall(requestedColumns.contains)) {
- HoodieMergeOnReadBaseFileReaders(
- fullSchemaReader = fullSchemaReader,
- requiredSchemaReader = requiredSchemaReader,
- requiredSchemaReaderSkipMerging = requiredSchemaReader
- )
- } else {
- val prunedRequiredSchema = {
- val unusedMandatoryColumnNames =
mandatoryColumns.filterNot(requestedColumns.contains)
- val prunedStructSchema =
- StructType(requiredDataSchema.structTypeSchema.fields
- .filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
-
- HoodieTableSchema(prunedStructSchema,
convertToAvroSchema(prunedStructSchema, tableName).toString)
- }
-
- val requiredSchemaReaderSkipMerging = createBaseFileReader(
- spark = sqlContext.sparkSession,
- partitionSchema = partitionSchema,
- dataSchema = dataSchema,
- requiredDataSchema = prunedRequiredSchema,
- // This file-reader is only used in cases when no merging is
performed, therefore it's safe to push
- // down these filters to the base file readers
- filters = requiredFilters ++ optionalFilters,
- options = optParams,
- // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
- // to configure Parquet reader appropriately
- hadoopConf = embedInternalSchema(split.hadoopConf,
requiredDataSchema.internalSchema)
- )
-
- HoodieMergeOnReadBaseFileReaders(
- fullSchemaReader = fullSchemaReader,
- requiredSchemaReader = requiredSchemaReader,
- requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
- )
- }
- }
-}
diff --git
a/fe/be-java-extensions/hudi-scanner/src/test/java/org/apache/doris/hudi/HudiJniScannerTest.java
b/fe/be-java-extensions/hudi-scanner/src/test/java/org/apache/doris/hudi/HudiJniScannerTest.java
deleted file mode 100644
index 6cdfbdc53e4..00000000000
---
a/fe/be-java-extensions/hudi-scanner/src/test/java/org/apache/doris/hudi/HudiJniScannerTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi;
-
-import org.junit.Test;
-
-
-/**
- * The hudi JniScanner test
- */
-public class HudiJniScannerTest {
- @Test
- public void testOpen() {
- }
-
-}
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index 5d56ef76e7c..049cb6b516d 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -21,7 +21,6 @@ under the License.
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
- <module>hudi-scanner</module>
<module>hadoop-hudi-scanner</module>
<module>java-common</module>
<module>java-udf</module>
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 987adbdc0bc..96ee3c08605 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -261,7 +261,6 @@ public class HudiScanNode extends HiveScanNode {
fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes());
// TODO(gaoxin): support complex types
// fileDesc.setNestedFields(hudiSplit.getNestedFields());
- fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner());
tableFormatFileDesc.setHudiParams(fileDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
@@ -493,7 +492,6 @@ public class HudiScanNode extends HiveScanNode {
split.setHudiColumnNames(columnNames);
split.setHudiColumnTypes(columnTypes);
split.setInstantTime(queryInstant);
- split.setHudiJniScanner(sessionVariable.getHudiJniScanner());
return split;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
index 2270d201793..2c3cbdb7fba 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
@@ -40,5 +40,4 @@ public class HudiSplit extends FileSplit {
private List<String> hudiColumnNames;
private List<String> hudiColumnTypes;
private List<String> nestedFields;
- private String hudiJniScanner;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 1bbb5e51d29..55a575a9ed4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -649,8 +649,6 @@ public class SessionVariable implements Serializable,
Writable {
public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
- public static final String HUDI_JNI_SCANNER = "hudi_jni_scanner";
-
public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE =
"enable_count_push_down_for_external_table";
public static final String SHOW_ALL_FE_CONNECTION =
"show_all_fe_connection";
@@ -2164,10 +2162,6 @@ public class SessionVariable implements Serializable,
Writable {
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read
external table"})
private boolean forceJniScanner = false;
- @VariableMgr.VarAttr(name = HUDI_JNI_SCANNER, description = { "使用那种hudi
jni scanner, 'hadoop' 或 'spark'",
- "Which hudi jni scanner to use, 'hadoop' or 'spark'" })
- private String hudiJniScanner = "hadoop";
-
@VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE,
description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown
optimization for external table"})
private boolean enableCountPushDownForExternalTable = true;
@@ -4710,10 +4704,6 @@ public class SessionVariable implements Serializable,
Writable {
return forceJniScanner;
}
- public String getHudiJniScanner() {
- return hudiJniScanner;
- }
-
public String getIgnoreSplitType() {
return ignoreSplitType;
}
@@ -4734,10 +4724,6 @@ public class SessionVariable implements Serializable,
Writable {
forceJniScanner = force;
}
- public void setHudiJniScanner(String hudiJniScanner) {
- this.hudiJniScanner = hudiJniScanner;
- }
-
public boolean isEnableCountPushDownForExternalTable() {
return enableCountPushDownForExternalTable;
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 7411383670f..a8f10a3f053 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -377,7 +377,7 @@ struct THudiFileDesc {
8: optional list<string> column_names;
9: optional list<string> column_types;
10: optional list<string> nested_fields;
- 11: optional string hudi_jni_scanner;
+ 11: optional string hudi_jni_scanner; // deprecated
}
struct TLakeSoulFileDesc {
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
index aa7dd5bfe3d..b962936dca1 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
@@ -105,7 +105,6 @@ suite("test_hudi_incremental",
"p2,external,hudi,external_remote,external_remote
]
sql """set force_jni_scanner=true;"""
- sql """set hudi_jni_scanner='hadoop';"""
// TODO: @suxiaogang223 don't support incremental query for cow table by
jni reader
// test_hudi_incremental_querys("user_activity_log_cow_non_partition",
timestamps_cow_non_partition)
// test_hudi_incremental_querys("user_activity_log_cow_partition",
timestamps_cow_partition)
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
index dbffb9ea4a4..fae889240d9 100644
---
a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
+++
b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
@@ -36,7 +36,6 @@ suite("test_hudi_schema_evolution",
"p2,external,hudi,external_remote,external_r
sql """ set enable_fallback_to_original_planner=false """
sql """set force_jni_scanner = true;"""
- sql """set hudi_jni_scanner='hadoop';"""
qt_adding_simple_columns_table """ select * from
adding_simple_columns_table order by id """
qt_altering_simple_columns_table """ select * from
altering_simple_columns_table order by id """
// qt_deleting_simple_columns_table """ select * from
deleting_simple_columns_table order by id """
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
index 16918d8305d..8feec578f74 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
@@ -83,7 +83,6 @@ suite("test_hudi_snapshot",
"p2,external,hudi,external_remote,external_remote_hu
}
sql """set force_jni_scanner=true;"""
- sql """set hudi_jni_scanner='hadoop';"""
test_hudi_snapshot_querys("user_activity_log_mor_non_partition")
test_hudi_snapshot_querys("user_activity_log_mor_partition")
test_hudi_snapshot_querys("user_activity_log_cow_non_partition")
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
index 25ebfe7c453..dd61ccf4230 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
@@ -99,7 +99,6 @@ suite("test_hudi_timetravel",
"p2,external,hudi,external_remote,external_remote_
]
sql """set force_jni_scanner=true;"""
- sql """set hudi_jni_scanner='hadoop';"""
test_hudi_timetravel_querys("user_activity_log_cow_non_partition",
timestamps_cow_non_partition)
test_hudi_timetravel_querys("user_activity_log_cow_partition",
timestamps_cow_partition)
test_hudi_timetravel_querys("user_activity_log_mor_non_partition",
timestamps_mor_non_partition)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]