This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 0c1ed06054a branch-3.1: [enhance](hudi) upgrade hudi to version 1.0.2
#54083 (#54277)
0c1ed06054a is described below
commit 0c1ed06054aa5aad7ad09ca57fbce1c5f5e98fe0
Author: Socrates <[email protected]>
AuthorDate: Tue Aug 5 16:55:37 2025 +0800
branch-3.1: [enhance](hudi) upgrade hudi to version 1.0.2 #54083 (#54277)
bp: #54083
---
fe/be-java-extensions/hadoop-hudi-scanner/pom.xml | 1 -
.../datasource/hudi/HudiSchemaCacheValue.java | 2 +-
.../apache/doris/datasource/hudi/HudiUtils.java | 10 +-
.../hudi/source/COWIncrementalRelation.java | 22 +--
.../hudi/source/HudiCachedFsViewProcessor.java | 7 +-
.../hudi/source/HudiCachedPartitionProcessor.java | 4 +-
.../hudi/source/HudiLocalEngineContext.java | 175 ---------------------
.../hudi/source/HudiPartitionProcessor.java | 5 +-
.../doris/datasource/hudi/source/HudiScanNode.java | 6 +-
.../hudi/source/MORIncrementalRelation.java | 18 +--
.../tablefunction/HudiTableValuedFunction.java | 1 -
.../doris/tablefunction/MetadataGenerator.java | 5 +-
.../doris/datasource/hudi/HudiUtilsTest.java | 97 ------------
fe/pom.xml | 2 +-
.../data/external_table_p2/hudi/test_hudi_meta.out | Bin 4093 -> 2823 bytes
15 files changed, 41 insertions(+), 314 deletions(-)
diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
index 205e21155a4..e3871844be1 100644
--- a/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
+++ b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
@@ -32,7 +32,6 @@ under the License.
<properties>
<doris.home>${basedir}/../../</doris.home>
<fe_ut_parallel>1</fe_ut_parallel>
- <hudi.version>0.15.0</hudi.version>
<luben.zstd.jni.version>1.5.4-2</luben.zstd.jni.version>
<hive-apache.version>3.1.2-22</hive-apache.version>
</properties>
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java
index 5eef275fe47..b0b39f4b85e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java
@@ -45,7 +45,7 @@ public class HudiSchemaCacheValue extends HMSSchemaCacheValue
{
}
public InternalSchema getCommitInstantInternalSchema(HoodieTableMetaClient
metaClient, Long commitInstantTime) {
- return InternalSchemaCache.searchSchemaAndCache(commitInstantTime,
metaClient, true);
+ return InternalSchemaCache.searchSchemaAndCache(commitInstantTime,
metaClient);
}
public boolean isEnableSchemaEvolution() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
index de670cbf262..eaa4f35cbca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
@@ -50,7 +50,6 @@ import org.apache.avro.Schema.Field;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -73,7 +72,8 @@ public class HudiUtils {
/**
* Convert different query instant time format to the commit time format.
- * Currently we support three kinds of instant time format for time travel
query:
+ * Currently we support three kinds of instant time format for time travel
+ * query:
* 1、yyyy-MM-dd HH:mm:ss
* 2、yyyy-MM-dd
* This will convert to 'yyyyMMdd000000'.
@@ -88,11 +88,11 @@ public class HudiUtils {
return
HoodieInstantTimeGenerator.getInstantForDateString(queryInstant);
} else if (instantLength ==
HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
|| instantLength ==
HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for
yyyyMMddHHmmss[SSS]
- HoodieActiveTimeline.parseDateFromInstantTime(queryInstant); //
validate the format
+ HoodieInstantTimeGenerator.parseDateFromInstantTime(queryInstant);
// validate the format
return queryInstant;
} else if (instantLength == 10) { // for yyyy-MM-dd
LocalDate date = LocalDate.parse(queryInstant,
DEFAULT_DATE_FORMATTER);
- return
HoodieActiveTimeline.formatDate(java.sql.Date.valueOf(date));
+ return
HoodieInstantTimeGenerator.formatDate(java.sql.Date.valueOf(date));
} else {
throw new IllegalArgumentException("Unsupported query instant time
format: " + queryInstant
+ ", Supported time format are: 'yyyy-MM-dd
HH:mm:ss[.SSS]' "
@@ -308,7 +308,7 @@ public class HudiUtils {
if (!snapshotInstant.isPresent()) {
return 0L;
}
- return Long.parseLong(snapshotInstant.get().getTimestamp());
+ return Long.parseLong(snapshotInstant.get().requestedTime());
}
public static TablePartitionValues
getPartitionValues(Optional<TableSnapshot> tableSnapshot,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
index 7f8b0a216c3..3a72c1bacab 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
@@ -92,14 +92,14 @@ public class COWIncrementalRelation implements
IncrementalRelation {
}
String endInstantTime =
optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
hollowCommitHandling ==
HollowCommitHandling.USE_TRANSITION_TIME
- ?
commitTimeline.lastInstant().get().getStateTransitionTime()
- : commitTimeline.lastInstant().get().getTimestamp());
+ ?
commitTimeline.lastInstant().get().getCompletionTime()
+ : commitTimeline.lastInstant().get().requestedTime());
startInstantArchived =
commitTimeline.isBeforeTimelineStarts(startInstantTime);
endInstantArchived =
commitTimeline.isBeforeTimelineStarts(endInstantTime);
HoodieTimeline commitsTimelineToReturn;
if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) {
- commitsTimelineToReturn =
commitTimeline.findInstantsInRangeByStateTransitionTime(startInstantTime,
+ commitsTimelineToReturn =
commitTimeline.findInstantsInRangeByCompletionTime(startInstantTime,
endInstantTime);
} else {
commitsTimelineToReturn =
commitTimeline.findInstantsInRange(startInstantTime, endInstantTime);
@@ -107,28 +107,28 @@ public class COWIncrementalRelation implements
IncrementalRelation {
List<HoodieInstant> commitsToReturn =
commitsTimelineToReturn.getInstants();
// todo: support configuration hoodie.datasource.read.incr.filters
- StoragePath basePath = metaClient.getBasePathV2();
+ StoragePath basePath = metaClient.getBasePath();
Map<String, String> regularFileIdToFullPath = new HashMap<>();
Map<String, String> metaBootstrapFileIdToFullPath = new HashMap<>();
HoodieTimeline replacedTimeline =
commitsTimelineToReturn.getCompletedReplaceTimeline();
Map<String, String> replacedFile = new HashMap<>();
for (HoodieInstant instant : replacedTimeline.getInstants()) {
-
HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(),
-
HoodieReplaceCommitMetadata.class).getPartitionToReplaceFileIds().forEach(
+ HoodieReplaceCommitMetadata metadata =
metaClient.getActiveTimeline()
+ .readReplaceCommitMetadata(instant);
+ metadata.getPartitionToReplaceFileIds().forEach(
(key, value) -> value.forEach(
e -> replacedFile.put(e,
FSUtils.constructAbsolutePath(basePath, key).toString())));
}
fileToWriteStat = new HashMap<>();
for (HoodieInstant commit : commitsToReturn) {
- HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
- commitTimeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
+ HoodieCommitMetadata metadata =
metaClient.getActiveTimeline().readCommitMetadata(commit);
metadata.getPartitionToWriteStats().forEach((partition, stats) -> {
for (HoodieWriteStat stat : stats) {
fileToWriteStat.put(FSUtils.constructAbsolutePath(basePath,
stat.getPath()).toString(), stat);
}
});
- if
(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS.equals(commit.getTimestamp())) {
+ if
(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS.equals(commit.requestedTime())) {
metadata.getFileIdAndFullPaths(basePath).forEach((k, v) -> {
if (!(replacedFile.containsKey(k) &&
v.startsWith(replacedFile.get(k)))) {
metaBootstrapFileIdToFullPath.put(k, v);
@@ -167,8 +167,8 @@ public class COWIncrementalRelation implements
IncrementalRelation {
startTs = startInstantTime;
endTs = endInstantTime;
} else {
- startTs = commitsToReturn.get(0).getTimestamp();
- endTs = commitsToReturn.get(commitsToReturn.size() -
1).getTimestamp();
+ startTs = commitsToReturn.get(0).requestedTime();
+ endTs = commitsToReturn.get(commitsToReturn.size() -
1).requestedTime();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java
index 5c93f1650f9..b906c6a1dd2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java
@@ -25,6 +25,7 @@ import org.apache.doris.datasource.ExternalTable;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Maps;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -52,7 +53,7 @@ public class HudiCachedFsViewProcessor {
private HoodieTableFileSystemView createFsView(FsViewKey key) {
HoodieMetadataConfig metadataConfig =
HoodieMetadataConfig.newBuilder().build();
- HudiLocalEngineContext ctx = new
HudiLocalEngineContext(key.getClient().getStorageConf());
+ HoodieLocalEngineContext ctx = new
HoodieLocalEngineContext(key.getClient().getStorageConf());
return FileSystemViewManager.createInMemoryFileSystemView(ctx,
key.getClient(), metadataConfig);
}
@@ -117,12 +118,12 @@ public class HudiCachedFsViewProcessor {
}
FsViewKey fsViewKey = (FsViewKey) o;
return Objects.equals(dbName, fsViewKey.dbName) &&
Objects.equals(tbName, fsViewKey.tbName)
- && Objects.equals(client.getBasePathV2(),
fsViewKey.client.getBasePathV2());
+ && Objects.equals(client.getBasePath(),
fsViewKey.client.getBasePath());
}
@Override
public int hashCode() {
- return Objects.hash(dbName, tbName, client.getBasePathV2());
+ return Objects.hash(dbName, tbName, client.getBasePath());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
index 6356698c067..40db60eb78b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
@@ -99,7 +99,7 @@ public class HudiCachedPartitionProcessor extends
HudiPartitionProcessor {
if (!lastInstant.isPresent()) {
return partitionValues;
}
- long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp());
+ long lastTimestamp = Long.parseLong(lastInstant.get().requestedTime());
if (Long.parseLong(timestamp) == lastTimestamp) {
return getPartitionValues(table, tableMetaClient,
useHiveSyncPartition);
}
@@ -130,7 +130,7 @@ public class HudiCachedPartitionProcessor extends
HudiPartitionProcessor {
return partitionValues;
}
try {
- long lastTimestamp =
Long.parseLong(lastInstant.get().getTimestamp());
+ long lastTimestamp =
Long.parseLong(lastInstant.get().requestedTime());
partitionValues = partitionCache.get(
new TablePartitionKey(table.getDbName(), table.getName(),
table.getHudiPartitionColumnTypes(lastTimestamp)));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
deleted file mode 100644
index fecc026cf8d..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
+++ /dev/null
@@ -1,175 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.hudi.source;
-
-import org.apache.hudi.common.data.HoodieAccumulator;
-import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
-import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
-import org.apache.hudi.common.data.HoodieListData;
-import org.apache.hudi.common.engine.EngineProperty;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.engine.LocalTaskContextSupplier;
-import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.function.FunctionWrapper;
-import org.apache.hudi.common.function.SerializableBiFunction;
-import org.apache.hudi.common.function.SerializableConsumer;
-import org.apache.hudi.common.function.SerializableFunction;
-import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
-import org.apache.hudi.common.function.SerializablePairFunction;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.storage.StorageConfiguration;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * This file is copied from
- * org.apache.hudi.common.engine.HudiLocalEngineContext.
- * Because we need set ugi in thread pool
- * A java based engine context, use this implementation on the query engine
- * integrations if needed.
- */
-public final class HudiLocalEngineContext extends HoodieEngineContext {
-
- public HudiLocalEngineContext(StorageConfiguration<?> conf) {
- this(conf, new LocalTaskContextSupplier());
- }
-
- public HudiLocalEngineContext(StorageConfiguration<?> conf,
TaskContextSupplier taskContextSupplier) {
- super(conf, taskContextSupplier);
- }
-
- @Override
- public HoodieAccumulator newAccumulator() {
- return HoodieAtomicLongAccumulator.create();
- }
-
- @Override
- public <T> HoodieData<T> emptyHoodieData() {
- return HoodieListData.eager(Collections.emptyList());
- }
-
- @Override
- public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
- return HoodieListData.eager(data);
- }
-
- @Override
- public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func,
int parallelism) {
- return
data.stream().parallel().map(FunctionWrapper.throwingMapWrapper(func)).collect(Collectors.toList());
- }
-
- @Override
- public <I, K, V> List<V> mapToPairAndReduceByKey(
- List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc,
- int parallelism) {
- return
data.stream().parallel().map(FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc))
- .collect(Collectors.groupingBy(p ->
p.getKey())).values().stream()
- .map(list -> list.stream().map(e -> e.getValue())
-
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).get())
- .collect(Collectors.toList());
- }
-
- @Override
- public <I, K, V> Stream<ImmutablePair<K, V>>
mapPartitionsToPairAndReduceByKey(
- Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
flatMapToPairFunc,
- SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
- return
FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
-
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
- .map(entry -> new ImmutablePair<>(entry.getKey(),
entry.getValue().stream().map(
-
Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)))
- .filter(Objects::nonNull);
- }
-
- @Override
- public <I, K, V> List<V> reduceByKey(
- List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc,
int parallelism) {
- return data.stream().parallel()
- .collect(Collectors.groupingBy(p ->
p.getKey())).values().stream()
- .map(list -> list.stream().map(e -> e.getValue())
-
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc))
- .orElse(null))
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
- }
-
- @Override
- public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I,
Stream<O>> func, int parallelism) {
- return
data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func))
- .collect(Collectors.toList());
- }
-
- @Override
- public <I> void foreach(List<I> data, SerializableConsumer<I> consumer,
int parallelism) {
-
data.stream().forEach(FunctionWrapper.throwingForeachWrapper(consumer));
- }
-
- @Override
- public <I, K, V> Map<K, V> mapToPair(List<I> data,
SerializablePairFunction<I, K, V> func, Integer parallelism) {
- return
data.stream().map(FunctionWrapper.throwingMapToPairWrapper(func)).collect(
- Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal,
newVal) -> newVal));
- }
-
- @Override
- public void setProperty(EngineProperty key, String value) {
- // no operation for now
- }
-
- @Override
- public Option<String> getProperty(EngineProperty key) {
- return Option.empty();
- }
-
- @Override
- public void setJobStatus(String activeModule, String activityDescription) {
- // no operation for now
- }
-
- @Override
- public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
- // no operation for now
- }
-
- @Override
- public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
- return Collections.emptyList();
- }
-
- @Override
- public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
- return Collections.emptyList();
- }
-
- @Override
- public void cancelJob(String jobId) {
- // no operation for now
- }
-
- @Override
- public void cancelAllJobs() {
- // no operation for now
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
index b1e5bd4a82d..ae127f9e1c4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
@@ -21,6 +21,7 @@ import org.apache.doris.datasource.ExternalTable;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -51,9 +52,9 @@ public abstract class HudiPartitionProcessor {
.build();
HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
- new HudiLocalEngineContext(tableMetaClient.getStorageConf()),
tableMetaClient.getStorage(),
+ new
HoodieLocalEngineContext(tableMetaClient.getStorageConf()),
tableMetaClient.getStorage(),
metadataConfig,
- tableMetaClient.getBasePathV2().toString(), true);
+ tableMetaClient.getBasePath().toString(), true);
return newTableMetadata.getAllPartitionPaths();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 948e1f8c5e2..7748bd33ce8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -210,7 +210,7 @@ public class HudiScanNode extends HiveScanNode {
partitionInit = true;
return;
}
- queryInstant = snapshotInstant.get().getTimestamp();
+ queryInstant = snapshotInstant.get().requestedTime();
}
HudiSchemaCacheValue hudiSchemaCacheValue =
HudiUtils.getSchemaCacheValue(hmsTable, queryInstant);
@@ -322,7 +322,7 @@ public class HudiScanNode extends HiveScanNode {
this.selectedPartitionNum = prunedPartitions.size();
String inputFormat =
hmsTable.getRemoteTable().getSd().getInputFormat();
- String basePath = metaClient.getBasePathV2().toString();
+ String basePath = metaClient.getBasePath().toString();
List<HivePartition> hivePartitions = Lists.newArrayList();
prunedPartitions.forEach(
@@ -368,7 +368,7 @@ public class HudiScanNode extends HiveScanNode {
if (partition.isDummyPartition()) {
partitionName = "";
} else {
- partitionName =
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
+ partitionName =
FSUtils.getRelativePartitionPath(hudiClient.getBasePath(),
new StoragePath(partition.getPath()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
index 7df01359922..69ca39e9ad6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
@@ -87,8 +87,8 @@ public class MORIncrementalRelation implements
IncrementalRelation {
}
endTimestamp =
optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
hollowCommitHandling ==
HollowCommitHandling.USE_TRANSITION_TIME
- ? timeline.lastInstant().get().getStateTransitionTime()
- : timeline.lastInstant().get().getTimestamp());
+ ? timeline.lastInstant().get().getCompletionTime()
+ : timeline.lastInstant().get().requestedTime());
startInstantArchived = timeline.isBeforeTimelineStarts(startTimestamp);
endInstantArchived = timeline.isBeforeTimelineStarts(endTimestamp);
@@ -96,7 +96,7 @@ public class MORIncrementalRelation implements
IncrementalRelation {
includedCommits = getIncludedCommits();
commitsMetadata = getCommitsMetadata();
affectedFilesInCommits =
HoodieInputFormatUtils.listAffectedFilesForCommits(configuration,
- metaClient.getBasePathV2(), commitsMetadata);
+ metaClient.getBasePath(), commitsMetadata);
fullTableScan = shouldFullTableScan();
if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME
&& fullTableScan) {
throw new HoodieException("Cannot use stateTransitionTime while
enables full table scan");
@@ -108,10 +108,10 @@ public class MORIncrementalRelation implements
IncrementalRelation {
startTs = startTimestamp;
} else {
includeStartTime = true;
- startTs = includedCommits.isEmpty() ? startTimestamp :
includedCommits.get(0).getTimestamp();
+ startTs = includedCommits.isEmpty() ? startTimestamp :
includedCommits.get(0).requestedTime();
}
endTs = endInstantArchived || includedCommits.isEmpty() ? endTimestamp
- : includedCommits.get(includedCommits.size() -
1).getTimestamp();
+ : includedCommits.get(includedCommits.size() -
1).requestedTime();
}
@Override
@@ -128,7 +128,7 @@ public class MORIncrementalRelation implements
IncrementalRelation {
// If endTimestamp commit is not archived, will filter instants
// before endTimestamp.
if (hollowCommitHandling ==
HollowCommitHandling.USE_TRANSITION_TIME) {
- return
timeline.findInstantsInRangeByStateTransitionTime(startTimestamp,
endTimestamp).getInstants();
+ return
timeline.findInstantsInRangeByCompletionTime(startTimestamp,
endTimestamp).getInstants();
} else {
return timeline.findInstantsInRange(startTimestamp,
endTimestamp).getInstants();
}
@@ -153,7 +153,7 @@ public class MORIncrementalRelation implements
IncrementalRelation {
return true;
}
for (StoragePathInfo fileStatus : affectedFilesInCommits) {
- if
(!metaClient.getRawHoodieStorage().exists(fileStatus.getPath())) {
+ if (!metaClient.getStorage().exists(fileStatus.getPath())) {
return true;
}
}
@@ -190,13 +190,13 @@ public class MORIncrementalRelation implements
IncrementalRelation {
HoodieTimeline scanTimeline;
if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) {
scanTimeline = metaClient.getCommitsAndCompactionTimeline()
- .findInstantsInRangeByStateTransitionTime(startTimestamp,
endTimestamp);
+ .findInstantsInRangeByCompletionTime(startTimestamp,
endTimestamp);
} else {
scanTimeline = TimelineUtils.handleHollowCommitIfNeeded(
metaClient.getCommitsAndCompactionTimeline(),
metaClient, hollowCommitHandling)
.findInstantsInRange(startTimestamp, endTimestamp);
}
- String latestCommit = includedCommits.get(includedCommits.size() -
1).getTimestamp();
+ String latestCommit = includedCommits.get(includedCommits.size() -
1).requestedTime();
HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, scanTimeline,
affectedFilesInCommits);
Stream<FileSlice> fileSlices =
HoodieTableMetadataUtil.getWritePartitionPaths(commitsMetadata)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
index b1d6c82329a..70e1ec84928 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
@@ -55,7 +55,6 @@ public class HudiTableValuedFunction extends
MetadataTableValuedFunction {
private static final ImmutableList<Column> SCHEMA_TIMELINE =
ImmutableList.of(
new Column("timestamp", PrimitiveType.STRING, false),
new Column("action", PrimitiveType.STRING, false),
- new Column("file_name", PrimitiveType.STRING, false),
new Column("state", PrimitiveType.STRING, false),
new Column("state_transition_time", PrimitiveType.STRING, false));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index e29c6bf10ee..9430032b7bb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -370,11 +370,10 @@ public class MetadataGenerator {
hudiBasePathString, conf).getActiveTimeline();
for (HoodieInstant instant : timeline.getInstants()) {
TRow trow = new TRow();
- trow.addToColumnValue(new
TCell().setStringVal(instant.getTimestamp()));
+ trow.addToColumnValue(new
TCell().setStringVal(instant.requestedTime()));
trow.addToColumnValue(new
TCell().setStringVal(instant.getAction()));
- trow.addToColumnValue(new
TCell().setStringVal(instant.getFileName()));
trow.addToColumnValue(new
TCell().setStringVal(instant.getState().name()));
- trow.addToColumnValue(new
TCell().setStringVal(instant.getStateTransitionTime()));
+ trow.addToColumnValue(new
TCell().setStringVal(instant.getCompletionTime()));
dataBatch.add(trow);
}
break;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
index 409fc1daf72..070cd2f1859 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
@@ -26,8 +26,6 @@ import mockit.Mock;
import mockit.MockUp;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.junit.Assert;
import org.junit.Test;
@@ -197,99 +195,4 @@ public class HudiUtilsTest {
Assert.assertTrue(meta.delete());
Files.delete(hudiTable);
}
-
- @Test
- public void testFormatQueryInstantThreadSafety() throws Exception {
- // Mock HoodieActiveTimeline and HoodieInstantTimeGenerator methods
- new MockUp<HoodieInstantTimeGenerator>() {
- @Mock
- public String getInstantForDateString(String dateString) {
- return "mocked_" + dateString.replace(" ", "_").replace(":",
"_").replace(".", "_");
- }
- };
-
- new MockUp<HoodieActiveTimeline>() {
- @Mock
- public void parseDateFromInstantTime(String instantTime) {
- // Just a validation method, no return value needed
- }
-
- @Mock
- public String formatDate(java.util.Date date) {
- return "formatted_" + date.getTime();
- }
- };
-
- // Test different date formats
- String[] dateFormats = {
- "2023-01-15", // yyyy-MM-dd format
- "2023-01-15 14:30:25", // yyyy-MM-dd HH:mm:ss format
- "2023-01-15 14:30:25.123", // yyyy-MM-dd HH:mm:ss.SSS format
- "20230115143025", // yyyyMMddHHmmss format
- "20230115143025123" // yyyyMMddHHmmssSSS format
- };
-
- // Single thread test for basic functionality
- for (String dateFormat : dateFormats) {
- String result = HudiUtils.formatQueryInstant(dateFormat);
- Assert.assertNotNull(result);
-
- // Verify expected format based on input length
- if (dateFormat.length() == 10) { // yyyy-MM-dd
- Assert.assertTrue(result.startsWith("formatted_"));
- } else if (dateFormat.length() == 19 || dateFormat.length() == 23)
{ // yyyy-MM-dd HH:mm:ss[.SSS]
- Assert.assertTrue(result.startsWith("mocked_"));
- } else {
- // yyyyMMddHHmmss[SSS] passes through
- Assert.assertEquals(dateFormat, result);
- }
- }
-
- // Multi-thread test for thread safety
- int threadCount = 10;
- int iterationsPerThread = 100;
-
- Thread[] threads = new Thread[threadCount];
- Exception[] threadExceptions = new Exception[threadCount];
-
- // Create a map to store expected results for each date format
- final java.util.Map<String, String> expectedResults = new
java.util.HashMap<>();
- for (String dateFormat : dateFormats) {
- expectedResults.put(dateFormat,
HudiUtils.formatQueryInstant(dateFormat));
- }
-
- for (int i = 0; i < threadCount; i++) {
- final int threadId = i;
- threads[i] = new Thread(() -> {
- try {
- for (int j = 0; j < iterationsPerThread; j++) {
- // Each thread cycles through all date formats
- String dateFormat = dateFormats[j %
dateFormats.length];
- String result =
HudiUtils.formatQueryInstant(dateFormat);
-
- // Verify the result matches the expected value for
this date format
- String expected = expectedResults.get(dateFormat);
- Assert.assertEquals("Thread " + threadId + " iteration
" + j
- + " got incorrect result for format "
+ dateFormat,
- expected, result);
- }
- } catch (Exception e) {
- threadExceptions[threadId] = e;
- }
- });
- threads[i].start();
- }
-
- // Wait for all threads to complete
- for (Thread thread : threads) {
- thread.join(5000); // Timeout after 5 seconds to ensure test
doesn't run too long
- }
-
- // Check if any thread encountered exceptions
- for (int i = 0; i < threadCount; i++) {
- if (threadExceptions[i] != null) {
- throw new AssertionError("Thread " + i + " failed with
exception", threadExceptions[i]);
- }
- }
- }
}
diff --git a/fe/pom.xml b/fe/pom.xml
index a62110f6eff..238226f184d 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -226,7 +226,7 @@ under the License.
<avro.version>1.12.0</avro.version>
<parquet.version>1.15.2</parquet.version>
<spark.version>3.4.3</spark.version>
- <hudi.version>0.15.0</hudi.version>
+ <hudi.version>1.0.2</hudi.version>
<obs.dependency.scope>compile</obs.dependency.scope>
<cos.dependency.scope>compile</cos.dependency.scope>
<gcs.dependency.scope>compile</gcs.dependency.scope>
diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_meta.out
b/regression-test/data/external_table_p2/hudi/test_hudi_meta.out
index 0c312efd3c0..af38b0e31fd 100644
Binary files a/regression-test/data/external_table_p2/hudi/test_hudi_meta.out
and b/regression-test/data/external_table_p2/hudi/test_hudi_meta.out differ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]