Questions regarding Jobs, Stages and Caching

2017-05-24 Thread ramnavan
Hi,
 
I’m new to Spark and trying to understand the inner workings of Spark in the
below mentioned scenarios. I’m using PySpark and Spark 2.1.1
 
Spark.read.json():
 
I am running executing this line
“spark.read.json(‘s3a:///*.json’)” and a cluster with three
worker nodes (AWS M4.xlarge instances). The bucket has about 19949 json
files and the total size is about 4.4 GB. The line created three spark jobs
first job with 1 tasks, second job with 19949 tasks and third job with
1 tasks. Each of the jobs have one stage in it. Please refer to the
attached images job0, job1 and job2.jpg.   job0.jpg

job1.jpg

job2.jpg
   
I was expecting it to create 1 job with 19949 tasks.  I’d like to understand
why there are three jobs instead of just one and why reading json files
calls for map operation.
 
Caching and Count():
 
Once spark reads 19949 json files into a dataframe (let’s call it files_df),
I am calling these two operations files_df.createOrReplaceTempView(“files)
and files_df.cache(). I am expecting files_df.cache() will cache the entire
dataframe in memory so any subsequent operation will be faster. My next
statement is files_df.count(). This operation took an entire 8.8 minutes and
it looks like it read the files again from s3 and calculated the count. 
Please refer to attached count.jpg file for reference.   count.jpg
  
Why is this happening? If I call files_df.count() for the second time, it
comes back fast within few seconds. Can someone explain this?
 
In general, I am looking for a good source to learn about Spark Internals
and try to understand what’s happening beneath the hood.
 
Thanks in advance!
 
Ram



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Questions-regarding-Jobs-Stages-and-Caching-tp28708.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2 Kafka Direct Stream Consumer Issue

2017-05-24 Thread Jayadeep J
Could any of the experts kindly advise ?

On Fri, May 19, 2017 at 6:00 PM, Jayadeep J  wrote:

> Hi ,
>
> I would appreciate some advice regarding an issue we are facing in
> Streaming Kafka Direct Consumer.
>
> We have recently upgraded our application with Kafka Direct Stream to
> Spark 2 (spark-streaming-kafka-0-10 - 2.1.0) with Kafka version (0.10.0.0)
>  . We find abnormal delays after the application has run for a couple of
> hours & completed consumption of a ~ 10 million records. There is a sudden
> dip in the processing time for ~15 seconds (usual for our app) to ~3
> minutes & from then on the processing time keeps degrading throughout
> without any failure though.
>
> We have seen that the delay is due to certain tasks taking the exact time
> duration of the configured 'request.timeout.ms'  for the Kafka consumer.
> We have tested this by varying timeout property to different values. Looks
> like the get(offset: Long, timeout: Long): ConsumerRecord[K, V]  &
> subsequent poll(timeout) method in CachedKafkaConsumer.scala is actually
> timing out on some of the partitions without reading the data. But the
> executor logs it as successfully completed after the exact timeout
> duration. Note that most other tasks are completing successfully with
> millisecond duration.  We found the DEBUG logs to contain
>  "org.apache.kafka.common.errors.DisconnectException" without any actual
> failure. The Kafka issue logged as 'KafkaConsumer susceptible to
> FetchResponse starvation' [KAFKA-4753] seems to be the underlying cause.
>
> Could anyone kindly suggest if this a normal behaviour for
> spark? Shouldn't Spark throw Timeout error or may be fail the tasks in such
> cases ?? Currently the tasks seems to be successful & the job appears to
> progress with really slow speed.  Thanks for your help.
>
> Thanks
> Jay
>


Re: Running into the same problem as JIRA SPARK-19268

2017-05-24 Thread kant kodali
Hi All,

I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can see
below however I dont see checkpoint directory under my hadoop_home=
/usr/local/hadoop in either datanodes or namenodes however in datanode
machine there seems to be some data under

/usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.X-1495672725898/current/finalized/subdir0/subdir0

I thought the checkpoint directory will be created by spark once I specify
the path but do I manually need to create checkpoint dir using mkdir in all
spark worker machines? I am new to HDFS as well so please let me know. I
can try sending df.explain("true") but I have 100 fields in my schema so
Project looks really big and if this is not a problem for you guys I can
send that as well.


  +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
-> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers ->
X.X.X.X:9092 , checkpointLocation ->
/usr/local/hadoop/checkpoint, startingOffsets -> earliest),None),
kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5,
timestampType#6]

*Here is the stack trace*

StructField(OptionalContext4,StringType,true),
StructField(OptionalContext5,StringType,true)),true), cast(value#1 as
string)) AS payload#15]
+- StreamingExecutionRelation
KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2,
partition#3, offset#4L, timestamp#5, timestampType#6]

