Re: Spark Streaming

2018-11-26 Thread Siva Samraj
My joindf is taking 14 sec in the first run and i have commented out the
withcolumn still it is taking more time.



On Tue, Nov 27, 2018 at 12:08 PM Jungtaek Lim  wrote:

> You may need to put efforts on triage how much time is spent on each part.
> Without such information you are only able to get general tips and tricks.
> Please check SQL tab and see DAG graph as well as details (logical plan,
> physical plan) to see whether you're happy about these plans.
>
> General tip on quick look of query: avoid using withColumn repeatedly and
> try to put them in one select statement. If I'm not mistaken, it is known
> as a bit costly since each call would produce a new Dataset. Defining
> schema and using "from_json" will eliminate all the call of withColumn"s"
> and extra calls of "get_json_object".
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2018년 11월 27일 (화) 오후 2:44, Siva Samraj 님이 작성:
>
>> Hello All,
>>
>> I am using Spark 2.3 version and i am trying to write Spark Streaming
>> Join. It is a basic join and it is taking more time to join the stream
>> data. I am not sure any configuration we need to set on Spark.
>>
>> Code:
>> *
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.streaming.Trigger
>> import org.apache.spark.sql.types.TimestampType
>>
>> object OrderSalesJoin {
>>   def main(args: Array[String]): Unit = {
>>
>> setEnvironmentVariables(args(0))
>>
>> val order_topic = args(1)
>> val invoice_topic = args(2)
>> val dest_topic_name = args(3)
>>
>> val spark =
>> SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()
>>
>> val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name
>>
>> import spark.implicits._
>>
>>
>> val order_df = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>>   .option("subscribe", order_topic)
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafka.replica.fetch.max.bytes", "15728640")
>>   .load()
>>
>>
>> val invoice_df = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>>   .option("subscribe", invoice_topic)
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafka.replica.fetch.max.bytes", "15728640")
>>   .load()
>>
>>
>> val order_details = order_df
>>   .withColumn("s_order_id", get_json_object($"value".cast("String"),
>> "$.order_id"))
>>   .withColumn("s_customer_id",
>> get_json_object($"value".cast("String"), "$.customer_id"))
>>   .withColumn("s_promotion_id",
>> get_json_object($"value".cast("String"), "$.promotion_id"))
>>   .withColumn("s_store_id", get_json_object($"value".cast("String"),
>> "$.store_id"))
>>   .withColumn("s_product_id",
>> get_json_object($"value".cast("String"), "$.product_id"))
>>   .withColumn("s_warehouse_id",
>> get_json_object($"value".cast("String"), "$.warehouse_id"))
>>   .withColumn("unit_cost", get_json_object($"value".cast("String"),
>> "$.unit_cost"))
>>   .withColumn("total_cost", get_json_object($"value".cast("String"),
>> "$.total_cost"))
>>   .withColumn("units_sold", get_json_object($"value".cast("String"),
>> "$.units_sold"))
>>   .withColumn("promotion_cost",
>> get_json_object($"value".cast("String"), "$.promotion_cost"))
>>   .withColumn("date_of_order",
>> get_json_object($"value".cast("String"), "$.date_of_order"))
>>   .withColumn("tstamp_trans", current_timestamp())
>>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
>> "MMddHHmmss").cast(TimestampType))
>>   .select($"s_customer_id", $"s_order_id", $"s_promotion_id",
>> $"s_store_id", $"s_product_id",
>> $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
>> $"total_cost".cast("integer") as "total_cost",
>> $"promotion_cost".cast("integer") as "promotion_cost",
>> $"date_of_order", $"tstamp_trans", $"TIMESTAMP",
>> $"units_sold".cast("integer") as "units_sold")
>>
>>
>> val invoice_details = invoice_df
>>   .withColumn("order_id", get_json_object($"value".cast("String"),
>> "$.order_id"))
>>   .withColumn("invoice_status",
>> get_json_object($"value".cast("String"), "$.invoice_status"))
>>   .where($"invoice_status" === "Success")
>>
>>   .withColumn("tstamp_trans", current_timestamp())
>>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
>> "MMddHHmmss").cast(TimestampType))
>>
>>
>>
>> val order_wm = order_details.withWatermark("tstamp_trans", args(4))
>> val invoice_wm = invoice_details.withWatermark("tstamp_trans",
>> args(5))
>>
>> val join_df = order_wm
>>   .join(invoice_wm, order_wm.col("s_order_id") ===
>> invoice_wm.col("order_id"))
>>   .select($"s_customer_id", $"s_promotion_id", 

Re: Spark Streaming

2018-11-26 Thread Jungtaek Lim
You may need to put efforts on triage how much time is spent on each part.
Without such information you are only able to get general tips and tricks.
Please check SQL tab and see DAG graph as well as details (logical plan,
physical plan) to see whether you're happy about these plans.

General tip on quick look of query: avoid using withColumn repeatedly and
try to put them in one select statement. If I'm not mistaken, it is known
as a bit costly since each call would produce a new Dataset. Defining
schema and using "from_json" will eliminate all the call of withColumn"s"
and extra calls of "get_json_object".

- Jungtaek Lim (HeartSaVioR)

2018년 11월 27일 (화) 오후 2:44, Siva Samraj 님이 작성:

> Hello All,
>
> I am using Spark 2.3 version and i am trying to write Spark Streaming
> Join. It is a basic join and it is taking more time to join the stream
> data. I am not sure any configuration we need to set on Spark.
>
> Code:
> *
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.types.TimestampType
>
> object OrderSalesJoin {
>   def main(args: Array[String]): Unit = {
>
> setEnvironmentVariables(args(0))
>
> val order_topic = args(1)
> val invoice_topic = args(2)
> val dest_topic_name = args(3)
>
> val spark =
> SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()
>
> val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name
>
> import spark.implicits._
>
>
> val order_df = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>   .option("subscribe", order_topic)
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafka.replica.fetch.max.bytes", "15728640")
>   .load()
>
>
> val invoice_df = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>   .option("subscribe", invoice_topic)
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafka.replica.fetch.max.bytes", "15728640")
>   .load()
>
>
> val order_details = order_df
>   .withColumn("s_order_id", get_json_object($"value".cast("String"),
> "$.order_id"))
>   .withColumn("s_customer_id",
> get_json_object($"value".cast("String"), "$.customer_id"))
>   .withColumn("s_promotion_id",
> get_json_object($"value".cast("String"), "$.promotion_id"))
>   .withColumn("s_store_id", get_json_object($"value".cast("String"),
> "$.store_id"))
>   .withColumn("s_product_id", get_json_object($"value".cast("String"),
> "$.product_id"))
>   .withColumn("s_warehouse_id",
> get_json_object($"value".cast("String"), "$.warehouse_id"))
>   .withColumn("unit_cost", get_json_object($"value".cast("String"),
> "$.unit_cost"))
>   .withColumn("total_cost", get_json_object($"value".cast("String"),
> "$.total_cost"))
>   .withColumn("units_sold", get_json_object($"value".cast("String"),
> "$.units_sold"))
>   .withColumn("promotion_cost",
> get_json_object($"value".cast("String"), "$.promotion_cost"))
>   .withColumn("date_of_order",
> get_json_object($"value".cast("String"), "$.date_of_order"))
>   .withColumn("tstamp_trans", current_timestamp())
>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
> "MMddHHmmss").cast(TimestampType))
>   .select($"s_customer_id", $"s_order_id", $"s_promotion_id",
> $"s_store_id", $"s_product_id",
> $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
> $"total_cost".cast("integer") as "total_cost",
> $"promotion_cost".cast("integer") as "promotion_cost",
> $"date_of_order", $"tstamp_trans", $"TIMESTAMP",
> $"units_sold".cast("integer") as "units_sold")
>
>
> val invoice_details = invoice_df
>   .withColumn("order_id", get_json_object($"value".cast("String"),
> "$.order_id"))
>   .withColumn("invoice_status",
> get_json_object($"value".cast("String"), "$.invoice_status"))
>   .where($"invoice_status" === "Success")
>
>   .withColumn("tstamp_trans", current_timestamp())
>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
> "MMddHHmmss").cast(TimestampType))
>
>
>
> val order_wm = order_details.withWatermark("tstamp_trans", args(4))
> val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5))
>
> val join_df = order_wm
>   .join(invoice_wm, order_wm.col("s_order_id") ===
> invoice_wm.col("order_id"))
>   .select($"s_customer_id", $"s_promotion_id", $"s_store_id",
> $"s_product_id",
> $"s_warehouse_id", $"unit_cost", $"total_cost",
> $"promotion_cost",
> $"date_of_order",
> $"units_sold" as "units_sold", $"order_id")
>
> val final_ids = join_df
>   .withColumn("value", to_json(struct($"s_customer_id",
> $"s_promotion_id", 

Spark Streaming

2018-11-26 Thread Siva Samraj
Hello All,

I am using Spark 2.3 version and i am trying to write Spark Streaming Join.
It is a basic join and it is taking more time to join the stream data. I am
not sure any configuration we need to set on Spark.

Code:
*
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType

object OrderSalesJoin {
  def main(args: Array[String]): Unit = {

setEnvironmentVariables(args(0))

val order_topic = args(1)
val invoice_topic = args(2)
val dest_topic_name = args(3)

val spark =
SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()

val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name

import spark.implicits._


val order_df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  .option("subscribe", order_topic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "15728640")
  .load()


val invoice_df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  .option("subscribe", invoice_topic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "15728640")
  .load()


val order_details = order_df
  .withColumn("s_order_id", get_json_object($"value".cast("String"),
"$.order_id"))
  .withColumn("s_customer_id", get_json_object($"value".cast("String"),
"$.customer_id"))
  .withColumn("s_promotion_id",
get_json_object($"value".cast("String"), "$.promotion_id"))
  .withColumn("s_store_id", get_json_object($"value".cast("String"),
"$.store_id"))
  .withColumn("s_product_id", get_json_object($"value".cast("String"),
"$.product_id"))
  .withColumn("s_warehouse_id",
get_json_object($"value".cast("String"), "$.warehouse_id"))
  .withColumn("unit_cost", get_json_object($"value".cast("String"),
"$.unit_cost"))
  .withColumn("total_cost", get_json_object($"value".cast("String"),
"$.total_cost"))
  .withColumn("units_sold", get_json_object($"value".cast("String"),
"$.units_sold"))
  .withColumn("promotion_cost",
get_json_object($"value".cast("String"), "$.promotion_cost"))
  .withColumn("date_of_order", get_json_object($"value".cast("String"),
"$.date_of_order"))
  .withColumn("tstamp_trans", current_timestamp())
  .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
"MMddHHmmss").cast(TimestampType))
  .select($"s_customer_id", $"s_order_id", $"s_promotion_id",
$"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
$"total_cost".cast("integer") as "total_cost",
$"promotion_cost".cast("integer") as "promotion_cost",
$"date_of_order", $"tstamp_trans", $"TIMESTAMP",
$"units_sold".cast("integer") as "units_sold")


val invoice_details = invoice_df
  .withColumn("order_id", get_json_object($"value".cast("String"),
"$.order_id"))
  .withColumn("invoice_status",
get_json_object($"value".cast("String"), "$.invoice_status"))
  .where($"invoice_status" === "Success")

  .withColumn("tstamp_trans", current_timestamp())
  .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
"MMddHHmmss").cast(TimestampType))



val order_wm = order_details.withWatermark("tstamp_trans", args(4))
val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5))

