BigDL and Analytics Zoo talks at upcoming Spark+AI Summit and Strata London

2019-04-18 Thread Jason Dai
Hi all,


Please see below for a list of upcoming technical talks
on BigDL and Analytics Zoo (
https://github.com/intel-analytics/analytics-zoo/) in the coming weeks:


   - Engineers from CERN will present a technical talk Deep Learning on
   Apache Spark at CERN’s Large Hardon Collider with Intel Technologies
   

at *Spark + AI Summit in San Francisco* (11:50am-12:20pm, Wednesday, 24
   April 2019)
   - Engineers from Intel will present a technical talk Leveraging NLP and
   Deep Learning for Document Recommendations in the Cloud
    at *Spark
   + AI Summit in San Francisco* (5:20-6pm, Wednesday, 24 April 2019)
   - Engineers from Dell EMC and Intel will present a technical talk Using
   Deep Learning on Apache Spark to Diagnose Thoracic Pathology from Chest
   X-rays
   

at* Spark + AI Summit in San Francisco* (3:30-4:10pm, Thursday, 25
   April 2019)
   - Engineers from Intel will host a session Game Playing Using AI on
   Apache Spark
    at*
   Spark + AI Summit in San Francisco* (5:30-6:10pm, Thursday, 25 April 2019
   )
   - Engineers from Intel will host a session LSTM-based time series
   anomaly detection using Analytics Zoo for Spark and BigDL
   

   at *Strata Data Conference in London* (16:35-17:15pm, Wednesday, 1 May
   2019)

If you plan to attend these events, please drop by and talk to the speakers
:-)



Thanks,

-Jason


Spark-submit and no java log file generated

2019-04-18 Thread Mann Du
Hello there,

I am running a Java Spark application. Most of the modules write to a log
file (not the spark log file). I can either use "java -jar " or
"spark-submit" to run the application.

If I use "java -jar myApp.jar" the log file will be generated in the
directory $LOG_DIR or in a default dir if the environmental variable
$LOG_DIR is not set.  The problem is that it doesn't take spark config
options, such as "--master local[3]" in the command line. I have to
hardcoded in the code master("local[3]") and such. Apparently I can't
hardcoded for running in a cluster.

If I use "spark-submit myMay.jar --master local[3] --xxx ", no customer log
files will be generated anywhere, neither in $LOG_DIR or the default dir.

My question is: How can I use "spark-submit" and still have the customized
log file generated?
Does anyone know what happened and how to fix it?

Thanks in advance!

Best,

Mann


Re: Difference between Checkpointing and Persist

2019-04-18 Thread Vadim Semenov
saving/checkpointing would be preferable in case of a big data set because:

- the RDD gets saved to HDFS and the DAG gets truncated so if some
partitions/executors fail it won't result in recomputing everything

- you don't use memory for caching therefore the JVM heap is going to be
smaller which helps GC and overall there'll be more memory for other
operations

- by saving to HDFS you're removing potential hotspots since partitions can
be fetched from many DataNodes vs when you get a hot partition that gets
requested a lot by other executors you may end up with an overwhelmed
executor

> We save that intermediate RDD and perform join (using same RDD - saving
is to just persist intermediate result before joining)
Checkpointing is essentially saving the RDD and reading it back, however
you can't read checkpointed data if the job failed so it'd be nice to have
one part of the join saved in case of potential issues.

Overall, in my opinion, when working with big joins you should pay more
attention to reliability and fault-tolerance rather than pure speed as the
probability of having issues grows with increasing the dataset size and
cluster size.

On Thu, Apr 18, 2019 at 1:49 PM Subash Prabakar 
wrote:

> Hi All,
>
> I have a doubt about checkpointing and persist/saving.
>
> Say we have one RDD - containing huge data,
> 1. We checkpoint and perform join
> 2. We persist as StorageLevel.MEMORY_AND_DISK and perform join
> 3. We save that intermediate RDD and perform join (using same RDD - saving
> is to just persist intermediate result before joining)
>
>
> Which of the above is faster and whats the difference?
>
>
> Thanks,
> Subash
>


-- 
Sent from my iPhone


Re: Difference between Checkpointing and Persist

2019-04-18 Thread Jack Kolokasis

Hi,

    in my point of view a good approach is first persist your data in 
StorageLevel.Memory_And_Disk and then perform join. This will accelerate 
your computation because data will be presented in memory and in your 
local intermediate storage device.


