Re: External Spark shuffle service for k8s

2024-04-07 Thread Enrico Minack

There is Apache incubator project Uniffle:
https://github.com/apache/incubator-uniffle

It stores shuffle data on remote servers in memory, on local disk and HDFS.

Cheers,
Enrico


Am 06.04.24 um 15:41 schrieb Mich Talebzadeh:

I have seen some older references for shuffle service for k8s,
although it is not clear they are talking about a generic shuffle
service for k8s.

Anyhow with the advent of genai and the need to allow for a larger
volume of data, I was wondering if there has been any more work on
this matter. Specifically larger and scalable file systems like HDFS,
GCS , S3 etc, offer significantly larger storage capacity than local
disks on individual worker nodes in a k8s cluster, thus allowing
handling much larger datasets more efficiently. Also the degree of
parallelism and fault tolerance  with these files systems come into
it. I will be interested in hearing more about any progress on this.

Thanks
.

Mich Talebzadeh,

Technologist | Solutions Architect | Data Engineer  | Generative AI

London
United Kingdom


view my Linkedin profile


  https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: AQE coalesce 60G shuffle data into a single partition

2024-02-24 Thread Enrico Minack

Hi Shay,

maybe this is related to the small number of output rows (1,250) of the
last exchange step that consume those 60GB shuffle data.

Looks like your outer transformation is something like

df.groupBy($"id").agg(collect_list($"prop_name"))

Have you tried adding a repartition as an attempt to convince AQE to
exchange into a specific number of partitions?

df.groupBy($"id").agg(collect_list($"prop_name")).repartition(100, $"id")

Can you provide some Spark code that reproduce the issue with synthetic
data and cleansed Spark logic?

Cheers,
Enrico


Am 22.02.24 um 15:14 schrieb Shay Elbaz:

Dear community,

We have noticed that AQE is coalescing a substantial amount of data
(approximately 60GB) into a single partition during query execution.
This behavior is unexpected given the absence of data skew, broadcast,
and the significant size of the shuffle operation.

*Environment Details:*

 *
Apache Spark Version: 3.1.3
 *
Platform: Dataproc 2.0
 *
Executors Configuration: 90GB memory, 15 cores

*Configuration Parameters:* We have examined the relevant
configuration parameters, and tried many different variations, but the
behavior persists. For example:
spark.sql.adaptive.advisoryPartitionSizeInBytes=104857600 //100MB
spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000
spark.sql.adaptive.coalescePartitions.minPartitionNum=500
spark.sql.optimizer.dynamicPartitionPruning.enabled=false
spark.sql.autoBroadcastJoinThreshold=-1 // off

*The full plan and diagram from the SQL tab are shown below*

Please advice:

 1. Are there additional configuration parameters or best practices we
should be aware of in such scenarios?
2.
Are there known related issues in 3.1.3? (didn't find any on Jira)


Thanks in advance,
Shay


...


Re: ordering of rows in dataframe

2023-12-05 Thread Enrico Minack

Looks like what you want is to add a column that, when ordered by that
column, the current order of the dateframe is preserved.

All you need is the monotonically_increasing_id() function:

spark.range(0, 10, 1, 5).withColumn("row",
monotonically_increasing_id()).show()
+---+---+
| id|    row|
+---+---+
|  0|  0|
|  1|  1|
|  2| 8589934592|
|  3| 8589934593|
|  4|17179869184|
|  5|17179869185|
|  6|25769803776|
|  7|25769803777|
|  8|34359738368|
|  9|34359738369|
+---+---+

Within a partition, all columns have consecutive row numbers, the start
of a new partition observes a jump in the row number. The example above
has 5 partitions with 2 rows each.

In case you need a global consecutive row number (not needed to preserve
current dataframe order as you want it), you can use the
Dataframe.with_row_numbers() method provided by the Spark-Extension
package:
https://github.com/G-Research/spark-extension/blob/master/ROW_NUMBER.md

import gresearch.spark

df.with_row_numbers().show()
+---+--+
| id|row_number|
+---+--+
|  1| 1|
|  2| 2|
|  2| 3|
|  3| 4|
+---+--+

Cheers,
Enrico



Am 05.12.23 um 18:25 schrieb Som Lima:


want to maintain the order of the rows in the data frame in Pyspark.
Is there any way to achieve this for this function here we have the
row ID which will give numbering to each row. Currently, the below
function results in the rearrangement of the row in the data frame.

|def createRowIdColumn(new_column, position, start_value): row_count =
df.count() row_ids = spark.range(int(start_value), int(start_value) +
row_count, 1).toDF(new_column) window = Window.orderBy(lit(1))
df_row_ids = row_ids.withColumn("row_num", row_number().over(window) -
1) df_with_row_num = df.withColumn("row_num",
row_number().over(window) - 1) if position == "Last Column": result =
df_with_row_num.join(df_row_ids, on="row_num").drop("row_num") else:
result = df_row_ids.join(df_with_row_num,
on="row_num").drop("row_num") return result.orderBy(new_column) |

Please let me know the solution if we can achieve this requirement.




Re: [PySpark][Spark Dataframe][Observation] Why empty dataframe join doesn't let you get metrics from observation?

2023-12-05 Thread Enrico Minack

Hi Michail,

with spark.conf.set("spark.sql.planChangeLog.level", "WARN") you can see
how Spark optimizes the query plan.

In PySpark, the plan is optimized into

Project ...
  +- CollectMetrics 2, [count(1) AS count(1)#200L]
  +- LocalTableScan , [col1#125, col2#126L, col3#127, col4#132L]

The entire join gets optimized away into an empty table. Looks like it
figures out that df has no rows with col1 = 'c'. So df is never consumed
/ iterated, so the observation does not retrieve any metrics.

In Scala, the optimization is different:

*(2) Project ...
  +- CollectMetrics 2, [count(1) AS count(1)#63L]
     +- *(1) Project [col1#37, col2#38, col3#39, cast(null as int) AS
col4#51]
        +- *(1) Filter (isnotnull(col1#37) AND (col1#37 = c))
       +- CollectMetrics 1, [count(1) AS count(1)#56L]
          +- LocalTableScan [col1#37, col2#38, col3#39]

where the join also gets optimized away, but table df is still filtered
for col1 = 'c', which iterates over the rows and collects the metrics
for observation 1.

Hope this helps to understand why there are no observed metrics for
Observation("1") in your case.

Enrico



Am 04.12.23 um 10:45 schrieb Enrico Minack:

Hi Michail,

observations as well as ordinary accumulators only observe / process
rows that are iterated / consumed by downstream stages. If the query
plan decides to skip one side of the join, that one will be removed from
the final plan completely. Then, the Observation will not retrieve any
metrics and .get waits forever. Definitively not helpful.

When creating the Observation class, we thought about a timeout for the
get method but could not find a use case where the user would call get
without first executing the query. Here is a scenario where though
executing the query there is no observation result. We will rethink this.

Interestingly, your example works in Scala:

import org.apache.spark.sql.Observation

val df = Seq(("a", 1, "1 2 3 4"), ("b", 2, "1 2 3 4")).toDF("col1",
"col2", "col3")
val df_join = Seq(("a", 6), ("b", 5)).toDF("col1", "col4")

val o1 = Observation()
val o2 = Observation()

val df1 = df.observe(o1, count("*")).filter("col1 = 'c'")
val df2 = df1.join(df_join, "col1", "left").observe(o2, count("*"))

df2.show()
+++++
|col1|col2|col3|col4|
+++++
+++++

o1.get
Map[String,Any] = Map(count(1) -> 2)

o2.get
Map[String,Any] = Map(count(1) -> 0)


Pyspark and Scala should behave identically here. I will investigate.

Cheers,
Enrico



Am 02.12.23 um 17:11 schrieb Михаил Кулаков:

Hey folks, I actively using observe method on my spark jobs and
noticed interesting behavior:
Here is an example of working and non working code:
https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c
<https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c>

In a few words, if I'm joining dataframe after some filter rules and
it became empty, observations configured on the first dataframe never
return any results, unless some action called on the empty dataframe
specifically before join.

Looks like a bug to me, I will appreciate any advice on how to fix
this behavior.




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [PySpark][Spark Dataframe][Observation] Why empty dataframe join doesn't let you get metrics from observation?

2023-12-04 Thread Enrico Minack

Hi Michail,

observations as well as ordinary accumulators only observe / process
rows that are iterated / consumed by downstream stages. If the query
plan decides to skip one side of the join, that one will be removed from
the final plan completely. Then, the Observation will not retrieve any
metrics and .get waits forever. Definitively not helpful.

When creating the Observation class, we thought about a timeout for the
get method but could not find a use case where the user would call get
without first executing the query. Here is a scenario where though
executing the query there is no observation result. We will rethink this.

Interestingly, your example works in Scala:

import org.apache.spark.sql.Observation

val df = Seq(("a", 1, "1 2 3 4"), ("b", 2, "1 2 3 4")).toDF("col1",
"col2", "col3")
val df_join = Seq(("a", 6), ("b", 5)).toDF("col1", "col4")

val o1 = Observation()
val o2 = Observation()

val df1 = df.observe(o1, count("*")).filter("col1 = 'c'")
val df2 = df1.join(df_join, "col1", "left").observe(o2, count("*"))

df2.show()
+++++
|col1|col2|col3|col4|
+++++
+++++

o1.get
Map[String,Any] = Map(count(1) -> 2)

o2.get
Map[String,Any] = Map(count(1) -> 0)


Pyspark and Scala should behave identically here. I will investigate.

Cheers,
Enrico



Am 02.12.23 um 17:11 schrieb Михаил Кулаков:

Hey folks, I actively using observe method on my spark jobs and
noticed interesting behavior:
Here is an example of working and non working code:
https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c


In a few words, if I'm joining dataframe after some filter rules and
it became empty, observations configured on the first dataframe never
return any results, unless some action called on the empty dataframe
specifically before join.

Looks like a bug to me, I will appreciate any advice on how to fix
this behavior.




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Apache Spark not reading UTC timestamp from MongoDB correctly

2023-06-08 Thread Enrico Minack
Sean is right, casting timestamps to strings (which is what show() does) 
uses the local timezone, either the Java default zone `user.timezone`, 
the Spark default zone `spark.sql.session.timeZone` or the default 
DataFrameWriter zone `timeZone`(when writing to file).


You say you are in PST, which is UTC - 8 hours. But I think this 
currently observes daylight saving, so PDT, which is UTC - 7 hours.


Then, your UTC timestamp is correctly displayed in local PDT time. Try 
the change above settings to display in different timezones. Inspecting 
the underlying long value as suggested by Sean is best practice to get 
hold of the true timestamp.


Cheers,
Enrico


Am 09.06.23 um 00:53 schrieb Sean Owen:
You sure it is not just that it's displaying in your local TZ? Check 
the actual value as a long for example. That is likely the same time.


On Thu, Jun 8, 2023, 5:50 PM karan alang  wrote:

ref :

https://stackoverflow.com/questions/76436159/apache-spark-not-reading-utc-timestamp-from-mongodb-correctly

Hello All,
I've data stored in MongoDB collection and the timestamp column is
not being read by Apache Spark correctly. I'm running Apache Spark
on GCP Dataproc.

Here is sample data :

-

|In Mongo : timeslot_date : timeslot |timeslot_date |
+--+--
1683527400|{2023-05-08T06:30:00Z}| When I use pyspark to read this
: +--+---+ timeslot |timeslot_date |
+--+---+ 1683527400|2023-05-07 23:30:00|
++---+-|

|-|

|

My understanding is, data in Mongo is in UTC format i.e.
2023-05-08T06:30:00Z is in UTC format. I'm in PST timezone. I'm
not clear why spark is reading it a different timezone format
(neither PST nor UTC) Note - it is not reading it as PST timezone,
if it was doing that it would advance the time by 7 hours, instead
it is doing the opposite.

Where is the default timezone format taken from, when Spark is
reading data from MongoDB ?

Any ideas on this ?

tia!

|




Re: Incremental Value dependents on another column of Data frame Spark

2023-05-24 Thread Enrico Minack

Hi,

given your dataset:

val df=Seq(
  (1, 20230523, "M01"), (2, 20230523, "M01"), (3, 20230523, "M01"), (4, 20230523, "M02"), (5, 20230523, "M02"), (6, 20230523, "M02"), (7, 20230523, 
"M01"), (8, 20230523, "M01"), (9, 20230523, "M02"), (10, 20230523, "M02"), (11, 20230523, "M02"), (12, 20230523, "M01"), (13, 20230523, 
"M02")
).toDF("Row", "Date", "Type")

the simplest you can get with column functions is this:

import org.apache.spark.sql.expressions.Window
val dateWindow=Window.partitionBy("Date").orderBy("Row")
val batchWindow=Window.partitionBy("Date", "batch").orderBy("Row")
df.withColumn("inc", when($"Type" =!=lag($"Type", 1).over(dateWindow), 
lit(1)).otherwise(lit(0)))
  .withColumn("batch", sum($"inc").over(dateWindow))
  .withColumn("count", when($"Type" ==="M01", 
lit(0)).otherwise(row_number().over(batchWindow)))
  .show

This creates:

+---+++---+-+-+
|Row|Date|Type|inc|batch|count|
+---+++---+-+-+
|  1|20230523| M01|  0|0|0|
|  2|20230523| M01|  0|0|0|
|  3|20230523| M01|  0|0|0|
|  4|20230523| M02|  1|1|1|
|  5|20230523| M02|  0|1|2|
|  6|20230523| M02|  0|1|3|
|  7|20230523| M01|  1|2|0|
|  8|20230523| M01|  0|2|0|
|  9|20230523| M02|  1|3|1|
| 10|20230523| M02|  0|3|2|
| 11|20230523| M02|  0|3|3|
| 12|20230523| M01|  1|4|0|
| 13|20230523| M02|  1|5|1|
+---+++---+-+-+

Column "inc" is used to split the partition into batches of same 
consecutive types. Column "batch" gives rows of those batches a unique 
ids. For each of those batch, we can use row_number to create count, but 
for "M01" we set "count" to 0.


The query plan does not look too bad, no extra shuffle involved.


Raghavendra proposed to iterate the partitions. I presume that you 
partition by "Date" and order within partition by "Row", which puts 
multiple dates into one partition. Even if you have one date per 
partition, AQE might coalesce partitions into bigger ones. This can can 
get you into trouble when a date starts with "M02".


You could group your dataset by "Date" and process the individual sorted 
groups (requires Spark 3.4.0). This way, you still partition by "Date" 
but process only individual dates, as proposed by Raghavendra:


import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.catalyst.encoders.RowEncoder val groups  
=df.groupByKey(_.getInt(1))
groups.flatMapSortedGroups($"Row") {case (_:Int, rows:Iterator[Row]) =>
  var count =0 rows.map {row =>
if (row.getString(2) =="M01") {
  count =0 }else {
  count +=1 }
Row.fromSeq(row.toSeq :+count)
  }
}(RowEncoder(df.schema.add("count", IntegerType))).show

Cheers,
Enrico


Am 23.05.23 um 20:13 schrieb Nipuna Shantha:

Hi all,

This is the sample set of data that I used for this task

image.png

My expected output is as below

image.png

My scenario is if Type is M01 the count should be 0 and if Type is M02 
it should be incremented from 1 or 0 until the sequence of M02 is 
finished. Imagine this as a partition so row numbers cannot jumble. So 
can you guys suggest a method to this scenario. Also for your concern 
this dataset is really large; it has around 1 records and I am 
using spark with scala


Thank You,
Best Regards

 
	Virus-free.www.avast.com 
 





Re: Write custom JSON from DataFrame in PySpark

2023-05-04 Thread Enrico Minack

Hi,

You could rearrange the DataFrame so that writing the DataFrame as-is
produces your structure:

df = spark.createDataFrame([(1, "a1"), (2, "a2"), (3, "a3")], "id int,
datA string")
+---++
| id|datA|
+---++
|  1|  a1|
|  2|  a2|
|  3|  a3|
+---++

df2 = df.select(df.id, struct(df.datA).alias("stuff"))
root
 |-- id: integer (nullable = true)
 |-- stuff: struct (nullable = false)
 |    |-- datA: string (nullable = true)
+---+-+
| id|stuff|
+---+-+
|  1| {a1}|
|  2| {a2}|
|  3| {a3}|
+---+-+

df2.write.json("data.json")
{"id":1,"stuff":{"datA":"a1"}}
{"id":2,"stuff":{"datA":"a2"}}
{"id":3,"stuff":{"datA":"a3"}}

Looks pretty much like what you described.

Enrico


Am 04.05.23 um 06:37 schrieb Marco Costantini:

Hello,

Let's say I have a very simple DataFrame, as below.

+---++
| id|datA|
+---++
|  1|  a1|
|  2|  a2|
|  3|  a3|
+---++

Let's say I have a requirement to write this to a bizarre JSON
structure. For example:

{
  "id": 1,
  "stuff": {
    "datA": "a1"
  }
}

How can I achieve this with PySpark? I have only seen the following:
- writing the DataFrame as-is (doesn't meet requirement)
- using a UDF (seems frowned upon)

What I have tried is to do this within a `foreach`. I have had some
success, but also some problems with other requirements (serializing
other things).

Any advice? Please and thank you,
Marco.




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Use Spark Aggregator in PySpark

2023-04-24 Thread Enrico Minack

Hi,

For an aggregating UDF, use spark.udf.registerJavaUDAF(name, className).

Enrico



Am 23.04.23 um 23:42 schrieb Thomas Wang:

Hi Spark Community,

I have implemented a custom Spark Aggregator (a subclass to 
|org.apache.spark.sql.expressions.Aggregator|). Now I'm trying to use 
it in a PySpark application, but for some reason, I'm not able to 
trigger the function. Here is what I'm doing, could someone help me 
take a look? Thanks.


spark = self._gen_spark_session()
spark.udf.registerJavaFunction(
name="MyAggrator",
javaClassName="my.package.MyAggrator",
returnType=ArrayType(elementType=LongType()),
)

The above code runs successfully. However, to call it, I assume I 
should do something like the following.


df = df.groupBy().agg(
functions.expr("MyAggrator(input)").alias("output"),
)

But this one gives me the following error:

pyspark.sql.utils.AnalysisException: UDF class my.package.MyAggrator doesn't 
implement any UDF interface

My question is how can I use the Spark Aggregator defined in a jar 
file in PySpark? Thanks.


Thomas




Re: How to explode array columns of a dataframe having the same length

2023-02-16 Thread Enrico Minack
You have to take each row and zip the lists, each element of the result 
becomes one new row.


So turn write a method that turns
  Row(List("A","B","null"), List("C","D","null"), List("E","null","null"))
into
  List(List("A","C","E"), List("B","D","null"), List("null","null","null"))
and use flatmap with that method.

In Scala, this would read:

df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1), 
row.getSeq[String](2)).zipped.toIterable }.show()


Enrico


Am 14.02.23 um 22:54 schrieb sam smith:

Hello guys,

I have the following dataframe:

*col1*



*col2*



*col3*

["A","B","null"]



["C","D","null"]



["E","null","null"]




I want to explode it to the following dataframe:

*col1*



*col2*



*col3*

"A"



"C"



"E"

"B"



"D"



"null"

"null"



"null"



"null"


How to do that (preferably in Java) using the explode() method ? 
knowing that something like the following won't yield correct output:


for (String colName: dataset.columns())
dataset=dataset.withColumn(colName,explode(dataset.col(colName)));




Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread Enrico Minack

@Sean: This aggregate function does work without an explicit groupBy():

./spark-3.3.1-bin-hadoop2/bin/spark-shell
Spark context Web UI available at http://*:4040
Spark context available as 'sc' (master = local[*], app id = 
local-1676237726079).

Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.1
  /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 
10, "one")).toDF("a", "b", "c")
scala> df.select(df.columns.map(column => 
collect_set(col(column)).as(column)): _*).show()

+++--+
|   a|   b| c|
+++--+
|[1, 2, 3, 4]|[20, 10]|[one, two]|
+++--+

@Sam: I haven't tested the Java code, sorry. I presume you can work it 
out from the working Scala code.


Enrico


Am 12.02.23 um 21:32 schrieb Sean Owen:
It doesn't work because it's an aggregate function. You have to 
groupBy() (group by nothing) to make that work, but, you can't assign 
that as a column. Folks those approaches don't make sense semantically 
in SQL or Spark or anything.
They just mean use threads to collect() distinct values for each col 
in parallel using threads in your program. You don't have to but you 
could.
What else are we looking for here, the answer has been given a number 
of times I think.



On Sun, Feb 12, 2023 at 2:28 PM sam smith  
wrote:


OK, what do you mean by " do your outer for loop in parallel "?
btw this didn't work:
for (String columnName : df.columns()) {
    df= df.withColumn(columnName,
collect_set(col(columnName)).as(columnName));
}


Le dim. 12 févr. 2023 à 20:36, Enrico Minack
 a écrit :

That is unfortunate, but 3.4.0 is around the corner, really!

Well, then based on your code, I'd suggest two improvements:
- cache your dataframe after reading, this way, you don't read
the entire file for each column
- do your outer for loop in parallel, then you have N parallel
Spark jobs (only helps if your Spark cluster is not fully
occupied by a single column)

Your withColumn-approach does not work because withColumn
expects a column as the second argument, but
df.select(columnName).distinct() is a DataFrame and .col is a
column in *that* DataFrame, it is not a column of the
dataframe that you call withColumn on.

It should read:

Scala:
df.select(df.columns.map(column =>
collect_set(col(column)).as(column)): _*).show()

Java:
for (String columnName : df.columns()) {
    df= df.withColumn(columnName,
collect_set(col(columnName)).as(columnName));
}

Then you have a single DataFrame that computes all columns in
a single Spark job.

But this reads all distinct values into a single partition,
which has the same downside as collect, so this is as bad as
using collect.

    Cheers,
Enrico


Am 12.02.23 um 18:05 schrieb sam smith:

@Enrico Minack <mailto:enrico-min...@gmx.de> Thanks for
"unpivot" but I am using version 3.3.0 (you are taking it way
too far as usual :) )
@Sean Owen <mailto:sro...@gmail.com> Pls then show me how it
can be improved by code.

Also, why such an approach (using withColumn() ) doesn't work:

for (String columnName : df.columns()) {
    df= df.withColumn(columnName,
    df.select(columnName).distinct().col(columnName));
}

Le sam. 11 févr. 2023 à 13:11, Enrico Minack
 a écrit :

You could do the entire thing in DataFrame world and
write the result to disk. All you need is unpivot (to be
released in Spark 3.4.0, soon).

Note this is Scala but should be straightforward to
translate into Java:

import org.apache.spark.sql.functions.collect_set

val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123),
(4, 10, 123)).toDF("a", "b", "c")

df.unpivot(Array.empty, "column", "value")
  .groupBy("column")
.agg(collect_set("value").as("distinct_values"))

The unpivot operation turns
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1| 10|123|
|  2| 20|124|
|  3| 20|123|
|  4| 10|123|
+---+---+---+

into

+--+

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread Enrico Minack

That is unfortunate, but 3.4.0 is around the corner, really!

Well, then based on your code, I'd suggest two improvements:
- cache your dataframe after reading, this way, you don't read the 
entire file for each column
- do your outer for loop in parallel, then you have N parallel Spark 
jobs (only helps if your Spark cluster is not fully occupied by a single 
column)


Your withColumn-approach does not work because withColumn expects a 
column as the second argument, but df.select(columnName).distinct() is a 
DataFrame and .col is a column in *that* DataFrame, it is not a column 
of the dataframe that you call withColumn on.


It should read:

Scala:
df.select(df.columns.map(column => collect_set(col(column)).as(column)): 
_*).show()


Java:
for (String columnName : df.columns()) {
    df= df.withColumn(columnName, 
collect_set(col(columnName)).as(columnName));

}

Then you have a single DataFrame that computes all columns in a single 
Spark job.


But this reads all distinct values into a single partition, which has 
the same downside as collect, so this is as bad as using collect.


Cheers,
Enrico


Am 12.02.23 um 18:05 schrieb sam smith:
@Enrico Minack <mailto:enrico-min...@gmx.de> Thanks for "unpivot" but 
I am using version 3.3.0 (you are taking it way too far as usual :) )
@Sean Owen <mailto:sro...@gmail.com> Pls then show me how it can be 
improved by code.


Also, why such an approach (using withColumn() ) doesn't work:

for (String columnName : df.columns()) {
    df= df.withColumn(columnName, 
df.select(columnName).distinct().col(columnName));

}

Le sam. 11 févr. 2023 à 13:11, Enrico Minack  
a écrit :


You could do the entire thing in DataFrame world and write the
result to disk. All you need is unpivot (to be released in Spark
3.4.0, soon).

Note this is Scala but should be straightforward to translate into
Java:

import org.apache.spark.sql.functions.collect_set

val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
123)).toDF("a", "b", "c")

