This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e206ddd [MINOR] Private the NoArgsConstructor of SparkMergeHelper and
code clean (#2194)
e206ddd is described below
commit e206ddd431131da26cf1bb00f70fe64ad0450059
Author: wangxianghu <[email protected]>
AuthorDate: Mon Oct 26 12:22:11 2020 +0800
[MINOR] Private the NoArgsConstructor of SparkMergeHelper and code clean
(#2194)
---
.../hudi/table/action/commit/BaseSparkCommitActionExecutor.java | 9 +--------
.../action/commit/SparkInsertOverwriteCommitActionExecutor.java | 7 ++-----
.../org/apache/hudi/table/action/commit/SparkMergeHelper.java | 3 +++
3 files changed, 6 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 36cca8c..ad62db9 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -199,6 +199,7 @@ public abstract class BaseSparkCommitActionExecutor<T
extends HoodieRecordPayloa
commitOnAutoCommit(result);
}
+ @Override
protected String getCommitActionType() {
return table.getMetaClient().getCommitActionType();
}
@@ -276,14 +277,6 @@ public abstract class BaseSparkCommitActionExecutor<T
extends HoodieRecordPayloa
return handleUpdateInternal(upsertHandle, fileId);
}
- public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String
fileId,
- Map<String, HoodieRecord<T>>
keyToNewRecords,
- HoodieBaseFile oldDataFile)
throws IOException {
- // these are updates
- HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId,
keyToNewRecords, oldDataFile);
- return handleUpdateInternal(upsertHandle, fileId);
- }
-
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle
upsertHandle, String fileId)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index 627e75e..2771a22 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -28,8 +28,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
@@ -41,8 +39,6 @@ import java.util.stream.Collectors;
public class SparkInsertOverwriteCommitActionExecutor<T extends
HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
- private static final Logger LOG =
LogManager.getLogger(SparkInsertOverwriteCommitActionExecutor.class);
-
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
@@ -53,7 +49,7 @@ public class SparkInsertOverwriteCommitActionExecutor<T
extends HoodieRecordPayl
}
@Override
- public HoodieWriteMetadata execute() {
+ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD,
context, table,
config.shouldCombineBeforeInsert(),
config.getInsertShuffleParallelism(), this, false);
}
@@ -68,6 +64,7 @@ public class SparkInsertOverwriteCommitActionExecutor<T
extends HoodieRecordPayl
return HoodieTimeline.REPLACE_COMMIT_ACTION;
}
+ @Override
protected Map<String, List<String>>
getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
return writeStatuses.map(status ->
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
new Tuple2<>(partitionPath,
getAllExistingFileIds(partitionPath))).collectAsMap();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java
index 2d130e3..08d60b9 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java
@@ -46,6 +46,9 @@ import java.util.Iterator;
public class SparkMergeHelper<T extends HoodieRecordPayload> extends
AbstractMergeHelper<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+ private SparkMergeHelper() {
+ }
+
private static class MergeHelperHolder {
private static final SparkMergeHelper SPARK_MERGE_HELPER = new
SparkMergeHelper();
}