spark streaming executor number still increase

2017-09-12 Thread zhan8610189
I use CDH spark(1.5.0-hadoop2.6.0) cluster, and  write one spark streaming
application, and start spark streaming using following command:

spark-submit --master spark://:7077 --conf spark.cores.max=4
--num-executors 4 --total-executor-cores 4 --executor-cores 4
--executor-memory 2g --class com..KafkaActive
streaming-assembly-0.0.1-SNAPSHOT.jar

but I found spark node(server) memory is all used out, and the spark
streaming executor number is still increasing, and new executor are started
but the removed executors(CoarseGrainedExecutorBackend instances) does not
exited.

what should i do.

look the following pics:


 









--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
It seems current_timestamp() cannot be used directly in window function?
because after attempts I found that using

*df.count.withColumn("pTime", current_timestamp).select(window($"pTime",
"15 minutes"), $"count")*

instead of

*df.count.withColumn("window", window(current_timestamp(), "15 minutes"))*

throws no exception and works fine. I don't know if this is a problem that
needs improvement.


张万新 于2017年9月13日周三 上午11:43写道:

> and I use .withColumn("window", window(current_timestamp(), "15
> minutes")) after count
>
> 张万新 于2017年9月13日周三 上午11:32写道:
>
>> *Yes, my code is shown below*
>> /**
>> * input
>> */
>>   val logs = spark
>> .readStream
>> .format("kafka")
>> .option("kafka.bootstrap.servers", BROKER_SERVER)
>> .option("subscribe", TOPIC)
>> .option("startingOffset", "latest")
>> .load()
>>
>>   /**
>> * process
>> */
>>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>
>>   val events = logValues
>> .map(parseFunction)
>> .select(
>>   $"_1".alias("date").cast("timestamp"),
>>   $"_2".alias("uuid").cast("string")
>> )
>>
>>   val results = events
>> .withWatermark("date", "1 day")
>> .dropDuplicates("uuid", "date")
>> .groupBy($"date")
>> .count()
>> .withColumn("window", window(current_timestamp(), "15 minutes"))
>>
>>   /**
>> * output
>> */
>>   val query = results
>> .writeStream
>> .outputMode("update")
>> .format("console")
>> .option("truncate", "false")
>> .trigger(Trigger.ProcessingTime("1 seconds"))
>> .start()
>>
>>   query.awaitTermination()
>>
>> *and I use play json to parse input logs from kafka ,the parse function
>> is like*
>>
>>   def parseFunction(str: String): (Long, String) = {
>> val json = Json.parse(str)
>> val timestamp = (json \ "time").get.toString().toLong
>> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>> val uuid = (json \ "uuid").get.toString()
>> (date, uuid)
>>   }
>>
>> Michael Armbrust 于2017年9月13日周三 上午2:36写道:
>>
>>> Can you show all the code?  This works for me.
>>>
>>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新  wrote:
>>>
 The spark version is 2.2.0

 Michael Armbrust 于2017年9月12日周二 下午12:32写道:

> Which version of spark?
>
> On Mon, Sep 11, 2017 at 8:27 PM, 张万新  wrote:
>
>> Thanks for reply, but using this method I got an exception:
>>
>> "Exception in thread "main"
>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
>> expressions are only allowed in
>>
>> Project, Filter, Aggregate or Window"
>>
>> Can you give more advice?
>>
>> Michael Armbrust 于2017年9月12日周二 上午4:48写道:
>>
>>> import org.apache.spark.sql.functions._
>>>
>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>
>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新  wrote:
>>>
 Hi,

 In structured streaming how can I add a column to a dataset with
 current system time aligned with 15 minutes?

 Thanks.

>>>
>>>
>
>>>


Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
and I use .withColumn("window", window(current_timestamp(), "15
minutes")) after
count

张万新 于2017年9月13日周三 上午11:32写道:

> *Yes, my code is shown below*
> /**
> * input
> */
>   val logs = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", BROKER_SERVER)
> .option("subscribe", TOPIC)
> .option("startingOffset", "latest")
> .load()
>
>   /**
> * process
> */
>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>
>   val events = logValues
> .map(parseFunction)
> .select(
>   $"_1".alias("date").cast("timestamp"),
>   $"_2".alias("uuid").cast("string")
> )
>
>   val results = events
> .withWatermark("date", "1 day")
> .dropDuplicates("uuid", "date")
> .groupBy($"date")
> .count()
> .withColumn("window", window(current_timestamp(), "15 minutes"))
>
>   /**
> * output
> */
>   val query = results
> .writeStream
> .outputMode("update")
> .format("console")
> .option("truncate", "false")
> .trigger(Trigger.ProcessingTime("1 seconds"))
> .start()
>
>   query.awaitTermination()
>
> *and I use play json to parse input logs from kafka ,the parse function is
> like*
>
>   def parseFunction(str: String): (Long, String) = {
> val json = Json.parse(str)
> val timestamp = (json \ "time").get.toString().toLong
> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
> val uuid = (json \ "uuid").get.toString()
> (date, uuid)
>   }
>
> Michael Armbrust 于2017年9月13日周三 上午2:36写道:
>
>> Can you show all the code?  This works for me.
>>
>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新  wrote:
>>
>>> The spark version is 2.2.0
>>>
>>> Michael Armbrust 于2017年9月12日周二 下午12:32写道:
>>>
 Which version of spark?

 On Mon, Sep 11, 2017 at 8:27 PM, 张万新  wrote:

> Thanks for reply, but using this method I got an exception:
>
> "Exception in thread "main"
> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
> expressions are only allowed in
>
> Project, Filter, Aggregate or Window"
>
> Can you give more advice?
>
> Michael Armbrust 于2017年9月12日周二 上午4:48写道:
>
>> import org.apache.spark.sql.functions._
>>
>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>
>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新  wrote:
>>
>>> Hi,
>>>
>>> In structured streaming how can I add a column to a dataset with
>>> current system time aligned with 15 minutes?
>>>
>>> Thanks.
>>>
>>
>>

>>


Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread 张万新
*Yes, my code is shown below(I also post my code in another mail)*
/**
* input
*/
  val logs = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", BROKER_SERVER)
.option("subscribe", TOPIC)
.option("startingOffset", "latest")
.load()

  /**
* process
*/
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
.map(parseFunction)
.select(
  $"_1".alias("date").cast("timestamp"),
  $"_2".alias("uuid").cast("string")
)

  val results = events
.withWatermark("date", "1 day")
.dropDuplicates("uuid", "date")
.groupBy($"date")
.count()

  /**
* output
*/
  val query = results
.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("1 seconds"))
.start()

  query.awaitTermination()

*and I use play json to parse input logs from kafka ,the parse function is
like*

  def parseFunction(str: String): (Long, String) = {
val json = Json.parse(str)
val timestamp = (json \ "time").get.toString().toLong
val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
val uuid = (json \ "uuid").get.toString()
(date, uuid)
  }

and the java heap space is like (I've increase the executor memory to 15g):

[image: image.png]
Michael Armbrust 于2017年9月13日周三 上午2:23写道:

> Can you show the full query you are running?
>
> On Tue, Sep 12, 2017 at 10:11 AM, 张万新  wrote:
>
>> Hi,
>>
>> I'm using structured streaming to count unique visits of our website. I
>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
>> memory to 4 cores * 10g memory for each executor, but there are frequent
>> full gc, and once the count raises to about more than 4.5 millions the
>> application will be blocked and finally crash in OOM. It's kind of
>> unreasonable. So is there any suggestion to optimize the memory consumption
>> of SS? Thanks.
>>
>
>


Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
*Yes, my code is shown below*
/**
* input
*/
  val logs = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", BROKER_SERVER)
.option("subscribe", TOPIC)
.option("startingOffset", "latest")
.load()

  /**
* process
*/
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
.map(parseFunction)
.select(
  $"_1".alias("date").cast("timestamp"),
  $"_2".alias("uuid").cast("string")
)

  val results = events
.withWatermark("date", "1 day")
.dropDuplicates("uuid", "date")
.groupBy($"date")
.count()
.withColumn("window", window(current_timestamp(), "15 minutes"))

  /**
* output
*/
  val query = results
.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("1 seconds"))
.start()

  query.awaitTermination()

*and I use play json to parse input logs from kafka ,the parse function is
like*

  def parseFunction(str: String): (Long, String) = {
val json = Json.parse(str)
val timestamp = (json \ "time").get.toString().toLong
val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
val uuid = (json \ "uuid").get.toString()
(date, uuid)
  }

Michael Armbrust 于2017年9月13日周三 上午2:36写道:

> Can you show all the code?  This works for me.
>
> On Tue, Sep 12, 2017 at 12:05 AM, 张万新  wrote:
>
>> The spark version is 2.2.0
>>
>> Michael Armbrust 于2017年9月12日周二 下午12:32写道:
>>
>>> Which version of spark?
>>>
>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新  wrote:
>>>
 Thanks for reply, but using this method I got an exception:

 "Exception in thread "main"
 org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
 expressions are only allowed in

 Project, Filter, Aggregate or Window"

 Can you give more advice?

 Michael Armbrust 于2017年9月12日周二 上午4:48写道:

> import org.apache.spark.sql.functions._
>
> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>
> On Mon, Sep 11, 2017 at 3:03 AM, 张万新  wrote:
>
>> Hi,
>>
>> In structured streaming how can I add a column to a dataset with
>> current system time aligned with 15 minutes?
>>
>> Thanks.
>>
>
>
>>>
>


Re: Chaining Spark Streaming Jobs

2017-09-12 Thread Sunita Arvind
Hi Michael,

I am wondering what I am doing wrong. I get error like:

Exception in thread "main" java.lang.IllegalArgumentException: Schema must
be specified when creating a streaming source DataFrame. If some files
already exist in the directory, then depending on the file format you may
be able to create a static DataFrame on that directory with
'spark.read.load(directory)' and infer schema from it.
at
org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:223)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at
org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:125)
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:134)
at
com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregates.scala:23)
at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook


