mosche commented on code in PR #25187:
URL: https://github.com/apache/beam/pull/25187#discussion_r1093351777
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -247,20 +292,37 @@ public <T> Dataset<WindowedValue<T>>
getDataset(PCollection<T> pCollection) {
public <T> void putDataset(
PCollection<T> pCollection, Dataset<WindowedValue<T>> dataset, boolean
cache) {
TranslationResult<T> result = getResult(pCollection);
- if (cache && result.dependentTransforms.size() > 1) {
- LOG.info("Dataset {} will be cached.", result.name);
- result.dataset = dataset.persist(storageLevel); // use NONE to disable
- } else {
- result.dataset = dataset;
- if (result.dependentTransforms.isEmpty()) {
- leaves.add(result);
+ result.dataset = dataset;
+
+ if (!cache && isMemoryOnly) {
+ result.resetPlanComplexity(); // cached as RDD in memory which breaks
linage
+ } else if (cache && result.usages() > 1) {
+ if (isMemoryOnly) {
+ // Cache as RDD in-memory only, this helps to also break linage of
complex query plans.
+ LOG.info("Dataset {} will be cached in-memory as RDD for reuse.",
result.name);
+ result.dataset = sparkSession.createDataset(dataset.rdd().persist(),
dataset.encoder());
+ result.resetPlanComplexity();
+ } else {
+ LOG.info("Dataset {} will be cached for reuse.", result.name);
+ dataset.persist(storageLevel); // use NONE to disable
}
}
+
+ if (result.estimatePlanComplexity() > PLAN_COMPLEXITY_THRESHOLD) {
+ // Break linage of dataset to limit planning overhead for complex
query plans.
+ LOG.debug("Breaking linage of dataset {} to limit complexity of query
plan.", result.name);
Review Comment:
Compared to other log statements here this is really just worth debug level
... but i'll change
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]