[
https://issues.apache.org/jira/browse/BEAM-6023?focusedWorklogId=164334&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-164334
]
ASF GitHub Bot logged work on BEAM-6023:
----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Nov/18 13:05
Start Date: 09/Nov/18 13:05
Worklog Time Spent: 10m
Work Description: dmvk closed pull request #6988: [BEAM-6023] Remove
Create.Values translation from Spark Runner
URL: https://github.com/apache/beam/pull/6988
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index f9c422b4a12..d4f472e5f5b 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -29,7 +29,6 @@
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -175,26 +174,6 @@ public void putDataset(PValue pvalue, Dataset dataset,
boolean forceCache) {
leaves.add(dataset);
}
- <T> void putBoundedDatasetFromValues(
- PTransform<?, ? extends PValue> transform, Iterable<T> values, Coder<T>
coder) {
- PValue output = getOutput(transform);
- if (shouldCache(output)) {
- // eagerly create the RDD, as it will be reused.
- Iterable<WindowedValue<T>> elems =
- Iterables.transform(values, WindowingHelpers.windowValueFunction());
- WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
- WindowedValue.getValueOnlyCoder(coder);
- JavaRDD<WindowedValue<T>> rdd =
- getSparkContext()
- .parallelize(CoderHelpers.toByteArrays(elems, windowCoder))
- .map(CoderHelpers.fromByteFunction(windowCoder));
- putDataset(transform, new BoundedDataset<>(rdd));
- } else {
- // create a BoundedDataset that would create a RDD on demand
- datasets.put(getOutput(transform), new BoundedDataset<>(values, jsc,
coder));
- }
- }
-
public Dataset borrowDataset(PTransform<? extends PValue, ?> transform) {
return borrowDataset(getInput(transform));
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 4c4851a1a56..332bc2dd3f9 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -43,7 +43,6 @@
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -494,24 +493,6 @@ public String toNativeString() {
};
}
- private static <T> TransformEvaluator<Create.Values<T>> create() {
- return new TransformEvaluator<Create.Values<T>>() {
- @Override
- public void evaluate(Create.Values<T> transform, EvaluationContext
context) {
- Iterable<T> elems = transform.getElements();
- // Use a coder to convert the objects in the PCollection to byte
arrays, so they
- // can be transferred over the network.
- Coder<T> coder = context.getOutput(transform).getCoder();
- context.putBoundedDatasetFromValues(transform, elems, coder);
- }
-
- @Override
- public String toNativeString() {
- return "sparkContext.parallelize(Arrays.asList(...))";
- }
- };
- }
-
private static <ReadT, WriteT>
TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>
createPCollView() {
return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>()
{
@@ -584,7 +565,6 @@ public String toNativeString() {
EVALUATORS.put(Combine.Globally.class, combineGlobally());
EVALUATORS.put(Combine.PerKey.class, combinePerKey());
EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
- EVALUATORS.put(Create.Values.class, create());
// EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
// EVALUATORS.put(View.AsIterable.class, viewAsIter());
EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 782091de097..2d12cd8e2bf 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -78,7 +78,7 @@ public void debugBatchPipeline() {
.apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
final String expectedPipeline =
- "sparkContext.parallelize(Arrays.asList(...))\n"
+
"sparkContext.<readFrom(org.apache.beam.sdk.transforms.Create$Values$CreateSource)>()\n"
+ "_.mapPartitions("
+ "new
org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
+ "_.mapPartitions(new
org.apache.beam.sdk.transforms.Contextful())\n"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 164334)
Time Spent: 40m (was: 0.5h)
> Remove Create.Values translation from Spark runner
> --------------------------------------------------
>
> Key: BEAM-6023
> URL: https://issues.apache.org/jira/browse/BEAM-6023
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark
> Reporter: Ismaël Mejía
> Assignee: Ismaël Mejía
> Priority: Minor
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Spark provides a specific translation for the Create transform (specifically
> Create.Values). Create is not a primitive transform and even replaced in
> Portability by Impulse+ParDo. This issue is to remove this micro-optimization
> (if the impact is small, as expected).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)