[ 
https://issues.apache.org/jira/browse/SPARK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14045652#comment-14045652
 ] 

Bharath Ravi Kumar edited comment on SPARK-2292 at 6/27/14 7:37 AM:
--------------------------------------------------------------------

My apologies, the cause was mapToPair and not reduceByKey. I was misled by line 
number associated with the execution stage. Here's the entire (cleaned up) code 
(copied from the updated gist) :

{code}
package com.acme;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import java.io.File;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import scala.Tuple4;

public class Test {

    @Parameter(required = true, names = "-sparkMaster", description = "The url 
of the spark master in the form spark://host:port", arity = 1)
    private String sparkMasterURL;
    @Parameter(names = "-sparkHome", required = true, description = "Location 
of the local spark installation directory", arity = 1)
    private String sparkHome;
    @Parameter(names = "-dataFile", required = true, description = "The local 
file to pick up input data from", arity = 1)
    private String dataFile;
    @Parameter(names = "-numPartitions", required = false, description = "The 
number of spark data partitions to split across", arity = 1)
    private int numPartitions = 80;
    @Parameter(names = "-jarFileLoc", required = true, description = "The 
location of the jar file containing the application code with its non-spark 
dependencies", arity = 1)
    private String jarFileLoc;
    private static final Logger logger = Logger.getLogger(Test.class);

    public static void main(String[] args) throws Exception {
        Test regressionDemo = new Test();
        JCommander cmdParser = new JCommander(regressionDemo);
        try {
            cmdParser.parse(args);
            regressionDemo.train();
        } catch (ParameterException exception) {
            System.err.println("Exception parsing one or more command line 
arguments " + exception.getMessage());
            cmdParser.usage();
        }
    }

    public void train() throws Exception {
        SparkConf conf = new SparkConf();
        conf = 
conf.setMaster(sparkMasterURL).setAppName("SparkRegressionExample").setSparkHome(sparkHome).setJars(new
 String[]{jarFileLoc});
        conf.set("spark.executor.memory", "1500M");
        conf.set("spark.default.parallelism", "1");
        conf.set("spark.cores.max", "20");
        conf.set("spark.storage.memoryFraction", "0.9");
        JavaSparkContext ctx = new JavaSparkContext(conf);
        JavaRDD<String> rawData = createRawRDD(ctx, new File(dataFile));
        LineSplitter splitter = new LineSplitter();
        JavaRDD<Tuple4<Long, Long, Integer, Integer>> parsedRecords = 
rawData.map(splitter);
        logger.info("Raw records size: " + parsedRecords.count());
        JavaRDD<Tuple4<Long, Long, Integer, Integer>> reducedTupleRecords = 
parsedRecords.coalesce(numPartitions);
        logger.info("Filtered records size: " + reducedTupleRecords.count());
        TupleToPairMapper tupleToPairMapper = new TupleToPairMapper();
        JavaPairRDD<Tuple2<Long, Long>, Tuple2<Integer, Integer>> 
recordsByTuple = reducedTupleRecords.mapToPair(tupleToPairMapper);
        logger.info("Size of records reduced by tuple: " + 
recordsByTuple.count());
        logger.info("Shutting down context...");
        ctx.stop();
    }

    private static JavaRDD<String> createRawRDD(JavaSparkContext ctx, File 
file) {
        JavaRDD<String> rawData = ctx.textFile(file.getAbsolutePath());
        return rawData;

    }

    private static class TupleToPairMapper implements PairFunction<Tuple4<Long, 
Long, Integer, Integer>, Tuple2<Long, Long>, Tuple2<Integer, Integer>> {

        @Override
        public Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>> 
call(Tuple4<Long, Long, Integer, Integer> t) throws Exception {
            return new Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>>(new 
Tuple2<Long, Long>(t._1(), t._2()), new Tuple2<Integer, Integer>(t._3(), 
t._4()));
        }

        public TupleToPairMapper() {
        }
    }

    private static class KVReducer implements Function2<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {

        @Override
        public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t1, 
Tuple2<Integer, Integer> t2) throws Exception {
            return new Tuple2<Integer, Integer>(t1._1() + t2._1(), t1._2() + 
t2._2());
        }

        public KVReducer() {
        }
    }

    private static class LineSplitter implements Function<String, Tuple4<Long, 
Long, Integer, Integer>> {

        @Override
        public Tuple4<Long, Long, Integer, Integer> call(String line) throws 
Exception {
            String[] dataPoint = line.trim().split("\\t");
            String f = dataPoint[0];
            String l1 = dataPoint[1];
            String[] fArr = f.split("#");
            Long x = Long.valueOf(fArr[0].trim());
            Long y = Long.valueOf(fArr[1].trim());
            String[] lArr = l1.split("#");
            int c = Integer.valueOf(lArr[2].trim());
            int r = Integer.valueOf(lArr[1].trim());
            Tuple4<Long, Long, Integer, Integer> tuple = new Tuple4<Long, Long, 
Integer, Integer>(x, y, r, c);
            return tuple;
        }
    }
}

{code}

The included code can be run within a maven project as - 
{noformat}
mvn exec:java -Dexec.mainClass="com.acme.Test" -Dexec.args="-sparkMaster 
spark://host:7077 -sparkHome /path/to/spark-1.0.0  -jarFileLoc 
/path/to/jar-with-dependencies.jar  -dataFile /tmp/part-r-00000 -numPartitions 
2"
{noformat}

Project's maven dependencies:

{noformat}

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.10</artifactId>
            <version>1.0.0</version>
            <type>jar</type>
        </dependency>
        <dependency>
            <groupId>com.github.fommil.netlib</groupId>
            <artifactId>all</artifactId>
            <version>1.1.2</version>
            <type>pom</type>
        </dependency>
        <dependency>
                <groupId>com.beust</groupId>
                <artifactId>jcommander</artifactId>
                <version>1.35</version>
        </dependency>
{noformat}

Input data:
{noformat}
1028#29922#89575#kuv6kvghgk1337d86d2111774      1#1#0
{noformat}



was (Author: reachbach):
My apologies, the cause was mapToPair and not reduceByKey. I was misled by line 
number associated with the execution stage. Here's the entire (cleaned up) code 
(copied from the updated gist) :

{code}
package com.acme;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import java.io.File;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import scala.Tuple4;

public class Test {

    @Parameter(required = true, names = "-sparkMaster", description = "The url 
of the spark master in the form spark://host:port", arity = 1)
    private String sparkMasterURL;
    @Parameter(names = "-sparkHome", required = true, description = "Location 
of the local spark installation directory", arity = 1)
    private String sparkHome;
    @Parameter(names = "-dataFile", required = true, description = "The local 
file to pick up input data from", arity = 1)
    private String dataFile;
    @Parameter(names = "-numPartitions", required = false, description = "The 
number of spark data partitions to split across", arity = 1)
    private int numPartitions = 80;
    @Parameter(names = "-jarFileLoc", required = true, description = "The 
location of the jar file containing the application code with its non-spark 
dependencies", arity = 1)
    private String jarFileLoc;
    private static final Logger logger = Logger.getLogger(Test.class);

    public static void main(String[] args) throws Exception {
        Test regressionDemo = new Test();
        JCommander cmdParser = new JCommander(regressionDemo);
        try {
            cmdParser.parse(args);
            regressionDemo.train();
        } catch (ParameterException exception) {
            System.err.println("Exception parsing one or more command line 
arguments " + exception.getMessage());
            cmdParser.usage();
        }
    }

    public void train() throws Exception {
        SparkConf conf = new SparkConf();
        conf = 
conf.setMaster(sparkMasterURL).setAppName("SparkRegressionExample").setSparkHome(sparkHome).setJars(new
 String[]{jarFileLoc});
        conf.set("spark.executor.memory", "1500M");
        conf.set("spark.default.parallelism", "1");
        conf.set("spark.cores.max", "20");
        conf.set("spark.storage.memoryFraction", "0.9");
        JavaSparkContext ctx = new JavaSparkContext(conf);
        JavaRDD<String> rawData = createRawRDD(ctx, new File(dataFile));
        LineSplitter splitter = new LineSplitter();
        JavaRDD<Tuple4<Long, Long, Integer, Integer>> parsedRecords = 
rawData.map(splitter);
        logger.info("Raw records size: " + parsedRecords.count());
        JavaRDD<Tuple4<Long, Long, Integer, Integer>> reducedTupleRecords = 
parsedRecords.coalesce(numPartitions);
        logger.info("Filtered records size: " + reducedTupleRecords.count());
        TupleToPairMapper tupleToPairMapper = new TupleToPairMapper();
        JavaPairRDD<Tuple2<Long, Long>, Tuple2<Integer, Integer>> 
recordsByTuple = reducedTupleRecords.mapToPair(tupleToPairMapper);
        logger.info("Size of records reduced by tuple: " + 
recordsByTuple.count());
        logger.info("Shutting down context...");
        ctx.stop();
    }

    private static JavaRDD<String> createRawRDD(JavaSparkContext ctx, File 
file) {
        JavaRDD<String> rawData = ctx.textFile(file.getAbsolutePath());
        return rawData;

    }

    private static class TupleToPairMapper implements PairFunction<Tuple4<Long, 
Long, Integer, Integer>, Tuple2<Long, Long>, Tuple2<Integer, Integer>> {

        @Override
        public Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>> 
call(Tuple4<Long, Long, Integer, Integer> t) throws Exception {
            return new Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>>(new 
Tuple2<Long, Long>(t._1(), t._2()), new Tuple2<Integer, Integer>(t._3(), 
t._4()));
        }

        public TupleToPairMapper() {
        }
    }

    private static class KVReducer implements Function2<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {

        @Override
        public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t1, 
Tuple2<Integer, Integer> t2) throws Exception {
            return new Tuple2<Integer, Integer>(t1._1() + t2._1(), t1._2() + 
t2._2());
        }

        public KVReducer() {
        }
    }

    private static class LineSplitter implements Function<String, Tuple4<Long, 
Long, Integer, Integer>> {

        @Override
        public Tuple4<Long, Long, Integer, Integer> call(String line) throws 
Exception {
            String[] dataPoint = line.trim().split("\\t");
            String f = dataPoint[0];
            String l1 = dataPoint[1];
            String[] fArr = f.split("#");
            Long x = Long.valueOf(fArr[0].trim());
            Long y = Long.valueOf(fArr[1].trim());
            String[] lArr = l1.split("#");
            int c = Integer.valueOf(lArr[2].trim());
            int r = Integer.valueOf(lArr[1].trim());
            Tuple4<Long, Long, Integer, Integer> tuple = new Tuple4<Long, Long, 
Integer, Integer>(x, y, r, c);
            return tuple;
        }
    }
}

{code}

Input data:
{noformat}
1028#29922#89575#kuv6kvghgk1337d86d2111774      1#1#0
{noformat}


> NullPointerException in JavaPairRDD.mapToPair
> ---------------------------------------------
>
>                 Key: SPARK-2292
>                 URL: https://issues.apache.org/jira/browse/SPARK-2292
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.0
>         Environment: Spark 1.0.0, Standalone with the master & single slave 
> running on Ubuntu on a laptop. 4G mem and 8 cores were available to the 
> executor .
>            Reporter: Bharath Ravi Kumar
>            Priority: Critical
>
> Correction: Invoking JavaPairRDD.mapToPair results in an NPE:
> {noformat}
> 14/06/26 21:05:35 WARN scheduler.TaskSetManager: Loss was due to 
> java.lang.NullPointerException
> java.lang.NullPointerException
>       at 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
>       at 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>       at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
>       at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
>       at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>       at org.apache.spark.scheduler.Task.run(Task.scala:51)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>       at java.lang.Thread.run(Thread.java:722)
> {noformat}
>  This occurs only after migrating to the 1.0.0 API. The details of the code 
> the data file used to test are included in this gist : 
> https://gist.github.com/reachbach/d8977c8eb5f71f889301



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to