[
https://issues.apache.org/jira/browse/BEAM-6053?focusedWorklogId=168796&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168796
]
ASF GitHub Bot logged work on BEAM-6053:
----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Nov/18 13:34
Start Date: 22/Nov/18 13:34
Worklog Time Spent: 10m
Work Description: jbonofre closed pull request #7018: [BEAM-6053] added
sparkOptions cacheDisabled
URL: https://github.com/apache/beam/pull/7018
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index f213187cac2..6935b5465bd 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -136,4 +136,12 @@ public String create(PipelineOptions options) {
List<String> getFilesToStage();
void setFilesToStage(List<String> value);
+
+ @Description(
+ "Disable caching of reused PCollections for whole Pipeline."
+ + " It's useful when it's faster to recompute RDD rather than save.
")
+ @Default.Boolean(false)
+ boolean isCacheDisabled();
+
+ void setCacheDisabled(boolean value);
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index d4f472e5f5b..d361bc60a55 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -139,31 +139,31 @@ public void setCurrentTransform(AppliedPTransform<?, ?,
?> transform) {
.collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection)
e.getValue()).getCoder()));
}
- private boolean shouldCache(PValue pvalue) {
- if ((pvalue instanceof PCollection)
- && cacheCandidates.containsKey(pvalue)
- && cacheCandidates.get(pvalue) > 1) {
- return true;
+ /**
+ * Cache PCollection if {@link #isCacheDisabled()} flag is false and
PCollection is used more then
+ * once in Pipeline.
+ *
+ * @param pvalue
+ * @return if PCollection will be cached
+ */
+ public boolean shouldCache(PValue pvalue) {
+ if (isCacheDisabled()) {
+ return false;
}
- return false;
- }
-
- public void putDataset(
- PTransform<?, ? extends PValue> transform, Dataset dataset, boolean
forceCache) {
- putDataset(getOutput(transform), dataset, forceCache);
+ return pvalue instanceof PCollection &&
cacheCandidates.getOrDefault(pvalue, 0L) > 1;
}
public void putDataset(PTransform<?, ? extends PValue> transform, Dataset
dataset) {
- putDataset(transform, dataset, false);
+ putDataset(getOutput(transform), dataset);
}
- public void putDataset(PValue pvalue, Dataset dataset, boolean forceCache) {
+ public void putDataset(PValue pvalue, Dataset dataset) {
try {
dataset.setName(pvalue.getName());
} catch (IllegalStateException e) {
// name not set, ignore
}
- if ((forceCache || shouldCache(pvalue)) && pvalue instanceof PCollection) {
+ if (shouldCache(pvalue)) {
// we cache only PCollection
Coder<?> coder = ((PCollection<?>) pvalue).getCoder();
Coder<? extends BoundedWindow> wCoder =
@@ -258,4 +258,8 @@ public void putPView(
public String storageLevel() {
return
serializableOptions.get().as(SparkPipelineOptions.class).getStorageLevel();
}
+
+ public boolean isCacheDisabled() {
+ return
serializableOptions.get().as(SparkPipelineOptions.class).isCacheDisabled();
+ }
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 332bc2dd3f9..c1766b5cdeb 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -409,7 +409,7 @@ public void evaluate(
// Object is the best we can do since different outputs can have
different tags
JavaRDD<WindowedValue<Object>> values =
(JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values();
- context.putDataset(output.getValue(), new BoundedDataset<>(values),
false);
+ context.putDataset(output.getValue(), new BoundedDataset<>(values));
}
}
@@ -458,8 +458,8 @@ public void evaluate(Read.Bounded<T> transform,
EvaluationContext context) {
new SourceRDD.Bounded<>(
jsc.sc(), transform.getSource(),
context.getSerializableOptions(), stepName)
.toJavaRDD();
- // cache to avoid re-evaluation of the source by Spark's lazy DAG
evaluation.
- context.putDataset(transform, new BoundedDataset<>(input), true);
+
+ context.putDataset(transform, new BoundedDataset<>(input));
}
@Override
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index f20eda5f5b6..a89d8bc9fe4 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -447,8 +447,7 @@ public void evaluate(
(JavaDStream<?>) TranslationUtils.dStreamValues(filtered);
context.putDataset(
output.getValue(),
- new UnboundedDataset<>(values,
unboundedDataset.getStreamSources()),
- false);
+ new UnboundedDataset<>(values,
unboundedDataset.getStreamSources()));
}
}
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
index 962d1ec7bb5..f2a5f1e8979 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
@@ -18,11 +18,15 @@
package org.apache.beam.runners.spark;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.apache.beam.runners.spark.translation.Dataset;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
@@ -30,12 +34,13 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Test;
-/**
- * This test checks how the cache candidates map is populated by the runner
when evaluating the
- * pipeline.
- */
+/** Tests of {@link Dataset#cache(String, Coder)}} scenarios. */
public class CacheTest {
+ /**
+ * Test checks how the cache candidates map is populated by the runner when
evaluating the
+ * pipeline.
+ */
@Test
public void cacheCandidatesUpdaterTest() throws Exception {
SparkPipelineOptions options =
@@ -57,4 +62,23 @@ public void cacheCandidatesUpdaterTest() throws Exception {
pipeline.traverseTopologically(cacheVisitor);
assertEquals(2L, (long) ctxt.getCacheCandidates().get(pCollection));
}
+
+ @Test
+ public void cacheDisabledOptionTest() {
+ SparkPipelineOptions options =
+ PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
+ options.setRunner(TestSparkRunner.class);
+ options.setCacheDisabled(true);
+ Pipeline pipeline = Pipeline.create(options);
+ PCollection<String> pCollection = pipeline.apply(Create.of("foo", "bar"));
+
+ JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
+ EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options);
+ ctxt.getCacheCandidates().put(pCollection, 2L);
+
+ assertFalse(ctxt.shouldCache(pCollection));
+
+ options.setCacheDisabled(false);
+ assertTrue(ctxt.shouldCache(pCollection));
+ }
}
diff --git a/website/src/documentation/runners/spark.md
b/website/src/documentation/runners/spark.md
index 31e8944becc..fa17b44d592 100644
--- a/website/src/documentation/runners/spark.md
+++ b/website/src/documentation/runners/spark.md
@@ -152,6 +152,11 @@ When executing your pipeline with the Spark Runner, you
should consider the foll
<td>Enable reporting metrics to Spark's metrics Sinks.</td>
<td>true</td>
</tr>
+<tr>
+ <td><code>cacheDisabled</code></td>
+ <td>Disable caching of reused PCollections for whole Pipeline. It's useful
when it's faster to recompute RDD rather than save.</td>
+ <td>false</td>
+</tr>
</table>
## Additional notes
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 168796)
Time Spent: 1.5h (was: 1h 20m)
> Add option to disable caching in Spark
> --------------------------------------
>
> Key: BEAM-6053
> URL: https://issues.apache.org/jira/browse/BEAM-6053
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark
> Affects Versions: 2.9.0
> Reporter: Marek Simunek
> Assignee: Marek Simunek
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Add possibility to SparkOptions to turn off spark RDD caching. There are use
> cases when its faster to recompute whole RDD rather then serialize, store,
> deserialize, read from store.
> We probably don't want to have some list of `PCollections` which we don't
> want to cache, because that would be tailored to specific runner and would be
> against Beam's concepts. So I propose to turn off caching for the whole
> pipeline.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)