I tried specifying the schema as well.
Here is my code:

object Aggregates {

  val aggregation=
"""select sum(col1), sum(col2), id, first(name)
  from enrichedtb
  group by id
""".stripMargin

  def aggregator(conf:Config)={
implicit val spark =
SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
implicit val sqlctx = spark.sqlContext
printf("Source path is" + conf.getString("source.path"))
val schemadf = sqlctx.read.parquet(conf.getString("source.path"))
// Added this as it was complaining about schema.
val df=spark.readStream.format("parquet").option("inferSchema",
true).schema(schemadf.schema).load(conf.getString("source.path"))
df.createOrReplaceTempView("enrichedtb")
val res = spark.sql(aggregation)

res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
  }

  def main(args: Array[String]): Unit = {
val mainconf = ConfigFactory.load()
val conf = mainconf.getConfig(mainconf.getString("pipeline"))
print(conf.toString)
aggregator(conf)
  }

}


I tried to extract schema from static read of the input path and
provided it to the readStream API. With that, I get this error:

at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)

While running on the EMR cluster all paths point to S3. In my laptop,
they all point to local filesystem.

I am using Spark2.2.0

Appreciate your help.

regards

Sunita


On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust 
wrote:

> If you use structured streaming and the file sink, you can have a
> subsequent stream read using the file source.  This will maintain exactly
> once processing even if there are hiccups or failures.
>
> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
> wrote:
>
>> Hello Spark Experts,
>>
>> I have a design question w.r.t Spark Streaming. I have a streaming job
>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>> on premise. My spark application runs on EMR (aws) and persists data onto
>> s3. Before I persist, I need to strip header and convert protobuffer to
>> parquet (I use sparksql-scalapb to convert from Protobuff to
>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>> enrichment on the same dataframe after persisting the raw data, however, in
>> order to modularize I am planning to have a separate job which picks up the
>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>> 1 job as the enrichments could get project specific while raw data
>> persistence stays 

Re: Multiple Sources found for csv

2017-09-12 Thread jeff saremi
sorry just found this which answers my question:


https://stackoverflow.com/questions/41726340/spark-2-0-csv-error

[https://cdn.sstatic.net/Sites/stackoverflow/img/apple-touch-i...@2.png?v=73d79a89bded]

Spark 2.0 CSV 
Error
stackoverflow.com
I am upgrading to spark 2 from 1.6 and am having an issue reading in CSV files. 
In spark 1.6 I would have something like this to read in a CSV file. val df = 
sqlContext.read.format("com.databricks...




From: jeff saremi 
Sent: Tuesday, September 12, 2017 3:38:00 PM
To: user@spark.apache.org
Subject: Multiple Sources found for csv


I have this line which works in the spark interactive console but it fails in 
Intellij

Using Spark 2.1.1 in both cases:

Exception in thread "main" java.lang.RuntimeException: Multiple sources found 
for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, 
com.databricks.spark.csv.DefaultSource15), please specify the fully qualified 
class name.



source:

val ie8df = 
sqlContext.read.schema(SomeSchema).option("mode","dropmalformed").option("sep", 
"\t").format("csv").load(somepath)



Multiple Sources found for csv

2017-09-12 Thread jeff saremi
I have this line which works in the spark interactive console but it fails in 
Intellij

Using Spark 2.1.1 in both cases:

Exception in thread "main" java.lang.RuntimeException: Multiple sources found 
for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, 
com.databricks.spark.csv.DefaultSource15), please specify the fully qualified 
class name.



source:

val ie8df = 
sqlContext.read.schema(SomeSchema).option("mode","dropmalformed").option("sep", 
"\t").format("csv").load(somepath)



Re: Continue reading dataframe from file despite errors

2017-09-12 Thread jeff saremi
thanks Suresh. it worked nicely


From: Suresh Thalamati 
Sent: Tuesday, September 12, 2017 2:59:29 PM
To: jeff saremi
Cc: user@spark.apache.org
Subject: Re: Continue reading dataframe from file despite errors

Try the CSV   Option(“mode”,  "dropmalformed”), that might skip the error 
records.


On Sep 12, 2017, at 2:33 PM, jeff saremi 
> wrote:

should have added some of the exception to be clear:

17/09/12 14:14:17 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 times; 
aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 
(TID 15, localhost, executor driver): java.lang.NumberFormatException: For 
input string: "south carolina"
at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at 
scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
at 
org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:250)



From: jeff saremi >
Sent: Tuesday, September 12, 2017 2:32:03 PM
To: user@spark.apache.org
Subject: Continue reading dataframe from file despite errors

I'm using a statement like the following to load my dataframe from some text 
file
Upon encountering the first error, the whole thing throws an exception and 
processing stops.
I'd like to continue loading even if that results in zero rows in my dataframe. 
How can i do that?
thanks

spark.read.schema(SomeSchema).option("sep", "\t").format("csv").load("somepath")



Re: Continue reading dataframe from file despite errors

2017-09-12 Thread Suresh Thalamati
Try the CSV   Option(“mode”,  "dropmalformed”), that might skip the error 
records. 


> On Sep 12, 2017, at 2:33 PM, jeff saremi  wrote:
> 
> should have added some of the exception to be clear:
> 
> 17/09/12 14:14:17 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 times; 
> aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 
> (TID 15, localhost, executor driver): java.lang.NumberFormatException: For 
> input string: "south carolina"
> at 
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:580)
> at java.lang.Integer.parseInt(Integer.java:615)
> at 
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
> at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
> at 
> org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:250)
> 
> From: jeff saremi 
> Sent: Tuesday, September 12, 2017 2:32:03 PM
> To: user@spark.apache.org
> Subject: Continue reading dataframe from file despite errors
>  
> I'm using a statement like the following to load my dataframe from some text 
> file
> Upon encountering the first error, the whole thing throws an exception and 
> processing stops.
> I'd like to continue loading even if that results in zero rows in my 
> dataframe. How can i do that?
> thanks
> 
> spark.read.schema(SomeSchema).option("sep", 
> "\t").format("csv").load("somepath")



