zhangyue19921010 commented on a change in pull request #4753:
URL: https://github.com/apache/hudi/pull/4753#discussion_r803474469



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java
##########
@@ -43,19 +48,25 @@ public HoodieSparkCompactor(BaseHoodieWriteClient<T, 
JavaRDD<HoodieRecord<T>>, J
   }
 
   @Override
-  public void compact(HoodieInstant instant) throws IOException {
+  public void compact(HoodieInstant instant) {
     LOG.info("Compactor executing compaction " + instant);
     SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>) 
compactionClient;
-    JavaRDD<WriteStatus> res = writeClient.compact(instant.getTimestamp());
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = 
writeClient.compact(instant.getTimestamp());
+    List<HoodieWriteStat> writeStats = 
compactionMetadata.getCommitMetadata().get()
+        .getPartitionToWriteStats()
+        .values()
+        .stream()
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
     this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
compaction write status");

Review comment:
       Changed. Thanks a lot for your review.

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
##########
@@ -293,4 +297,17 @@ private String getSchemaFromLatestInstant() throws 
Exception {
     Schema schema = schemaUtil.getTableAvroSchema(false);
     return schema.toString();
   }
+
+  private int handleErrors(HoodieCommitMetadata metadata, String instantTime) {
+    List<HoodieWriteStat> writeStats = 
metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
+        e.getValue().stream()).collect(Collectors.toList());
+    long errorsCount = 
writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
+    if (errorsCount == 0) {
+      LOG.info(String.format("Table imported into hoodie with %s instant 
time.", instantTime));

Review comment:
       Changed

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
##########
@@ -293,4 +297,17 @@ private String getSchemaFromLatestInstant() throws 
Exception {
     Schema schema = schemaUtil.getTableAvroSchema(false);
     return schema.toString();
   }
+
+  private int handleErrors(HoodieCommitMetadata metadata, String instantTime) {
+    List<HoodieWriteStat> writeStats = 
metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
+        e.getValue().stream()).collect(Collectors.toList());
+    long errorsCount = 
writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
+    if (errorsCount == 0) {
+      LOG.info(String.format("Table imported into hoodie with %s instant 
time.", instantTime));
+      return 0;
+    }
+
+    LOG.error(String.format("Import failed with %d errors.", errorsCount));

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to