--Iacovos

On 4/18/19 8:49 PM, Subash Prabakar wrote:

Hi All,

I have a doubt about checkpointing and persist/saving.

Say we have one RDD - containing huge data,
1. We checkpoint and perform join
2. We persist as StorageLevel.MEMORY_AND_DISK and perform join
3. We save that intermediate RDD and perform join (using same RDD - 
saving is to just persist intermediate result before joining)



Which of the above is faster and whats the difference?


Thanks,
Subash


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Difference between Checkpointing and Persist

2019-04-18 Thread Subash Prabakar
Hi All,

I have a doubt about checkpointing and persist/saving.

Say we have one RDD - containing huge data,
1. We checkpoint and perform join
2. We persist as StorageLevel.MEMORY_AND_DISK and perform join
3. We save that intermediate RDD and perform join (using same RDD - saving
is to just persist intermediate result before joining)


Which of the above is faster and whats the difference?


Thanks,
Subash


Re: writing into oracle database is very slow

2019-04-18 Thread Jörn Franke
What is the size of the data? How much time does it need on HDFS and how much 
on Oracle? How many partitions do you have on Oracle side?

> Am 06.04.2019 um 16:59 schrieb Lian Jiang :
> 
> Hi,
> 
> My spark job writes into oracle db using:
> df.coalesce(10).write.format("jdbc").option("url", url)
>   .option("driver", driver).option("user", user)
>   .option("batchsize", 2000)
>   .option("password", password).option("dbtable", 
> tableName).mode("append").save()
> It is much slow than writting into HDFS. The data to write is small.
> Is this expected? Thanks for any clue.
> 


[Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-18 Thread Juho Autio
Hi,

My job is writing ~10 partitions with insertInto. With the same input /
output data the total duration of the job is very different depending on
how many partitions the target table has.

Target table with 10 of partitions:
1 min 30 s

Target table with ~1 partitions:
13 min 0 s

It seems that spark is always fetching the full list of partitions in
target table. When this happens, the cluster is basically idling while
driver is listing partitions.

Here's a thread dump for executor driver from such idle time:
https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20

Is there any way to optimize this currently? Is this a known issue? Any
plans to improve?

My code is essentially:

spark = SparkSession.builder \
.config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
.config("hive.exec.dynamic.partition", "true") \
.config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()

out_df.write \
.option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
.insertInto(target_table_name, overwrite=True)

Table has been originally created from spark with saveAsTable.

Does spark need to know anything about the existing partitions though? As a
manual workaround I would write the files directly to the partition
locations, delete existing files first if there's anything in that
partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
PARTITION. This doesn't require previous knowledge on existing partitions.

Thanks.


autoBroadcastJoinThreshold not working as expected

2019-04-18 Thread Mike Chan
Dear all,

I'm on a case that when certain table being exposed to broadcast join, the
query will eventually failed with remote block error.

Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely
10485760
[image: image.png]

Then we proceed to perform query. In the SQL plan, we found that one table
that is 25MB in size is broadcast as well.

[image: image.png]

Also in desc extended the table is 24452111 bytes. It is a Hive table. We
always ran into error when this table being broadcast. Below is the sample
error

Caused by: java.io.IOException: org.apache.spark.SparkException:
corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625
!= -992055931
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)


Also attached the physical plan if you're interested. One thing to
note that, if I turn down autoBroadcastJoinThreshold

to 5MB, this query will get successfully executed and default.product
NOT broadcasted.


However, when I change to another query that querying even less
columns than pervious one, even in 5MB this table still get
broadcasted and failed with the same error. I even changed to 1MB and
still the same.


Appreciate if you can share any input. Thank you very much.


Best Regards,

MIke
== Physical Plan ==
*(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 
AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 
END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, 
vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, 
bu_name#273, principle_supplier_code#154 AS supplier_code#476, 
mother_company_name#150 AS supplier_name#477, brand_type_name#117, 
brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, 
coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, 
h1_l1_hierarchy_name#126 AS Category_Name#480, 
coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, 
coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 
more fields]
+- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight
   :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 41 more fields]
   :  +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], 
[fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight
   : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 35 more fields]
   : :  +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], 
LeftOuter, BuildRight
   : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, 
adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 33 more fields]
   : : :  +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], 
[cast(store_key#155 as double)], LeftOuter, BuildRight
   : : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454,