Configuration for unit testing and sql.shuffle.partitions

2017-09-12 Thread peay
Hello,

I am running unit tests with Spark DataFrames, and I am looking for 
configuration tweaks that would make tests faster. Usually, I use a local[2] or 
local[4] master.

Something that has been bothering me is that most of my stages end up using 200 
partitions, independently of whether I repartition the input. This seems a bit 
overkill for small unit tests that barely have 200 rows per DataFrame.

spark.sql.shuffle.partitions used to control this I believe, but it seems to be 
gone and I could not find any information on what mechanism/setting replaces it 
or the corresponding JIRA.

Has anyone experience to share on how to tune Spark best for very small local 
runs like that?

Thanks!

Re: Continue reading dataframe from file despite errors

2017-09-12 Thread jeff saremi
should have added some of the exception to be clear:


17/09/12 14:14:17 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 times; 
aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 
(TID 15, localhost, executor driver): java.lang.NumberFormatException: For 
input string: "south carolina"
at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at 
scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
at 
org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:250)



From: jeff saremi 
Sent: Tuesday, September 12, 2017 2:32:03 PM
To: user@spark.apache.org
Subject: Continue reading dataframe from file despite errors


I'm using a statement like the following to load my dataframe from some text 
file

Upon encountering the first error, the whole thing throws an exception and 
processing stops.

