This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 94a5e72  [HUDI-1737][hudi-client] Code Cleanup: Extract common method 
in HoodieCreateHandle & FlinkCreateHandle (#2745)
94a5e72 is described below

commit 94a5e72f16620da86bd2ca7ef27bb9abc266cc47
Author: Roc Marshal <[email protected]>
AuthorDate: Fri Apr 2 11:39:05 2021 +0800

    [HUDI-1737][hudi-client] Code Cleanup: Extract common method in 
HoodieCreateHandle & FlinkCreateHandle (#2745)
---
 .../org/apache/hudi/io/HoodieCreateHandle.java     | 56 ++++++++++++++--------
 .../java/org/apache/hudi/io/FlinkCreateHandle.java | 36 +++++---------
 2 files changed, 48 insertions(+), 44 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 357cf1b..6fa9b56 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -179,29 +179,47 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload, I, K, O> extends
 
       fileWriter.close();
 
-      HoodieWriteStat stat = new HoodieWriteStat();
-      stat.setPartitionPath(writeStatus.getPartitionPath());
-      stat.setNumWrites(recordsWritten);
-      stat.setNumDeletes(recordsDeleted);
-      stat.setNumInserts(insertRecordsWritten);
-      stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
-      stat.setFileId(writeStatus.getFileId());
-      stat.setPath(new Path(config.getBasePath()), path);
-      long fileSizeInBytes = FSUtils.getFileSize(fs, path);
-      stat.setTotalWriteBytes(fileSizeInBytes);
-      stat.setFileSizeInBytes(fileSizeInBytes);
-      stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
-      RuntimeStats runtimeStats = new RuntimeStats();
-      runtimeStats.setTotalCreateTime(timer.endTimer());
-      stat.setRuntimeStats(runtimeStats);
-      writeStatus.setStat(stat);
-
-      LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, 
took %d ms.", stat.getPartitionPath(),
-          stat.getFileId(), runtimeStats.getTotalCreateTime()));
+      setupWriteStatus();
+
+      LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, 
took %d ms.",
+          writeStatus.getStat().getPartitionPath(), 
writeStatus.getStat().getFileId(),
+          writeStatus.getStat().getRuntimeStats().getTotalCreateTime()));
 
       return Collections.singletonList(writeStatus);
     } catch (IOException e) {
       throw new HoodieInsertException("Failed to close the Insert Handle for 
path " + path, e);
     }
   }
+
+  /**
+   * Set up the write status.
+   *
+   * @throws IOException if error occurs
+   */
+  protected void setupWriteStatus() throws IOException {
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setPartitionPath(writeStatus.getPartitionPath());
+    stat.setNumWrites(recordsWritten);
+    stat.setNumDeletes(recordsDeleted);
+    stat.setNumInserts(insertRecordsWritten);
+    stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
+    stat.setFileId(writeStatus.getFileId());
+    stat.setPath(new Path(config.getBasePath()), path);
+    stat.setTotalWriteBytes(computeTotalWriteBytes());
+    stat.setFileSizeInBytes(computeFileSizeInBytes());
+    stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
+    RuntimeStats runtimeStats = new RuntimeStats();
+    runtimeStats.setTotalCreateTime(timer.endTimer());
+    stat.setRuntimeStats(runtimeStats);
+    writeStatus.setStat(stat);
+  }
+
+  protected long computeTotalWriteBytes() throws IOException {
+    return FSUtils.getFileSize(fs, path);
+  }
+
+  protected long computeFileSizeInBytes() throws IOException {
+    return FSUtils.getFileSize(fs, path);
+  }
+
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index 6f4638e..2abefa9 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -22,7 +22,6 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -30,7 +29,6 @@ import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -89,7 +87,7 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, 
I, K, O>
    */
   private WriteStatus getIncrementalWriteStatus() {
     try {
-      setUpWriteStatus();
+      setupWriteStatus();
       // reset the write status
       recordsWritten = 0;
       recordsDeleted = 0;
@@ -102,32 +100,20 @@ public class FlinkCreateHandle<T extends 
HoodieRecordPayload, I, K, O>
     }
   }
 
-  /**
-   * Set up the write status.
-   *
-   * @throws IOException if error occurs
-   */
-  private void setUpWriteStatus() throws IOException {
-    long fileSizeInBytes = fileWriter.getBytesWritten();
+  @Override
+  protected long computeTotalWriteBytes() throws IOException {
+    long fileSizeInBytes = computeFileSizeInBytes();
     long incFileSizeInBytes = fileSizeInBytes - lastFileSize;
     this.lastFileSize = fileSizeInBytes;
-    HoodieWriteStat stat = new HoodieWriteStat();
-    stat.setPartitionPath(writeStatus.getPartitionPath());
-    stat.setNumWrites(recordsWritten);
-    stat.setNumDeletes(recordsDeleted);
-    stat.setNumInserts(insertRecordsWritten);
-    stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
-    stat.setFileId(writeStatus.getFileId());
-    stat.setPath(new Path(config.getBasePath()), path);
-    stat.setTotalWriteBytes(incFileSizeInBytes);
-    stat.setFileSizeInBytes(fileSizeInBytes);
-    stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
-    HoodieWriteStat.RuntimeStats runtimeStats = new 
HoodieWriteStat.RuntimeStats();
-    runtimeStats.setTotalCreateTime(timer.endTimer());
-    stat.setRuntimeStats(runtimeStats);
-    writeStatus.setStat(stat);
+    return incFileSizeInBytes;
   }
 
+  @Override
+  protected long computeFileSizeInBytes() throws IOException {
+    return fileWriter.getBytesWritten();
+  }
+
+  @Override
   public void finishWrite() {
     LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done 
with all the records " + recordsWritten);
     try {

Reply via email to