Re: Equally split a RDD partition into two partition at the same node

2017-01-14 Thread Rishi Yadav
Can you provide some more details:
1. How many partitions does RDD have
2. How big is the cluster
On Sat, Jan 14, 2017 at 3:59 PM Fei Hu  wrote:

> Dear all,
>
> I want to equally divide a RDD partition into two partitions. That means,
> the first half of elements in the partition will create a new partition,
> and the second half of elements in the partition will generate another new
> partition. But the two new partitions are required to be at the same node
> with their parent partition, which can help get high data locality.
>
> Is there anyone who knows how to implement it or any hints for it?
>
> Thanks in advance,
> Fei
>
>


Re: Spark streaming app that processes Kafka DStreams produces no output and no error

2017-01-14 Thread shyla deshpande
Hello,

I want to add that,
I don't even see the streaming tab in the application UI on port 4040 when
I run it on the cluster.
The cluster on EC2  has 1 master node and 1 worker node.
The cores used on the worker node is 2 of 2 and memory used is 6GB of 6.3GB.

Can I run a spark streaming job with just 2 cores?

Appreciate your time and help.

Thanks





On Fri, Jan 13, 2017 at 10:46 PM, shyla deshpande 
wrote:

> Hello,
>
> My spark streaming app that reads kafka topics and prints the DStream
> works fine on my laptop, but on AWS cluster it produces no output and no
> errors.
>
> Please help me debug.
>
> I am using Spark 2.0.2 and kafka-0-10
>
> Thanks
>
> The following is the output of the spark streaming app...
>
>
> 17/01/14 06:22:41 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 17/01/14 06:22:43 WARN Checkpoint: Checkpoint directory check1 does not exist
> Creating new context
> 17/01/14 06:22:45 WARN SparkContext: Use an existing SparkContext, some 
> configuration may not take effect.
> 17/01/14 06:22:45 WARN KafkaUtils: overriding enable.auto.commit to false for 
> executor
> 17/01/14 06:22:45 WARN KafkaUtils: overriding auto.offset.reset to none for 
> executor
> 17/01/14 06:22:45 WARN KafkaUtils: overriding executor group.id to 
> spark-executor-whilDataStream
> 17/01/14 06:22:45 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 
> see KAFKA-3135
>
>
>


Equally split a RDD partition into two partition at the same node

2017-01-14 Thread Fei Hu
Dear all,

I want to equally divide a RDD partition into two partitions. That means,
the first half of elements in the partition will create a new partition,
and the second half of elements in the partition will generate another new
partition. But the two new partitions are required to be at the same node
with their parent partition, which can help get high data locality.

Is there anyone who knows how to implement it or any hints for it?

Thanks in advance,
Fei


Re: Debugging a PythonException with no details

2017-01-14 Thread Marco Mistroni
It seems it has to do with UDF..Could u share snippet of code you are
running?
Kr

On 14 Jan 2017 1:40 am, "Nicholas Chammas" 
wrote:

> I’m looking for tips on how to debug a PythonException that’s very sparse
> on details. The full exception is below, but the only interesting bits
> appear to be the following lines:
>
> org.apache.spark.api.python.PythonException:
> ...
> py4j.protocol.Py4JError: An error occurred while calling 
> None.org.apache.spark.api.java.JavaSparkContext
>
> Otherwise, the only other clue from the traceback I can see is that the
> problem may involve a UDF somehow.
>
> I’ve tested this code against many datasets (stored as ORC) and it works
> fine. The same code only seems to throw this error on a few datasets that
> happen to be sourced via JDBC. I can’t seem to get a lead on what might be
> going wrong here.
>
> Does anyone have tips on how to debug a problem like this? How do I find
> more specifically what is going wrong?
>
> Nick
>
> Here’s the full exception:
>
> 17/01/13 17:12:14 WARN TaskSetManager: Lost task 7.0 in stage 9.0 (TID 15, 
> devlx023.private.massmutual.com, executor 4): 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 
> 161, in main
> func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 97, 
> in read_udfs
> arg_offsets, udf = read_single_udf(pickleSer, infile)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 78, 
> in read_single_udf
> f, return_type = read_command(pickleSer, infile)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 54, 
> in read_command
> command = serializer._read_with_length(file)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 169, in _read_with_length
> return self.loads(obj)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 431, in loads
> return pickle.loads(obj, encoding=encoding)
>   File 
> "/hadoop/yarn/nm/usercache/jenkins/appcache/application_1483203887152_1207/container_1483203887152_1207_01_05/splinkr/person.py",
>  line 111, in 
> py_normalize_udf = udf(py_normalize, StringType())
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/sql/functions.py", 
> line 1868, in udf
> return UserDefinedFunction(f, returnType)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/sql/functions.py", 
> line 1826, in __init__
> self._judf = self._create_judf(name)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/sql/functions.py", 
> line 1830, in _create_judf
> sc = SparkContext.getOrCreate()
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 307, in getOrCreate
> SparkContext(conf=conf or SparkConf())
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 118, in __init__
> conf, jsc, profiler_cls)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 179, in _do_init
> self._jsc = jsc or self._initialize_context(self._conf._jconf)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 246, in _initialize_context
> return self._jvm.JavaSparkContext(jconf)
>   File 
> "/hadoop/spark/2.1/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 
> 1401, in __call__
> answer, self._gateway_client, None, self._fqn)
>   File "/hadoop/spark/2.1/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", 
> line 327, in get_return_value
> format(target_id, ".", name))
> py4j.protocol.Py4JError: An error occurred while calling 
> None.org.apache.spark.api.java.JavaSparkContext
>
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
> at 
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
> at 
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> 