df.unpivot(Array.empty, "column", "value")
  .groupBy("column")
  .agg(collect_set("value").as("distinct_values"))

The unpivot operation turns
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1| 10|123|
|  2| 20|124|
|  3| 20|123|
|  4| 10|123|
+---+---+---+

into

+--+-+
|column|value|
+--+-+
| a|    1|
| b|   10|
| c|  123|
| a|    2|
| b|   20|
| c|  124|
| a|    3|
| b|   20|
| c|  123|
| a|    4|
| b|   10|
| c|  123|
+--+-+

The
groupBy("column").agg(collect_set("value").as("distinct_values"))
collects distinct values per column:
+--+---+
|column|distinct_values|
+--+---+
| c| [123, 124]|
| b|   [20, 10]|
| a|   [1, 2, 3, 4]|
+--+---+

Note that unpivot only works if all columns have a "common" type.
Then all columns are cast to that common type. If you have
incompatible types like Integer and String, you would have to cast
them all to String first:

import org.apache.spark.sql.types.StringType

df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)

If you want to preserve the type of the values and have multiple
value types, you cannot put everything into a DataFrame with one
distinct_values column. You could still have multiple DataFrames,
one per data type, and write those, or collect the DataFrame's
values into Maps:

import scala.collection.immutable

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.collect_set

// if all you columns have the same type
def distinctValuesPerColumnOneType(df: DataFrame):
immutable.Map[String, immutable.Seq[Any]] = {
  df.unpivot(Array.empty, "column", "value")
    .groupBy("column")
    .agg(collect_set("value").as("distinct_values"))
    .collect()
    .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
    .toMap
}


// if your columns have different types
def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
immutable.Seq[Any]] = {
  df.schema.fields
    .groupBy(_.dataType)
    .mapValues(_.map(_.name))
    .par
    .map { case (dataType, columns) => df.select(columns.map(col):
_*) }
    .map(distinctValuesPerColumnOneType)
    .flatten
    .toList
    .toMap
}

val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4,
10, "one")).toDF("a", "b", "c")
distinctValuesPerColumn(df)

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-11 Thread Enrico Minack
You could do the entire thing in DataFrame world and write the result to 
disk. All you need is unpivot (to be released in Spark 3.4.0, soon).


Note this is Scala but should be straightforward to translate into Java:

import org.apache.spark.sql.functions.collect_set

val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10, 
123)).toDF("a", "b", "c")


df.unpivot(Array.empty, "column", "value")
  .groupBy("column")
  .agg(collect_set("value").as("distinct_values"))

The unpivot operation turns
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1| 10|123|
|  2| 20|124|
|  3| 20|123|
|  4| 10|123|
+---+---+---+

into

+--+-+
|column|value|
+--+-+
| a|    1|
| b|   10|
| c|  123|
| a|    2|
| b|   20|
| c|  124|
| a|    3|
| b|   20|
| c|  123|
| a|    4|
| b|   10|
| c|  123|
+--+-+

The groupBy("column").agg(collect_set("value").as("distinct_values")) 
collects distinct values per column:

+--+---+
|column|distinct_values|
+--+---+
| c| [123, 124]|
| b|   [20, 10]|
| a|   [1, 2, 3, 4]|
+--+---+

Note that unpivot only works if all columns have a "common" type. Then 
all columns are cast to that common type. If you have incompatible types 
like Integer and String, you would have to cast them all to String first:


import org.apache.spark.sql.types.StringType

df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)

If you want to preserve the type of the values and have multiple value 
types, you cannot put everything into a DataFrame with one 
distinct_values column. You could still have multiple DataFrames, one 
per data type, and write those, or collect the DataFrame's values into Maps:


import scala.collection.immutable

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.collect_set

// if all you columns have the same type
def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String, 
immutable.Seq[Any]] = {

  df.unpivot(Array.empty, "column", "value")
    .groupBy("column")
    .agg(collect_set("value").as("distinct_values"))
    .collect()
    .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
    .toMap
}


// if your columns have different types
def distinctValuesPerColumn(df: DataFrame): immutable.Map[String, 
immutable.Seq[Any]] = {

  df.schema.fields
    .groupBy(_.dataType)
    .mapValues(_.map(_.name))
    .par
    .map { case (dataType, columns) => df.select(columns.map(col): _*) }
    .map(distinctValuesPerColumnOneType)
    .flatten
    .toList
    .toMap
}

val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10, 
"one")).toDF("a", "b", "c")

distinctValuesPerColumn(df)

The result is: (list values are of original type)
Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))

Hope this helps,
Enrico


Am 10.02.23 um 22:56 schrieb sam smith:

Hi Apotolos,
Can you suggest a better approach while keeping values within a dataframe?

Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos 
 a écrit :


Dear Sam,

you are assuming that the data fits in the memory of your local
machine. You are using as a basis a dataframe, which potentially
can be very large, and then you are storing the data in local
lists. Keep in mind that that the number of distinct elements in a
column may be very large (depending on the app). I suggest to work
on a solution that assumes that the number of distinct values is
also large. Thus, you should keep your data in dataframes or RDDs,
and store them as csv files, parquet, etc.

a.p.


On 10/2/23 23:40, sam smith wrote:

I want to get the distinct values of each column in a List (is it
good practice to use List here?), that contains as first element
the column name, and the other element its distinct values so
that for a dataset we get a list of lists, i do it this way (in
my opinion no so fast):