I'd like to continue loading even if that results in zero rows in my dataframe. 
How can i do that?
thanks


spark.read.schema(SomeSchema).option("sep", "\t").format("csv").load("somepath")




Continue reading dataframe from file despite errors

2017-09-12 Thread jeff saremi
I'm using a statement like the following to load my dataframe from some text 
file

Upon encountering the first error, the whole thing throws an exception and 
processing stops.

I'd like to continue loading even if that results in zero rows in my dataframe. 
How can i do that?
thanks


spark.read.schema(SomeSchema).option("sep", "\t").format("csv").load("somepath")




Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-12 Thread kant kodali
I have about 100 fields in my dataset and some of them have "null" in it.
Does to_json fails to convert if that is the case?

Thanks!

On Tue, Sep 12, 2017 at 12:32 PM, kant kodali  wrote:

> Hi Michael,
>
> Interestingly that doesn't seem to quite work for me for some reason. Here
> is what I have
>
> Datset
>
> name | id | country
> -
> kant   | 1  | usa
> john   | 2  | usa
>
>
> And here is my code
>
> Dataset ds = getKafkaStream(); // This dataset represents the one above
> StreamingQuery query = 
> ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
> query.awaitTermination();
>
> *This works completely fine and I can see the rows on my console.*
>
> Now if I change it to this.
>
> Dataset ds = getKafkaStream(); // This dataset represents the one above
> Dataset jsonDS = 
> ds.select(to_json(struct(ds.col("*".as(Encoders.STRING());
> StreamingQuery query2 = 
> jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
> query2.awaitTermination();
>
> *I dont see any rows on my console and I made sure I waited for a while.*
>
> *The moment I change it back to above code and run it works again.*
>
>
>
>
>
>
>
>
>
>
>
> On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrust 
> wrote:
>
>> The following will convert the whole row to JSON.
>>
>> import org.apache.spark.sql.functions.*
>> df.select(to_json(struct(col("*"
>>
>> On Sat, Sep 9, 2017 at 6:27 PM, kant kodali  wrote:
>>
>>> Thanks Ryan! In this case, I will have Dataset so is there a way to
>>> convert Row to Json string?
>>>
>>> Thanks
>>>
>>> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
 It's because "toJSON" doesn't support Structured Streaming. The current
 implementation will convert the Dataset to an RDD, which is not supported
 by streaming queries.

 On Sat, Sep 9, 2017 at 4:40 PM, kant kodali  wrote:

> yes it is a streaming dataset. so what is the problem with following
> code?
>
> Dataset ds = dataset.toJSON().map(()->{some function that returns 
> a string});
>  StreamingQuery query = ds.writeStream().start();
>  query.awaitTermination();
>
>
> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung <
> felixcheun...@hotmail.com> wrote:
>
>> What is newDS?
>> If it is a Streaming Dataset/DataFrame (since you have writeStream
>> there) then there seems to be an issue preventing toJSON to work.
>>
>> --
>> *From:* kant kodali 
>> *Sent:* Saturday, September 9, 2017 4:04:33 PM
>> *To:* user @spark
>> *Subject:* Queries with streaming sources must be executed with
>> writeStream.start()
>>
>> Hi All,
>>
>> I  have the following code and I am not sure what's wrong with it? I
>> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
>> 2.2.0 so I am wondering if there is any work around?
>>
>>  Dataset ds = newDS.toJSON().map(()->{some function that returns 
>> a string});
>>  StreamingQuery query = ds.writeStream().start();
>>  query.awaitTermination();
>>
>>
>

>>>
>>
>


Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-12 Thread kant kodali
Hi Michael,

Interestingly that doesn't seem to quite work for me for some reason. Here
is what I have

Datset

name | id | country
-
kant   | 1  | usa
john   | 2  | usa


And here is my code

Dataset ds = getKafkaStream(); // This dataset represents the one above
StreamingQuery query =
ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
query.awaitTermination();

*This works completely fine and I can see the rows on my console.*

Now if I change it to this.

Dataset ds = getKafkaStream(); // This dataset represents the one above
Dataset jsonDS =
ds.select(to_json(struct(ds.col("*".as(Encoders.STRING());
StreamingQuery query2 =
jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
query2.awaitTermination();

*I dont see any rows on my console and I made sure I waited for a while.*

*The moment I change it back to above code and run it works again.*











On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrust 
wrote:

> The following will convert the whole row to JSON.
>
> import org.apache.spark.sql.functions.*
> df.select(to_json(struct(col("*"
>
> On Sat, Sep 9, 2017 at 6:27 PM, kant kodali  wrote:
>
>> Thanks Ryan! In this case, I will have Dataset so is there a way to
>> convert Row to Json string?
>>
>> Thanks
>>
>> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> It's because "toJSON" doesn't support Structured Streaming. The current
>>> implementation will convert the Dataset to an RDD, which is not supported
>>> by streaming queries.
>>>
>>> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali  wrote:
>>>
 yes it is a streaming dataset. so what is the problem with following
 code?

 Dataset ds = dataset.toJSON().map(()->{some function that returns 
 a string});
  StreamingQuery query = ds.writeStream().start();
  query.awaitTermination();


 On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung  wrote:

> What is newDS?
> If it is a Streaming Dataset/DataFrame (since you have writeStream
> there) then there seems to be an issue preventing toJSON to work.
>
> --
> *From:* kant kodali 
> *Sent:* Saturday, September 9, 2017 4:04:33 PM
> *To:* user @spark
> *Subject:* Queries with streaming sources must be executed with
> writeStream.start()
>
> Hi All,
>
> I  have the following code and I am not sure what's wrong with it? I
> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
> 2.2.0 so I am wondering if there is any work around?
>
>  Dataset ds = newDS.toJSON().map(()->{some function that returns 
> a string});
>  StreamingQuery query = ds.writeStream().start();
>  query.awaitTermination();
>
>

>>>
>>
>


How can I Upgrade Spark 1.6 to 2.x in Cloudera QuickStart VM 5.7

2017-09-12 Thread Gaurav1809
Hi All,

I am using Cloudera 5.7 QuickStart VM for learning purpose. It has Spark 1.6
I want to upgrade Spark to 2.x. How can I do it?
I dong think it will be as easy as downloading 2.x and replace the older
files.

Please guide if anyone has done this in past. Steps would be highly helpful.

Thanks
Gaurav



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How do I create a JIRA issue and associate it with a PR that I created for a bug in master?

2017-09-12 Thread Mikhailau, Alex
How do I create a JIRA issue and associate it with a PR that I created for a 
bug in master?

https://github.com/apache/spark/pull/19210


Re: [SS]How to add a column with custom system time?

2017-09-12 Thread Michael Armbrust
Can you show all the code?  This works for me.

On Tue, Sep 12, 2017 at 12:05 AM, 张万新  wrote:

> The spark version is 2.2.0
>
> Michael Armbrust 于2017年9月12日周二 下午12:32写道:
>
>> Which version of spark?
>>
>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新  wrote:
>>
>>> Thanks for reply, but using this method I got an exception:
>>>
>>> "Exception in thread "main" 
>>> org.apache.spark.sql.streaming.StreamingQueryException:
>>> nondeterministic expressions are only allowed in
>>>
>>> Project, Filter, Aggregate or Window"
>>>
>>> Can you give more advice?
>>>
>>> Michael Armbrust 于2017年9月12日周二 上午4:48写道:
>>>
 import org.apache.spark.sql.functions._

 df.withColumn("window", window(current_timestamp(), "15 minutes"))

 On Mon, Sep 11, 2017 at 3:03 AM, 张万新  wrote:

> Hi,
>
> In structured streaming how can I add a column to a dataset with
> current system time aligned with 15 minutes?
>
> Thanks.
>


>>


Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread Michael Armbrust
Can you show the full query you are running?

On Tue, Sep 12, 2017 at 10:11 AM, 张万新  wrote:

> Hi,
>
> I'm using structured streaming to count unique visits of our website. I
> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
> memory to 4 cores * 10g memory for each executor, but there are frequent
> full gc, and once the count raises to about more than 4.5 millions the
> application will be blocked and finally crash in OOM. It's kind of
> unreasonable. So is there any suggestion to optimize the memory consumption
> of SS? Thanks.
>


[SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread 张万新
Hi,

I'm using structured streaming to count unique visits of our website. I use
spark on yarn mode with 4 executor instances and from 2 cores * 5g memory
to 4 cores * 10g memory for each executor, but there are frequent full gc,
and once the count raises to about more than 4.5 millions the application
will be blocked and finally crash in OOM. It's kind of unreasonable. So is
there any suggestion to optimize the memory consumption of SS? Thanks.


How to run "merge into" ACID transaction hive query using hive java api?

2017-09-12 Thread Hokam Singh Chauhan
Please share if any one know how to execute "merge into" hive query.
Thanks,
Hokam


Re: Why do checkpoints work the way they do?

2017-09-12 Thread Hugo Reinwald
Thanks Tathagata for the clarification. +1 on documenting limitations of
checkpoints on structured streaming.

Hugo

On Mon, Sep 11, 2017 at 7:13 PM, Dmitry Naumenko 
wrote:

> +1 for me for this question. If there any constraints in restoring
> checkpoint for Structured Streaming, they should be documented.
>
>
> 2017-08-31 9:20 GMT+03:00 张万新 :
>
>> So is there any documents demonstrating in what condition can my
>> application recover from the same checkpoint and in what condition not?
>>
>> Tathagata Das 于2017年8月30日周三 下午1:20写道:
>>
>>> Hello,
>>>
>>> This is an unfortunate design on my part when I was building DStreams :)
>>>
>>> Fortunately, we learnt from our mistakes and built Structured Streaming
>>> the correct way. Checkpointing in Structured Streaming stores only the
>>> progress information (offsets, etc.), and the user can change their
>>> application code (within certain constraints, of course) and still restart
>>> from checkpoints (unlike DStreams). If you are just building out your
>>> streaming applications, then I highly recommend you to try out Structured
>>> Streaming instead of DStreams (which is effectively in maintenance mode).
>>>
>>>
>>> On Fri, Aug 25, 2017 at 7:41 PM, Hugo Reinwald 
>>> wrote:
>>>
 Hello,

 I am implementing a spark streaming solution with Kafka and read that
 checkpoints cannot be used across application code changes - here
 

 I tested changes in application code and got the error message as b
 below -

 17/08/25 15:10:47 WARN CheckpointReader: Error reading checkpoint from
 file file:/tmp/checkpoint/checkpoint-150364116.bk
 java.io.InvalidClassException: scala.collection.mutable.ArrayBuffer;
 local class incompatible: stream classdesc serialVersionUID =
 -2927962711774871866, local class serialVersionUID = 1529165946227428979

 While I understand that this is as per design, can I know why does
 checkpointing work the way that it does verifying the class signatures?
 Would it not be easier to let the developer decide if he/she wants to use
 the old checkpoints depending on what is the change in application logic
 e.g. changes in code unrelated to spark/kafka - Logging / conf changes etc

 This is first post in the group. Apologies if I am asking the question
 again, I did a nabble search and it didnt throw up the answer.

 Thanks for the help.
 Hugo

>>>
>>>
>


Re: How does spark work?

2017-09-12 Thread Jules Damji

Alternatively, watch Spark Summit talk on Memory Management to get insight from 
a developer's perspective.

https://spark-summit.org/2016/events/deep-dive-apache-spark-memory-management/

https://spark-summit.org/2017/events/a-developers-view-into-sparks-memory-model/

Cheers 
Jules 

Sent from my iPhone
Pardon the dumb thumb typos :)

> On Sep 12, 2017, at 4:07 AM, Vikash Pareek  wrote:
> 
> Obviously, you can't store 900GB of data into 80GB memory. 
> There is a concept in spark called disk spill, it means when your data size
> increases and can't fit into memory then it spilled out to disk.
> 
> Also, spark doesn't use whole memory for storing the data, some fraction of
> memory used for processing, shuffling and internal data structure too.
> For more detail, you can have a look at 
> https://0x0fff.com/spark-memory-management/
>   
> 
> Hope this will help you.
> 
> 
> 
> 
> 
> 
> -
> 
> __Vikash Pareek
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


How to set Map values in spark/scala

2017-09-12 Thread Paras Bansal
Hello,

I am new to spark-scala development. I am trying to create map values in
spark using scala but getting nothing printed

def createMap() : Map[String, Int] = {var tMap:Map[String, Int] =
Map()val tDF = spark.sql("select a, b, c from temp")for (x <- tDF) {
  val k = x.getAs[Long](0) + "|" + x.getAs[Long](1)
  val v = x.getAs[Int](2)
  tMap += ( k -> v )
  println( k -> v ) ///--This print values}
println("Heo1")for ((k,v) <- tMap) println("key = " + k+
", value= " + v) --This prints nothing
println("Heo2")return tMap}

Please suggest.

I have posted the same at:
https://stackoverflow.com/questions/46181071/how-to-set-map-values-in-spark-scala

Thank you,
Paras


Re: How does spark work?

2017-09-12 Thread nguyen duc Tuan
In genernal, RDD, which is the central concept of Spark, is just
deffinition of how to get data and process data. Each partition of RDD
defines how to get/process each partition of data. A series of
transformation will transform every partitions of data from previous RDD. I
give you very easy example of what I meant. For example, you need to count
number of lines of 100 files. So your RDD will consist of 100 partitions,
which one partition corresponding to 1 file path. You have only 3
executors/3 cores resource to do that. So executors will firstly get 3
tasks / 3 tasks, do count job and send the results to driver, and execute
next tasks and so on until all tasks are done. So driver just do simple
reduce operation to get final number of lines. Note that, the partition
only defines how to get data, so you don't have to send data to each
executor. If some tasks fail, you just need to do them again.
If you want to understand more internal details, I recommend this:
https://github.com/JerryLead/SparkInternals.
Hope this will help you.

2017-09-12 18:07 GMT+07:00 Vikash Pareek :

> Obviously, you can't store 900GB of data into 80GB memory.
> There is a concept in spark called disk spill, it means when your data size
> increases and can't fit into memory then it spilled out to disk.
>
> Also, spark doesn't use whole memory for storing the data, some fraction of
> memory used for processing, shuffling and internal data structure too.
> For more detail, you can have a look at
> https://0x0fff.com/spark-memory-management/
> 
>
> Hope this will help you.
>
>
>
>
>
>
> -
>
> __Vikash Pareek
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


RE: ClassNotFoundException while unmarshalling a remote RDD on Spark 1.5.1

2017-09-12 Thread PICARD Damien
Ok, it just seems to be an issue with the syntax of the spark-submit command. 
It should be :

spark-submit --queue default \
--class com.my.Launcher \
--deploy-mode cluster \
--master yarn-cluster \
--driver-java-options "-Dfile.encoding=UTF-8" \
--jars /home/user/hibernate-validator-5.2.2.Final.jar \
--driver-class-path hibernate-validator-5.2.2.Final.jar \
--conf "spark.executor.extraClassPath=hibernate -validator-5.2.2.Final.jar" \
/home/user/uberjar-job.jar

I also have to add some others jars, like jboss-logging to meet the needs of 
hibernate-validator.

De : PICARD Damien (EXT) AssuResPriSmsAts
Envoyé : lundi 11 septembre 2017 08:53
À : 'user@spark.apache.org'
Objet : ClassNotFoundException while unmarshalling a remote RDD on Spark 1.5.1

Hi !

I'm facing a Classloader problem using Spark 1.5.1

I use javax.validation and hibernate validation annotations on some of my beans 
:

  @NotBlank
  @Valid
  private String attribute1 ;

  @Valid
  private String attribute2 ;

When Spark tries to unmarshall these beans (after a remote RDD), I get the 
ClassNotFoundException :
17/09/07 09:19:25 INFO storage.BlockManager: Found block rdd_8_1 remotely
17/09/07 09:19:25 ERROR executor.Executor: Exception in task 3.0 in stage 2.0 
(TID 6)
java.lang.ClassNotFoundException: org.hibernate.validator.constraints.NotBlank
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
java.io.ObjectInputStream.resolveProxyClass(ObjectInputStream.java:700)
at java.io.ObjectInputStream.readProxyDesc(ObjectInputStream.java:1566)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
   ...

Indeed, it means that the annotation class is not found, because it is not in 
the classpath. Why ? I don't know, because I make a uber JAR that contains this 
class. I suppose that at the time the job tries to unmarshall the RDD, the uber 
jar is not loaded.

So, I try to add the hibernate JAR to the class loader manually, using this 
spark-submit command :

spark-submit --queue default \
--class com.my.Launcher \
--deploy-mode cluster \
--master yarn-cluster \
--driver-java-options "-Dfile.encoding=UTF-8" \
--jars /home/user/hibernate-validator-5.2.2.Final.jar \
--driver-class-path /home/user/hibernate-validator-5.2.2.Final.jar \
--conf 
"spark.executor.extraClassPath=/home/user/hibernate-validator-5.2.2.Final.jar" \
/home/user/uberjar-job.jar

Without effects. So, is there a way to add this class to the classloader ?

Thank you in advance.

Damien


=

Ce message et toutes les pieces jointes (ci-apres le "message")
sont confidentiels et susceptibles de contenir des informations
couvertes par le secret professionnel. Ce message est etabli
a l'intention exclusive de ses destinataires. Toute utilisation
ou diffusion non autorisee interdite.
Tout message electronique est susceptible d'alteration. La SOCIETE GENERALE
et ses filiales declinent toute responsabilite au titre de ce message
s'il a ete altere, deforme falsifie.

=

This message and any attachments (the "message") are confidential,
intended solely for the addresses, and may contain legally privileged
information. Any unauthorized use or dissemination is prohibited.
E-mails are susceptible to alteration. Neither SOCIETE GENERALE nor any
of its subsidiaries or affiliates shall be liable for the message
if altered, changed or falsified.

=


Re: Efficient Spark-Submit planning

2017-09-12 Thread Sonal Goyal
Overall the defaults are sensible, but you definitely have to look at your
application and optimise a few of them. I mostly refer to the following
links when the job is slow or failing or we have more hardware which we see
we are not utilizing.

http://spark.apache.org/docs/latest/tuning.html
http://spark.apache.org/docs/latest/hardware-provisioning.html
http://spark.apache.org/docs/latest/configuration.html


Thanks,
Sonal
Nube Technologies 





On Tue, Sep 12, 2017 at 2:40 AM, Aakash Basu 
wrote:

> Hi,
>
> Can someone please clarify a little on how should we effectively calculate
> the parameters to be passed over using spark-submit.
>
> Parameters as in -
>
> Cores, NumExecutors, DriverMemory, etc.
>
> Is there any generic calculation which can be done over most kind of
> clusters with different sizes from small 3 node to 100s of nodes.
>
> Thanks,
> Aakash.
>


Re: Spark ignores --master local[*]

2017-09-12 Thread Vikash Pareek
Your VM might not be having more than 1 core available to run spark job.
Check with *nproc* command to see how many cores available on VM and *top
*command to see how many cores are free.



-

__Vikash Pareek
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How does spark work?

2017-09-12 Thread Vikash Pareek
Obviously, you can't store 900GB of data into 80GB memory. 
There is a concept in spark called disk spill, it means when your data size
increases and can't fit into memory then it spilled out to disk.

Also, spark doesn't use whole memory for storing the data, some fraction of
memory used for processing, shuffling and internal data structure too.
For more detail, you can have a look at 
https://0x0fff.com/spark-memory-management/
  

Hope this will help you.






-

__Vikash Pareek
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Multiple Kafka topics processing in Spark 2.2

2017-09-12 Thread kant kodali
@Dan shouldn't you be using Dataset/Dataframes ? I heard it is recommended
to use Dataset and Dataframes than using Dstreams since Dstreams is in
maintenance mode.

On Mon, Sep 11, 2017 at 7:41 AM, Cody Koeninger  wrote:

> If you want an "easy" but not particularly performant way to do it, each
> org.apache.kafka.clients.consumer.ConsumerRecord has a topic.
>
> The topic is going to be the same for the entire partition as long as you
> haven't shuffled, hence the examples on how to deal with it at a partition
> level.
>
> On Fri, Sep 8, 2017 at 8:29 PM, Dan Dong  wrote:
>
>> Hi,Alonso.
>>   Thanks! I've read about this but did not quite understand it. To pick
>> out the topic name of a kafka message seems a simple task but the example
>> code looks so complicated with redundent info. Why do we need offsetRanges
>> here and do we have a easy way to achieve this?
>>
>> Cheers,
>> Dan
>>
>>
>> 2017-09-06 21:17 GMT+08:00 Alonso Isidoro Roman :
>>
>>> Hi, reading the official doc
>>> ,
>>> i think you can do it this way:
>>>
>>> import org.apache.spark.streaming.kafka._
>>>
>>>val directKafkaStream = KafkaUtils.createDirectStream[String, String, 
>>> StringDecoder, StringDecoder](
>>>
>>>   ssc, kafkaParams, topicsSet)
>>>
>>>
>>>  // Hold a reference to the current offset ranges, so it can be used 
>>> downstream
>>>  var offsetRanges = Array.empty[OffsetRange]
>>>
>>>  directKafkaStream.transform { rdd =>
>>>offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>rdd
>>>  }.map {
>>>...
>>>  }.foreachRDD { rdd =>
>>>for (o <- offsetRanges) {
>>>  println(*s"${o.topic}* ${o.partition} ${o.fromOffset} 
>>> ${o.untilOffset}")
>>>}
>>>
>>>  }
>>>
>>>
>>> 2017-09-06 14:38 GMT+02:00 Dan Dong :
>>>
 Hi, All,
   I have one issue here about how to process multiple Kafka topics in a
 Spark 2.* program. My question is: How to get the topic name from a message
 received from Kafka? E.g:

 ..
 val messages = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](
   ssc, kafkaParams, topicsSet)

 // Get the lines, split them into words, count the words and print
 val lines = messages.map(_._2)
 val words = lines.flatMap(_.split(" "))
 val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
 wordCounts.print()
 ..

 Kafka send the messages in multiple topics through console producer for
 example. But when Spark receive the message, how it will know which topic
 is this piece of message coming from? Thanks a lot for any of your helps!

 Cheers,
 Dan

>>>
>>>
>>>
>>> --
>>> Alonso Isidoro Roman
>>> [image: https://]about.me/alonso.isidoro.roman
>>>
>>> 
>>>
>>
>>
>


Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
The spark version is 2.2.0

Michael Armbrust 于2017年9月12日周二 下午12:32写道:

> Which version of spark?
>
> On Mon, Sep 11, 2017 at 8:27 PM, 张万新  wrote:
>
>> Thanks for reply, but using this method I got an exception:
>>
>> "Exception in thread "main"
>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
>> expressions are only allowed in
>>
>> Project, Filter, Aggregate or Window"
>>
>> Can you give more advice?
>>
>> Michael Armbrust 于2017年9月12日周二 上午4:48写道:
>>
>>> import org.apache.spark.sql.functions._
>>>
>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>
>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新  wrote:
>>>
 Hi,

 In structured streaming how can I add a column to a dataset with
 current system time aligned with 15 minutes?

 Thanks.

>>>
>>>
>


Spark Yarn Java Out Of Memory on Complex Query Execution Plan

2017-09-12 Thread Nimmi Cv
Exception in thread "main" java.lang.OutOfMemoryError at
java.lang.AbstractStringBuilder.hugeCapacity(AbstractStringBuilder.java:161)
at
java.lang.AbstractStringBuilder.newCapacity(AbstractStringBuilder.java:155)
at
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:125)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at
java.lang.StringBuilder.append(StringBuilder.java:136) at
java.lang.StringBuilder.append(StringBuilder.java:131) at
scala.StringContext.standardInterpolator(StringContext.scala:125) at
scala.StringContext.s(StringContext.scala:95) at
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:230)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:54)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788) at
org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
at
org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2420)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2419)
at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2801) at
org.apache.spark.sql.Dataset.count(Dataset.scala:2419) at
com.samsung.cloud.mopay.linking.controller.PostNotificLinkController.linkPostNotific(PostNotificLinkController.java:51)
at
com.samsung.cloud.mopay.linking.txn.TxnLinking.performTxnLinking(TxnLinking.java:26)
at com.samsung.cloud.mopay.linking.Linking.processData(Linking.java:199) at
com.samsung.cloud.mopay.linking.Linking.main(Linking.java:72) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


I tried increasing
spark.driver.memory = 50g
spark.yarn.driver.memoryOverhead = 80g

Any help appreciated. I am stuck here for a while.

Thanks,
Nimmi


Spark Yarn Java Out Of Memory on Complex Query Execution Plan

2017-09-12 Thread nimmi.cv
Exception in thread "main" java.lang.OutOfMemoryError at
java.lang.AbstractStringBuilder.hugeCapacity(AbstractStringBuilder.java:161)
at
java.lang.AbstractStringBuilder.newCapacity(AbstractStringBuilder.java:155)
at
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:125)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) at
java.lang.StringBuilder.append(StringBuilder.java:136) at
java.lang.StringBuilder.append(StringBuilder.java:131) at
scala.StringContext.standardInterpolator(StringContext.scala:125) at
scala.StringContext.s(StringContext.scala:95) at
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:230)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:54)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788) at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2420)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2419)
at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2801) at
org.apache.spark.sql.Dataset.count(Dataset.scala:2419) at
com.samsung.cloud.mopay.linking.controller.PostNotificLinkController.linkPostNotific(PostNotificLinkController.java:51)
at
com.samsung.cloud.mopay.linking.txn.TxnLinking.performTxnLinking(TxnLinking.java:26)
at com.samsung.cloud.mopay.linking.Linking.processData(Linking.java:199) at
com.samsung.cloud.mopay.linking.Linking.main(Linking.java:72) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


I tried increasing 
spark.driver.memory = 50g
spark.yarn.driver.memoryOverhead = 80g

Any help appreciated. I am stuck here for a while.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unable to save an RDd on S3 with SSE-KMS encryption

2017-09-12 Thread Vikash Pareek
I am trying to save an rdd on S3 with server side encryption using KMS key
(SSE-KMS), But I am getting the following exception:

*Exception in thread "main"
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS
Service: Amazon S3, AWS Request ID: 695E32175EBA568A, AWS Error Code:
InvalidArgument, AWS Error Message: The encryption method specified is not
supported, S3 Extended Request ID:
Pi+HFLg0WsAWtkdI2S/xViOcRPMCi7zdHiaO5n1f7tiwpJe2z0lPY1C2Cr53PnnUCj3358Gx3AQ=*

Following is the piece of my test code to write an rdd on S3 by using
SSE-KMS for encryption:
/val sparkConf = new SparkConf().
  setMaster("local[*]").
  setAppName("aws-encryption")
val sc = new SparkContext(sparkConf)

sc.hadoopConfiguration.set("fs.s3a.access.key", AWS_ACCESS_KEY)
sc.hadoopConfiguration.set("fs.s3a.secret.key", AWS_SECRET_KEY)
sc.hadoopConfiguration.setBoolean("fs.s3a.sse.enabled", true)
sc.hadoopConfiguration.set("fs.s3a.server-side-encryption-algorithm",
"SSE-KMS")
sc.hadoopConfiguration.set("fs.s3a.sse.kms.keyId", KMS_ID)

val s3a = new org.apache.hadoop.fs.s3a.S3AFileSystem
val s3aName = s3a.getClass.getName
sc.hadoopConfiguration.set("fs.s3a.impl", s3aName)

val rdd = sc.parallelize(Seq("one", "two", "three", "four"))
println("rdd is: " + rdd.collect())
rdd.saveAsTextFile(s"s3a://$bucket/$objKey")
/

Although, I am able to write rdd on s3 with AES256 encryption but failing
with SSE-KMS.
Does spark/hadoop have a different value for KMS key encryption instead of
"SSE-KMS" or it doesn't support SSE-KMS encryption on AWS S3?

I found in the official document of hadoop that it only supports AES256 as
of now.
/
  fs.s3n.server-side-encryption-algorithm
  
  Specify a server-side encryption algorithm for S3.
  The default is NULL, and the only other currently allowable value is
AES256.
  
/

Can anyone please suggest what I am missing here or doing wrong?

My environment details as follow:
spark: 1.6.1
hadoop: 2.6.0
aws-java-sdk: 1.7.4

Thank you in advance.




-

__Vikash Pareek
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org