java.io.NotSerializableException: org.apache.spark.SparkConf

2017-01-14 Thread streamly tester
Hi,

I was playing with spark streaming and I wanted to collect data from MQTT
and publish them into Cassandra.

Here is my code,

package com.wouri.streamly.examples.mqtt;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.mqtt.MQTTUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraJavaUtil;

/**
 * Counts words in UTF8 encoded, '\n' delimited text received from MQTT
Server.
 *
 * Usage: JavaMQTTStreamWordCountand

 * describe the MQTT server that Structured Streaming would connect to
receive
 * data.
 *
 * To run this on your local machine, a MQTT Server should be up and
running.
 *
 */
public class JavaMQTTStreamWordCount implements Serializable{

static Logger log =
LoggerFactory.getLogger(JavaMQTTStreamWordCount.class);

static String wordTable = "words";

private transient static SparkConf conf;
private JavaMQTTStreamWordCount(SparkConf conf) {
this.conf = conf;
}
private static void sendDataToCassandra(JavaSparkContext sc,String
keyspace,Map wordCounts) {
log.info("In generateData");
CassandraConnector connector =
CassandraConnector.apply(sc.getConf());
System.out.println("WordCounts "+wordCounts);

// Prepare the schema
try (Session session = connector.openSession()){
session.execute("CREATE TABLE IF NOT EXISTS " + keyspace +"."
+wordTable + " (id INT PRIMARY KEY, word TEXT, counts INT)");
}

log.info("keyspace {} and tables : {}, {} and  {} created
successfully", keyspace, productsTable, salesTable, summariesTable);
// Prepare the products hierarchy
List words = new ArrayList<>();
int i = 0;
for (Map.Entry word : wordCounts.entrySet()){
Word eWord = new Word(i, word.getKey(), (int)
(long)word.getValue());
words.add(i, eWord);
i++;
}

log.info("Products to add : {}", words);
JavaRDD productsRDD = sc.parallelize(words);


CassandraJavaUtil.javaFunctions(productsRDD).writerBuilder(keyspace,
wordTable, CassandraJavaUtil.mapToRow(Word.class)).saveToCassandra();

log.info("Words successfully added : {}", words);
}
static private Map wordCounts = new HashMap<>();

public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaMQTTStreamWordCount 

");
System.exit(1);
}

String brokerUrl = args[0];
String topic = args[1];
String clientID = args[2];
String username = args[3];
String password = args[4];
String keyspace = args[5];
String cassandraContactPoints = args[6];
SparkConf conf = new SparkConf();


SparkConf sparkConf = new
SparkConf().setAppName("JavaMQTTStreamWordCount");
sparkConf.set("spark.driver.allowMultipleContexts", "true");
sparkConf.set("spark.cores.max", "2");
sparkConf.set("spark.cassandra.connection.host",
cassandraContactPoints );
sparkConf.set("spark.cassandra.connection.port", "9042" );
sparkConf.set("spark.cassandra.auth.username", username);
sparkConf.set("spark.cassandra.auth.password", password);
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true");
sparkConf.set("spark.cores.max", "2");

// check Spark configuration for master URL, set it to local if not
// configured
if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[4]");
}

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(5));

