[
https://issues.apache.org/jira/browse/MAHOUT-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14582703#comment-14582703
]
ASF GitHub Bot commented on MAHOUT-1660:
----------------------------------------
Github user andrewmusselman commented on a diff in the pull request:
https://github.com/apache/mahout/pull/135#discussion_r32280831
--- Diff:
spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala ---
@@ -1,50 +1,58 @@
package org.apache.mahout.sparkbindings.blas
+import org.apache.mahout.sparkbindings.drm
+
import scala.reflect.ClassTag
import org.apache.mahout.sparkbindings.drm.DrmRddInput
import org.apache.mahout.math.drm.logical.OpPar
import org.apache.spark.rdd.RDD
+import scala.math._
+
+import org.apache.mahout.logging._
/** Physical adjustment of parallelism */
object Par {
+ private final implicit val log = getLog(Par.getClass)
+
def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K]
= {
- def adjust[T](rdd: RDD[T]): RDD[T] =
- if (op.minSplits > 0) {
- if (rdd.partitions.size < op.minSplits)
- rdd.coalesce(op.minSplits, shuffle = true)
- else rdd.coalesce(rdd.partitions.size)
- } else if (op.exactSplits > 0) {
- if (op.exactSplits < rdd.partitions.size)
- rdd.coalesce(numPartitions = op.exactSplits, shuffle = false)
- else if (op.exactSplits > rdd.partitions.size)
- rdd.coalesce(numPartitions = op.exactSplits, shuffle = true)
- else
- rdd.coalesce(rdd.partitions.size)
- } else if (op.exactSplits == -1 && op.minSplits == -1) {
-
- // auto adjustment, try to scale up to either x1Size or x2Size.
- val clusterSize =
rdd.context.getConf.get("spark.default.parallelism", "1").toInt
-
- val x1Size = (clusterSize * .95).ceil.toInt
- val x2Size = (clusterSize * 1.9).ceil.toInt
-
- if (rdd.partitions.size <= x1Size)
- rdd.coalesce(numPartitions = x1Size, shuffle = true)
- else if (rdd.partitions.size <= x2Size)
- rdd.coalesce(numPartitions = x2Size, shuffle = true)
- else
- rdd.coalesce(numPartitions = rdd.partitions.size)
- } else rdd.coalesce(rdd.partitions.size)
-
- if (src.isBlockified) {
- val rdd = src.toBlockifiedDrmRdd()
- new DrmRddInput[K](blockifiedSrc = Some(adjust(rdd)))
+ val srcBlockified = src.isBlockified
+
+ val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else
src.toDrmRdd()
+ val srcNParts = srcRdd.partitions.size
+
+ // To what size?
+ val targetParts = if (op.minSplits > 0) srcNParts max op.minSplits
--- End diff --
This is a bit tough to read but could be I'm just not accustomed to the
idiom
> Hadoop1HDFSUtil.readDRMHEader should be taking Hadoop conf
> ----------------------------------------------------------
>
> Key: MAHOUT-1660
> URL: https://issues.apache.org/jira/browse/MAHOUT-1660
> Project: Mahout
> Issue Type: Bug
> Components: spark
> Affects Versions: 0.10.0
> Reporter: Suneel Marthi
> Assignee: Dmitriy Lyubimov
> Priority: Minor
> Fix For: 0.10.2
>
>
> Hadoop1HDFSUtil.readDRMHEader should be taking Hadoop configuration from
> Context and not ignore it
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)