val join_df = order_wm
  .join(invoice_wm, order_wm.col("s_order_id") ===
invoice_wm.col("order_id"))
  .select($"s_customer_id", $"s_promotion_id", $"s_store_id",
$"s_product_id",
$"s_warehouse_id", $"unit_cost", $"total_cost",
$"promotion_cost",
$"date_of_order",
$"units_sold" as "units_sold", $"order_id")

val final_ids = join_df
  .withColumn("value", to_json(struct($"s_customer_id",
$"s_promotion_id", $"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost",
$"total_cost".cast("Int") as "total_cost",
$"promotion_cost".cast("Int") as "promotion_cost",
$"date_of_order",
$"units_sold".cast("Int") as "units_sold", $"order_id")))
  .dropDuplicates("order_id")
  .select("value")


val write_df = final_ids
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  .option("topic", dest_topic_name)
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.ProcessingTime("1 second"))
  .start()

write_df.awaitTermination()

  }

}


Let me know, it is taking more than a minute for every run. The waiting
time is keep on increasing as the data grows.

Please let me know, any thing we need to configure to make it fast. I tried

Spark column combinations and combining multiple dataframes (pyspark)

2018-11-26 Thread Christopher Petrino
Hi all, I'm working on a problem where it is necessary to find all
combinations of columns for a dataframe.

