This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 94a5e72 [HUDI-1737][hudi-client] Code Cleanup: Extract common method
in HoodieCreateHandle & FlinkCreateHandle (#2745)
94a5e72 is described below
commit 94a5e72f16620da86bd2ca7ef27bb9abc266cc47
Author: Roc Marshal <[email protected]>
AuthorDate: Fri Apr 2 11:39:05 2021 +0800
[HUDI-1737][hudi-client] Code Cleanup: Extract common method in
HoodieCreateHandle & FlinkCreateHandle (#2745)
---
.../org/apache/hudi/io/HoodieCreateHandle.java | 56 ++++++++++++++--------
.../java/org/apache/hudi/io/FlinkCreateHandle.java | 36 +++++---------
2 files changed, 48 insertions(+), 44 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 357cf1b..6fa9b56 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -179,29 +179,47 @@ public class HoodieCreateHandle<T extends
HoodieRecordPayload, I, K, O> extends
fileWriter.close();
- HoodieWriteStat stat = new HoodieWriteStat();
- stat.setPartitionPath(writeStatus.getPartitionPath());
- stat.setNumWrites(recordsWritten);
- stat.setNumDeletes(recordsDeleted);
- stat.setNumInserts(insertRecordsWritten);
- stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
- stat.setFileId(writeStatus.getFileId());
- stat.setPath(new Path(config.getBasePath()), path);
- long fileSizeInBytes = FSUtils.getFileSize(fs, path);
- stat.setTotalWriteBytes(fileSizeInBytes);
- stat.setFileSizeInBytes(fileSizeInBytes);
- stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
- RuntimeStats runtimeStats = new RuntimeStats();
- runtimeStats.setTotalCreateTime(timer.endTimer());
- stat.setRuntimeStats(runtimeStats);
- writeStatus.setStat(stat);
-
- LOG.info(String.format("CreateHandle for partitionPath %s fileID %s,
took %d ms.", stat.getPartitionPath(),
- stat.getFileId(), runtimeStats.getTotalCreateTime()));
+ setupWriteStatus();
+
+ LOG.info(String.format("CreateHandle for partitionPath %s fileID %s,
took %d ms.",
+ writeStatus.getStat().getPartitionPath(),
writeStatus.getStat().getFileId(),
+ writeStatus.getStat().getRuntimeStats().getTotalCreateTime()));
return Collections.singletonList(writeStatus);
} catch (IOException e) {
throw new HoodieInsertException("Failed to close the Insert Handle for
path " + path, e);
}
}
+
+ /**
+ * Set up the write status.
+ *
+ * @throws IOException if error occurs
+ */
+ protected void setupWriteStatus() throws IOException {
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setPartitionPath(writeStatus.getPartitionPath());
+ stat.setNumWrites(recordsWritten);
+ stat.setNumDeletes(recordsDeleted);
+ stat.setNumInserts(insertRecordsWritten);
+ stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
+ stat.setFileId(writeStatus.getFileId());
+ stat.setPath(new Path(config.getBasePath()), path);
+ stat.setTotalWriteBytes(computeTotalWriteBytes());
+ stat.setFileSizeInBytes(computeFileSizeInBytes());
+ stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
+ RuntimeStats runtimeStats = new RuntimeStats();
+ runtimeStats.setTotalCreateTime(timer.endTimer());
+ stat.setRuntimeStats(runtimeStats);
+ writeStatus.setStat(stat);
+ }
+
+ protected long computeTotalWriteBytes() throws IOException {
+ return FSUtils.getFileSize(fs, path);
+ }
+
+ protected long computeFileSizeInBytes() throws IOException {
+ return FSUtils.getFileSize(fs, path);
+ }
+
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index 6f4638e..2abefa9 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -22,7 +22,6 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -30,7 +29,6 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -89,7 +87,7 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload,
I, K, O>
*/
private WriteStatus getIncrementalWriteStatus() {
try {
- setUpWriteStatus();
+ setupWriteStatus();
// reset the write status
recordsWritten = 0;
recordsDeleted = 0;
@@ -102,32 +100,20 @@ public class FlinkCreateHandle<T extends
HoodieRecordPayload, I, K, O>
}
}
- /**
- * Set up the write status.
- *
- * @throws IOException if error occurs
- */
- private void setUpWriteStatus() throws IOException {
- long fileSizeInBytes = fileWriter.getBytesWritten();
+ @Override
+ protected long computeTotalWriteBytes() throws IOException {
+ long fileSizeInBytes = computeFileSizeInBytes();
long incFileSizeInBytes = fileSizeInBytes - lastFileSize;
this.lastFileSize = fileSizeInBytes;
- HoodieWriteStat stat = new HoodieWriteStat();
- stat.setPartitionPath(writeStatus.getPartitionPath());
- stat.setNumWrites(recordsWritten);
- stat.setNumDeletes(recordsDeleted);
- stat.setNumInserts(insertRecordsWritten);
- stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
- stat.setFileId(writeStatus.getFileId());
- stat.setPath(new Path(config.getBasePath()), path);
- stat.setTotalWriteBytes(incFileSizeInBytes);
- stat.setFileSizeInBytes(fileSizeInBytes);
- stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
- HoodieWriteStat.RuntimeStats runtimeStats = new
HoodieWriteStat.RuntimeStats();
- runtimeStats.setTotalCreateTime(timer.endTimer());
- stat.setRuntimeStats(runtimeStats);
- writeStatus.setStat(stat);
+ return incFileSizeInBytes;
}
+ @Override
+ protected long computeFileSizeInBytes() throws IOException {
+ return fileWriter.getBytesWritten();
+ }
+
+ @Override
public void finishWrite() {
LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done
with all the records " + recordsWritten);
try {