at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost
task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0):
java.lang.IllegalStateException: Error reading delta file
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
of HDFSStateStoreProvider[id = (op=0, part=2), dir =
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]:
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
does not exist
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
at 
org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
at 
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist:
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
at 

Re: Running into the same problem as JIRA SPARK-19268

2017-05-24 Thread Shixiong(Ryan) Zhu
What's the value of "hdfsCheckPointDir"? Could you list this directory on
HDFS and report the files there?

On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust 
wrote:

> -dev
>
> Have you tried clearing out the checkpoint directory?  Can you also give
> the full stack trace?
>
> On Wed, May 24, 2017 at 3:45 PM, kant kodali  wrote:
>
>> Even if I do simple count aggregation like below I get the same error as
>> https://issues.apache.org/jira/browse/SPARK-19268
>>
>> Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 
>> hours", "24 hours"), df1.col("AppName")).count();
>>
>>
>> On Wed, May 24, 2017 at 3:35 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>>> Kafka
>>>
>>> I am running into the same problem as https://issues.apache.org/jira
>>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>>
>>> Here is my sample code
>>>
>>> *Here is how I create ReadStream*
>>>
>>> sparkSession.readStream()
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", 
>>> config.getString("kafka.consumer.settings.bootstrapServers"))
>>> .option("subscribe", 
>>> config.getString("kafka.consumer.settings.topicName"))
>>> .option("startingOffsets", "earliest")
>>> .option("failOnDataLoss", "false")
>>> .option("checkpointLocation", hdfsCheckPointDir)
>>> .load();
>>>
>>>
>>> *The core logic*
>>>
>>> Dataset df = ds.select(from_json(new Column("value").cast("string"), 
>>> client.getSchema()).as("payload"));
>>> Dataset df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>> Dataset df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", 
>>> "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>> StreamingQuery query = df1.writeStream().foreach(new 
>>> KafkaSink()).outputMode("update").start();
>>> query.awaitTermination();
>>>
>>>
>>> I can also provide any other information you may need.
>>>
>>> Thanks!
>>>
>>
>>
>


Re: Running into the same problem as JIRA SPARK-19268

2017-05-24 Thread Michael Armbrust
-dev

Have you tried clearing out the checkpoint directory?  Can you also give
the full stack trace?

On Wed, May 24, 2017 at 3:45 PM, kant kodali  wrote:

> Even if I do simple count aggregation like below I get the same error as
> https://issues.apache.org/jira/browse/SPARK-19268
>
> Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 
> hours", "24 hours"), df1.col("AppName")).count();
>
>
> On Wed, May 24, 2017 at 3:35 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>> Kafka
>>
>> I am running into the same problem as https://issues.apache.org/jira
>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>
>> Here is my sample code
>>
>> *Here is how I create ReadStream*
>>
>> sparkSession.readStream()
>> .format("kafka")
>> .option("kafka.bootstrap.servers", 
>> config.getString("kafka.consumer.settings.bootstrapServers"))
>> .option("subscribe", 
>> config.getString("kafka.consumer.settings.topicName"))
>> .option("startingOffsets", "earliest")
>> .option("failOnDataLoss", "false")
>> .option("checkpointLocation", hdfsCheckPointDir)
>> .load();
>>
>>
>> *The core logic*
>>
>> Dataset df = ds.select(from_json(new Column("value").cast("string"), 
>> client.getSchema()).as("payload"));
>> Dataset df1 = df.selectExpr("payload.info.*", "payload.data.*");
>> Dataset df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 
>> hours"), df1.col("AppName")).agg(sum("Amount"));
>> StreamingQuery query = df1.writeStream().foreach(new 
>> KafkaSink()).outputMode("update").start();
>> query.awaitTermination();
>>
>>
>> I can also provide any other information you may need.
>>
>> Thanks!
>>
>
>


Re: Running into the same problem as JIRA SPARK-19268

2017-05-24 Thread kant kodali
Even if I do simple count aggregation like below I get the same error as
https://issues.apache.org/jira/browse/SPARK-19268

Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"),
"24 hours", "24 hours"), df1.col("AppName")).count();


On Wed, May 24, 2017 at 3:35 PM, kant kodali  wrote:

> Hi All,
>
> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
> Kafka
>
> I am running into the same problem as https://issues.apache.org/
> jira/browse/SPARK-19268 with my app(not KafkaWordCount).
>
> Here is my sample code
>
> *Here is how I create ReadStream*
>
> sparkSession.readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers", 
> config.getString("kafka.consumer.settings.bootstrapServers"))
> .option("subscribe", 
> config.getString("kafka.consumer.settings.topicName"))
> .option("startingOffsets", "earliest")
> .option("failOnDataLoss", "false")
> .option("checkpointLocation", hdfsCheckPointDir)
> .load();
>
>
> *The core logic*
>
> Dataset df = ds.select(from_json(new Column("value").cast("string"), 
> client.getSchema()).as("payload"));
> Dataset df1 = df.selectExpr("payload.info.*", "payload.data.*");
> Dataset df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 
> hours"), df1.col("AppName")).agg(sum("Amount"));
> StreamingQuery query = df1.writeStream().foreach(new 
> KafkaSink()).outputMode("update").start();
> query.awaitTermination();
>
>
> I can also provide any other information you may need.
>
> Thanks!
>