|List> finalList = new ArrayList>();
Dataset df = spark.read().format("csv").option("header",
"true").load("/pathToCSV"); String[] columnNames = df.columns();
for (int i=0;i columnList
= new ArrayList(); columnList.add(columnNames[i]);
List columnValues =

df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
for (int j=0;j

-- 
Apostolos N. Papadopoulos, Associate Professor

Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email:papad...@csd.auth.gr
twitter: @papadopoulos_ap
web:http://datalab.csd.auth.gr/~apostol



SQL GROUP BY alias with dots, was: Spark SQL question

2023-02-07 Thread Enrico Minack

Hi,

you are right, that is an interesting question.

Looks like GROUP BY is doing something funny / magic here (spark-shell 
3.3.1 and 3.5.0-SNAPSHOT):


With an alias, it behaves as you have pointed out:

spark.range(3).createTempView("ids_without_dots")
spark.sql("SELECT * FROM ids_without_dots").show()

// works
spark.sql("SELECT id AS `an.id` FROM ids_without_dots GROUP BY 
an.id").show()

// fails
spark.sql("SELECT id AS `an.id` FROM ids_without_dots GROUP BY 
`an.id`").show()



Without an alias, it behaves as expected, which is the opposite of above 
(a column with a dot exists, no alias used in SELECT):


spark.range(3).select($"id".as("an.id")).createTempView("ids_with_dots")
spark.sql("SELECT `an.id` FROM ids_with_dots").show()

// works
spark.sql("SELECT `an.id` FROM ids_with_dots GROUP BY `an.id`").show()
// fails
spark.sql("SELECT `an.id` FROM ids_with_dots GROUP BY an.id").show()


With a struct column, it also behaves as expected:

spark.range(3).select(struct($"id").as("an")).createTempView("ids_with_struct")
spark.sql("SELECT an.id FROM ids_with_struct").show()

// works
spark.sql("SELECT an.id FROM ids_with_struct GROUP BY an.id").show()
// fails
spark.sql("SELECT `an.id` FROM ids_with_struct GROUP BY an.id").show()
spark.sql("SELECT an.id FROM ids_with_struct GROUP BY `an.id`").show()
spark.sql("SELECT `an.id` FROM ids_with_struct GROUP BY `an.id`").show()


This does not feel very consistent.

Enrico



Am 28.01.23 um 00:34 schrieb Kohki Nishio:

this SQL works

select 1 as *`data.group`* from tbl group by *data.group*


Since there's no such field as *data,* I thought the SQL has to look 
like this


select 1 as *`data.group`* from tbl group by `*data.group`*


 But that gives and error (cannot resolve '`data.group`') ... I'm no 
expert in SQL, but feel like it's a strange behavior... does anybody 
have a good explanation for it ?


Thanks

--
Kohki Nishio




Re: The Dataset unit test is much slower than the RDD unit test (in Scala)

2022-11-01 Thread Enrico Minack

Hi Tanin,

running your test with option "spark.sql.planChangeLog.level" set to 
"info" or "warn" (depending on your Spark log level) will show you 
insights into the planning (which rules are applied, how long rules 
take, how many iterations are done).


Hoping this helps,
Enrico


Am 25.10.22 um 21:54 schrieb Tanin Na Nakorn:

Hi All,

Our data job is very complex (e.g. 100+ joins), and we have switched 
from RDD to Dataset recently.


We've found that the unit test takes much longer. We profiled it and 
have found that it's the planning phase that is slow, not execution.


I wonder if anyone has encountered this issue before and if there's a 
way to make the planning phase faster (e.g. maybe disabling certain 
optimizers).


Any thoughts or input would be appreciated.

Thank you,
Tanin




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Reading too many files

2022-10-05 Thread Enrico Minack

Hi,

Spark is fine with that many Parquet files in general:

# generate 100,000 small Parquet files
spark.range(0, 100, 1, 10).write.parquet("too-many-files.parquet")

# read 100,000 Parquet files
val df = spark.read.parquet("too-many-files.parquet")
df.show()
df.count()

Reading the files takes a few seconds, so there is no problem with the 
number of files.


What exactly do you mean with "But after spark.read.parquet , it is not 
able to proceed further."?


Does that mean that executing the line
  val df = spark.read.parquet("too-many-files.parquet")
takes forever?

How long do individual tasks take? How many tasks are there for this line?
Where are the Parquet files stored? Where does the Spark job run?

Enrico



Am 03.10.22 um 18:22 schrieb Sachit Murarka:

Hello,

I am reading too many files in Spark 3.2(Parquet) . It is not giving 
any error in the logs. But after spark.read.parquet , it is not able 
to proceed further.
Can anyone please suggest if there is any property to improve the 
parallel reads? I am reading more than 25000 files .


Kind Regards,
Sachit Murarka




Re: [Spark Internals]: Is sort order preserved after partitioned write?

2022-09-17 Thread Enrico Minack
936649|  
1168275843761|
| 10|136|1168246605999|  
1168231104520|
|1215331|136|1168236539239|  
1168275843762|
| 10|136|1168289197499|  
1168231104521|
|1215331|136|1168337136110|  
1168275843763|
+---+---+-+---+

3
+---+---+-+---+
|sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
+---+---+-+---+
| 10|581|4990751997952|  
4990751997952|
|1207875|581|4990829438530|  
4990796737194|
| 10|581|4990797772249|  
4990751997953|
|1207875|581|4990789773711|  
4990796737195|
| 10|581|4990754836237|  
4990751997954|
|1207875|581|4990792883763|  
4990796737196|
| 10|581|4990799663372|  
4990751997955|
|1207875|581|4990795135016|  
4990796737197|
| 10|581|499075488|  
4990751997956|
|1207875|581|4990796258628|  
4990796737198|
| 10|581|4990801912980|  
4990751997957|
|1207876|581|4990798880125|  
4990796737199|
| 10|581|4990755328908|  
4990751997958|
|1207876|581|4990753105828|  
4990796737200|
| 10|581|4990804520539|  
4990751997959|
|1207876|581|4990800771248|  
4990796737201|
| 10|581|4990756046653|  
4990751997960|
|1207876|581|4990757154529|  
4990796737202|
| 10|581|4990806212169|  
4990751997961|
|1207876|581|4990803020856|  
4990796737203|
+---+---+-+---+


Thanks,
Swetha


On Fri, Sep 16, 2022 at 1:45 AM Enrico Minack  
wrote:


Yes, you can expect each partition file to be sorted by "col1" and
"col2".

However, values for "col1" will be "randomly" allocated to
partition files, but all rows with the same value for "col1" will
reside in the same one partition file.

What kind of unexpected sort order do you observe?

Enrico



Am 16.09.22 um 05:42 schrieb Swetha Baskaran:

Hi!

We expected the order of sorted partitions to be preserved after
a dataframe write. We use the following code to write out one
file per partition, with the rows sorted by a column.

/df
    .repartition($"col1")
    .sortWithinPartitions("col1", "col2")
    .write
    .partitionBy("col1")
    .csv(path)/

However we observe unexpected sort order in some files. Does
spark guarantee sort order within partitions on write?


Thanks,
swebask





Re: Splittable or not?

2022-09-17 Thread Enrico Minack
If with "won't affect the performance" you mean "parquet is splittable 
though it uses snappy", then yes. Splittable files allow for optimal 
parallelization, which "won't affect performance".


Spark writing data will split the data into multiple files already (here 
parquet files). Even if each file would not be splittable, your data 
have been split already. Splittable parquet files allow for more 
granularity (more splitting if your data), in case those files are big.


Enrico


Am 14.09.22 um 21:57 schrieb Sid:
Okay so you mean to say that parquet compresses the denormalized data 
using snappy so it won't affect the performance.


Only using snappy will affect the performance

Am I correct?

On Thu, 15 Sep 2022, 01:08 Amit Joshi,  wrote:

Hi Sid,

Snappy itself is not splittable. But the format that contains the
actual data like parquet (which are basically divided into row
groups) can be compressed using snappy.
This works because blocks(pages of parquet format) inside the
parquet can be independently compressed using snappy.

Thanks
Amit

On Wed, Sep 14, 2022 at 8:14 PM Sid  wrote:

Hello experts,

I know that Gzip and snappy files are not splittable i.e data
won't be distributed into multiple blocks rather it would try
to load the data in a single partition/block

So, my question is when I write the parquet data via spark it
gets stored at the destination with something like
/part*.snappy.parquet/
/
/
So, when I read this data will it affect my performance?

Please help me if there is any understanding gap.

Thanks,
Sid



Re: [Spark Internals]: Is sort order preserved after partitioned write?

2022-09-15 Thread Enrico Minack

Yes, you can expect each partition file to be sorted by "col1" and "col2".

However, values for "col1" will be "randomly" allocated to partition 
files, but all rows with the same value for "col1" will reside in the 
same one partition file.


What kind of unexpected sort order do you observe?

Enrico



Am 16.09.22 um 05:42 schrieb Swetha Baskaran:

Hi!

We expected the order of sorted partitions to be preserved after a 
dataframe write. We use the following code to write out one file per 
partition, with the rows sorted by a column.


/df
    .repartition($"col1")
    .sortWithinPartitions("col1", "col2")
    .write
    .partitionBy("col1")
    .csv(path)/

However we observe unexpected sort order in some files. Does spark 
guarantee sort order within partitions on write?



Thanks,
swebask




Re: reading each JSON file from dataframe...

2022-07-12 Thread Enrico Minack

Hi,

how does RDD's mapPartitions make a difference regarding 1. and 2. 
compared to Dataset's mapPartitions / map function?


Enrico


Am 12.07.22 um 22:13 schrieb Muthu Jayakumar:

Hello Enrico,

Thanks for the reply. I found that I would have to use `mapPartitions` 
API of RDD to perform this safely as I have to

1. Read each file from GCS using HDFS FileSystem API.
2. Parse each JSON record in a safe manner.

For (1) to work, I do have to broadcast HadoopConfiguration from 
sparkContext. I did try to use GCS Java API to read content, but ran 
into many JAR conflicts as the HDFS wrapper and the JAR library uses 
different dependencies.

Hope this findings helps others as well.

Thanks,
Muthu


On Mon, 11 Jul 2022 at 14:11, Enrico Minack  
wrote:


All you need to do is implement a method readJson that reads a
single file given its path. Than, you map the values of column
file_path to the respective JSON content as a string. This can be
done via an UDF or simply Dataset.map:

case class RowWithJsonUri(entity_id: String, file_path: String,
other_useful_id: String)
case class RowWithJsonContent(entity_id: String, json_content:
String, other_useful_id: String)

val ds = Seq(
  RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
"gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
"id-2-01g4he5cb4xqn6s1999k6y1vbd"),
  RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
"gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
"id-2-01g4he5cbh52che104rwy603sr"),
  RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
"gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
"id-2-01g4he5cbqmdv7dnx46sebs0gt"),
  RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
"gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
"id-2-01g4he5cbx1kwhgvdme1s560dw")
).toDS()

ds.show(false)

+-+---+---+
|entity_id |file_path |other_useful_id    |

+-+---+---+

|id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|

|id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|

|id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|

|id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|

+-+---+---+


def readJson(uri: String): String = { s"content of $uri" }

ds.map { row => RowWithJsonContent(row.entity_id,
readJson(row.file_path), row.other_useful_id) }.show(false)

+-+--+---+
|entity_id |json_content |other_useful_id    |

+-+--+---+
|id-01f7pqqbxddb3b1an6ntyqx6mg|content of

gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
|id-01f7pqgbwms4ajmdtdedtwa3mf|content of

gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
|id-01f7pqqbxejt3ef4ap9qcs78m5|content of

gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
|id-01f7pqqbynh895ptpjjfxvk6dc|content of

gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|

+-+--+---+

Cheers,
Enrico




Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:

Hello there,

I have a dataframe with the following...


+-+---+---+
|entity_id                    |file_path  |other_useful_id      
         |

+-+---+---+

|id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-

Re: reading each JSON file from dataframe...

2022-07-11 Thread Enrico Minack
All you need to do is implement a method readJson that reads a single 
file given its path. Than, you map the values of column file_path to the 
respective JSON content as a string. This can be done via an UDF or 
simply Dataset.map:


case class RowWithJsonUri(entity_id: String, file_path: String, 
other_useful_id: String)
case class RowWithJsonContent(entity_id: String, json_content: String, 
other_useful_id: String)


val ds = Seq(
  RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg", 
"gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json", 
"id-2-01g4he5cb4xqn6s1999k6y1vbd"),
  RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf", 
"gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json", 
"id-2-01g4he5cbh52che104rwy603sr"),
  RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5", 
"gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json", 
"id-2-01g4he5cbqmdv7dnx46sebs0gt"),
  RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc", 
"gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json", 
"id-2-01g4he5cbx1kwhgvdme1s560dw")

).toDS()

ds.show(false)
+-+---+---+
|entity_id |file_path |other_useful_id    |
+-+---+---+
|id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
|id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
|id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
|id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
+-+---+---+


def readJson(uri: String): String = { s"content of $uri" }

ds.map { row => RowWithJsonContent(row.entity_id, 
readJson(row.file_path), row.other_useful_id) }.show(false)

+-+--+---+
|entity_id |json_content |other_useful_id    |
+-+--+---+
|id-01f7pqqbxddb3b1an6ntyqx6mg|content of 
gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
|id-01f7pqgbwms4ajmdtdedtwa3mf|content of 
gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
|id-01f7pqqbxejt3ef4ap9qcs78m5|content of 
gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
|id-01f7pqqbynh895ptpjjfxvk6dc|content of 
gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|

+-+--+---+

Cheers,
Enrico




Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:

Hello there,

I have a dataframe with the following...

+-+---+---+
|entity_id                    |file_path                               
       |other_useful_id          |

+-+---+---+
|id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
|id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
|id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
|id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
+-+---+---+

I would like to read each row from `file_path` and write the result to 
another dataframe containing `entity_id`, `other_useful_id`, 
`json_content`, `file_path`.
Assume that I already have the required HDFS url libraries in my 
classpath.


Please advice,
Muthu




Re: Will it lead to OOM error?

2022-06-22 Thread Enrico Minack
Yes, a single file compressed with a non-splitable compression (e.g. 
gzip) would have to be read by a single executor. That takes forever.


You should consider to recompress the file with a splitable compression 
first. You will not want to read that file more than once, so you should 
uncompress it only once (in order to recompress).


Enrico


Am 22.06.22 um 20:17 schrieb Sid:

Hi Enrico,

Thanks for the insights.

Could you please help me to understand with one example of compressed 
files where the file wouldn't be split in partitions and will put load 
on a single partition and might lead to OOM error?


Thanks,
Sid

On Wed, Jun 22, 2022 at 6:40 PM Enrico Minack  
wrote:


The RAM and disk memory consumtion depends on what you do with the
data after reading them.

Your particular action will read 20 lines from the first partition
and show them. So it will not use any RAM or disk, no matter how
large the CSV is.

If you do a count instead of show, it will iterate over the each
partition and return a count per partition, so no RAM here needed
as well.

If you do some real processing of the data, the requirement RAM
and disk again depends on involved shuffles and intermediate
results that need to be store in RAM or on disk.

Enrico


Am 22.06.22 um 14:54 schrieb Deepak Sharma:

It will spill to disk if everything can’t be loaded in memory .


On Wed, 22 Jun 2022 at 5:58 PM, Sid  wrote:

I have a 150TB CSV file.

I have a total of 100 TB RAM and 100TB disk. So If I do
something like this

spark.read.option("header","true").csv(filepath).show(false)

Will it lead to an OOM error since it doesn't have enough
memory? or it will spill data onto the disk and process it?

Thanks,
Sid

-- 
Thanks

Deepak
www.bigdatabig.com <http://www.bigdatabig.com>
www.keosha.net <http://www.keosha.net>





Re: Will it lead to OOM error?

2022-06-22 Thread Enrico Minack
The RAM and disk memory consumtion depends on what you do with the data 
after reading them.


Your particular action will read 20 lines from the first partition and 
show them. So it will not use any RAM or disk, no matter how large the 
CSV is.


If you do a count instead of show, it will iterate over the each 
partition and return a count per partition, so no RAM here needed as well.


If you do some real processing of the data, the requirement RAM and disk 
again depends on involved shuffles and intermediate results that need to 
be store in RAM or on disk.


Enrico


Am 22.06.22 um 14:54 schrieb Deepak Sharma:

It will spill to disk if everything can’t be loaded in memory .


On Wed, 22 Jun 2022 at 5:58 PM, Sid  wrote:

I have a 150TB CSV file.

I have a total of 100 TB RAM and 100TB disk. So If I do something
like this

spark.read.option("header","true").csv(filepath).show(false)

Will it lead to an OOM error since it doesn't have enough memory?
or it will spill data onto the disk and process it?

Thanks,
Sid

--
Thanks
Deepak
www.bigdatabig.com 
www.keosha.net 




Re: input file size

2022-06-19 Thread Enrico Minack

Maybe a

  .as[String].mapPartitions(it => if (it.hasNext) Iterator(it.next) else 
Iterator.empty)

might be faster than the

  .distinct.as[String]


Enrico


Am 19.06.22 um 08:59 schrieb Enrico Minack:
Given you already know your input files (input_file_name), why not 
getting their size and summing this up?


|import java.io.File ||import java.net.URI|
|import|  org.apache.spark.sql.functions.input_file_name

|ds.select(input_file_name.as("filename")) .distinct.as[String] 
.map(filename => new File(new URI(filename).getPath).length) 
.select(sum($"value")) .show()|

||

Enrico


Am 19.06.22 um 03:16 schrieb Yong Walt:
|import java.io.File val someFile = new File("somefile.txt") val 
fileSize = someFile.length|

This one?

On Sun, Jun 19, 2022 at 4:33 AM mbreuer  wrote:

Hello Community,

I am working on optimizations for file sizes and number of files.
In the
data frame there is a function input_file_name which returns the
file
name. I miss a counterpart to get the size of the file. Just the
size,
like "ls -l" returns. Is there something like that?

Kind regards,
Markus


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





Re: input file size

2022-06-19 Thread Enrico Minack
Given you already know your input files (input_file_name), why not 
getting their size and summing this up?


|import java.io.File ||import java.net.URI|
|import|  org.apache.spark.sql.functions.input_file_name

|ds.select(input_file_name.as("filename")) .distinct.as[String] 
.map(filename => new File(new URI(filename).getPath).length) 
.select(sum($"value")) .show()|

||


Enrico


Am 19.06.22 um 03:16 schrieb Yong Walt:
|import java.io.File val someFile = new File("somefile.txt") val 
fileSize = someFile.length|

This one?

On Sun, Jun 19, 2022 at 4:33 AM mbreuer  wrote:

Hello Community,

I am working on optimizations for file sizes and number of files.
In the
data frame there is a function input_file_name which returns the file
name. I miss a counterpart to get the size of the file. Just the
size,
like "ls -l" returns. Is there something like that?