JavaReceiverInputDStream lines =
MQTTUtils.createStream(jssc, brokerUrl, topic, clientID, username,
password, false);

JavaDStream words = lines.flatMap(new
FlatMapFunction

Re: Kryo On Spark 1.6.0

2017-01-14 Thread Yan Facai
For scala, you could fix it by using:
conf.registerKryoClasses(Array(Class.forName("scala.collection.mutable.
WrappedArray$ofRef")))


By the way,
if the class is array of primitive class of Java, say byte[], then to use:
Class.forName("[B")

if it is array of other class, say scala.collection.mutable.WrappedArray$ofRef,
then to use:
Class.forName("[Lscala.collection.mutable.WrappedArray$ofRef")

ref:
https://docs.oracle.com/javase/8/docs/api/java/lang/Class.html#getName--





On Tue, Jan 10, 2017 at 11:11 PM, Yang Cao  wrote:

> If you don’t mind, could please share me with the scala solution? I tried
> to use kryo but seamed not work at all. I hope to get some practical
> example. THX
>
> On 2017年1月10日, at 19:10, Enrico DUrso  wrote:
>
> Hi,
>
> I am trying to use Kryo on Spark 1.6.0.
> I am able to register my own classes and it works, but when I set
> “spark.kryo.registrationRequired “ to true, I get an error about a scala
> class:
> “Class is not registered: scala.collection.mutable.WrappedArray$ofRef”.
>
> Any of you has already solved this issue in Java? I found the code to
> solve it in Scala, but unable to register this class in Java.
>
> Cheers,
>
> enrico
>
> --
>
> CONFIDENTIALITY WARNING.
> This message and the information contained in or attached to it are
> private and confidential and intended exclusively for the addressee. everis
> informs to whom it may receive it in error that it contains privileged
> information and its use, copy, reproduction or distribution is prohibited.
> If you are not an intended recipient of this E-mail, please notify the
> sender, delete it and do not read, act upon, print, disclose, copy, retain
> or redistribute any portion of this E-mail.
>
>
>


Re: spark locality

2017-01-14 Thread vincent gromakowski
Should I open a ticket to allow data locality in IP per container context ?

2017-01-12 23:41 GMT+01:00 Michael Gummelt :

> If the executor reports a different hostname inside the CNI container,
> then no, I don't think so.
>
> On Thu, Jan 12, 2017 at 2:28 PM, vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> So even if I make the Spark executors run on the same node as Casssandra
>> nodes, I am not sure each worker will connect to c* nodes on the same mesos
>> agent ?
>>
>> 2017-01-12 21:13 GMT+01:00 Michael Gummelt :
>>
>>> The code in there w/ docs that reference CNI doesn't actually run when
>>> CNI is in effect, and doesn't have anything to do with locality.  It's just
>>> making Spark work in a no-DNS environment
>>>
>>> On Thu, Jan 12, 2017 at 12:04 PM, vincent gromakowski <
>>> vincent.gromakow...@gmail.com> wrote:
>>>
 I have found this but I am not sure how it can help...
 https://github.com/mesosphere/spark-build/blob/a9efef8850976
 f787956660262f3b77cd636f3f5/conf/spark-env.sh


 2017-01-12 20:16 GMT+01:00 Michael Gummelt :

> That's a good point. I hadn't considered the locality implications of
> CNI yet.  I think tasks are placed based on the hostname reported by the
> executor, which in a CNI container will be different than the
> HDFS/Cassandra hostname.  I'm not aware of anyone running Spark+CNI in 
> prod
> yet, either.
>
> However, locality in Mesos isn't great right now anyway.  Executors
> are placed w/o regard to locality.  Locality is only taken into account
> when tasks are assigned to executors.  So if you get a locality-poor
> executor placement, you'll also have locality poor task placement.  It
> could be better.
>
> On Thu, Jan 12, 2017 at 7:55 AM, vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> Hi all,
>> Does anyone have experience running Spark on Mesos with CNI (ip per
>> container) ?
>> How would Spark use IP or hostname for data locality with backend
>> framework like HDFS or Cassandra ?
>>
>> V
>>
>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


>>>
>>>
>>> --
>>> Michael Gummelt
>>> Software Engineer
>>> Mesosphere
>>>
>>
>>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>