[ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=151550&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151550
 ]

ASF GitHub Bot logged work on BEAM-4783:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Oct/18 09:01
            Start Date: 05/Oct/18 09:01
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 7a7f9902db2..651e5223721 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.spark;
 
 import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -99,6 +100,16 @@ public String create(PipelineOptions options) {
 
   void setCheckpointDurationMillis(Long durationMillis);
 
+  @Description(
+      "If set bundleSize will be used for splitting BoundedSources, otherwise 
default to "
+          + "splitting BoundedSources on Spark defaultParallelism. Most 
effective when used with "
+          + "Spark dynamicAllocation.")
+  @Default.Long(0)
+  Long getBundleSize();
+
+  @Experimental
+  void setBundleSize(Long value);
+
   @Description("Enable/disable sending aggregator values to Spark's metric 
sinks")
   @Default.Boolean(true)
   Boolean getEnableSparkMetricSinks();
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index 67d0ab25e72..c94c4ec2a9e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -30,6 +30,7 @@
 import java.util.NoSuchElementException;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
@@ -65,6 +66,7 @@
     private final BoundedSource<T> source;
     private final SerializablePipelineOptions options;
     private final int numPartitions;
+    private final long bundleSize;
     private final String stepName;
     private final Accumulator<MetricsContainerStepMap> metricsAccum;
 
@@ -88,6 +90,7 @@ public Bounded(
       // ** the configuration "spark.default.parallelism" takes precedence 
over all of the above **
       this.numPartitions = sc.defaultParallelism();
       checkArgument(this.numPartitions > 0, "Number of partitions must be 
greater than zero.");
+      this.bundleSize = 
options.get().as(SparkPipelineOptions.class).getBundleSize();
       this.stepName = stepName;
       this.metricsAccum = MetricsAccumulator.getInstance();
     }
@@ -96,19 +99,23 @@ public Bounded(
 
     @Override
     public Partition[] getPartitions() {
-      long desiredSizeBytes = DEFAULT_BUNDLE_SIZE;
       try {
-        desiredSizeBytes = source.getEstimatedSizeBytes(options.get()) / 
numPartitions;
-      } catch (Exception e) {
-        LOG.warn(
-            "Failed to get estimated bundle size for source {}, using default 
bundle "
-                + "size of {} bytes.",
-            source,
-            DEFAULT_BUNDLE_SIZE);
-      }
-      try {
-        List<? extends Source<T>> partitionedSources =
-            source.split(desiredSizeBytes, options.get());
+        List<? extends Source<T>> partitionedSources;
+        if (bundleSize > 0) {
+          partitionedSources = source.split(bundleSize, options.get());
+        } else {
+          long desiredSizeBytes = DEFAULT_BUNDLE_SIZE;
+          try {
+            desiredSizeBytes = source.getEstimatedSizeBytes(options.get()) / 
numPartitions;
+          } catch (Exception e) {
+            LOG.warn(
+                "Failed to get estimated bundle size for source {}, using 
default bundle "
+                    + "size of {} bytes.",
+                source,
+                DEFAULT_BUNDLE_SIZE);
+          }
+          partitionedSources = source.split(desiredSizeBytes, options.get());
+        }
         Partition[] partitions = new 
SourcePartition[partitionedSources.size()];
         for (int i = 0; i < partitionedSources.size(); i++) {
           partitions[i] = new SourcePartition<>(id(), i, 
partitionedSources.get(i));
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index 24ea8ad29f1..21b9ff9fa4e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -30,7 +30,6 @@
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.spark.HashPartitioner;
-import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
@@ -44,7 +43,10 @@
    * org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly} 
for the Spark runner.
    */
   public static <K, V> JavaRDD<WindowedValue<KV<K, 
Iterable<WindowedValue<V>>>>> groupByKeyOnly(
-      JavaRDD<WindowedValue<KV<K, V>>> rdd, Coder<K> keyCoder, 
WindowedValueCoder<V> wvCoder) {
+      JavaRDD<WindowedValue<KV<K, V>>> rdd,
+      Coder<K> keyCoder,
+      WindowedValueCoder<V> wvCoder,
+      boolean defaultParallelism) {
     // we use coders to convert objects in the PCollection to byte arrays, so 
they
     // can be transferred over the network for the shuffle.
     JavaPairRDD<ByteArray, byte[]> pairRDD =
@@ -52,13 +54,17 @@
             .map(WindowingHelpers.unwindowFunction())
             .mapToPair(TranslationUtils.toPairFunction())
             .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
-    // use a default parallelism HashPartitioner.
-    Partitioner partitioner = new 
HashPartitioner(rdd.rdd().sparkContext().defaultParallelism());
+    JavaPairRDD<ByteArray, Iterable<byte[]>> groupedRDD;
+    if (defaultParallelism) {
+      groupedRDD =
+          pairRDD.groupByKey(new 
HashPartitioner(rdd.rdd().sparkContext().defaultParallelism()));
+    } else {
+      groupedRDD = pairRDD.groupByKey();
+    }
 
     // using mapPartitions allows to preserve the partitioner
     // and avoid unnecessary shuffle downstream.
-    return pairRDD
-        .groupByKey(partitioner)
+    return groupedRDD
         .mapPartitionsToPair(
             TranslationUtils.pairFunctionToPairFlatMapFunction(
                 CoderHelpers.fromByteFunctionIterable(keyCoder, wvCoder)),
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 3f508b1edd6..aa25aaa0c00 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -30,6 +30,7 @@
 import java.util.Map;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -131,7 +132,16 @@ public void evaluate(GroupByKey<K, V> transform, 
EvaluationContext context) {
 
         // --- group by key only.
         JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey 
=
-            GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder);
+            GroupCombineFunctions.groupByKeyOnly(
+                inRDD,
+                keyCoder,
+                wvCoder,
+                context
+                        .getSerializableOptions()
+                        .get()
+                        .as(SparkPipelineOptions.class)
+                        .getBundleSize()
+                    > 0);
 
         // --- now group also by window.
         // for batch, GroupAlsoByWindow uses an in-memory StateInternals.
@@ -424,7 +434,7 @@ public String toNativeString() {
         WindowedValue.FullWindowedValueCoder.of(kvCoder.getValueCoder(), 
windowCoder);
 
     JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupRDD =
-        GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder);
+        GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, true);
 
     return groupRDD
         .map(
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index a307cc96827..e4dda182583 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -300,7 +300,8 @@ public void evaluate(GroupByKey<K, V> transform, 
EvaluationContext context) {
         // --- group by key only.
         JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> 
groupedByKeyStream =
             dStream.transform(
-                rdd -> GroupCombineFunctions.groupByKeyOnly(rdd, 
coder.getKeyCoder(), wvCoder));
+                rdd ->
+                    GroupCombineFunctions.groupByKeyOnly(rdd, 
coder.getKeyCoder(), wvCoder, true));
 
         // --- now group also by window.
         JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 151550)
    Time Spent: 4h 40m  (was: 4.5h)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> ------------------------------------------------------------
>
>                 Key: BEAM-4783
>                 URL: https://issues.apache.org/jira/browse/BEAM-4783
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark
>            Reporter: Kyle Winkelman
>            Assignee: Kyle Winkelman
>            Priority: Major
>             Fix For: 2.8.0
>
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>       // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>       // when running on Mesos it's 8.
>       // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>       // local[*] = estimation of the machine's cores).
>       // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to