Re: One question / kerberos, yarn-cluster -> connection to hbase

2017-05-24 Thread Michael Gummelt
What version of Spark are you using?  Can you provide your logs with DEBUG
logging enabled?  You should see these logs:
https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L475

On Wed, May 24, 2017 at 10:07 AM, Sudhir Jangir 
wrote:

> Facing one issue with Kerberos enabled Hadoop/CDH cluster.
>
>
>
> We are trying to run a streaming job on yarn-cluster, which interacts with
> Kafka (direct stream), and hbase.
>
>
>
> Somehow, we are not able to connect to hbase in the cluster mode. We use
> keytab to login to hbase.
>
>
>
> This is what we do:
>
> *spark-submit --master yarn-cluster --keytab "dev.keytab" --principal
> "d...@io-int.com "*  --conf "spark.executor.
> extraJavaOptions=-Dlog4j.configuration=log4j_executor_conf.properties
> -XX:+UseG1GC" --conf "spark.driver.extraJavaOptions=-Dlog4j.
> configuration=log4j_driver_conf.properties -XX:+UseG1GC" --conf
> spark.yarn.stagingDir=hdfs:///tmp/spark/ --files
> "job.properties,log4j_driver_conf.properties,log4j_executor_conf.properties"
> service-0.0.1-SNAPSHOT.jar job.properties
>
>
>
> To connect to hbase:
>
>  def getHbaseConnection(properties: SerializedProperties): (Connection,
> UserGroupInformation) = {
>
>
>
>
>
> val config = HBaseConfiguration.create();
>
> config.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM_VALUE);
>
> config.set("hbase.zookeeper.property.clientPort", 2181);
>
> config.set("hadoop.security.authentication", "kerberos");
>
> config.set("hbase.security.authentication", "kerberos");
>
> config.set("hbase.cluster.distributed", "true");
>
> config.set("hbase.rpc.protection", "privacy");
>
>config.set("hbase.regionserver.kerberos.principal", “hbase/_
> h...@io-int.com”);
>
> config.set("hbase.master.kerberos.principal", “hbase/_h...@io-int.com
> ”);
>
>
>
> UserGroupInformation.setConfiguration(config);
>
>
>
>  var ugi: UserGroupInformation = null;
>
>   if (SparkFiles.get(properties.keytab) != null
>
> && (new java.io.File(SparkFiles.get(properties.keytab)).exists)) {
>
> ugi = UserGroupInformation.loginUserFromKeytabAndReturnUG
> I(properties.kerberosPrincipal,
>
>   SparkFiles.get(properties.keytab));
>
>   } else {
>
> ugi = UserGroupInformation.loginUserFromKeytabAndReturnUG
> I(properties.kerberosPrincipal,
>
>   properties.keytab);
>
>   }
>
>
>
>
>
> val connection = ConnectionFactory.createConnection(config);
>
> return (connection, ugi);
>
>   }
>
>
>
> and we connect to hbase:
>
>  ….foreachRDD { rdd =>
>
>   if (!rdd.isEmpty()) {
>
> //*var* *ugi*: UserGroupInformation = Utils.getHbaseConnection(
> properties)._2
>
> rdd.foreachPartition { partition =>
>
>   val connection = Utils.getHbaseConnection(propsObj)._1
>
>   val table = …
>
>   partition.foreach { json =>
>
>
>
>   }
>
>   table.put(puts)
>
>   table.close()
>
>   connection.close()
>
> }
>
>   }
>
> }
>
>
>
>
>
> Keytab file is not getting copied to yarn staging/temp directory, we are
> not getting that in SparkFiles.get… and if we pass keytab with --files,
> spark-submit is failing because it’s there in --keytab already.
>
>
>
> Thanks,
>
> Sudhir
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: KMeans Clustering is not Reproducible

2017-05-24 Thread Christoph Brücke
Hi Ankur,

thank you for answering. But my problem is not, that I'm stuck in a local
extrema but rather the reproducibility of kmeans. Want I'm trying to
achieve is: when the input data and all the parameters stay the same,
especially the seed, I want to get the exact same results. Even though the
partitioning changes. As far as I'm concerned if I'm setting a seed in a ML
algorithm, I would expect that this algorithm is deterministic.

Unfortunately I couldn't find any information if this a goal of Spark's
mllib or not.

