[
https://issues.apache.org/jira/browse/SPARK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14045652#comment-14045652
]
Bharath Ravi Kumar edited comment on SPARK-2292 at 6/28/14 12:09 PM:
---------------------------------------------------------------------
My apologies, the cause was mapToPair and not reduceByKey. I was misled by line
number associated with the execution stage. Here's the entire (cleaned up) code
(copied from the updated gist) :
{code}
package com.acme;
import java.io.File;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import scala.Tuple4;
public class Test {
private static final Logger logger = Logger.getLogger(Test.class);
public static void main(String[] args) throws Exception {
Test demo = new Test();
if (args == null || args.length != 1) {
throw new IllegalArgumentException("A single argument (the path of
the data file) is expected");
}
demo.test(args[0]);
}
public void test(String dataFile) throws Exception {
SparkConf conf = new SparkConf();
logger.info("Initializing context...");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> rawData = createRawRDD(ctx, new File(dataFile));
LineSplitter splitter = new LineSplitter();
JavaRDD<Tuple4<Long, Long, Integer, Integer>> reducedTupleRecords =
rawData.map(splitter);
logger.info(String.format("Raw records: %d ",rawData.count()));
TupleToPairMapper tupleToPairMapper = new TupleToPairMapper();
JavaPairRDD<Tuple2<Long, Long>, Tuple2<Integer, Integer>>
recordsByTuple = reducedTupleRecords.mapToPair(tupleToPairMapper);
logger.info(String.format("Mapped records: %d
",recordsByTuple.count()));
logger.info("Shutting down context...");
ctx.stop();
}
private static JavaRDD<String> createRawRDD(JavaSparkContext ctx, File
file) {
JavaRDD<String> rawData = ctx.textFile(file.getAbsolutePath());
return rawData;
}
private static class TupleToPairMapper implements PairFunction<Tuple4<Long,
Long, Integer, Integer>, Tuple2<Long, Long>, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>>
call(Tuple4<Long, Long, Integer, Integer> t) throws Exception {
return new Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>>(new
Tuple2<Long, Long>(t._1(), t._2()), new Tuple2<Integer, Integer>(t._3(),
t._4()));
}
public TupleToPairMapper() {
}
}
private static class LineSplitter implements Function<String, Tuple4<Long,
Long, Integer, Integer>> {
@Override
public Tuple4<Long, Long, Integer, Integer> call(String line) throws
Exception {
String[] dataPoint = line.trim().split("\\t");
String f = dataPoint[0];
String l1 = dataPoint[1];
String[] fArr = f.split("#");
Long x = Long.valueOf(fArr[0].trim());
Long y = Long.valueOf(fArr[1].trim());
String[] lArr = l1.split("#");
int c = Integer.valueOf(lArr[2].trim());
int r = Integer.valueOf(lArr[1].trim());
Tuple4<Long, Long, Integer, Integer> tuple = new Tuple4<Long, Long,
Integer, Integer>(x, y, r, c);
return tuple;
}
}
}
{code}
The included code can be run within a maven project as -
{noformat}
mvn exec:java -Dexec.mainClass="com.acme.Test" -Dexec.args="-sparkMaster
spark://host:7077 -sparkHome /path/to/spark-1.0.0 -jarFileLoc
/path/to/jar-with-dependencies.jar -dataFile /tmp/part-r-00000 -numPartitions
2"
{noformat}
Project's maven dependencies:
{noformat}
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.acme</groupId>
<artifactId>SparkExp</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>SparkExp</name>
<url>http://maven.apache.org</url>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.acme.Test</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.0.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>1.0.0</version>
<type>jar</type>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.github.fommil.netlib</groupId>
<artifactId>all</artifactId>
<version>1.1.2</version>
<type>pom</type>
</dependency>
</dependencies>
<repositories>
<repository>
<id>Akka repository</id>
<url>http://repo.akka.io/releases</url>
</repository>
</repositories>
</project>
{noformat}
Input data:
{noformat}
1028#29922#89575#kuv6kvghgk1337d86d2111774 1#1#0
{noformat}
was (Author: reachbach):
My apologies, the cause was mapToPair and not reduceByKey. I was misled by line
number associated with the execution stage. Here's the entire (cleaned up) code
(copied from the updated gist) :
{code}
package com.acme;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import java.io.File;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import scala.Tuple4;
public class Test {
@Parameter(required = true, names = "-sparkMaster", description = "The url
of the spark master in the form spark://host:port", arity = 1)
private String sparkMasterURL;
@Parameter(names = "-sparkHome", required = true, description = "Location
of the local spark installation directory", arity = 1)
private String sparkHome;
@Parameter(names = "-dataFile", required = true, description = "The local
file to pick up input data from", arity = 1)
private String dataFile;
@Parameter(names = "-numPartitions", required = false, description = "The
number of spark data partitions to split across", arity = 1)
private int numPartitions = 80;
@Parameter(names = "-jarFileLoc", required = true, description = "The
location of the jar file containing the application code with its non-spark
dependencies", arity = 1)
private String jarFileLoc;
private static final Logger logger = Logger.getLogger(Test.class);
public static void main(String[] args) throws Exception {
Test regressionDemo = new Test();
JCommander cmdParser = new JCommander(regressionDemo);
try {
cmdParser.parse(args);
regressionDemo.train();
} catch (ParameterException exception) {
System.err.println("Exception parsing one or more command line
arguments " + exception.getMessage());
cmdParser.usage();
}
}
public void train() throws Exception {
SparkConf conf = new SparkConf();
conf =
conf.setMaster(sparkMasterURL).setAppName("SparkRegressionExample").setSparkHome(sparkHome).setJars(new
String[]{jarFileLoc});
conf.set("spark.executor.memory", "1500M");
conf.set("spark.default.parallelism", "1");
conf.set("spark.cores.max", "20");
conf.set("spark.storage.memoryFraction", "0.9");
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> rawData = createRawRDD(ctx, new File(dataFile));
LineSplitter splitter = new LineSplitter();
JavaRDD<Tuple4<Long, Long, Integer, Integer>> parsedRecords =
rawData.map(splitter);
logger.info("Raw records size: " + parsedRecords.count());
JavaRDD<Tuple4<Long, Long, Integer, Integer>> reducedTupleRecords =
parsedRecords.coalesce(numPartitions);
logger.info("Filtered records size: " + reducedTupleRecords.count());
TupleToPairMapper tupleToPairMapper = new TupleToPairMapper();
JavaPairRDD<Tuple2<Long, Long>, Tuple2<Integer, Integer>>
recordsByTuple = reducedTupleRecords.mapToPair(tupleToPairMapper);
logger.info("Size of records reduced by tuple: " +
recordsByTuple.count());
logger.info("Shutting down context...");
ctx.stop();
}
private static JavaRDD<String> createRawRDD(JavaSparkContext ctx, File
file) {
JavaRDD<String> rawData = ctx.textFile(file.getAbsolutePath());
return rawData;
}
private static class TupleToPairMapper implements PairFunction<Tuple4<Long,
Long, Integer, Integer>, Tuple2<Long, Long>, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>>
call(Tuple4<Long, Long, Integer, Integer> t) throws Exception {
return new Tuple2<Tuple2<Long, Long>, Tuple2<Integer, Integer>>(new
Tuple2<Long, Long>(t._1(), t._2()), new Tuple2<Integer, Integer>(t._3(),
t._4()));
}
public TupleToPairMapper() {
}
}
private static class KVReducer implements Function2<Tuple2<Integer,
Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t1,
Tuple2<Integer, Integer> t2) throws Exception {
return new Tuple2<Integer, Integer>(t1._1() + t2._1(), t1._2() +
t2._2());
}
public KVReducer() {
}
}
private static class LineSplitter implements Function<String, Tuple4<Long,
Long, Integer, Integer>> {
@Override
public Tuple4<Long, Long, Integer, Integer> call(String line) throws
Exception {
String[] dataPoint = line.trim().split("\\t");
String f = dataPoint[0];
String l1 = dataPoint[1];
String[] fArr = f.split("#");
Long x = Long.valueOf(fArr[0].trim());
Long y = Long.valueOf(fArr[1].trim());
String[] lArr = l1.split("#");
int c = Integer.valueOf(lArr[2].trim());
int r = Integer.valueOf(lArr[1].trim());
Tuple4<Long, Long, Integer, Integer> tuple = new Tuple4<Long, Long,
Integer, Integer>(x, y, r, c);
return tuple;
}
}
}
{code}
The included code can be run within a maven project as -
{noformat}
mvn exec:java -Dexec.mainClass="com.acme.Test" -Dexec.args="-sparkMaster
spark://host:7077 -sparkHome /path/to/spark-1.0.0 -jarFileLoc
/path/to/jar-with-dependencies.jar -dataFile /tmp/part-r-00000 -numPartitions
2"
{noformat}
Project's maven dependencies:
{noformat}
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>1.0.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.github.fommil.netlib</groupId>
<artifactId>all</artifactId>
<version>1.1.2</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>1.35</version>
</dependency>
{noformat}
Input data:
{noformat}
1028#29922#89575#kuv6kvghgk1337d86d2111774 1#1#0
{noformat}
> NullPointerException in JavaPairRDD.mapToPair
> ---------------------------------------------
>
> Key: SPARK-2292
> URL: https://issues.apache.org/jira/browse/SPARK-2292
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.0.0
> Environment: Spark 1.0.0, Standalone with the master & single slave
> running on Ubuntu on a laptop. 4G mem and 8 cores were available to the
> executor .
> Reporter: Bharath Ravi Kumar
> Priority: Critical
> Attachments: SPARK-2292-aash-repro.tar.gz
>
>
> Correction: Invoking JavaPairRDD.mapToPair results in an NPE:
> {noformat}
> 14/06/26 21:05:35 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.NullPointerException
> java.lang.NullPointerException
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
> at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
> at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.Task.run(Task.scala:51)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> {noformat}
> This occurs only after migrating to the 1.0.0 API. The details of the code
> the data file used to test are included in this gist :
> https://gist.github.com/reachbach/d8977c8eb5f71f889301
--
This message was sent by Atlassian JIRA
(v6.2#6252)