[ 
https://issues.apache.org/jira/browse/SPARK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14045652#comment-14045652
 ] 

Bharath Ravi Kumar edited comment on SPARK-2292 at 6/28/14 12:09 PM:
---------------------------------------------------------------------

My apologies, the cause was mapToPair and not reduceByKey. I was misled by line 
number associated with the execution stage. Here's the entire (cleaned up) code 
(copied from the updated gist) :

{code}

package com.acme;

import java.io.File;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import scala.Tuple4;

public class Test {

    private static final Logger logger = Logger.getLogger(Test.class);

    public static void main(String[] args) throws Exception {
        Test demo = new Test();
        if (args == null || args.length != 1) {
            throw new IllegalArgumentException("A single argument (the path of 
the data file) is expected");
        }
        demo.test(args[0]);
    }

    public void test(String dataFile) throws Exception {
        SparkConf conf = new SparkConf();
        logger.info("Initializing context...");
        JavaSparkContext ctx = new JavaSparkContext(conf);
        JavaRDD<String> rawData = createRawRDD(ctx, new File(dataFile));
        LineSplitter splitter = new LineSplitter();
        JavaRDD<Tuple4<Long, Long, Integer, Integer>> reducedTupleRecords = 
rawData.map(splitter);
        logger.info(String.format("Raw records: %d ",rawData.count()));
        TupleToPairMapper tupleToPairMapper = new TupleToPairMapper();
        JavaPairRDD<Tuple2<Long, Long>, Tuple2<Integer, Integer>> 
recordsByTuple = reducedTupleRecords.mapToPair(tupleToPairMapper);
        logger.info(String.format("Mapped records: %d 
",recordsByTuple.count()));
        logger.info("Shutting down context...");
        ctx.stop();
    }

    private static JavaRDD<String> createRawRDD(JavaSparkContext ctx, File 
file) {
        JavaRDD<String> rawData = ctx.textFile(file.getAbsolutePath());
        return rawData;

    }

    private static class TupleToPairMapper implements PairFunction<Tuple4<Long, 
Long, Integer, Integer>, Tuple2<Long, Long>, Tuple2<Integer, Integer>> {

        @Override
        public Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>> 
call(Tuple4<Long, Long, Integer, Integer> t) throws Exception {
            return new Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>>(new 
Tuple2<Long, Long>(t._1(), t._2()), new Tuple2<Integer, Integer>(t._3(), 
t._4()));
        }

        public TupleToPairMapper() {
        }
    }

    private static class LineSplitter implements Function<String, Tuple4<Long, 
Long, Integer, Integer>> {

        @Override
        public Tuple4<Long, Long, Integer, Integer> call(String line) throws 
Exception {
            String[] dataPoint = line.trim().split("\\t");
            String f = dataPoint[0];
            String l1 = dataPoint[1];
            String[] fArr = f.split("#");
            Long x = Long.valueOf(fArr[0].trim());
            Long y = Long.valueOf(fArr[1].trim());
            String[] lArr = l1.split("#");
            int c = Integer.valueOf(lArr[2].trim());
            int r = Integer.valueOf(lArr[1].trim());
            Tuple4<Long, Long, Integer, Integer> tuple = new Tuple4<Long, Long, 
Integer, Integer>(x, y, r, c);
            return tuple;
        }
    }
}

{code}

The included code can be run within a maven project as - 
{noformat}
mvn exec:java -Dexec.mainClass="com.acme.Test" -Dexec.args="-sparkMaster 
spark://host:7077 -sparkHome /path/to/spark-1.0.0  -jarFileLoc 
/path/to/jar-with-dependencies.jar  -dataFile /tmp/part-r-00000 -numPartitions 
2"
{noformat}

Project's maven dependencies:

{noformat}
<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.acme</groupId>
    <artifactId>SparkExp</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>SparkExp</name>
    <url>http://maven.apache.org</url>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.acme.Test</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.0.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.10</artifactId>
            <version>1.0.0</version>
            <type>jar</type>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>com.github.fommil.netlib</groupId>
            <artifactId>all</artifactId>
            <version>1.1.2</version>
            <type>pom</type>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>Akka repository</id>
            <url>http://repo.akka.io/releases</url>
        </repository>
    </repositories>
</project>

{noformat}

Input data:
{noformat}
1028#29922#89575#kuv6kvghgk1337d86d2111774      1#1#0
{noformat}



was (Author: reachbach):
My apologies, the cause was mapToPair and not reduceByKey. I was misled by line 
number associated with the execution stage. Here's the entire (cleaned up) code 
(copied from the updated gist) :

{code}
package com.acme;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import java.io.File;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import scala.Tuple4;

public class Test {

    @Parameter(required = true, names = "-sparkMaster", description = "The url 
of the spark master in the form spark://host:port", arity = 1)
    private String sparkMasterURL;
    @Parameter(names = "-sparkHome", required = true, description = "Location 
of the local spark installation directory", arity = 1)
    private String sparkHome;
    @Parameter(names = "-dataFile", required = true, description = "The local 
file to pick up input data from", arity = 1)
    private String dataFile;
    @Parameter(names = "-numPartitions", required = false, description = "The 
number of spark data partitions to split across", arity = 1)
    private int numPartitions = 80;
    @Parameter(names = "-jarFileLoc", required = true, description = "The 
location of the jar file containing the application code with its non-spark 
dependencies", arity = 1)
    private String jarFileLoc;
    private static final Logger logger = Logger.getLogger(Test.class);

    public static void main(String[] args) throws Exception {
        Test regressionDemo = new Test();
        JCommander cmdParser = new JCommander(regressionDemo);
        try {
            cmdParser.parse(args);
            regressionDemo.train();
        } catch (ParameterException exception) {
            System.err.println("Exception parsing one or more command line 
arguments " + exception.getMessage());
            cmdParser.usage();
        }
    }

    public void train() throws Exception {
        SparkConf conf = new SparkConf();
        conf = 
conf.setMaster(sparkMasterURL).setAppName("SparkRegressionExample").setSparkHome(sparkHome).setJars(new
 String[]{jarFileLoc});
        conf.set("spark.executor.memory", "1500M");
        conf.set("spark.default.parallelism", "1");
        conf.set("spark.cores.max", "20");
        conf.set("spark.storage.memoryFraction", "0.9");
        JavaSparkContext ctx = new JavaSparkContext(conf);
        JavaRDD<String> rawData = createRawRDD(ctx, new File(dataFile));
        LineSplitter splitter = new LineSplitter();
        JavaRDD<Tuple4<Long, Long, Integer, Integer>> parsedRecords = 
rawData.map(splitter);
        logger.info("Raw records size: " + parsedRecords.count());
        JavaRDD<Tuple4<Long, Long, Integer, Integer>> reducedTupleRecords = 
parsedRecords.coalesce(numPartitions);
        logger.info("Filtered records size: " + reducedTupleRecords.count());
        TupleToPairMapper tupleToPairMapper = new TupleToPairMapper();
        JavaPairRDD<Tuple2<Long, Long>, Tuple2<Integer, Integer>> 
recordsByTuple = reducedTupleRecords.mapToPair(tupleToPairMapper);
        logger.info("Size of records reduced by tuple: " + 
recordsByTuple.count());
        logger.info("Shutting down context...");
        ctx.stop();
    }

    private static JavaRDD<String> createRawRDD(JavaSparkContext ctx, File 
file) {
        JavaRDD<String> rawData = ctx.textFile(file.getAbsolutePath());
        return rawData;

    }

    private static class TupleToPairMapper implements PairFunction<Tuple4<Long, 
Long, Integer, Integer>, Tuple2<Long, Long>, Tuple2<Integer, Integer>> {

        @Override
        public Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>> 
call(Tuple4<Long, Long, Integer, Integer> t) throws Exception {
            return new Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>>(new 
Tuple2<Long, Long>(t._1(), t._2()), new Tuple2<Integer, Integer>(t._3(), 
t._4()));
        }

        public TupleToPairMapper() {
        }
    }

    private static class KVReducer implements Function2<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {

        @Override
        public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t1, 
Tuple2<Integer, Integer> t2) throws Exception {
            return new Tuple2<Integer, Integer>(t1._1() + t2._1(), t1._2() + 
t2._2());
        }

        public KVReducer() {
        }
    }

    private static class LineSplitter implements Function<String, Tuple4<Long, 
Long, Integer, Integer>> {

        @Override
        public Tuple4<Long, Long, Integer, Integer> call(String line) throws 
Exception {
            String[] dataPoint = line.trim().split("\\t");
            String f = dataPoint[0];
            String l1 = dataPoint[1];
            String[] fArr = f.split("#");
            Long x = Long.valueOf(fArr[0].trim());
            Long y = Long.valueOf(fArr[1].trim());
            String[] lArr = l1.split("#");
            int c = Integer.valueOf(lArr[2].trim());
            int r = Integer.valueOf(lArr[1].trim());
            Tuple4<Long, Long, Integer, Integer> tuple = new Tuple4<Long, Long, 
Integer, Integer>(x, y, r, c);
            return tuple;
        }
    }
}

{code}

The included code can be run within a maven project as - 
{noformat}
mvn exec:java -Dexec.mainClass="com.acme.Test" -Dexec.args="-sparkMaster 
spark://host:7077 -sparkHome /path/to/spark-1.0.0  -jarFileLoc 
/path/to/jar-with-dependencies.jar  -dataFile /tmp/part-r-00000 -numPartitions 
2"
{noformat}

Project's maven dependencies:

{noformat}

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.10</artifactId>
            <version>1.0.0</version>
            <type>jar</type>
        </dependency>
        <dependency>
            <groupId>com.github.fommil.netlib</groupId>
            <artifactId>all</artifactId>
            <version>1.1.2</version>
            <type>pom</type>
        </dependency>
        <dependency>
                <groupId>com.beust</groupId>
                <artifactId>jcommander</artifactId>
                <version>1.35</version>
        </dependency>
{noformat}

Input data:
{noformat}
1028#29922#89575#kuv6kvghgk1337d86d2111774      1#1#0
{noformat}


> NullPointerException in JavaPairRDD.mapToPair
> ---------------------------------------------
>
>                 Key: SPARK-2292
>                 URL: https://issues.apache.org/jira/browse/SPARK-2292
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.0
>         Environment: Spark 1.0.0, Standalone with the master & single slave 
> running on Ubuntu on a laptop. 4G mem and 8 cores were available to the 
> executor .
>            Reporter: Bharath Ravi Kumar
>            Priority: Critical
>         Attachments: SPARK-2292-aash-repro.tar.gz
>
>
> Correction: Invoking JavaPairRDD.mapToPair results in an NPE:
> {noformat}
> 14/06/26 21:05:35 WARN scheduler.TaskSetManager: Loss was due to 
> java.lang.NullPointerException
> java.lang.NullPointerException
>       at 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
>       at 
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>       at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
>       at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
>       at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>       at org.apache.spark.scheduler.Task.run(Task.scala:51)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>       at java.lang.Thread.run(Thread.java:722)
> {noformat}
>  This occurs only after migrating to the 1.0.0 API. The details of the code 
> the data file used to test are included in this gist : 
> https://gist.github.com/reachbach/d8977c8eb5f71f889301



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to