THE PROBLEM:

Let's say there is a dataframe with columns:
[ col_a, col_b, col_c, col_d, col_e, result ]

The number of combinations can vary between 1 and 5 but lets say 3 for this
case.

This would result in something like:
[ col_a, col_b, col_c, result ]
[ col_b, col_c, col_d, result ]
[ col_b, col_c, col_e, result ]
... etc

These would have their columns renamed and combined to result in:
[ col_1, col_2, col_3, result ]


THE CURRENT APPROACH:

The current approach is to use python and itertools.combinations to produce
all combinations. Then for each combination create a new dataframe from the
original dataframe using

df.select(cols).toDF(*cols).cache()

{Cache and count are being used to help track progress and resolve previous
issues}
The generated dataframe is added to a python array like

the_dfs.append(df.select(cols).toDF(*cols).cache())
the_dfs[len(the_dfs)].count()

The dataframes are finally combined using

df_all = reduce(DataFrame.union, the_dfs).cache()
df_all.count()


THE CURRENT STATE:

The proof of concept works on a smaller amount of data but as the data size
increases this approach has proven to be unreliable. I've had several
blocking issues that I've resolved by caching dataframes.

Can anyone provide any criticism on the current approach or advice on a
different approach?


Re: [Spark SQL]: Does Spark SQL 2.3+ suppor UDT?