Kind regards,
Markus


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: API Problem

2022-06-13 Thread Enrico Minack
           response = call_to_cust_bulk_api(policyUrl, 
custRequestBody)

                print(response)
                finalDFStatus = finalDF.withColumn("edl_timestamp", 
to_timestamp(lit(F.TimeNow(.withColumn(

                    "status_for_each_batch",
                    lit(str(response)))


                print("Max Value:::")
                print(maxValue)
                print("Next I:::")
                i = rangeNum + 1
                print(i)

This is my very first approach to hitting the APIs with Spark. So, 
could you please help me to redesign the approach, or can share some 
links or references using which I can go to the depth of this and 
rectify myself. How can I scale this?



Any help is much appreciated. Thank you so much again for your time.

TIA,
Sid




On Fri, Jun 10, 2022 at 8:54 PM Enrico Minack  
wrote:


Hi,

This adds a column with value "1" (string) *in all rows*:

|df = df.withColumn("uniqueID", lit("1")) |

||This counts the rows for all rows that have the same |uniqueID|,
*which are all rows*. The window does not make much sense.
And it orders all rows that have the same |uniqueID |by
|uniqueID|. Does not make much sense either.
|df = df.withColumn("row_num", row_number().over(
Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID")) )) |

Then it looks like it takes the first 4000 rows (row_num from 1 to
4000) and tries to send them via HTTP POST. Then it moves the
range by one and sends row 2 to 4001 (mostly overlapped with the
first POST).

It is not clear if the "Data" field is meant to be all rows or
only a single row. Either way, this is not what happens. Please
consider the difference between a Column and a DataFrame in Spark.
This is very different from Pandas.

I think you have to rethink your approach. Using Spark means big
data. This approach is iterative and single-threaded.

Enrico


Am 10.06.22 um 16:01 schrieb Sid:

Hi Enrico,

Thanks for your time. Much appreciated.

I am expecting the payload to be as a JSON string to be a record
like below:

{"A":"some_value","B":"some_value"}

Where A and B are the columns in my dataset.


On Fri, Jun 10, 2022 at 6:09 PM Enrico Minack
 wrote:

    Sid,

just recognized you are using Python API here. Then
||struct(*colsListToBePassed))|| should be correct, given it
takes a list of strings.

Your method |call_to_cust_bulk_api| takes argument |payload|,
which is a ||Column||. This is then used in
|custRequestBody|. That is pretty strange use of a column
expression. What do you expect |print(payload)| to be?

I recommend to split that complex command into multiple
commands to find out what "an error of column not iterable"
refers to.

Enrico


Am 10.06.22 um 13:39 schrieb Enrico Minack:

Hi Sid,

||finalDF =
finalDF.repartition(finalDF.rdd.getNumPartitions())
.withColumn("status_for_batch",
call_to_cust_bulk_api(policyUrl,
to_json(struct(*colsListToBePassed | |
You are calling ||withColumn|| with the result of
||call_to_cust_bulk_api|| as the second argument. That
result looks like it is of type string. But ||withColumn||
expects type ||Column||. You can turn that string into a
||Column|| using ||lit||:

||finalDF =
finalDF.repartition(finalDF.rdd.getNumPartitions())
.withColumn("status_for_batch",
lit(call_to_cust_bulk_api(policyUrl,
to_json(struct(*colsListToBePassed) ||

You are saying that gives you an error of column not
iterable. I reckon the ||struct(*colsListToBePassed))|| is
wrong.

Method ||struct|| requires a single string followed by a
list of strings. Given your ||colsListToBePassed|| is a list
of strings, this does not work. Try:

||  struct(||colsListToBePassed.head,
||colsListToBePassed.tail|: _*|))||

Alternatively, ||struct|| requires a list of ||Column||, so
try this:

||  struct(||colsListToBePassed.map(col)|||: _*|))||

The API is pretty clear about the types it expects.


If you are still having errors, you better please paste the
code and error.

Enrico



Am 09.06.22 um 21:31 schrieb Sid:

Hi Experts,

I am facing one problem while passing a column to the
method.  The problem is described in detail here:


https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark

TIA,
Sid









Re: API Problem

2022-06-10 Thread Enrico Minack

Hi,

This adds a column with value "1" (string) *in all rows*:

|df = df.withColumn("uniqueID", lit("1")) |

||This counts the rows for all rows that have the same |uniqueID|, 
*which are all rows*. The window does not make much sense.
And it orders all rows that have the same |uniqueID |by |uniqueID|. Does 
not make much sense either.
|df = df.withColumn("row_num", row_number().over( 
Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID")) )) |


Then it looks like it takes the first 4000 rows (row_num from 1 to 4000) 
and tries to send them via HTTP POST. Then it moves the range by one and 
sends row 2 to 4001 (mostly overlapped with the first POST).


It is not clear if the "Data" field is meant to be all rows or only a 
single row. Either way, this is not what happens. Please consider the 
difference between a Column and a DataFrame in Spark. This is very 
different from Pandas.


I think you have to rethink your approach. Using Spark means big data. 
This approach is iterative and single-threaded.


Enrico


Am 10.06.22 um 16:01 schrieb Sid:

Hi Enrico,

Thanks for your time. Much appreciated.

I am expecting the payload to be as a JSON string to be a record like 
below:


{"A":"some_value","B":"some_value"}

Where A and B are the columns in my dataset.


On Fri, Jun 10, 2022 at 6:09 PM Enrico Minack  
wrote:


Sid,

just recognized you are using Python API here. Then
||struct(*colsListToBePassed))|| should be correct, given it takes
a list of strings.

Your method |call_to_cust_bulk_api| takes argument |payload|,
which is a ||Column||. This is then used in |custRequestBody|.
That is pretty strange use of a column expression. What do you
expect |print(payload)| to be?

I recommend to split that complex command into multiple commands
    to find out what "an error of column not iterable" refers to.

Enrico


Am 10.06.22 um 13:39 schrieb Enrico Minack:

Hi Sid,

||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions())
.withColumn("status_for_batch", call_to_cust_bulk_api(policyUrl,
to_json(struct(*colsListToBePassed | |
You are calling ||withColumn|| with the result of
||call_to_cust_bulk_api|| as the second argument. That result
looks like it is of type string. But ||withColumn|| expects type
||Column||. You can turn that string into a ||Column|| using ||lit||:

||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions())
.withColumn("status_for_batch",
lit(call_to_cust_bulk_api(policyUrl,
to_json(struct(*colsListToBePassed) ||

You are saying that gives you an error of column not iterable. I
reckon the ||struct(*colsListToBePassed))|| is wrong.

Method ||struct|| requires a single string followed by a list of
strings. Given your ||colsListToBePassed|| is a list of strings,
this does not work. Try:

||  struct(||colsListToBePassed.head,
||colsListToBePassed.tail|: _*|))||

Alternatively, ||struct|| requires a list of ||Column||, so try this:

||  struct(||colsListToBePassed.map(col)|||: _*|))||

The API is pretty clear about the types it expects.


If you are still having errors, you better please paste the code
and error.

Enrico



Am 09.06.22 um 21:31 schrieb Sid:

Hi Experts,

I am facing one problem while passing a column to the method. 
The problem is described in detail here:


https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark

TIA,
Sid







Re: API Problem

2022-06-10 Thread Enrico Minack

Sid,

just recognized you are using Python API here. Then 
||struct(*colsListToBePassed))|| should be correct, given it takes a 
list of strings.


Your method |call_to_cust_bulk_api| takes argument |payload|, which is a 
||Column||. This is then used in |custRequestBody|. That is pretty 
strange use of a column expression. What do you expect |print(payload)| 
to be?


I recommend to split that complex command into multiple commands to find 
out what "an error of column not iterable" refers to.


Enrico


Am 10.06.22 um 13:39 schrieb Enrico Minack:

Hi Sid,

||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions()) 
.withColumn("status_for_batch", call_to_cust_bulk_api(policyUrl, 
to_json(struct(*colsListToBePassed | |
You are calling ||withColumn|| with the result of 
||call_to_cust_bulk_api|| as the second argument. That result looks 
like it is of type string. But ||withColumn|| expects type ||Column||. 
You can turn that string into a ||Column|| using ||lit||:


||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions()) 
.withColumn("status_for_batch", lit(call_to_cust_bulk_api(policyUrl, 
to_json(struct(*colsListToBePassed) ||


You are saying that gives you an error of column not iterable. I 
reckon the ||struct(*colsListToBePassed))|| is wrong.


Method ||struct|| requires a single string followed by a list of 
strings. Given your ||colsListToBePassed|| is a list of strings, this 
does not work. Try:


||  struct(||colsListToBePassed.head, 
||colsListToBePassed.tail|: _*|))||


Alternatively, ||struct|| requires a list of ||Column||, so try this:

||  struct(||colsListToBePassed.map(col)|||: _*|))||

The API is pretty clear about the types it expects.


If you are still having errors, you better please paste the code and 
error.


Enrico



Am 09.06.22 um 21:31 schrieb Sid:

Hi Experts,

I am facing one problem while passing a column to the method.  The 
problem is described in detail here:


https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark

TIA,
Sid





Re: API Problem

2022-06-10 Thread Enrico Minack

Hi Sid,

||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions()) 
.withColumn("status_for_batch", call_to_cust_bulk_api(policyUrl, 
to_json(struct(*colsListToBePassed | |


You are calling ||withColumn|| with the result of 
||call_to_cust_bulk_api|| as the second argument. That result looks like 
it is of type string. But ||withColumn|| expects type ||Column||. You 
can turn that string into a ||Column|| using ||lit||:


||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions()) 
.withColumn("status_for_batch", lit(call_to_cust_bulk_api(policyUrl, 
to_json(struct(*colsListToBePassed) ||



You are saying that gives you an error of column not iterable. I reckon 
the ||struct(*colsListToBePassed))|| is wrong.


Method ||struct|| requires a single string followed by a list of 
strings. Given your ||colsListToBePassed|| is a list of strings, this 
does not work. Try:


||  struct(||colsListToBePassed.head, 
||colsListToBePassed.tail|: _*|))||


Alternatively, ||struct|| requires a list of ||Column||, so try this:

||  struct(||colsListToBePassed.map(col)|||: _*|))||

The API is pretty clear about the types it expects.


If you are still having errors, you better please paste the code and error.

Enrico



Am 09.06.22 um 21:31 schrieb Sid:

Hi Experts,

I am facing one problem while passing a column to the method.  The 
problem is described in detail here:


https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark

TIA,
Sid




Re: partitionBy creating lot of small files

2022-06-04 Thread Enrico Minack
You refer to df.write.partitionBy, which creates for each value of "col" 
a directory, and in worst-case writes one file per DataFrame partition. 
So the number of output files is controlled by cardinality of "col", 
which is your data and hence out of control, and the number of 
partitions of your DataFrame.


The only way to change the number of DataFrame partitions without 
repartition / shuffle all data is to use coalesce (as you already 
mentioned in an earlier post).


Repartition the DataFrame with the same column that you partitionBy will 
output a single file per col1-partition:


|ds.repartition(100, $"col1") .write .partitionBy("col1") 
.parquet("data.parquet")|


Large col1-values with much data will have a large file and col1-values 
with few data will have a small file.


If even-sized files is of great value for you, repartition / shuffle or 
even range partition might pay off:


|ds.repartitionByRange(100, $"col1", $"col2") .write 
.partitionBy("col1") .parquet("data.parquet")|


This will give you equal-size files (given (col1, col2) has even 
distribution) with many files for large col1-partitions and few files 
for small col1-partitions.


You can even emulate some kind of bucketing with:

|ds|||.withColumn("month", month($"timestamp")) |.withColumn("year", 
year($"timestamp")) .repartitionByRange(100, $"year", $"month", $"id", 
$"time") .write .partitionBy("year", "month") .parquet("data.parquet")|


Files have similar size while large months have more files than small 
months.


https://github.com/G-Research/spark-extension/blob/master/PARTITIONING.md

Enrico


Am 04.06.22 um 18:44 schrieb Nikhil Goyal:

Hi all,

Is there a way to use dataframe.partitionBy("col") and control the 
number of output files without doing a full repartition? The thing is 
some partitions have more data while some have less. Doing a 
.repartition is a costly operation. We want to control the size of the 
output files. Is it even possible?


Thanks




Re: How to convert a Dataset to a Dataset?

2022-06-04 Thread Enrico Minack
You could use .option("nullValue", "+") to tell the parser that '+' 
refers to "no value":


spark.read
.option("inferSchema", "true")
.option("header", "true")
.option("nullvalue", "+")
 .csv("path")

Enrico


Am 04.06.22 um 18:54 schrieb marc nicole:


c1



c2



c3



c4



c5



c6

1.2



true



A



Z



120



+

1.3



false



B



X



130



F

+



true



C



Y



200



G

in the above table c1 has double values except on the last row so:

Dataset dataset = 
spark.read().format("csv")..option("inferSchema","true").option("header","true").load("path");

will yield StringType as a type for column c1 similarly for c6
I want to return the true type of each column by first discarding the "+"
I use Dataset after filtering the rows (removing "+") because 
i can re-read the new dataset using .csv() method.

Any better idea to do that ?

Le sam. 4 juin 2022 à 18:40, Enrico Minack  a 
écrit :


Can you provide an example string (row) and the expected inferred
schema?

Enrico


Am 04.06.22 um 18:36 schrieb marc nicole:

How to do just that? i thought we only can inferSchema when we
first read the dataset, or am i wrong?

Le sam. 4 juin 2022 à 18:10, Sean Owen  a écrit :

It sounds like you want to interpret the input as strings, do
some processing, then infer the schema. That has nothing to
do with construing the entire row as a string like
"Row[foo=bar, baz=1]"

On Sat, Jun 4, 2022 at 10:32 AM marc nicole
 wrote:

Hi Sean,

Thanks, actually I have a dataset where I want to
inferSchema after discarding the specific String value of
"+". I do this because the column would be considered
StringType while if i remove that "+" value it will be
considered DoubleType for example or something else.
Basically I want to remove "+" from all dataset rows and
then inferschema.
Here my idea is to filter the rows not equal to "+" for
the target columns (potentially all of them) and then use
spark.read().csv() to read the new filtered dataset with
the option inferSchema which would then yield correct
column types.
What do you think?

Le sam. 4 juin 2022 à 15:56, Sean Owen 
a écrit :

I don't think you want to do that. You get a string
representation of structured data without the
structure, at best. This is part of the reason it
doesn't work directly this way.
You can use a UDF to call .toString on the Row of
course, but, again what are you really trying to do?

On Sat, Jun 4, 2022 at 7:35 AM marc nicole
 wrote:

Hi,
How to convert a Dataset to a Dataset?
What i have tried is:

List list = dataset.as
<http://dataset.as>(Encoders.STRING()).collectAsList();
Dataset datasetSt =
spark.createDataset(list, Encoders.STRING()); //
But this line raises
a org.apache.spark.sql.AnalysisException: Try to
map struct... to Tuple1, but failed as the number
of fields does not line up

Type of columns being String
How to solve this?





Re: How to convert a Dataset to a Dataset?

2022-06-04 Thread Enrico Minack

Can you provide an example string (row) and the expected inferred schema?

Enrico


Am 04.06.22 um 18:36 schrieb marc nicole:
How to do just that? i thought we only can inferSchema when we first 
read the dataset, or am i wrong?


Le sam. 4 juin 2022 à 18:10, Sean Owen  a écrit :

It sounds like you want to interpret the input as strings, do some
processing, then infer the schema. That has nothing to do with
construing the entire row as a string like "Row[foo=bar, baz=1]"

On Sat, Jun 4, 2022 at 10:32 AM marc nicole 
wrote:

Hi Sean,

Thanks, actually I have a dataset where I want to inferSchema
after discarding the specific String value of "+". I do this
because the column would be considered StringType while if i
remove that "+" value it will be considered DoubleType for
example or something else. Basically I want to remove "+" from
all dataset rows and then inferschema.
Here my idea is to filter the rows not equal to "+" for the
target columns (potentially all of them) and then use
spark.read().csv() to read the new filtered dataset with the
option inferSchema which would then yield correct column types.
What do you think?

Le sam. 4 juin 2022 à 15:56, Sean Owen  a
écrit :

I don't think you want to do that. You get a string
representation of structured data without the structure,
at best. This is part of the reason it doesn't work
directly this way.
You can use a UDF to call .toString on the Row of course,
but, again what are you really trying to do?

On Sat, Jun 4, 2022 at 7:35 AM marc nicole
 wrote:

Hi,
How to convert a Dataset to a Dataset?
What i have tried is:

List list = dataset.as
(Encoders.STRING()).collectAsList();
Dataset datasetSt = spark.createDataset(list,
Encoders.STRING()); // But this line raises
a org.apache.spark.sql.AnalysisException: Try to map
struct... to Tuple1, but failed as the number of
fields does not line up

Type of columns being String
How to solve this?



Re: PartitionBy and SortWithinPartitions

2022-06-03 Thread Enrico Minack

Nikhil,

What are you trying to achieve with this in the first place? What are 
your goals? What is the problem with your approach?


Are you concerned about the 1000 files in each written col2-partition?

The write.partitionBy is something different that df.repartition or 
df.coalesce.


The df partitions are sorted *before* partitionBy-writing them.

Enrico


Am 03.06.22 um 16:13 schrieb Nikhil Goyal:

Hi folks,

We are trying to do
`df.coalesce(1000).sortWithinPartitions("col1").write.mode('overwrite').partitionBy("col2").parquet(...)`

I do see that coalesce 1000 is applied for every sub partition. But I 
wanted to know if sortWithinPartitions(col1) works after applying 
partitionBy or before? Basically would spark first partitionBy col2 
and then sort by col1 or sort first and then partition?


Thanks
Nikhil




Re: Writing Custom Spark Readers and Writers

2022-04-06 Thread Enrico Minack

Another project implementing DataSource V2 in Scala with Python wrapper:

https://github.com/G-Research/spark-dgraph-connector

Cheers,
Enrico


Am 06.04.22 um 12:01 schrieb Cheng Pan:

There are some projects based on Spark DataSource V2 that I hope will help you.

https://github.com/datastax/spark-cassandra-connector
https://github.com/housepower/spark-clickhouse-connector
https://github.com/oracle/spark-oracle
https://github.com/pingcap/tispark

Thanks,
Cheng Pan

On Wed, Apr 6, 2022 at 5:52 PM daniel queiroz  wrote:

https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/connector/read/index.html
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/connector/write/index.html

https://developer.here.com/documentation/java-scala-dev/dev_guide/spark-connector/index.html

Grato,

Daniel Queiroz
81 996289671


Em qua., 6 de abr. de 2022 às 03:57, Dyanesh Varun  
escreveu:

Hey team,

Can you please share some documentation/blogs where we can get to know how we 
can write custom sources and sinks for both streaming and static datasets.

Thanks in advance
Dyanesh Varun


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-31 Thread Enrico Minack
How well Spark can scale up with your data (in terms of years of data) 
depends on two things: the operations performed on the data, and 
characteristics of the data, like value distributions.


Failing tasks smell like you are using operations that do not scale 
(e.g. Cartesian product of your data, join on low-cardinality row). But 
that could be anything.


Again, the reasons for these failing tasks can be manifold, and without 
the actual transformations (i.e. your "complex statements"), and some 
characteristics of your data, no specific help is possible.


Enrico


Am 31.03.22 um 10:30 schrieb Joris Billen:

Thanks for reply :-)

I am using pyspark. Basicially my code (simplified is):

df=spark.read.csv(hdfs://somehdfslocation)
df1=spark.sql (complex statement using df)
...
dfx=spark.sql(complex statement using df x-1)
...
dfx15.write()


What exactly is meant by "closing resources"? Is it just unpersisting 
cached dataframes at the end and stopping the spark context 
explicitly: sc.stop()?



FOr processing many years at once versus a chunk in a loop: I see that 
if I go up to certain number of days, one iteration will start to have 
tasks that fail. So I only take a limited number of days, and do this 
process several times. Isnt this normal as you are always somehow 
limited in terms of resources (I have 9 nodes wiht 32GB). Or is it 
like this that in theory you could process any volume, in case you 
wait long enough? I guess spark can only break down the tasks up to a 
certain level (based on the datasets' and the intermediate results’ 
partitions) and at some moment you hit the limit where your resources 
are not sufficient anymore to process such one task? Maybe you can 
tweak it a bit, but in the end you’ll hit a limit?




Concretely  following topics would be interesting to find out more 
about (links):
-where to see what you are still consuming after spark job ended if 
you didnt close resources

-memory leaks for pyspark
-good article about closing resources (you find tons of snippets on 
how to start spark context+ config for number/cores/memory of 
worker/executors etc, but never saw a focus on making sure you clean 
up —> or is it just stopping the spark context)





On 30 Mar 2022, at 21:24, Bjørn Jørgensen  
wrote:


It`s quite impossible for anyone to answer your question about what 
is eating your memory, without even knowing what language you are using.


If you are using C then it`s always pointers, that's the mem issue.
If you are using python, there can be some like not using context 
manager like With Context Managers and Python's with Statement 
 


And another can be not to close resources after use.

In my experience you can process 3 years or more of data, IF you are 
closing opened resources.

I use the web GUI http://spark:4040 to follow what spark is doing.



ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen 
:


Thanks for answer-much appreciated! This forum is very useful :-)

I didnt know the sparkcontext stays alive. I guess this is eating
up memory.  The eviction means that he knows that he should clear
some of the old cached memory to be able to store new one. In
case anyone has good articles about memory leaks I would be
interested to read.
I will try to add following lines at the end of my job (as I
cached the table in spark sql):


/sqlContext.sql("UNCACHE TABLE mytableofinterest ")/
/spark.stop()/


Wrt looping: if I want to process 3 years of data, my modest
cluster will never do it one go , I would expect? I have to break
it down in smaller pieces and run that in a loop (1 day is
already lots of data).



Thanks!





On 30 Mar 2022, at 17:25, Sean Owen  wrote:

The Spark context does not stop when a job does. It stops when
you stop it. There could be many ways mem can leak. Caching
maybe - but it will evict. You should be clearing caches when no
longer needed.

I would guess it is something else your program holds on to in
its logic.

Also consider not looping; there is probably a faster way to do
it in one go.

On Wed, Mar 30, 2022, 10:16 AM Joris Billen
 wrote:

Hi,
I have a pyspark job submitted through spark-submit that
does some heavy processing for 1 day of data. It runs with
no errors. I have to loop over many days, so I run this
spark job in a loop. I notice after couple executions the
memory is increasing on all worker nodes and eventually this
leads to faillures. My job does some caching, but I
   

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-30 Thread Enrico Minack
> Wrt looping: if I want to process 3 years of data, my modest cluster 
will never do it one go , I would expect?
> I have to break it down in smaller pieces and run that in a loop (1 
day is already lots of data).


Well, that is exactly what Spark is made for. It splits the work up and 
processes it in small pieces, called partitions. No matter how much data 
you have, it probably works with your laptop (as long as it fits on 
disk), though it will take some time. But it will succeed. A large 
cluster is doing nothing else, except for having more partitions being 
processed in parallel.


You should expect it to work, no matter how many years of data. 
Otherwise, you have to rethink your Spark code, not your cluster size.


Share some code that does not work with 3 years and people might help. 
Without that, speculations is all you will get.


Enrico



Am 30.03.22 um 17:40 schrieb Joris Billen:

Thanks for answer-much appreciated! This forum is very useful :-)

I didnt know the sparkcontext stays alive. I guess this is eating up 
memory.  The eviction means that he knows that he should clear some of 
the old cached memory to be able to store new one. In case anyone has 
good articles about memory leaks I would be interested to read.
I will try to add following lines at the end of my job (as I cached 
the table in spark sql):



/sqlContext.sql("UNCACHE TABLE mytableofinterest ")/
/spark.stop()/


Wrt looping: if I want to process 3 years of data, my modest cluster 
will never do it one go , I would expect? I have to break it down in 
smaller pieces and run that in a loop (1 day is already lots of data).




Thanks!





On 30 Mar 2022, at 17:25, Sean Owen  wrote:

The Spark context does not stop when a job does. It stops when you 
stop it. There could be many ways mem can leak. Caching maybe - but 
it will evict. You should be clearing caches when no longer needed.


I would guess it is something else your program holds on to in its 
logic.


Also consider not looping; there is probably a faster way to do it in 
one go.


On Wed, Mar 30, 2022, 10:16 AM Joris Billen 
 wrote:


Hi,
I have a pyspark job submitted through spark-submit that does
some heavy processing for 1 day of data. It runs with no errors.
I have to loop over many days, so I run this spark job in a loop.
I notice after couple executions the memory is increasing on all
worker nodes and eventually this leads to faillures. My job does
some caching, but I understand that when the job ends
successfully, then the sparkcontext is destroyed and the cache
should be cleared. However it seems that something keeps on
filling the memory a bit more and more after each run. THis is
the memory behaviour over time, which in the end will start
leading to failures :

(what we see is: green=physical memory used, green-blue=physical
memory cached, grey=memory capacity =straight line around 31GB )
This runs on a healthy spark 2.4 and was optimized already to
come to a stable job in terms of spark-submit resources
parameters like

driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
Any clue how to “really” clear the memory in between jobs? So
basically currently I can loop 10x and then need to restart my
cluster so all memory is cleared completely.


Thanks for any info!






Re: GraphX Support

2022-03-22 Thread Enrico Minack
Right, GraphFrames is not very active and maintainers don't even have 
the capacity to make releases.


Enrico


Am 22.03.22 um 00:10 schrieb Sean Owen:
GraphX is not active, though still there and does continue to build 
and test with each Spark release. GraphFrames kind of superseded it, 
but is also not super active FWIW.


On Mon, Mar 21, 2022 at 6:03 PM Jacob Marquez 
 wrote:


Hello!

My team and I are evaluating GraphX as a possible solution. Would
someone be able to speak to the support of this Spark feature? Is
there active development or is GraphX in maintenance mode (e.g.
updated to ensure functionality with new Spark releases)?

Thanks in advance for your help!

--

Jacob H. Marquez

He/Him

Data & Applied Scientist

Microsoft Cloud Data Sciences



Re: 回复:Re: calculate correlation between multiple columns and one specific column after groupby the spark data frame

2022-03-16 Thread Enrico Minack
If you have a list of Columns called `columns`, you can pass them to the 
`agg` method as:


  agg(columns.head, columns.tail: _*)

Enrico


Am 16.03.22 um 08:02 schrieb ckgppl_...@sina.cn:

Thanks, Sean. I modified the codes and have generated a list of columns.
I am working on convert a list of columns to a new data frame. It 
seems that there is no direct  API to do this.


- 原始邮件 -
发件人:Sean Owen 
收件人:ckgppl_...@sina.cn
抄送人:user 
主题:Re: calculate correlation between multiple columns and one specific 
column after groupby the spark data frame

日期:2022年03月16日 11点55分

Are you just trying to avoid writing the function call 30 times? Just 
put this in a loop over all the columns instead, which adds a new corr 
col every time to a list.


On Tue, Mar 15, 2022, 10:30 PM  wrote:

Hi all,

I am stuck at  a correlation calculation problem. I have a
dataframe like below:

groupid datacol1datacol2datacol3datacol*
corr_co
1   1   2   3   4   5
1   2   3   4   6   5
2   4   2   1   7   5
2   8   9   3   2   5
3   7   1   2   3   5
3   3   5   3   1   5

I want to calculate the correlation between all datacol columns
and corr_col column by each groupid.
So I used the following spark scala-api codes:

df.groupby("groupid").agg(functions.corr("datacol1","corr_col"),functions.corr("datacol2","corr_col"),functions.corr("datacol3","corr_col"),functions.corr("datacol*","corr_col"))

This is very inefficient. If I have 30 data_col columns, I need to
input 30 times functions.corr to calculate correlation.

I have searched, it seems that functions.corr doesn't accept a
List/Array parameter, and df.agg doesn't accept a function to be
parameter.

So any  spark scala API codes can do this job efficiently?

Thanks

Liang



Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Enrico Minack

Sid,

Your Aggregation Query selects all employees where less than three 
distinct salaries exist that are larger. So, both queries seem to do the 
same.


The Windowing Query is explicit in what it does: give me the rank for 
salaries per department in the given order and pick the top 3 per 
department.


The Aggregation Query is trying to get to this conclusion by 
constructing some comparison. The former is the better approach, the 
second scales badly as this is done by counting distinct salaries that 
are larger than each salary in E. This looks like a Cartesian product of 
Employees. You make this very hard to optimize or execute by the query 
engine.


And as you say, your example is very small, so this will not give any 
insights into big data.


Enrico


Am 27.02.22 um 19:30 schrieb Sid:

My bad.

Aggregation Query:

# Write your MySQL query statement below

   SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
       WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
ORDER by E.DepartmentId, E.Salary DESC

Time Taken: 1212 ms

Windowing Query:

select Department,Employee,Salary from (
select d.name  as Department, e.name 
 as Employee,e.salary as Salary,dense_rank() 
over(partition by d.name  order by e.salary desc) as 
rnk from Department d join Employee e on e.departmentId=d.id 
 ) a where rnk<=3


Time Taken: 790 ms

Thanks,
Sid


On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:

Those two queries are identical?

On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:

Hi Team,

I am aware that if windowing functions are used, then at first
it loads the entire dataset into one window,scans and then
performs the other mentioned operations for that particular
window which could be slower when dealing with trillions /
billions of records.

I did a POC where I used an example to find the max 3 highest
salary for an employee per department. So, I wrote a below
queries and compared the time for it:

Windowing Query:

select Department,Employee,Salary from (
select d.name  as Department, e.name
 as Employee,e.salary as Salary,dense_rank()
over(partition by d.name  order by e.salary
desc) as rnk from Department d join Employee e on
e.departmentId=d.id  ) a where rnk<=3

Time taken: 790 ms

Aggregation Query:

select Department,Employee,Salary from (
select d.name  as Department, e.name
 as Employee,e.salary as Salary,dense_rank()
over(partition by d.name  order by e.salary
desc) as rnk from Department d join Employee e on
e.departmentId=d.id  ) a where rnk<=3

Time taken: 1212 ms

But as per my understanding, the aggregation should have run
faster. So, my whole point is if the dataset is huge I should
force some kind of map reduce jobs like we have an option
called df.groupby().reduceByGroups()

So I think the aggregation query is taking more time since the
dataset size here is smaller and as we all know that map
reduce works faster when there is a huge volume of data.
Haven't tested it yet on big data but needed some expert
guidance over here.

Please correct me if I am wrong.

TIA,
Sid




Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Enrico Minack
Though spark.read. refers to "built-in" data sources, there is 
nothing that prevents 3rd party libraries to "extend" spark.read in 
Scala or Python. As users know the Spark-way to read built-in data 
sources, it feels natural to hook 3rd party data sources under the same 
scheme, to give users a holistic and integrated feel.


One Scala example 
(https://github.com/G-Research/spark-dgraph-connector#spark-dgraph-connector):


import  uk.co.gresearch.spark.dgraph.connector._
val  triples  =  spark.read.dgraph.triples("localhost:9080")

and in Python:

from  gresearch.spark.dgraph.connector  import  *
triples  =  spark.read.dgraph.triples("localhost:9080")

I agree that 3rd parties should also support the official 
spark.read.format() and the new catalog approaches.


Enrico


Am 05.10.20 um 14:03 schrieb Jungtaek Lim:

Hi,

"spark.read." is a "shorthand" for "built-in" data sources, 
not for external data sources. spark.read.format() is still an 
official way to use it. Delta Lake is not included in Apache Spark so 
that is indeed not possible for Spark to refer to.


Starting from Spark 3.0, the concept of "catalog" is introduced, which 
you can simply refer to the table from catalog (if the external data 
source provides catalog implementation) and no need to specify the 
format explicitly (as catalog would know about it).


This session explains the catalog and how Cassandra connector 
leverages it. I see some external data sources starting to support 
catalog, and in Spark itself there's some effort to support catalog 
for JDBC.

https://databricks.com/fr/session_na20/datasource-v2-and-cassandra-a-whole-new-world

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Oct 5, 2020 at 8:53 PM Moser, Michael 
> wrote:


Hi there,

I’m just wondering if there is any incentive to implement
read/write methods in the DataFrameReader/DataFrameWriter for
delta similar to e.g. parquet?

For example, using PySpark, “spark.read.parquet” is available, but
“spark.read.delta” is not (same for write).

In my opinion, “spark.read.delta” feels more clean and pythonic
compared to “spark.read.format(‘delta’).load()”, especially if
more options are called, like “mode”.

Can anyone explain the reasoning behind this, is this due to the
Java nature of Spark?

From a pythonic point of view, I could also imagine a single
read/write method, with the format as an arg and kwargs related to
the different file format options.

Best,

Michael



Re: Query about Spark

2020-09-07 Thread Enrico Minack
You could use Horovod to distribute your ML algorithm on a cluster, 
while Horovod also supports Spark clusters.


Enrico


Am 06.09.20 um 15:30 schrieb Ankur Das:


Good Evening Sir/Madam,
Hope you are doing well, I am experimenting on some ML techniques 
where I need to test it on a distributed environment.
For example a particular algorithm I want to run it on different nodes 
at the same time and collect the results at the end in one single node 
or the parent node.


So, I would like to know if it is possible or a good choice to use 
spark for this.


Hope to hear from you soon, Stay safe and healthy
Thanking you in advance.
--
Regards,
Ankur J Das
Research Scholar @ Tezpur University
Tezpur, Assam





Re: regexp_extract regex for extracting the columns from string

2020-08-10 Thread Enrico Minack
You can remove the <1000> first and then turn the string into a map 
(interpret the string as key-values). From that map you can access each 
key and turn it into a separate column:


Seq(("<1000> date=2020-08-01 time=20:50:04 name=processing id=123 
session=new packt=20 orgin=null address=null dest=fgjglgl"))

  .toDF("string")
  .withColumn("key-values", regexp_replace($"string", "^[^ ]+ ", ""))
  .withColumn("map", expr("str_to_map(`key-values`, ' ', '=')"))
  .select(
    $"map"("date").as("date"),
    $"map"("time").as("time"),
    $"map"("name").as("name"),
    $"map"("id").as("id"),
    $"map"("session").as("session"),
    $"map"("packt").as("packt"),
    $"map"("origin").as("origin"),
    $"map"("address").as("address"),
    $"map"("dest").as("dest")
  )
  .show(false)

Enrico


Am 09.08.20 um 18:00 schrieb anbutech:

Hi All,

I have a following info.in the data column.

<1000> date=2020-08-01 time=20:50:04 name=processing id=123 session=new
packt=20 orgin=null address=null dest=fgjglgl

here I want to create a separate column for the above key value pairs after
the integer <1000> separated by spaces.
Is there any way to achieved it using regexp_extract inbuilt functions.i
don't want to do it using udf function.
apart from udf,is there any way to achieved it.


Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unablee to get to_timestamp with Timezone Information

2020-04-02 Thread Enrico Minack
Once parsed into a Timestamp the timestamp is store internally as UTC 
and printed as your local timezone (e.g. as defined by 
spark.sql.session.timeZone). Spark is good at hiding timezone 
information from you.


You can get the timezone information via date_format(column, format):

import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.functions._

val sampleDF = Seq("2020-04-11T20:40:00-05:00").toDF("value")
val timestampDF = sampleDF.select($"value".cast(TimestampType))
timestampDF.select(date_format($"value", 
"-MM-dd'T'HH:mm:ss")).show(false)

+-+
|date_format(value, -MM-dd'T'HH:mm:ss)|
+-+
|2020-04-12T03:40:00+0200 |
+-+

If you want the timezone only, use 
timestampDF.select(date_format($"value", "")).show.

++
|date_format(value, )|
++
|   +0200|
++

It all depends how you get the data "downstream". If you go through 
parquet or csv files, they will retain the timezone information. If you 
go through strings, you should format them as above. If you use 
Dataset.map you can access the timestamps as java.sql.Timestamp objects 
(but that might not be necessary):


import java.sql.Timestamp
case class Times(value: Timestamp)
timestampDF.as[Times].map(t => t.value.getTimezoneOffset).show
+-+
|value|
+-+
| -120|
+-+


Enrico


Am 31.03.20 um 21:40 schrieb Chetan Khatri:

Sorry misrepresentation the question also. Thanks for your great help.

What I want is the time zone information as it is 
2020-04-11T20:40:00-05:00 in timestamp datatype. so I can write to 
downstream application as it is. I can correct the lacking UTC offset 
info.



On Tue, Mar 31, 2020 at 1:15 PM Magnus Nilsson > wrote:


And to answer your question (sorry, read too fast). The string is
not in proper ISO8601. Extended form must be used throughout, ie
2020-04-11T20:40:00-05:00, there's a colon (:) lacking in the UTC
offset info.

br,

Magnus

On Tue, Mar 31, 2020 at 7:11 PM Magnus Nilsson mailto:ma...@kth.se>> wrote:

Timestamps aren't timezoned. If you parse ISO8601 strings they
will be converted to UTC automatically.

If you parse timestamps without timezone they will converted
to the the timezone the server Spark is running on uses. You
can change the timezone Spark uses with
spark.conf.set("spark.sql.session.timeZone","UTC"). Timestamps
represent a point in time, the clock representation of that
instant is dependent on sparks timezone settings both for
parsing (non ISO8601) strings and showing timestamps.

br,

Magnus

On Tue, Mar 31, 2020 at 6:14 PM Chetan Khatri
mailto:chetan.opensou...@gmail.com>> wrote:

Hi Spark Users,

I am losing the timezone value from below format, I tried
couple of formats but not able to make it. Can someone
throw lights?

scala> val sampleDF =
Seq("2020-04-11T20:40:00-0500").toDF("value")
sampleDF: org.apache.spark.sql.DataFrame = [value: string]

scala> sampleDF.select('value, to_timestamp('value,
"-MM-dd\'T\'HH:mm:ss")).show(false)

+++
|value                   |to_timestamp(`value`,
'-MM-dd\'T\'HH:mm:ss')|

+++
|2020-04-11T20:40:00-0500|2020-04-11 20:40:00            
              |

+++

Thanks





Re: Issue with UDF Int Conversion - Str to Int

2020-03-23 Thread Enrico Minack

Ayan,

no need for UDFs, the SQL API provides all you need (sha1, substring, conv):
https://spark.apache.org/docs/2.4.5/api/python/pyspark.sql.html

>>> df.select(conv(substring(sha1(col("value_to_hash")), 33, 8), 16, 
10).cast("long").alias("sha2long")).show()

+--+
|  sha2long|
+--+
| 478797741|
|2520346415|
+--+

This creates a lean query plan:

>>> df.select(conv(substring(sha1(col("value_to_hash")), 33, 8), 16, 
10).cast("long").alias("sha2long")).explain()

== Physical Plan ==
Union
:- *(1) Project [478797741 AS sha2long#74L]
:  +- Scan OneRowRelation[]
+- *(2) Project [2520346415 AS sha2long#76L]
   +- Scan OneRowRelation[]


Enrico


Am 23.03.20 um 06:13 schrieb ayan guha:

Hi

I am trying to implement simple hashing/checksum logic. The key logic 
is -


1. Generate sha1 hash
2. Extract last 8 chars
3. Convert 8 chars to Int (using base 16)

Here is the cut down version of the code:

---
/from pyspark.sql.functions import *
from pyspark.sql.types import *
from hashlib import sha1 as local_sha1
df = spark.sql("select '4104003141' value_to_hash union all  select 
'4102859263'")

f1 = lambda x: str(int(local_sha1(x.encode('UTF-8')).hexdigest()[32:],16))
f2 = lambda x: int(local_sha1(x.encode('UTF-8')).hexdigest()[32:],16)
sha2Int1 = udf( f1 , StringType())
sha2Int2 = udf( f2 , IntegerType())
print(f('4102859263'))
dfr = df.select(df.value_to_hash, 
sha2Int1(df.value_to_hash).alias('1'), 
sha2Int2(df.value_to_hash).alias('2'))

/
/dfr.show(truncate=False)/
-

I was expecting both columns should provide exact same values, however 
thats not the case *"always" *

*
*
2520346415 +-+--+---+ |value_to_hash|1 |2 
| +-+--+---+ |4104003141 |478797741 
|478797741 | |4102859263 
|2520346415|-1774620881|+-+--+---+ *

*

The function working fine, as shown in the print statement. However 
values are not matching and vary widely.


Any pointer?

--
Best Regards,
Ayan Guha





Re: Time-based frequency table at scale

2020-03-11 Thread Enrico Minack

An interesting puzzle indeed.

What is your measure of "that scales"? Does not fail, does not spill, 
does not need a huge amount of memory / disk, is O(N), processes X 
records per second and core?


Enrico

Am 11.03.20 um 16:59 schrieb sakag:

Hi all,
  
We have a rather interesting use case, and are struggling to come up with an

approach that scales. Reaching out to seek your expert opinion/feedback and
tips.
  
What we are trying to do is to find the count of numerical ids over a

sliding time window where each of our data records has a timestamp and a set
of numerical ids in the below format.
  
timestamp | ids

1  [1,2,3,8]
1  [1,2]
2  [1,2,3,4]
2  [1, 10]
  
What we are looking to get as output is:
  
timestamp | id_count_map

1 | {1: 2, 2: 2, 3: 1, 4:0, 5:0, 6:0, 8:1}
2 | {1: 2, 2:1, 3: 1, 4: 1, 5:0, 6:0, 7:0, 8:0, 9:0, 10:1}
  
This gives us the frequency of occurrence of these ids over time periods.

Please note that the output expected is in a dense format.
  
However, we are running into scale issues with the data that has these

characteristics.
  
- 500 million records - Total ~100 GB

- Each record can have 500 elements in the ids column
- Max id value (length of id_count_map) is 750K
  
We have tried the below approaches to achieve this

1) Expanding ids to a dense, frequency-based vector and then doing a
row-wise sum over a Window partitioned by timestamp
2) Converting ids into a SparseVector and computing the L1 norm (using
Summarizer) over a Window partitioned by timestamp
3) GroupBy/aggregating ids by timestamp, converting to a sparse,
frequency-based vector using collections.Counter, and expanding to a dense
format
4) GroupBy/aggregating ids by timestamp, converting to a sparse,
frequency-based vector using CountVectorizer, and then expanding to a dense
format
  
Any other approaches we could try?
  
Thanks!

Sakshi
  




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark driver thread

2020-03-06 Thread Enrico Minack

James,

If you are having multithreaded code in your driver, then you should 
allocate multiple cores. In cluster mode you share the node with other 
jobs. If you allocate fewer cores than you are using in your driver, 
then that node gets over-allocated and you are stealing other 
applications' resources. Be nice and limit the parallelism of your 
driver and allocate as many spark cores (|spark.driver.cores| see 
https://spark.apache.org/docs/latest/configuration.html#application-properties).


Enrico


Am 06.03.20 um 18:36 schrieb James Yu:

Pol, thanks for your reply.

Actually I am running Spark apps in CLUSTER mode. Is what you said 
still applicable in cluster mode.  Thanks in advance for your further 
clarification.



*From:* Pol Santamaria 
*Sent:* Friday, March 6, 2020 12:59 AM
*To:* James Yu 
*Cc:* user@spark.apache.org 
*Subject:* Re: Spark driver thread
Hi james,

You can configure the Spark Driver to use more than a single thread. 
It is something that depends on the application, but the Spark driver 
can take advantage of multiple threads in many situations. For 
instance, when the driver program gathers or sends data to the workers.


So yes, if you do computation or I/O on the driver side, you should 
explore using multithreads and more than 1 vCPU.


Bests,
Pol Santamaria

On Fri, Mar 6, 2020 at 1:28 AM James Yu > wrote:


Hi,

Does a Spark driver always works as single threaded?

If yes, does it mean asking for more than one vCPU for the driver
is wasteful?


Thanks,
James





Re: Compute the Hash of each row in new column

2020-03-02 Thread Enrico Minack

Well, then apply md5 on all columns:

ds.select(ds.columns.map(col) ++ ds.columns.map(column => 
md5(col(column)).as(s"$column hash")): _*).show(false)


Enrico

Am 02.03.20 um 11:10 schrieb Chetan Khatri:

Thanks Enrico
I want to compute hash of all the columns value in the row.

On Fri, Feb 28, 2020 at 7:28 PM Enrico Minack <mailto:m...@enrico.minack.dev>> wrote:


This computes the md5 hash of a given column id of Dataset ds:

ds.withColumn("id hash", md5($"id")).show(false)

Test with this Dataset ds:

import org.apache.spark.sql.types._
val ds = spark.range(10).select($"id".cast(StringType))

Available are md5, sha, sha1, sha2 and hash:
https://spark.apache.org/docs/2.4.5/api/sql/index.html

Enrico


Am 28.02.20 um 13:56 schrieb Chetan Khatri:
> Hi Spark Users,
> How can I compute Hash of each row and store in new column at
> Dataframe, could someone help me.
>
> Thanks






Re:

2020-03-02 Thread Enrico Minack

Looks like the schema of some files is unexpected.

You could either run parquet-tools on each of the files and extract the 
schema to find the problematic files:


|hdfs |||-stat "%n"| 
|hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet 
| 
while read file

do
   echo -n "$file: "
   hadoop jar parquet-tools-1.9.0.jar schema $file
done

https://confusedcoders.com/data-engineering/hadoop/how-to-view-content-of-parquet-files-on-s3hdfs-from-hadoop-cluster-using-parquet-tools

||


Or you can use Spark to investigate the parquet files in parallel:

spark.sparkContext
  .binaryFiles("||hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet 
")

  .map {case (path, _) =>
import collection.JavaConverters._
val file = HadoopInputFile.fromPath(new Path(path), new Configuration())
val reader = ParquetFileReader.open(file)
try {
  val schema = reader.getFileMetaData().getSchema
  (
schema.getName, schema.getFields.asScala.map(f => (
  Option(f.getId).map(_.intValue()), f.getName, 
Option(f.getOriginalType).map(_.name()), Option(f.getRepetition).map(_.name()))
).toArray
  )
}finally {
  reader.close()
}
  }
  .toDF("schema name", "fields")
  .show(false)

.binaryFiles provides you all filenames that match the given pattern as 
an RDD, so the following .map is executed on the Spark executors.
The map then opens each parquet file via ParquetFileReader and provides 
access to its schema and data.


I hope this points you in the right direction.

Enrico


Am 01.03.20 um 22:56 schrieb Hamish Whittal:

Hi there,

I have an hdfs directory with thousands of files. It seems that some 
of them - and I don't know which ones - have a problem with their 
schema and it's causing my Spark application to fail with this error:


Caused by: org.apache.spark.sql.execution.QueryExecutionException: 
Parquet column cannot be converted in file 
hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-0-8b83989a-e387-4f64-8ac5-22b16770095e-c000.snappy.parquet 
. 
Column: [price], Expected: double, Found: FIXED_LEN_BYTE_ARRAY


The problem is not only that it's causing the application to fail, but 
every time if does fail, I have to copy that file out of the directory 
and start the app again.


I thought of trying to use try-except, but I can't seem to get that to 
work.


Is there any advice anyone can give me because I really can't see 
myself going through thousands of files trying to figure out which 
ones are broken.


Thanks in advance,

hamish





Re: Compute the Hash of each row in new column

2020-02-28 Thread Enrico Minack

This computes the md5 hash of a given column id of Dataset ds:

ds.withColumn("id hash", md5($"id")).show(false)

Test with this Dataset ds:

import org.apache.spark.sql.types._
val ds = spark.range(10).select($"id".cast(StringType))

Available are md5, sha, sha1, sha2 and hash: 
https://spark.apache.org/docs/2.4.5/api/sql/index.html


Enrico


Am 28.02.20 um 13:56 schrieb Chetan Khatri:

Hi Spark Users,
How can I compute Hash of each row and store in new column at 
Dataframe, could someone help me.


Thanks




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Convert each partition of RDD to Dataframe

2020-02-27 Thread Enrico Minack

Manjunath,

You can define your DataFrame in parallel in a multi-threaded driver.

Enrico

Am 27.02.20 um 15:50 schrieb Manjunath Shetty H:

Hi Enrico,

In that case how to make effective use of all nodes in the cluster ?.

And also whats your opinion on the below

  * Create 10 Dataframes sequentially in Driver program and
transform/write to hdfs one after the other
  * Or the current approach mentioned in the previous mail

What will be the performance implications ?

Regards
Manjunath


*From:* Enrico Minack 
*Sent:* Thursday, February 27, 2020 7:57 PM
*To:* user@spark.apache.org 
*Subject:* Re: Convert each partition of RDD to Dataframe
Hi Manjunath,

why not creating 10 DataFrames loading the different tables in the 
first place?


Enrico


Am 27.02.20 um 14:53 schrieb Manjunath Shetty H:

Hi Vinodh,

Thanks for the quick response. Didn't got what you meant exactly, any 
reference or snippet  will be helpful.


To explain the problem more,

  * I have 10 partitions , each partition loads the data from
different table and different SQL shard.
  * Most of the partitions will have different schema.
  * Before persisting the data i want to do some column level
manipulation using data frame.

So thats why i want to create 10 (based on partitions ) dataframes 
that maps to 10 different table/shard from a RDD.


Regards
Manjunath

*From:* Charles vinodh  
<mailto:mig.flan...@gmail.com>

*Sent:* Thursday, February 27, 2020 7:04 PM
*To:* manjunathshe...@live.com <mailto:manjunathshe...@live.com> 
 <mailto:manjunathshe...@live.com>

*Cc:* user  <mailto:user@spark.apache.org>
*Subject:* Re: Convert each partition of RDD to Dataframe
Just split the single rdd into multiple individual rdds using a 
filter operation and then convert each individual rdds to it's 
respective dataframe..


On Thu, Feb 27, 2020, 7:29 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:



Hello All,

In spark i am creating the custom partitions with Custom RDD,
each partition will have different schema. Now in the
transformation step we need to get the schema and run some
Dataframe SQL queries per partition, because each partition data
has different schema.

How to get the Dataframe's per partition of a RDD?.

As of now i am doing|foreachPartition|on RDD and
converting|Iterable|to|List|and converting that to
Dataframe. But the problem is converting|Iterable|to|List|will
bring all the data to memory and it might crash the process.

Is there any known way to do this ? or is there any way to handle
Custom Partitions in|Dataframes|instead of using|RDD|?

I am using Spark version|1.6.2|.

Any pointers would be helpful. Thanks in advance








Re: Convert each partition of RDD to Dataframe

2020-02-27 Thread Enrico Minack

Hi Manjunath,

why not creating 10 DataFrames loading the different tables in the first 
place?


Enrico


Am 27.02.20 um 14:53 schrieb Manjunath Shetty H:

Hi Vinodh,

Thanks for the quick response. Didn't got what you meant exactly, any 
reference or snippet  will be helpful.


To explain the problem more,

  * I have 10 partitions , each partition loads the data from
different table and different SQL shard.
  * Most of the partitions will have different schema.
  * Before persisting the data i want to do some column level
manipulation using data frame.

So thats why i want to create 10 (based on partitions ) dataframes 
that maps to 10 different table/shard from a RDD.


Regards
Manjunath

*From:* Charles vinodh 
*Sent:* Thursday, February 27, 2020 7:04 PM
*To:* manjunathshe...@live.com 
*Cc:* user 
*Subject:* Re: Convert each partition of RDD to Dataframe
Just split the single rdd into multiple individual rdds using a filter 
operation and then convert each individual rdds to it's respective 
dataframe..


On Thu, Feb 27, 2020, 7:29 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:



Hello All,

In spark i am creating the custom partitions with Custom RDD, each
partition will have different schema. Now in the transformation
step we need to get the schema and run some Dataframe SQL queries
per partition, because each partition data has different schema.

How to get the Dataframe's per partition of a RDD?.

As of now i am doing|foreachPartition|on RDD and
converting|Iterable|to|List|and converting that to Dataframe.
But the problem is converting|Iterable|to|List|will bring all the
data to memory and it might crash the process.

Is there any known way to do this ? or is there any way to handle
Custom Partitions in|Dataframes|instead of using|RDD|?

I am using Spark version|1.6.2|.

Any pointers would be helpful. Thanks in advance






[SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

2020-02-26 Thread Enrico Minack
I have created a jira to track this request: 
https://issues.apache.org/jira/browse/SPARK-30957


Enrico

Am 08.02.20 um 16:56 schrieb Enrico Minack:


Hi Devs,

I am forwarding this from the user mailing list. I agree that the <=> 
version of join(Dataset[_], Seq[String]) would be useful.


Does any PMC consider this useful enough to be added to the Dataset 
API? I'd be happy to create a PR in that case.


Enrico



 Weitergeleitete Nachricht 
Betreff:dataframe null safe joins given a list of columns
Datum:  Thu, 6 Feb 2020 12:45:11 +
Von:Marcelo Valle 
An: user @spark 



I was surprised I couldn't find a way of solving this in spark, as it 
must be a very common problem for users. Then I decided to ask here.


Consider the code bellow:

```
val joinColumns = Seq("a", "b")
val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null, 
"c4")).toDF("a", "b", "c")
val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null, 
"d4")).toDF("a", "b", "d")

df1.join(df2, joinColumns).show()
```

The output is :

```
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
| a1| b1| c1| d1|
+---+---+---+---+
```

But I want it to be:

```
+---+-+---+---+
|  a|    b|  c|  d|
+---+-+---+---+
| a1|   b1| c1| d1|
| a4| null| c4| d4|
+---+-+---+---+
```

The join syntax of `df1.join(df2, joinColumns)` has some advantages, 
as it doesn't create duplicate columns by default. However, it uses 
the operator `===` to join, not the null safe one `<=>`.


Using the following syntax:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
```

Would produce:

```
+---++---+---++---+
|  a|   b|  c|  a|   b|  d|
+---++---+---++---+
| a1|  b1| c1| a1|  b1| d1|
| a4|null| c4| a4|null| d4|
+---++---+---++---+
```

So to get the result I really want, I must do:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> 
df2("b")).drop(df2("a")).drop(df2("b")).show()

+---++---+---+
|  a|   b|  c|  d|
+---++---+---+
| a1|  b1| c1| d1|
| a4|null| c4| d4|
+---++---+---+
```

Which works, but is really verbose, especially when you have many join 
columns.


Is there a better way of solving this without needing a 
utility method? This same problem is something I find in every spark 
project.




This email is confidential [and may be protected by legal privilege]. 
If you are not the intended recipient, please do not copy or disclose 
its content but contact the sender immediately upon receipt.


KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, 
United Kingdom






Re: Questions about count() performance with dataframes and parquet files

2020-02-17 Thread Enrico Minack
It is not about very large or small, it is about how large your cluster 
is w.r.t. your data. Caching is only useful if you have the respective 
memory available across your executors. Otherwise you could either 
materialize the Dataframe on HDFS (e.g. parquet or checkpoint) or indeed 
have to do the join twice. It's a memory-over-CPU trade-off.


Enrico


Am 17.02.20 um 22:06 schrieb Nicolas PARIS:

.dropDuplicates() \ .cache() |
Since df_actions is cached, you can count inserts and updates quickly
with only that one join in df_actions:

Hi Enrico. I am wondering if this is ok for very large tables ? Is
caching faster than recomputing both insert/update ?

Thanks

Enrico Minack  writes:


Ashley,

I want to suggest a few optimizations. The problem might go away but
at least performance should improve.
The freeze problems could have many reasons, the Spark UI SQL pages
and stages detail pages would be useful. You can send them privately,
if you wish.

1. the repartition(1) should be replaced by coalesce(1). The former
will shuffle all data, while the latter will read in the existing
partitions and not shuffle them again.
2. Repartitioning to a single partition is discouraged, unless you can
guarantee the data fit into one worker's memory.
3. You can compute Insert and Update in one go, so that you don't have
to join with df_reference twice.

|df_actions =
df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
how="left") \ .withColumn('|||_action|',
when(col('b.hashkey')||.isNull,
'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'),
'Update')) \| .select(col('_action'), *df_source_hashed) \
.dropDuplicates() \ .cache() |

Since df_actions is cached, you can count inserts and updates quickly
with only that one join in df_actions:

|inserts_count = df_actions|||.where(col('_action') === 
'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === 
'Update')|.count()|

And you can get rid of the union:

|df_output = df_actions.where(col('_action').isNotNull) |

If you have to write that output to parquet anyway, then you can get
the count quickly from the parquet file if it is partitioned by the
_action column (Spark then only looks into parquet's metadata to get
the count, it does not read any row):

|df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
df_output =
|||sql_context.read.parquet('|/path/to/output.parquet|')
|inserts_count = |||df_output|.where(col('_action') ===
'Insert').count() updates_count = |||df_output|.where(col('_action')
=== 'Update').count() |

These are all just sketches, but I am sure you get the idea.

Enrico


Am 13.02.20 um 05:08 schrieb Ashley Hoff:

Hi,

I am currently working on an app using PySpark to produce an insert
and update daily delta capture, being outputted as Parquet.  This is
running on a 8 core 32 GB Linux server in standalone mode (set to 6
worker cores of 2GB memory each) running Spark 2.4.3.

This is being achieved by reading in data from a TSQL database, into
a dataframe, which has a hash of all records appended to it and
comparing it to a dataframe from yesterdays data (which has been
saved also as parquet).

As part of the monitoring and logging, I am trying to count the
number of records for the respective actions.  Example code:
|df_source = spark_session.read.format('jdbc'). df_reference =
sql_context.read.parquet('/path/to/reference.parquet')
df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('',
*df_source.columns))) \ .cache() df_inserts =
df_source_hashed.join(df_reference, pk_list, how='left_anti') \
.select(lit('Insert').alias('_action'), *df_source_hashed) \
.dropDuplicates() \ .cache() inserts_count = df_inserts.count()
df_updates =
df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
how="inner") \ .select(lit('Update').alias('_action'),
*df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \
.dropDuplicates() \ .cache() updates_count = df_updates.count()
df_output = df_inserts.union(df_updates)
df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')|
The above code is running two occurrences concurrently via Python
threading.Thread (this is to try and overcome the network bottle
neck connecting to the database server).

What I am finding is I am getting some very inconsistent behavior
with the counts.  Occasionally, it appears that it will freeze up on
a count operation for a few minutes and quite often that specific
data frame will have zero records in it.  According to the DAG
(which I am not 100% sure how to read) the following is the
processing flow:

Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0
  => WholeStageCodegen/MapPartitionsRDD [75]count at
NativeMethodAccessorImpl.java:0  =>
InMemoryTableScan/MapPartitionsRDD [78]count at
NativeMethodAccessorImpl.java:0 =&g

Re: Questions about count() performance with dataframes and parquet files

2020-02-13 Thread Enrico Minack

Ashley,

I want to suggest a few optimizations. The problem might go away but at 
least performance should improve.
The freeze problems could have many reasons, the Spark UI SQL pages and 
stages detail pages would be useful. You can send them privately, if you 
wish.


1. the repartition(1) should be replaced by coalesce(1). The former will 
shuffle all data, while the latter will read in the existing partitions 
and not shuffle them again.
2. Repartitioning to a single partition is discouraged, unless you can 
guarantee the data fit into one worker's memory.
3. You can compute Insert and Update in one go, so that you don't have 
to join with df_reference twice.


|df_actions = df_source_hashed.alias('a').join(df_reference.alias('b'), 
pk_list, how="left") \ .withColumn('|||_action|', when(col('b.hashkey')||.isNull, 'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'), 'Update')) \| 
.select(col('_action'), *df_source_hashed) \ .dropDuplicates() \ .cache() |


Since df_actions is cached, you can count inserts and updates quickly 
with only that one join in df_actions:


|inserts_count = df_actions|||.where(col('_action') === 
'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === 
'Update')|.count()|

And you can get rid of the union:

|df_output = df_actions.where(col('_action').isNotNull) |

If you have to write that output to parquet anyway, then you can get the 
count quickly from the parquet file if it is partitioned by the _action 
column (Spark then only looks into parquet's metadata to get the count, 
it does not read any row):


|df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet') 
df_output = |||sql_context.read.parquet('|/path/to/output.parquet|') |inserts_count = |||df_output|.where(col('_action') === 'Insert').count() updates_count = |||df_output|.where(col('_action') === 'Update').count() |


These are all just sketches, but I am sure you get the idea.

Enrico


Am 13.02.20 um 05:08 schrieb Ashley Hoff:

Hi,

I am currently working on an app using PySpark to produce an insert 
and update daily delta capture, being outputted as Parquet.  This is 
running on a 8 core 32 GB Linux server in standalone mode (set to 6 
worker cores of 2GB memory each) running Spark 2.4.3.


This is being achieved by reading in data from a TSQL database, into a 
dataframe, which has a hash of all records appended to it and 
comparing it to a dataframe from yesterdays data (which has been saved 
also as parquet).


As part of the monitoring and logging, I am trying to count the number 
of records for the respective actions.  Example code:
|df_source = spark_session.read.format('jdbc'). df_reference = 
sql_context.read.parquet('/path/to/reference.parquet') 
df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', 
*df_source.columns))) \ .cache() df_inserts = 
df_source_hashed.join(df_reference, pk_list, how='left_anti') \ 
.select(lit('Insert').alias('_action'), *df_source_hashed) \ 
.dropDuplicates() \ .cache() inserts_count = df_inserts.count() 
df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), 
pk_list, how="inner") \ .select(lit('Update').alias('_action'), 
*df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \ 
.dropDuplicates() \ .cache() updates_count = df_updates.count() 
df_output = df_inserts.union(df_updates) 
df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')|
The above code is running two occurrences concurrently via Python 
threading.Thread (this is to try and overcome the network bottle neck 
connecting to the database server).


What I am finding is I am getting some very inconsistent behavior with 
the counts.  Occasionally, it appears that it will freeze up on a 
count operation for a few minutes and quite often that specific data 
frame will have zero records in it.  According to the DAG (which I am 
not 100% sure how to read) the following is the processing flow:


Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0 
 => WholeStageCodegen/MapPartitionsRDD [75]count at 
NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD 
[78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD 
[79]count at NativeMethodAccessorImpl.java:0 => 
WholeStageCodegen/MapPartitionsRDD [80]count at 
NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD [81]count 
at NativeMethodAccessorImpl.java:0


The other observation I have found that if I remove the counts from 
the data frame operations and instead open the outputted parquet field 
and count using a 
`sql_context.read.load('/path/to/output.parquet').filter(col("_action") 
== "Insert").count()` command, I am reducing my run-times by around 20 
to 30%.  In my feeble mind, opening up the outputs and re-reading them 
seems counter-intuitive.


Is anyone able to give me some guidance on why or how to ensure that I 
am 

Re: Reading 7z file in spark

2020-01-14 Thread Enrico Minack

Hi,

Spark does not support 7z natively, but you can read any file in Spark:

def read(stream: PortableDataStream):Iterator[String] = 
{Seq(stream.getPath()).iterator }

spark.sparkContext
  .binaryFiles("*.7z")
  .flatMap(file => read(file._2))
  .toDF("path")
  .show(false)

This scales with the number of files. A single large 7z file would not 
scale well (a single partition).


Any file that matches *.7z will be loaded via the read(stream: 
PortableDataStream) method, which returns an iterator over the rows. 
This method is executed on the executor and can implement the 7z 
specific code, which is independent of Spark and should not be too hard 
(here it does not open the input stream but returns the path only).


If you are planning to read the same files more than once, then it would 
be worth to first uncompress and convert them into files Spark supports. 
Then Spark can scale much better.


Regards,
Enrico


Am 13.01.20 um 13:31 schrieb HARSH TAKKAR:

Hi,


Is it possible to read 7z compressed file in spark?


Kind Regards
Harsh Takkar





Re: [pyspark2.4+] When to choose RDD over Dataset, was: A lot of tasks failed, but job eventually completes

2020-01-06 Thread Enrico Minack

Hi Rishi,

generally it is better to avoid RDDs if you can and use the Dataset API. 
With Datasets (formerly DataFrames) Spark can optimize your query / tree 
of transformations, RDDs are opaque. Datasets have an optimized memory 
footprint. Pure Dataset operations provide you helpful information on 
the SQL tab in the Spark UI. For large transformations it is then easier 
to identify the transformations that cause you trouble. Switching from 
Dataset to RDD at some point hides all operations that happen before 
accessing the RDD so you lose the query debugging capability for that part.


That is my experience.

Enrico


Am 06.01.20 um 14:35 schrieb Rishi Shah:

Thank you Hemant and Enrico. Much appreciated.

your input really got me closer to the issue, I realized every task 
didn't get enough memory and hence tasks with large partitions kept 
failing. I increased executor memory and at the same time increased 
number of partitions as well. This made the job succeed with flying 
colors. Really appreciate the help here.


I do have one more question, when do you recommend using RDDs over 
data frames? Because at time using windows may get a bit complicated 
but there's always some or the other way to use windows on data 
frames. I always get confused as to when to fall back on RDD approach? 
Any use case in your experience warrant for RDD use, for better 
performance?


Thanks,
Rishi

On Mon, Jan 6, 2020 at 4:18 AM Enrico Minack <mailto:m...@enrico.minack.dev>> wrote:


Note that repartitioning helps to increase the number of
partitions (and hence to reduce the size of partitions and
required executor memory), but subsequent transformations like
join will repartition data again with the configured number of
partitions (|spark.sql.shuffle.partitions|), virtually undoing the
repartitioning, e.g.:

data    // may have any number of partitions
  .repartition(1000)    // has 1000 partitions
  .join(table)  // has
|spark.sql.shuffle.partitions|partitions

If you use RDDs, you need to configure |spark.default.parallelism|
rather than |spark.sql.shuffle.partitions|.

Given you have 700GB of data, the default of 200 partitions mean
that each partition is 3,5 GB (equivalent of input data) in size.
Since increasing executor memory is limited by the available
memory, executor memory does not scale for big data. Increasing
the number of partitions is the natural way of scaling in Spark land.

Having hundreds of tasks that fail is an indication that you do
not suffer from skewed data but from large partitions. Skewed data
usually has a few tasks that keep failing.

It is easy to check for skewed data in the Spark UI. Open a stage
that has failing tasks and look at the Summary Metrics, e.g.:
If the Max number of Shuffle Read Size is way higher than the 75th
percentile, than this indicates a poor distribution of the data
(or more precise the partitioning key) of this stage.

You can also sort the tasks by the "Shuffle Read Size / Records"
column and see if numbers are evenly distributed (ideally).

I hope this helped.

Enrico



Am 06.01.20 um 06:27 schrieb hemant singh:

You can try repartitioning the data, if it’s a skewed data then
you may need to salt the keys for better partitioning.
Are you using a coalesce or any other fn which brings the data to
lesser nodes. Window function also incurs shuffling that could be
an issue.

On Mon, 6 Jan 2020 at 9:49 AM, Rishi Shah
mailto:rishishah.s...@gmail.com>> wrote:

Thanks Hemant, underlying data volume increased from 550GB to
690GB and now the same job doesn't succeed. I tried
incrementing executor memory to 20G as well, still fails. I
am running this in Databricks and start cluster with 20G
assigned to spark.executor.memory property.

Also some more information on the job, I have about 4 window
functions on this dataset before it gets written out.

Any other ideas?

Thanks,
-Shraddha

On Sun, Jan 5, 2020 at 11:06 PM hemant singh
mailto:hemant2...@gmail.com>> wrote:

You can try increasing the executor memory, generally
this error comes when there is not enough memory in
individual executors.
Job is getting completed may be because when tasks are
re-scheduled it would be going through.

Thanks.

On Mon, 6 Jan 2020 at 5:47 AM, Rishi Shah
mailto:rishishah.s...@gmail.com>> wrote:

Hello All,

One of my jobs, keep getting into this situation
where 100s of tasks keep failing with below error but
job eventually completes.

org.apache.spark.memory.SparkOutOfMemoryError: Unable
   

Re: OrderBy Year and Month is not displaying correctly

2020-01-06 Thread Enrico Minack
The distinct transformation does not preserve order, you need to 
distinct first, then orderby.


Enrico


Am 06.01.20 um 00:39 schrieb Mich Talebzadeh:

Hi,

I am working out monthly outgoing etc from an account and I am using 
the following code


import org.apache.spark.sql.expressions.Window
val wSpec = 
Window.partitionBy(year(col("transactiondate")),month(col("transactiondate")))

joint_accounts.
  select(year(col("transactiondate")).as("Year")
    , month(col("transactiondate")).as("Month")
    , sum("moneyin").over(wSpec).cast("DECIMAL(10,2)").as("incoming 
Per Month")
    , sum("moneyout").over(wSpec).cast("DECIMAL(10,2)").as("outgoing 
Per Month")).

*orderBy(year(col("transactiondate")),month(col("transactiondate"))).*
    distinct.
    show(1000,false)

This shows as follows:


|Year|Month|incoming Per Month|outgoing Per Month|
++-+--+--+
|2019|9    |13958.58  |17920.31  |
|2019|11   |4032.30   |4225.30   |
|2020|1    |1530.00   |1426.91   |
|2019|10   |10029.00  |10067.52  |
|2019|12   |742.00    |814.49    |
++-+--+--+

 however the orderby is not correct as I expect to see 2010 record and 
2019 records in the order of year and month.


Any suggestions?

Thanks,

Dr Mich Talebzadeh

LinkedIn 
/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/


http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.






Re: [pyspark2.4+] A lot of tasks failed, but job eventually completes

2020-01-06 Thread Enrico Minack
Note that repartitioning helps to increase the number of partitions (and 
hence to reduce the size of partitions and required executor memory), 
but subsequent transformations like join will repartition data again 
with the configured number of partitions 
(|spark.sql.shuffle.partitions|), virtually undoing the repartitioning, 
e.g.:


data    // may have any number of partitions
  .repartition(1000)    // has 1000 partitions
  .join(table)  // has |spark.sql.shuffle.partitions|partitions

If you use RDDs, you need to configure |spark.default.parallelism| 
rather than |spark.sql.shuffle.partitions|.


Given you have 700GB of data, the default of 200 partitions mean that 
each partition is 3,5 GB (equivalent of input data) in size. Since 
increasing executor memory is limited by the available memory, executor 
memory does not scale for big data. Increasing the number of partitions 
is the natural way of scaling in Spark land.


Having hundreds of tasks that fail is an indication that you do not 
suffer from skewed data but from large partitions. Skewed data usually 
has a few tasks that keep failing.


It is easy to check for skewed data in the Spark UI. Open a stage that 
has failing tasks and look at the Summary Metrics, e.g.:
If the Max number of Shuffle Read Size is way higher than the 75th 
percentile, than this indicates a poor distribution of the data (or more 
precise the partitioning key) of this stage.


You can also sort the tasks by the "Shuffle Read Size / Records" column 
and see if numbers are evenly distributed (ideally).


I hope this helped.

Enrico



Am 06.01.20 um 06:27 schrieb hemant singh:
You can try repartitioning the data, if it’s a skewed data then you 
may need to salt the keys for better partitioning.
Are you using a coalesce or any other fn which brings the data to 
lesser nodes. Window function also incurs shuffling that could be an 
issue.


On Mon, 6 Jan 2020 at 9:49 AM, Rishi Shah > wrote:


Thanks Hemant, underlying data volume increased from 550GB to
690GB and now the same job doesn't succeed. I tried incrementing
executor memory to 20G as well, still fails. I am running this in
Databricks and start cluster with 20G assigned to
spark.executor.memory property.

Also some more information on the job, I have about 4 window
functions on this dataset before it gets written out.

Any other ideas?

Thanks,
-Shraddha

On Sun, Jan 5, 2020 at 11:06 PM hemant singh mailto:hemant2...@gmail.com>> wrote:

You can try increasing the executor memory, generally this
error comes when there is not enough memory in individual
executors.
Job is getting completed may be because when tasks are
re-scheduled it would be going through.

Thanks.

On Mon, 6 Jan 2020 at 5:47 AM, Rishi Shah
mailto:rishishah.s...@gmail.com>>
wrote:

Hello All,

One of my jobs, keep getting into this situation where
100s of tasks keep failing with below error but job
eventually completes.

org.apache.spark.memory.SparkOutOfMemoryError: Unable to
acquire 16384 bytes of memory

Could someone advice?

-- 
Regards,


Rishi Shah



-- 
Regards,


Rishi Shah





Re: Identify bottleneck

2019-12-19 Thread Enrico Minack
The issue is explained in depth here: 
https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015


Am 19.12.19 um 23:33 schrieb Chris Teoh:
As far as I'm aware it isn't any better. The logic all gets processed 
by the same engine so to confirm, compare the DAGs generated from both 
approaches and see if they're identical.


On Fri, 20 Dec 2019, 8:56 am ayan guha, <mailto:guha.a...@gmail.com>> wrote:


Quick question: Why is it better to use one sql vs multiple
withColumn? isnt everything eventually rewritten by catalyst?

On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:

How many withColumn statements do you have? Note that it is
better to use a single select, rather than lots of withColumn.
This also makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on
32 cores is really slow. Can you try this on a single machine,
i.e. run wit "local[*]".

Can you rule out the writing part by counting the rows? I
presume this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:

Hello

I'm working on an ETL based on csv describing file systems to
transform it into parquet so I can work on them easily to
extract informations.
I'm using Mr. Powers framework Daria to do so. I've quiet
different input and a lot of transformation and the framework
helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8
cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much
lines the CSV is and I wanted to identify if bz2 or network
could be an issue
so I run the following test (several time with consistent
result) :
I tried the following scenario with 20 cores and 2 core per task:

  * Read the csv.bz2 from CephFS with connection with 1Gb/s
for each node: ~5 minutes.
  * Read the csv.bz2 from TMPFS(setup to look like a shared
storage space): ~5 minutes.
  * From the 2 previous tests I concluded that uncompressing
the file was part of the bottleneck so I decided to
uncompress the file and store it in TMPFS as well,
result: ~5.9 minutes.

The test file has 25'833'369 lines and is 370MB compressed
and 3700MB uncompressed. Those results have been reproduced
several time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the
fastest. Is it possible that my program is suboptimal reading
the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds
GC time max, and timeline shows mainly CPU time (no
shuffling, no randomization overload either).
I also noticed that memory storage is never used during the
execution. I know from several hours of research that bz2 is
the only real compression algorithm usable as an input in
spark for parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine



-- 
Best Regards,

Ayan Guha





Re: Identify bottleneck

2019-12-18 Thread Enrico Minack
Good points, but single-line CSV files are splitable (not multi-line CSV 
though), especially in the mentioned size. And bz2 is also splitable, 
though reading speed is much slower than uncompressed csv.


If your csv.bz2 files are not splittable then repartitioning does not 
improve the situation much because reading happens through one worker 
first before repartitioning happens.


Besides checking the Spark UI SQL tab you can check that your stage has 
multiple tasks, ideally 200, at least 32 to fully employ your cluster.



Am 18.12.19 um 13:33 schrieb Chris Teoh:
Please look at the spark UI and confirm you are indeed getting more 
than 1 partition in your dataframe. Text files are usually not 
splittable so you may just be doing all the work in a single partition.


If that is the case, It may be worthwhile considering calling the 
repartition method to distribute your data across multiple partitions 
so you get more parallelism.


On Wed, 18 Dec 2019, 9:35 pm Antoine DUBOIS, 
mailto:antoine.dub...@cc.in2p3.fr>> wrote:


There's 15 withColumn Statement and one drop at the end to remove
old column.
I which I could write it as a single sql statement, but it's not
reasonable for maintaining purpose.
I will try on a local instance and let you know.

Thanks  for the help.



*De: *"Enrico Minack" mailto:m...@enrico.minack.dev>>
*À: *user@spark.apache.org <mailto:user@spark.apache.org>,
"Antoine DUBOIS" mailto:antoine.dub...@cc.in2p3.fr>>
*Envoyé: *Mercredi 18 Décembre 2019 11:13:38
*Objet: *Re: Identify bottleneck

How many withColumn statements do you have? Note that it is better
to use a single select, rather than lots of withColumn. This also
makes drops redundant.

Reading 25m CSV lines and writing to Parquet in 5 minutes on 32
cores is really slow. Can you try this on a single machine, i.e.
run wit "local[*]".

Can you rule out the writing part by counting the rows? I presume
this all happens in a single stage.

Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:

Hello

I'm working on an ETL based on csv describing file systems to
transform it into parquet so I can work on them easily to extract
informations.
I'm using Mr. Powers framework Daria to do so. I've quiet
different input and a lot of transformation and the framework
helps organize the code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8
cores and 32GB of memory each.
The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines
the CSV is and I wanted to identify if bz2 or network could be an
issue
so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:

  * Read the csv.bz2 from CephFS with connection with 1Gb/s for
each node: ~5 minutes.
  * Read the csv.bz2 from TMPFS(setup to look like a shared
storage space): ~5 minutes.
  * From the 2 previous tests I concluded that uncompressing the
file was part of the bottleneck so I decided to uncompress the
file and store it in TMPFS as well, result: ~5.9 minutes.

The test file has 25'833'369 lines and is 370MB compressed and
3700MB uncompressed. Those results have been reproduced several
time each.
My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest.
Is it possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC
time max, and timeline shows mainly CPU time (no shuffling, no
randomization overload either).
I also noticed that memory storage is never used during the
execution. I know from several hours of research that bz2 is the
only real compression algorithm usable as an input in spark for
parallelization reasons.

Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine







Re: Identify bottleneck

2019-12-18 Thread Enrico Minack
How many withColumn statements do you have? Note that it is better to 
use a single select, rather than lots of withColumn. This also makes 
drops redundant.


Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is 
really slow. Can you try this on a single machine, i.e. run wit "local[*]".


Can you rule out the writing part by counting the rows? I presume this 
all happens in a single stage.


Enrico


Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:

Hello

I'm working on an ETL based on csv describing file systems to 
transform it into parquet so I can work on them easily to extract 
informations.
I'm using Mr. Powers framework Daria to do so. I've quiet different 
input and a lot of transformation and the framework helps organize the 
code.
I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores 
and 32GB of memory each.

The storage is handle by a CephFS volume mounted on all nodes.
First a small description of my algorithm (it's quiet simple):

Use SparkContext to load the csv.bz2 file,
Chain a lot of withColumn() statement,
Drop all unnecessary columns,
Write parquet file to CephFS

This treatment can take several hours depending on how much lines the 
CSV is and I wanted to identify if bz2 or network could be an issue

so I run the following test (several time with consistent result) :
I tried the following scenario with 20 cores and 2 core per task:

  * Read the csv.bz2 from CephFS with connection with 1Gb/s for each
node: ~5 minutes.
  * Read the csv.bz2 from TMPFS(setup to look like a shared storage
space): ~5 minutes.
  * From the 2 previous tests I concluded that uncompressing the file
was part of the bottleneck so I decided to uncompress the file and
store it in TMPFS as well, result: ~5.9 minutes.

The test file has 25'833'369 lines and is 370MB compressed and 3700MB 
uncompressed. Those results have been reproduced several time each.

My question here is by what am I bottleneck in this case ?

I though that the uncompressed file in RAM would be the fastest. Is it 
possible that my program is suboptimal reading the CSV ?
In the execution logs on the cluster I have 5 to 10 seconds GC time 
max, and timeline shows mainly CPU time (no shuffling, no 
randomization overload either).
I also noticed that memory storage is never used during the execution. 
I know from several hours of research that bz2 is the only real 
compression algorithm usable as an input in spark for parallelization 
reasons.


Do you have any idea of why such a behaviour ?
and do you have any idea on how to improve such treatment ?

Cheers

Antoine





Re: Issue With mod function in Spark SQL

2019-12-17 Thread Enrico Minack

I think some example code would help to understand what you are doing.

Am 18.12.19 um 08:12 schrieb Tzahi File:

no.. there're 100M records both even and odd

On Tue, Dec 17, 2019 at 8:13 PM Russell Spitzer 
mailto:russell.spit...@gmail.com>> wrote:


Is there a chance your data is all even or all odd?

On Tue, Dec 17, 2019 at 11:01 AM Tzahi File
mailto:tzahi.f...@ironsrc.com>> wrote:

I have in my spark sql query a calculated field that gets the
value if field1 % 3.

I'm using this field as a partition so I expected to get 3
partitions in the mentioned case, and I do get. The issue
happened with even numbers (instead of 3 - 4,2 ... ).
When I tried to use even numbers, for example 4 I got only 2
partitions - 1 and 3.
Field 1 datatype is bigint.

Do you have any suggestions?


-- 
thanks,

Tzahi



--
Tzahi File
Data Engineer
ironSource 
email tzahi.f...@ironsrc.com 
mobile +972-546864835 
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com 
linkedin twitter 
facebook 
googleplus 

This email (including any attachments) is for the sole use of the 
intended recipient and may contain confidential information which may 
be protected by legal privilege. If you are not the intended 
recipient, or the employee or agent responsible for delivering it to 
the intended recipient, you are hereby notified that any use, 
dissemination, distribution or copying of this communication and/or 
its content is strictly prohibited. If you are not the intended 
recipient, please immediately notify us by reply email or by 
telephone, delete this email and destroy any copies. Thank you.