Maybe a little bit of background. I'm trying to benchmark some ML
algorithms while changing my cluster config. That is I want to find the
best cluster config to achieve the same results. But what I see is that
when I change the amount of executors, the results become incomparable,
since the results differ.

So in essence my question is, are the algorithms in the mllib partition
agnostic or not?

Thanks for your help,
Christoph

Am 24.05.2017 20:49 schrieb "Ankur Srivastava" :

Hi Christoph,

I am not an expert in ML and have not used Spark KMeans but your problem
seems to be an issue of local minimum vs global minimum. You should run
K-means multiple times with random starting point and also try with
multiple values of K (unless you are already sure).

Hope this helps.

Thanks
Ankur



On Wed, May 24, 2017 at 2:15 AM, Christoph Bruecke 
wrote:

> Hi Anastasios,
>
> thanks for the reply but caching doesn’t seem to change anything.
>
> After further investigation it really seems that the RDD#takeSample method
> is the cause of the non-reproducibility.
>
> Is this considered a bug and should I open an Issue for that?
>
> BTW: my example script contains a little type in line 3: it is `feature`
> not `features` (mind the `s`).
>
> Best,
> Christoph
>
> The script with caching
>
> ```
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.ml.clustering.KMeans
> import org.apache.spark.ml.feature.VectorAssembler
> import org.apache.spark.storage.StorageLevel
>
> // generate random data for clustering
> val randomData = spark.range(1, 1000).withColumn("a",
> rand(123)).withColumn("b", rand(321))
>
> val vecAssembler = new VectorAssembler().setInputCols(Array("a",
> "b")).setOutputCol("features")
>
> val data = vecAssembler.transform(randomData)
>
> // instantiate KMeans with fixed seed
> val kmeans = new KMeans().setK(10).setSeed(9876L)
>
> // train the model with different partitioning
> val dataWith1Partition = data.repartition(1)
> // cache the data
> dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK)
> println("1 Partition: " + kmeans.fit(dataWith1Partition)
> .computeCost(dataWith1Partition))
>
> val dataWith4Partition = data.repartition(4)
> // cache the data
> dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK)
> println("4 Partition: " + kmeans.fit(dataWith4Partition)
> .computeCost(dataWith4Partition))
>
>
> ```
>
> Output:
>
> ```
> 1 Partition: 16.028212597888057
> 4 Partition: 16.14758460544976
> ```
>
> > On 22 May 2017, at 16:33, Anastasios Zouzias  wrote:
> >
> > Hi Christoph,
> >
> > Take a look at this, you might end up having a similar case:
> >
> > http://www.spark.tc/using-sparks-cache-for-correctness-not-
> just-performance/
> >
> > If this is not the case, then I agree with you the kmeans should be
> partitioning agnostic (although I haven't check the code yet).
> >
> > Best,
> > Anastasios
> >
> >
> > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke 
> wrote:
> > Hi,
> >
> > I’m trying to figure out how to use KMeans in order to achieve
> reproducible results. I have found that running the same kmeans instance on
> the same data, with different partitioning will produce different
> clusterings.
> >
> > Given a simple KMeans run with fixed seed returns different results on
> the same
> > training data, if the training data is partitioned differently.
> >
> > Consider the following example. The same KMeans clustering set up is run
> on
> > identical data. The only difference is the partitioning of the training
> data
> > (one partition vs. four partitions).
> >
> > ```
> > import org.apache.spark.sql.DataFrame
> > import org.apache.spark.ml.clustering.KMeans
> > import org.apache.spark.ml.features.VectorAssembler
> >
> > // generate random data for clustering
> > val randomData = spark.range(1, 1000).withColumn("a",
> rand(123)).withColumn("b", rand(321))
> >
> > val vecAssembler = new VectorAssembler().setInputCols(Array("a",
> "b")).setOutputCol("features")
> >
> > val data = vecAssembler.transform(randomData)
> >
> > // instantiate KMeans with fixed seed
> > val kmeans = new KMeans().setK(10).setSeed(9876L)
> >
> > // train the model with different partitioning
> > val dataWith1Partition = data.repartition(1)
> > println("1 Partition: " + kmeans.fit(dataWith1Partition)
> .computeCost(dataWith1Partition))
> >
> > val dataWith4Partition = data.repartition(4)
> > 

Re: KMeans Clustering is not Reproducible

2017-05-24 Thread Yu Zhang
I agree with what Ankur said. The kmeans seeding program ('takeSample'
method) runs in parallel, so each partition has its sampling points based
on the local data which will cause the 'partition agnostic'. The seeding
method is based on Bahmani et al. kmeansII algorithm which gives
approximation guarantees on the kmeans cost.

You could set the initial seeding points which will avoid the 'agnostic'
issue.



Regards,
Yu Zhang

On Wed, May 24, 2017 at 1:49 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Hi Christoph,
>
> I am not an expert in ML and have not used Spark KMeans but your problem
> seems to be an issue of local minimum vs global minimum. You should run
> K-means multiple times with random starting point and also try with
> multiple values of K (unless you are already sure).
>
> Hope this helps.
>
> Thanks
> Ankur
>
>
>
> On Wed, May 24, 2017 at 2:15 AM, Christoph Bruecke 
> wrote:
>
>> Hi Anastasios,
>>
>> thanks for the reply but caching doesn’t seem to change anything.
>>
>> After further investigation it really seems that the RDD#takeSample
>> method is the cause of the non-reproducibility.
>>
>> Is this considered a bug and should I open an Issue for that?
>>
>> BTW: my example script contains a little type in line 3: it is `feature`
>> not `features` (mind the `s`).
>>
>> Best,
>> Christoph
>>
>> The script with caching
>>
>> ```
>> import org.apache.spark.sql.DataFrame
>> import org.apache.spark.ml.clustering.KMeans
>> import org.apache.spark.ml.feature.VectorAssembler
>> import org.apache.spark.storage.StorageLevel
>>
>> // generate random data for clustering
>> val randomData = spark.range(1, 1000).withColumn("a",
>> rand(123)).withColumn("b", rand(321))
>>
>> val vecAssembler = new VectorAssembler().setInputCols(Array("a",
>> "b")).setOutputCol("features")
>>
>> val data = vecAssembler.transform(randomData)
>>
>> // instantiate KMeans with fixed seed
>> val kmeans = new KMeans().setK(10).setSeed(9876L)
>>
>> // train the model with different partitioning
>> val dataWith1Partition = data.repartition(1)
>> // cache the data
>> dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK)
>> println("1 Partition: " + kmeans.fit(dataWith1Partition)
>> .computeCost(dataWith1Partition))
>>
>> val dataWith4Partition = data.repartition(4)
>> // cache the data
>> dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK)
>> println("4 Partition: " + kmeans.fit(dataWith4Partition)
>> .computeCost(dataWith4Partition))
>>
>>
>> ```
>>
>> Output:
>>
>> ```
>> 1 Partition: 16.028212597888057
>> 4 Partition: 16.14758460544976
>> ```
>>
>> > On 22 May 2017, at 16:33, Anastasios Zouzias  wrote:
>> >
>> > Hi Christoph,
>> >
>> > Take a look at this, you might end up having a similar case:
>> >
>> > http://www.spark.tc/using-sparks-cache-for-correctness-not-
>> just-performance/
>> >
>> > If this is not the case, then I agree with you the kmeans should be
>> partitioning agnostic (although I haven't check the code yet).
>> >
>> > Best,
>> > Anastasios
>> >
>> >
>> > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke 
>> wrote:
>> > Hi,
>> >
>> > I’m trying to figure out how to use KMeans in order to achieve
>> reproducible results. I have found that running the same kmeans instance on
>> the same data, with different partitioning will produce different
>> clusterings.
>> >
>> > Given a simple KMeans run with fixed seed returns different results on
>> the same
>> > training data, if the training data is partitioned differently.
>> >
>> > Consider the following example. The same KMeans clustering set up is
>> run on
>> > identical data. The only difference is the partitioning of the training
>> data
>> > (one partition vs. four partitions).
>> >
>> > ```
>> > import org.apache.spark.sql.DataFrame
>> > import org.apache.spark.ml.clustering.KMeans
>> > import org.apache.spark.ml.features.VectorAssembler
>> >
>> > // generate random data for clustering
>> > val randomData = spark.range(1, 1000).withColumn("a",
>> rand(123)).withColumn("b", rand(321))
>> >
>> > val vecAssembler = new VectorAssembler().setInputCols(Array("a",
>> "b")).setOutputCol("features")
>> >
>> > val data = vecAssembler.transform(randomData)
>> >
>> > // instantiate KMeans with fixed seed
>> > val kmeans = new KMeans().setK(10).setSeed(9876L)
>> >
>> > // train the model with different partitioning
>> > val dataWith1Partition = data.repartition(1)
>> > println("1 Partition: " + kmeans.fit(dataWith1Partition)
>> .computeCost(dataWith1Partition))
>> >
>> > val dataWith4Partition = data.repartition(4)
>> > println("4 Partition: " + kmeans.fit(dataWith4Partition)
>> .computeCost(dataWith4Partition))
>> > ```
>> >
>> > I get the following related cost
>> >
>> > ```
>> > 1 Partition: 16.028212597888057
>> > 4 Partition: 16.14758460544976
>> > ```
>> >
>> > What I want to achieve is that repeated computations of the KMeans
>> Clustering should yield identical result on identical 

[PySpark] - Broadcast Variable Pickle Registry Usage?

2017-05-24 Thread Michael Mansour (CS)
Hi all,

I’m poking around the Pyspark.Broadcast module, and I notice that one can pass 
in a `pickle_registry` and a `path`.  The documentation does not outline the 
pickle registry use and I’m curious about how to use it, and if there are any 
advantages to it.

Thanks,

Michael Mansour


Re: KMeans Clustering is not Reproducible

2017-05-24 Thread Ankur Srivastava
Hi Christoph,

I am not an expert in ML and have not used Spark KMeans but your problem
seems to be an issue of local minimum vs global minimum. You should run
K-means multiple times with random starting point and also try with
multiple values of K (unless you are already sure).

Hope this helps.

Thanks
Ankur



On Wed, May 24, 2017 at 2:15 AM, Christoph Bruecke 
wrote:

> Hi Anastasios,
>
> thanks for the reply but caching doesn’t seem to change anything.
>
> After further investigation it really seems that the RDD#takeSample method
> is the cause of the non-reproducibility.
>
> Is this considered a bug and should I open an Issue for that?
>
> BTW: my example script contains a little type in line 3: it is `feature`
> not `features` (mind the `s`).
>
> Best,
> Christoph
>
> The script with caching
>
> ```
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.ml.clustering.KMeans
> import org.apache.spark.ml.feature.VectorAssembler
> import org.apache.spark.storage.StorageLevel
>
> // generate random data for clustering
> val randomData = spark.range(1, 1000).withColumn("a",
> rand(123)).withColumn("b", rand(321))
>
> val vecAssembler = new VectorAssembler().setInputCols(Array("a",
> "b")).setOutputCol("features")
>
> val data = vecAssembler.transform(randomData)
>
> // instantiate KMeans with fixed seed
> val kmeans = new KMeans().setK(10).setSeed(9876L)
>
> // train the model with different partitioning
> val dataWith1Partition = data.repartition(1)
> // cache the data
> dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK)
> println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(
> dataWith1Partition))
>
> val dataWith4Partition = data.repartition(4)
> // cache the data
> dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK)
> println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(
> dataWith4Partition))
>
>
> ```
>
> Output:
>
> ```
> 1 Partition: 16.028212597888057
> 4 Partition: 16.14758460544976
> ```
>
> > On 22 May 2017, at 16:33, Anastasios Zouzias  wrote:
> >
> > Hi Christoph,
> >
> > Take a look at this, you might end up having a similar case:
> >
> > http://www.spark.tc/using-sparks-cache-for-correctness-
> not-just-performance/
> >
> > If this is not the case, then I agree with you the kmeans should be
> partitioning agnostic (although I haven't check the code yet).
> >
> > Best,
> > Anastasios
> >
> >
> > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke 
> wrote:
> > Hi,
> >
> > I’m trying to figure out how to use KMeans in order to achieve
> reproducible results. I have found that running the same kmeans instance on
> the same data, with different partitioning will produce different
> clusterings.
> >
> > Given a simple KMeans run with fixed seed returns different results on
> the same
> > training data, if the training data is partitioned differently.
> >
> > Consider the following example. The same KMeans clustering set up is run
> on
> > identical data. The only difference is the partitioning of the training
> data
> > (one partition vs. four partitions).
> >
> > ```
> > import org.apache.spark.sql.DataFrame
> > import org.apache.spark.ml.clustering.KMeans
> > import org.apache.spark.ml.features.VectorAssembler
> >
> > // generate random data for clustering
> > val randomData = spark.range(1, 1000).withColumn("a",
> rand(123)).withColumn("b", rand(321))
> >
> > val vecAssembler = new VectorAssembler().setInputCols(Array("a",
> "b")).setOutputCol("features")
> >
> > val data = vecAssembler.transform(randomData)
> >
> > // instantiate KMeans with fixed seed
> > val kmeans = new KMeans().setK(10).setSeed(9876L)
> >
> > // train the model with different partitioning
> > val dataWith1Partition = data.repartition(1)
> > println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(
> dataWith1Partition))
> >
> > val dataWith4Partition = data.repartition(4)
> > println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(
> dataWith4Partition))
> > ```
> >
> > I get the following related cost
> >
> > ```
> > 1 Partition: 16.028212597888057
> > 4 Partition: 16.14758460544976
> > ```
> >
> > What I want to achieve is that repeated computations of the KMeans
> Clustering should yield identical result on identical training data,
> regardless of the partitioning.
> >
> > Looking through the Spark source code, I guess the cause is the
> initialization method of KMeans which in turn uses the `takeSample` method,
> which does not seem to be partition agnostic.
> >
> > Is this behaviour expected? Is there anything I could do to achieve
> reproducible results?
> >
> > Best,
> > Christoph
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> >
> >
> > --
> > -- Anastasios Zouzias
>
>
> -
> To unsubscribe e-mail: 

One question / kerberos, yarn-cluster -> connection to hbase

2017-05-24 Thread Sudhir Jangir
Facing one issue with Kerberos enabled Hadoop/CDH cluster.

 

We are trying to run a streaming job on yarn-cluster, which interacts with 
Kafka (direct stream), and hbase. 

 

Somehow, we are not able to connect to hbase in the cluster mode. We use keytab 
to login to hbase. 

 

This is what we do:

spark-submit --master yarn-cluster --keytab "dev.keytab" --principal 
"d...@io-int.com"  --conf 
"spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j_executor_conf.properties
 -XX:+UseG1GC" --conf 
"spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j_driver_conf.properties
 -XX:+UseG1GC" --conf spark.yarn.stagingDir=hdfs:///tmp/spark/ --files 
"job.properties,log4j_driver_conf.properties,log4j_executor_conf.properties" 
service-0.0.1-SNAPSHOT.jar job.properties

 

To connect to hbase:

 def getHbaseConnection(properties: SerializedProperties): (Connection, 
UserGroupInformation) = {

 

   

val config = HBaseConfiguration.create();

config.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM_VALUE);

config.set("hbase.zookeeper.property.clientPort", 2181);

config.set("hadoop.security.authentication", "kerberos");

config.set("hbase.security.authentication", "kerberos");

config.set("hbase.cluster.distributed", "true");

config.set("hbase.rpc.protection", "privacy");

   config.set("hbase.regionserver.kerberos.principal", 
“hbase/_h...@io-int.com”);

config.set("hbase.master.kerberos.principal", “hbase/_h...@io-int.com”);

 

UserGroupInformation.setConfiguration(config);

  

 var ugi: UserGroupInformation = null;

  if (SparkFiles.get(properties.keytab) != null

&& (new java.io.File(SparkFiles.get(properties.keytab)).exists)) {

ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(properties.kerberosPrincipal,

  SparkFiles.get(properties.keytab));

  } else {

ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(properties.kerberosPrincipal,

  properties.keytab);

  }



 

val connection = ConnectionFactory.createConnection(config);

return (connection, ugi);

  }

 

and we connect to hbase:

 ….foreachRDD { rdd =>

  if (!rdd.isEmpty()) {

//var ugi: UserGroupInformation = 
Utils.getHbaseConnection(properties)._2

rdd.foreachPartition { partition =>

  val connection = Utils.getHbaseConnection(propsObj)._1

  val table = … 

  partition.foreach { json =>



  }

  table.put(puts)

  table.close()

  connection.close()

}

  }

}

 

 

Keytab file is not getting copied to yarn staging/temp directory, we are not 
getting that in SparkFiles.get… and if we pass keytab with --files, 
spark-submit is failing because it’s there in --keytab already. 

 

Thanks,

Sudhir



Re: scalastyle violation on mvn install but not on mvn package

2017-05-24 Thread Xiangyu Li
I downloaded a source code distribution of spark-2.1.0 and did the install
again, and this time I did not see any warnings. I must have used some
modified code before. Thank you for the help!

On Tue, May 23, 2017 at 11:19 AM, Mark Hamstra 
wrote:

>
>
> On Tue, May 23, 2017 at 7:48 AM, Xiangyu Li  wrote:
>
>> Thank you for the answer.
>>
>> So basically it is not recommended to install Spark to your local maven
>> repository? I thought if they wanted to enforce scalastyle for better open
>> source contributions, they would have fixed all the scalastyle warnings.
>>
>
> That isn't a valid conclusion. There is nothing wrong with using maven's
> "install" with Spark. There shouldn't be any scalastyle violations.
>
>
>> On a side note, my posts on Nabble never got accepted by the mailing list
>> for some reason (I am subscribed to the mail list), and your reply does not
>> show as a reply to my question on Nabble probably for the same reason.
>> Sorry for the late reply but is using email the only way to communicate on
>> the mail list? I got another reply to this question through email but the
>> two replies are not even in the same "email conversation".
>>
>
> I don't know the mechanics of why posts do or don't show up via Nabble,
> but Nabble is neither the canonical archive nor the system of record for
> Apache mailing lists.
>
>
>> On Thu, May 4, 2017 at 8:11 PM, Mark Hamstra 
>> wrote:
>>
>>> The check goal of the scalastyle plugin runs during the "verify" phase,
>>> which is between "package" and "install"; so running just "package" will
>>> not run scalastyle:check.
>>>
>>> On Thu, May 4, 2017 at 7:45 AM, yiskylee  wrote:
>>>
 ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean
 package
 works, but
 ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean
 install
 triggers scalastyle violation error.

 Is the scalastyle check not used on package but only on install? To
 install,
 should I turn off "failOnViolation" in the pom?




 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/scalastyle-violation-on-mvn-install-bu
 t-not-on-mvn-package-tp28653.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>
>>
>> --
>> Sincerely
>> Xiangyu Li
>>
>> 
>>
>
>


-- 
Sincerely
Xiangyu Li




Dependencies for starting Master / Worker in maven

2017-05-24 Thread Jens Teglhus Møller
Hi

I just joined a project that runs on spark-1.6.1 and I have no prior spark
experience.

The project build is quite fragile when it comes to runtime dependencies.
Often the project builds fine but after deployment we end up with
ClassNotFoundException's or NoSuchMethodError's when submitting a job.

To catch these issues early, I'm trying like to setup integrations tests
with maven. In the pre-integration phase I would like to startup a master
and a worker (using process-exec-maven-plugin in the pre-integration-test
phase).

I have managed to get it working for spark 1.6.1 (against a downloaded
spark distribution), but would prefer to be able to download all the
required jars as maven dependencies. Is there a relatively simple way to
get all the required dependencies? It is ok if its only for 2.x since we
are planning to migrate.

I would prefer to do this without docker.

Has anyone done something similar already or is there a simpler way?

Best regards Jens

(this is a resend, my original from yesterday seems to have been lost in
transmission)


Re: KMeans Clustering is not Reproducible

2017-05-24 Thread Christoph Bruecke
Hi Anastasios,

thanks for the reply but caching doesn’t seem to change anything.

After further investigation it really seems that the RDD#takeSample method is 
the cause of the non-reproducibility.

Is this considered a bug and should I open an Issue for that?

BTW: my example script contains a little type in line 3: it is `feature` not 
`features` (mind the `s`).

Best,
Christoph

The script with caching

```
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.storage.StorageLevel