2018-11-26 Thread Suny Tyagi
Thanks and Regards,
Suny Tyagi
Phone No : 9885027192


On Mon, Nov 26, 2018 at 10:31 PM Suny Tyagi  wrote:

> Hi Team,
>
>
> I was going through this ticket
> https://issues.apache.org/jira/browse/SPARK-7768?jql=text%20~%20%22public%20udt%22
>  and
> could not understand that if spark support UDT in  2.3+ version in any
> language (scala, python , java, R) ?
>
> I have class something like this
>
> Class Test{
>
> string name;
>
> int age;
>
> }
>
>
> And My UDF method is:
>
> public Test UDFMethod(string name, int age){
>
>Test ob = new Test();
>
>ob.name = name;
>
>ob.age = age;
>
> }
>
> Sample Spark query-   `Select *, UDFMethod(name, age) From SomeTable;`
>
> Now UDFMethod(name, age) will return Class Test object. So will this work
> in Spark SQL after using SQLUserDefinedType tag and extending
> UserDefinedType class.
>
>  As UserDefinedType is private in Spark 2.0. I am just want to know if UDT
> is support in Spark 2.3+. If yes what is the best to use UserDefinedType or
> UDTRegisteration. As of now both are private.
>
>
> Thanks,
>
> Suny Tyagi
>
>
>
>
> Thanks and Regards,
> Suny Tyagi
> Phone No : 9885027192
>


Encoding not working when using a map / mapPartitions call

2018-11-26 Thread ccaspanello
Attached you will find a project with unit tests showing the issue at hand.

If I read in a ISO-8859-1 encoded file and simply write out what was read;
the contents in the part file matches what was read.  Which is great.

However, the second I use a map / mapPartitions function it looks like the
encoding is not correct.  In addition a simple collectAsList and writing
that list of strings to a file does not work either.  I don't think I'm
doing anything wrong.  Can someone please investigate?  I think this is a
bug.

spark-sandbox.zip

  



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Zookeeper and Spark deployment for standby master

2018-11-26 Thread Jörn Franke
I guess it is the usual things - if the non zookeeper processes take too much 
memory , disk space etc it will negatively affect zookeeper and thus your whole 
running cluster. You will have to make for your specific architectural setting 
a risk assessment if this is acceptable. 

> Am 26.11.2018 um 07:25 schrieb Akila Wajirasena :
> 
> Hi,
> 
> Is it necessary to deploy Apache Zookeeper and Spark in separate set of hosts 
> when using standby masters? What will be the consequences of running 
> Zookeeper quorum in the same physical machines that runs the Spark cluster.
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org