Davis-Zhang-Onehouse commented on code in PR #13695:
URL: https://github.com/apache/hudi/pull/13695#discussion_r2288656363
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -94,4 +96,76 @@ public static <K, V> Map<K, Set<V>>
collectPairDataAsMap(HoodiePairData<K, V> pa
},
HashMap::putAll);
}
+
+ /**
+ * Executes a function with HoodieData and ensures cleanup after use
+ *
+ * @param hoodieData The HoodieData to use
+ * @param f Function that processes the HoodieData
+ * @param <T> Type parameter of HoodieData
+ * @param <R> Return type of the function
+ * @return Result of the function
+ */
+ public static <T, R> R withHoodieDataCleanUp(HoodieData<T> hoodieData,
Function<HoodieData<T>, R> f) {
+ try {
+ return f.apply(hoodieData);
+ } finally {
+ hoodieData.unpersistWithDependencies();
+ }
+ }
+
+ /**
+ * Executes a function with HoodiePairData and ensures cleanup after use
+ *
+ * @param hoodiePairData The HoodiePairData to use
+ * @param f Function that processes the HoodiePairData
+ * @param <K> Key type parameter of HoodiePairData
+ * @param <V> Value type parameter of HoodiePairData
+ * @param <R> Return type of the function
+ * @return Result of the function
+ */
+ public static <K, V, R> R withHoodieDataCleanUp(HoodiePairData<K, V>
hoodiePairData, Function<HoodiePairData<K, V>, R> f) {
+ try {
+ return f.apply(hoodiePairData);
+ } finally {
+ hoodiePairData.unpersistWithDependencies();
+ }
+ }
+
+ /**
+ * Executes a function with HoodieData and ensures cleanup only on exception
+ *
+ * @param hoodieData The HoodieData to use
+ * @param f Function that processes the HoodieData
+ * @param <T> Type parameter of HoodieData
+ * @param <R> Return type of the function
+ * @return Result of the function
+ */
+ public static <T, R> R withHoodieDataCleanUpOnException(HoodieData<T>
hoodieData, Function<HoodieData<T>, R> f) {
Review Comment:
```
myAppFunc() {
rdd4 = myFunc0() {
rdd1 = myFunc1() {
create rdd11
rdd11.persist
rdd11.collect()
return myFunc2(rdd11)
}
rdd3 = rdd1 do more DAG and some other code
return rdd3
}
rdd4.collect() <--- done using the DAG
}
```
here we have 3 types of funcs:
- myFunc1 - deeply nested and call persist and collect and return another rdd
- myFunc0 - some intermediate func that extend the DAG and does not finish
the code. I call this "pipeFunc"
- myAppFunc - application level code that defines "when the DAG is done
being used" and we can remove cache as code completion clean up.
when the "pipeFunc" does not wrap with cleanUpOnException and throws, what
happens here is:
myAppFunc at best catch the exception, yet in order to clean up the cached
rdd upstream, it needs the reference to either that rdd itself or any rdd that
use the cached rdd as the depdnency, which myAppFunc does not have either of
them.
But if the pipeFunc are all wrapped with clean up on exception, then
pipefunc will do the clean up if it throws ("the unhappy path" - who becomes
unhappy is responsible to clean up)
myAppFunc who defines when we are done using the DAG, must wrap with
cleanupfinally as only this func knows when we can "clean up everything" on
"the happy path"
So the protocol is summarized as:
- the pipeFunc must guard itself with clean up on exception to handle the
unhappy path
- myAppfunc must guard with clean up finally.
Over the entire code base that we work with hoodieData and hoodiePair data
we should follow this protocol unless the same requirement is satisfied
otherwise.
If this is still not sufficient, feel free to chat offline
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]