// generate random data for clustering
val randomData = spark.range(1, 1000).withColumn("a", 
rand(123)).withColumn("b", rand(321))

val vecAssembler = new VectorAssembler().setInputCols(Array("a", 
"b")).setOutputCol("features")

val data = vecAssembler.transform(randomData)

// instantiate KMeans with fixed seed
val kmeans = new KMeans().setK(10).setSeed(9876L)

// train the model with different partitioning
val dataWith1Partition = data.repartition(1)
// cache the data
dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK)
println("1 Partition: " + 
kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition))

val dataWith4Partition = data.repartition(4)
// cache the data
dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK)
println("4 Partition: " + 
kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition))


```

Output:

```
1 Partition: 16.028212597888057
4 Partition: 16.14758460544976
```
 
> On 22 May 2017, at 16:33, Anastasios Zouzias  wrote:
> 
> Hi Christoph,
> 
> Take a look at this, you might end up having a similar case:
> 
> http://www.spark.tc/using-sparks-cache-for-correctness-not-just-performance/
> 
> If this is not the case, then I agree with you the kmeans should be 
> partitioning agnostic (although I haven't check the code yet).
> 
> Best,
> Anastasios
> 
> 
> On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke  
> wrote:
> Hi,
> 
> I’m trying to figure out how to use KMeans in order to achieve reproducible 
> results. I have found that running the same kmeans instance on the same data, 
> with different partitioning will produce different clusterings.
> 
> Given a simple KMeans run with fixed seed returns different results on the 
> same
> training data, if the training data is partitioned differently.
> 
> Consider the following example. The same KMeans clustering set up is run on
> identical data. The only difference is the partitioning of the training data
> (one partition vs. four partitions).
> 
> ```
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.ml.clustering.KMeans
> import org.apache.spark.ml.features.VectorAssembler
> 
> // generate random data for clustering
> val randomData = spark.range(1, 1000).withColumn("a", 
> rand(123)).withColumn("b", rand(321))
> 
> val vecAssembler = new VectorAssembler().setInputCols(Array("a", 
> "b")).setOutputCol("features")
> 
> val data = vecAssembler.transform(randomData)
> 
> // instantiate KMeans with fixed seed
> val kmeans = new KMeans().setK(10).setSeed(9876L)
> 
> // train the model with different partitioning
> val dataWith1Partition = data.repartition(1)
> println("1 Partition: " + 
> kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition))
> 
> val dataWith4Partition = data.repartition(4)
> println("4 Partition: " + 
> kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition))
> ```
> 
> I get the following related cost
> 
> ```
> 1 Partition: 16.028212597888057
> 4 Partition: 16.14758460544976
> ```
> 
> What I want to achieve is that repeated computations of the KMeans Clustering 
> should yield identical result on identical training data, regardless of the 
> partitioning.
> 
> Looking through the Spark source code, I guess the cause is the 
> initialization method of KMeans which in turn uses the `takeSample` method, 
> which does not seem to be partition agnostic.
> 
> Is this behaviour expected? Is there anything I could do to achieve 
> reproducible results?
> 
> Best,
> Christoph
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 
> 
> 
> -- 
> -- Anastasios Zouzias


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org