This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b3292ebf4a [HUDI-5154] Improve hudi-spark-client Lambada writing
(#7127)
b3292ebf4a is described below
commit b3292ebf4a8e6e7168766a3f6f3dd8b76edf8c5e
Author: slfan1989 <[email protected]>
AuthorDate: Wed Nov 9 08:51:48 2022 +0800
[HUDI-5154] Improve hudi-spark-client Lambada writing (#7127)
Co-authored-by: slfan1989 <louj1988@@>
---
.../java/org/apache/hudi/client/SparkRDDWriteClient.java | 4 ++--
.../run/strategy/SingleSparkJobExecutionStrategy.java | 8 ++++----
.../clustering/update/strategy/SparkAllowUpdateStrategy.java | 2 +-
.../org/apache/hudi/client/utils/SparkValidatorUtils.java | 12 +++++-------
.../execution/bulkinsert/RDDConsistentBucketPartitioner.java | 4 +---
.../org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java | 4 ++--
.../java/org/apache/hudi/metrics/DistributedRegistry.java | 10 +++++-----
7 files changed, 20 insertions(+), 24 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index ef37dd1835..c200abee5e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -434,9 +434,9 @@ public class SparkRDDWriteClient<T extends
HoodieRecordPayload> extends
List<HoodieWriteStat> writeStats =
metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());
- if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0)
{
+ if
(writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0)
{
throw new HoodieClusteringException("Clustering failed to write to
files:"
- + writeStats.stream().filter(s -> s.getTotalWriteErrors() >
0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
+ + writeStats.stream().filter(s -> s.getTotalWriteErrors() >
0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
}
final HoodieInstant clusteringInstant =
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 601d2ec8a7..46d2466c5c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -53,6 +53,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import java.io.IOException;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -77,8 +78,7 @@ public abstract class SingleSparkJobExecutionStrategy<T
extends HoodieRecordPayl
JavaSparkContext engineContext =
HoodieSparkEngineContext.getSparkContext(getEngineContext());
final TaskContextSupplier taskContextSupplier =
getEngineContext().getTaskContextSupplier();
final SerializableSchema serializableSchema = new
SerializableSchema(schema);
- final List<ClusteringGroupInfo> clusteringGroupInfos =
clusteringPlan.getInputGroups().stream().map(clusteringGroup ->
-
ClusteringGroupInfo.create(clusteringGroup)).collect(Collectors.toList());
+ final List<ClusteringGroupInfo> clusteringGroupInfos =
clusteringPlan.getInputGroups().stream().map(ClusteringGroupInfo::create).collect(Collectors.toList());
String umask =
engineContext.hadoopConfiguration().get("fs.permissions.umask-mode");
Broadcast<String> umaskBroadcastValue = engineContext.broadcast(umask);
@@ -121,7 +121,7 @@ public abstract class SingleSparkJobExecutionStrategy<T
extends HoodieRecordPayl
Iterable<List<WriteStatus>> writeStatusIterable = () -> writeStatuses;
return StreamSupport.stream(writeStatusIterable.spliterator(), false)
- .flatMap(writeStatusList -> writeStatusList.stream());
+ .flatMap(Collection::stream);
}
@@ -152,7 +152,7 @@ public abstract class SingleSparkJobExecutionStrategy<T
extends HoodieRecordPayl
}
};
- return StreamSupport.stream(indexedRecords.spliterator(),
false).map(record -> transform(record)).iterator();
+ return StreamSupport.stream(indexedRecords.spliterator(),
false).map(this::transform).iterator();
}).collect(Collectors.toList());
return new ConcatenatingIterator<>(iteratorsForPartition);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
index acb6d82ae1..6d819df3c2 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
@@ -44,7 +44,7 @@ public class SparkAllowUpdateStrategy<T extends
HoodieRecordPayload<T>> extends
public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>>
handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate =
getGroupIdsWithUpdate(taggedRecordsRDD);
Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering =
fileGroupIdsWithRecordUpdate.stream()
- .filter(f -> fileGroupsInPendingClustering.contains(f))
+ .filter(fileGroupsInPendingClustering::contains)
.collect(Collectors.toSet());
return Pair.of(taggedRecordsRDD,
fileGroupIdsWithUpdatesAndPendingClustering);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index a6d03eae2b..4c4200d9ba 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -24,6 +24,7 @@ import
org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -71,8 +72,7 @@ public class SparkValidatorUtils {
if (!writeMetadata.getWriteStats().isPresent()) {
writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
}
- Set<String> partitionsModified =
writeMetadata.getWriteStats().get().stream().map(writeStats ->
- writeStats.getPartitionPath()).collect(Collectors.toSet());
+ Set<String> partitionsModified =
writeMetadata.getWriteStats().get().stream().map(HoodieWriteStat::getPartitionPath).collect(Collectors.toSet());
SQLContext sqlContext = new
SQLContext(HoodieSparkEngineContext.getSparkContext(context));
// Refresh timeline to ensure validator sees the any other operations
done on timeline (async operations such as other clustering/compaction/rollback)
table.getMetaClient().reloadActiveTimeline();
@@ -80,11 +80,9 @@ public class SparkValidatorUtils {
Dataset<Row> afterState = getRecordsFromPendingCommits(sqlContext,
partitionsModified, writeMetadata, table, instantTime).cache();
Stream<SparkPreCommitValidator> validators =
Arrays.stream(config.getPreCommitValidators().split(","))
- .map(validatorClass -> {
- return ((SparkPreCommitValidator)
ReflectionUtils.loadClass(validatorClass,
- new Class<?>[] {HoodieSparkTable.class,
HoodieEngineContext.class, HoodieWriteConfig.class},
- table, context, config));
- });
+ .map(validatorClass -> ((SparkPreCommitValidator)
ReflectionUtils.loadClass(validatorClass,
+ new Class<?>[] {HoodieSparkTable.class,
HoodieEngineContext.class, HoodieWriteConfig.class},
+ table, context, config)));
boolean allSuccess = validators.map(v -> runValidatorAsync(v,
writeMetadata, beforeState, afterState,
instantTime)).map(CompletableFuture::join)
.reduce(true, Boolean::logicalAnd);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
index e23723ac72..7b644938bb 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
@@ -266,9 +266,7 @@ public class RDDConsistentBucketPartitioner<T extends
HoodieRecordPayload> exten
LOG.warn("Consistent bucket does not support global sort mode, the sort
will only be done within each data partition");
}
- Comparator<HoodieKey> comparator = (Comparator<HoodieKey> & Serializable)
(t1, t2) -> {
- return t1.getRecordKey().compareTo(t2.getRecordKey());
- };
+ Comparator<HoodieKey> comparator = (Comparator<HoodieKey> & Serializable)
(t1, t2) -> t1.getRecordKey().compareTo(t2.getRecordKey());
return records.mapToPair(record -> new Tuple2<>(record.getKey(), record))
.repartitionAndSortWithinPartitions(partitioner, comparator)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
index f99bf876c9..16f47b8f8c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
@@ -427,7 +427,7 @@ public class SparkHoodieHBaseIndex extends
HoodieIndex<Object, Object> {
// Map each fileId that has inserts to a unique partition Id. This will be
used while
// repartitioning RDD<WriteStatus>
final List<String> fileIds = writeStatusRDD.filter(w ->
w.getStat().getNumInserts() > 0)
- .map(w -> w.getFileId()).collect();
+ .map(WriteStatus::getFileId).collect();
for (final String fileId : fileIds) {
fileIdPartitionMap.put(fileId, partitionIndex++);
}
@@ -445,7 +445,7 @@ public class SparkHoodieHBaseIndex extends
HoodieIndex<Object, Object> {
writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
.partitionBy(new WriteStatusPartitioner(fileIdPartitionMap,
this.numWriteStatusWithInserts))
- .map(w -> w._2());
+ .map(Tuple2::_2);
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
JavaRDD<WriteStatus> writeStatusJavaRDD =
partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(),
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
index 60c32b34da..ca01def803 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
@@ -52,12 +52,12 @@ public class DistributedRegistry extends
AccumulatorV2<Map<String, Long>, Map<St
@Override
public void increment(String name) {
- counters.merge(name, 1L, (oldValue, newValue) -> oldValue + newValue);
+ counters.merge(name, 1L, Long::sum);
}
@Override
public void add(String name, long value) {
- counters.merge(name, value, (oldValue, newValue) -> oldValue + newValue);
+ counters.merge(name, value, Long::sum);
}
@Override
@@ -80,13 +80,13 @@ public class DistributedRegistry extends
AccumulatorV2<Map<String, Long>, Map<St
@Override
public void add(Map<String, Long> arg) {
- arg.forEach((key, value) -> add(key, value));
+ arg.forEach(this::add);
}
@Override
public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() {
DistributedRegistry registry = new DistributedRegistry(name);
- counters.forEach((key, value) -> registry.add(key, value));
+ counters.forEach(registry::add);
return registry;
}
@@ -97,7 +97,7 @@ public class DistributedRegistry extends
AccumulatorV2<Map<String, Long>, Map<St
@Override
public void merge(AccumulatorV2<Map<String, Long>, Map<String, Long>> acc) {
- acc.value().forEach((key, value) -> add(key, value));
+ acc.value().forEach(this::add);
}
@Override