Davis-Zhang-Onehouse commented on code in PR #13695:
URL: https://github.com/apache/hudi/pull/13695#discussion_r2288684699


##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -94,4 +96,76 @@ public static <K, V> Map<K, Set<V>> 
collectPairDataAsMap(HoodiePairData<K, V> pa
             },
             HashMap::putAll);
   }
+
+  /**
+   * Executes a function with HoodieData and ensures cleanup after use
+   *
+   * @param hoodieData The HoodieData to use
+   * @param f          Function that processes the HoodieData
+   * @param <T>        Type parameter of HoodieData
+   * @param <R>        Return type of the function
+   * @return Result of the function
+   */
+  public static <T, R> R withHoodieDataCleanUp(HoodieData<T> hoodieData, 
Function<HoodieData<T>, R> f) {
+    try {
+      return f.apply(hoodieData);
+    } finally {
+      hoodieData.unpersistWithDependencies();
+    }
+  }
+
+  /**
+   * Executes a function with HoodiePairData and ensures cleanup after use
+   *
+   * @param hoodiePairData The HoodiePairData to use
+   * @param f              Function that processes the HoodiePairData
+   * @param <K>            Key type parameter of HoodiePairData
+   * @param <V>            Value type parameter of HoodiePairData
+   * @param <R>            Return type of the function
+   * @return Result of the function
+   */
+  public static <K, V, R> R withHoodieDataCleanUp(HoodiePairData<K, V> 
hoodiePairData, Function<HoodiePairData<K, V>, R> f) {
+    try {
+      return f.apply(hoodiePairData);
+    } finally {
+      hoodiePairData.unpersistWithDependencies();
+    }
+  }
+
+  /**
+   * Executes a function with HoodieData and ensures cleanup only on exception
+   *
+   * @param hoodieData The HoodieData to use
+   * @param f          Function that processes the HoodieData
+   * @param <T>        Type parameter of HoodieData
+   * @param <R>        Return type of the function
+   * @return Result of the function
+   */
+  public static <T, R> R withHoodieDataCleanUpOnException(HoodieData<T> 
hoodieData, Function<HoodieData<T>, R> f) {

Review Comment:
   > How do we handle unchecked exceptions?
   
   as you can see it catches "Exception" class, do clean up and throw again as 
the code is not responsible for handling the exception that it has no idea 
about.
   
   > Does every usage now require this helper?
   
   yes, at the time I introduced the code, I make sure all usage that have rdd 
cache follows this pattern. Verified with functional tests as long as it is 
index related (col stats, files, RLI, SI, expression idx, MDT validation test, 
etc)
   
   In the PR I linked for you to check the description, I could not enforce 
test failure if we found cached rdd in the end, it is because the storage level 
member is not volatile so multi-threading scenario it has visibility issue from 
other threads. Also spark only provides listener for rdd unpersist, but not for 
persist. without changing the spark code we cannot reliably enforce the check 
in hudi functional test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to