[Structured streaming, V2] commit on ContinuousReader

2018-05-03 Thread Jiří Syrový
Version: 2.3, DataSourceV2, ContinuousReader

Hi,

We're creating a new data source to fetch data from streaming source that
requires commiting received data and we would like to commit data once in a
while after it has been retrieved and correctly processed and then fetch
more.

One option could be to rely on spark committing already read data
using *commit(end:
Offset)* that is present in *ContinuousReader (v2.reader.streaming)*, but
it seems that this method is never called.

The question is if this method *commit(end: Offset) is ever* used and when?
I went through part of Spark code base, but haven't really found any place
where it could be called.

Thanks,
Jiri


Re: java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-28 Thread Jiří Syrový
Quick comment:

Excel CSV (very special case though) supports arrays in CSV using "\n"
inside quotes, but you have to use as EOL for the row "\r\n" (Windows EOL).

Cheers,
Jiri

2018-03-28 14:14 GMT+02:00 Yong Zhang :

> Your dataframe has array data type, which is NOT supported by CSV. How csv
> file can include array or other nest structure?
>
>
> If you want your data to be human readable text, write out as json in your
> case then.
>
>
> Yong
>
>
> --
> *From:* Mina Aslani 
> *Sent:* Wednesday, March 28, 2018 12:22 AM
> *To:* naresh Goud
> *Cc:* user @spark
> *Subject:* Re: java.lang.UnsupportedOperationException: CSV data source
> does not support struct/ERROR RetryingBlockFetcher
>
> Hi Naresh,
>
> Thank you for the quick response, appreciate it.
> Removing the option("header","true") and trying
>
> df = spark.read.parquet("test.parquet"), now can read the parquet works.
> However, I would like to find a way to have the data in csv/readable.
> still I cannot save df as csv as it throws.
> ava.lang.UnsupportedOperationException: CSV data source does not support
> struct
> data type.
>
> Any idea?
>
>
> Best regards,
>
> Mina
>
>
> On Tue, Mar 27, 2018 at 10:51 PM, naresh Goud 
> wrote:
>
> In case of storing as parquet file I don’t think it requires header.
> option("header","true")
>
> Give a try by removing header option and then try to read it.  I haven’t
> tried. Just a thought.
>
> Thank you,
> Naresh
>
>
> On Tue, Mar 27, 2018 at 9:47 PM Mina Aslani  wrote:
>
> Hi,
>
>
> I am using pyspark. To transform my sample data and create model, I use
> stringIndexer and OneHotEncoder.
>
>
> However, when I try to write data as csv using below command
>
> df.coalesce(1).write.option("header","true").mode("overwrite
> ").csv("output.csv")
>
>
> I get UnsupportedOperationException
>
> java.lang.UnsupportedOperationException: CSV data source does not support
> struct
> data type.
>
> Therefore, to save data and avoid getting the error I use
>
>
> df.coalesce(1).write.option("header","true").mode("overwrite
> ").save("output")
>
>
> The above command saves data but it's in parquet format.
> How can I read parquet file and convert to csv to observe the data?
>
> When I use
>
> df = spark.read.parquet("1.parquet"), it throws:
>
> ERROR RetryingBlockFetcher: Exception while beginning fetch of 1
> outstanding blocks
>
> Your input is appreciated.
>
>
> Best regards,
>
> Mina
>
>
>
> --
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>
>


org.apache.spark.ui.jobs.UIData$TaskMetricsUIData

2017-03-17 Thread Jiří Syrový
Hi,

is there a good way how to get rid of UIData completely? I have switched
off UI, decreased retainedXXX to minimum, but still there seems to be a lot
of instances of this class ($SUBJ) held in memory. Any ideas?

Thanks,
J. S.

spark {
  master = "local[2]"
  master = ${?SPARK_MASTER}
  info = ""
  info = ${?SPARK_INFO_URI}
  jobs = ""
  jobs = ${?SPARK_JOBS}
  jars.packages = "org.elasticsearch:elasticsearch-spark-20_2.11:5.0.1"
  submit.deployMode = "cluster"
  sql.crossJoin.enabled = true
  executor.memory = "4g"
  executor.memory = ${?SPARK_EXECUTOR_MEMORY}
  executor.cores = 2
  shuffle.service.enabled = true
  dynamicAllocation.enabled = true
  ui.enabled = false
  ui.retainedJobs = 100
  ui.retainedStages = 100
  ui.retainedTasks = 3000
  sql.retainedExecutions = 100
}


 num #instances #bytes  class name
--
   1:  4011 1563354608  [J
   2:133214  185564000  [B
   3:449373  140445216  [C
   4:   5481059  131545416  scala.Tuple2
   5:   5429700  130312800  java.lang.Long
   6:238037   36071536  [Ljava.lang.Object;
   7:148048   16581376
org.apache.spark.ui.jobs.UIData$TaskMetricsUIData
   8:545859   13100616  scala.collection.immutable.$colon$colon
   9:148048   11843840
org.apache.spark.ui.jobs.UIData$ShuffleReadMetricsUIData


Re: Dependency Injection and Microservice development with Spark

2017-01-04 Thread Jiří Syrový
Hi,

another nice approach is to use instead of it Reader monad and some
framework to support this approach (e.g. Grafter -
https://github.com/zalando/grafter). It's lightweight and helps a bit with
dependencies issues.

2016-12-28 22:55 GMT+01:00 Lars Albertsson :

> Do you really need dependency injection?
>
> DI is often used for testing purposes. Data processing jobs are easy
> to test without DI, however, due to their functional and synchronous
> nature. Hence, DI is often unnecessary for testing data processing
> jobs, whether they are batch or streaming jobs.
>
> Or do you want to use DI for other reasons?
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> https://twitter.com/lalleal
> +46 70 7687109
> Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com
>
>
> On Fri, Dec 23, 2016 at 11:56 AM, Chetan Khatri
>  wrote:
> > Hello Community,
> >
> > Current approach I am using for Spark Job Development with Scala + SBT
> and
> > Uber Jar with yml properties file to pass configuration parameters. But
> If i
> > would like to use Dependency Injection and MicroService Development like
> > Spring Boot feature in Scala then what would be the standard approach.
> >
> > Thanks
> >
> > Chetan
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Can't zip RDDs with unequal numbers of partitions

2016-03-19 Thread Jiří Syrový
Unfortunately I can't share any snippet quickly as the code is generated,
but for now at least can share the plan. (See it here -
http://pastebin.dqd.cz/RAhm/)

After I've increased spark.sql.autoBroadcastJoinThreshold to 30 from
10 it went through without any problems. With 10 it was always
failing during the "planning" phase with the Exception above.

2016-03-17 22:05 GMT+01:00 Jakob Odersky <ja...@odersky.com>:

> Can you share a snippet that reproduces the error? What was
> spark.sql.autoBroadcastJoinThreshold before your last change?
>
> On Thu, Mar 17, 2016 at 10:03 AM, Jiří Syrový <syrovy.j...@gmail.com>
> wrote:
> > Hi,
> >
> > any idea what could be causing this issue? It started appearing after
> > changing parameter
> >
> > spark.sql.autoBroadcastJoinThreshold to 10
> >
> >
> > Caused by: java.lang.IllegalArgumentException: Can't zip RDDs with
> unequal
> > numbers of partitions
> > at
> >
> org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> > at
> > org.apache.spark.rdd.PartitionCoalescer.(CoalescedRDD.scala:172)
> > at
> > org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:85)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> > at
> >
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> > at
> >
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> > at
> >
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> > at org.apache.spark.ShuffleDependency.(Dependency.scala:91)
> > at
> >
> org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:220)
> > at
> >
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
> > at
> >
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
> > at
> >
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> > ... 28 more
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Can't zip RDDs with unequal numbers of partitions

2016-03-18 Thread Jiří Syrový
Hi,

any idea what could be causing this issue? It started appearing after
changing parameter



*spark.sql.autoBroadcastJoinThreshold to 10*

Caused by: java.lang.IllegalArgumentException: Can't zip RDDs with unequal
numbers of partitions
at
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.PartitionCoalescer.(CoalescedRDD.scala:172)
at
org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:85)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.ShuffleDependency.(Dependency.scala:91)
at
org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:220)
at
org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
at
org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 28 more


Re: jobs much slower in cluster mode vs local

2016-01-15 Thread Jiří Syrový
Hi,

you can try to use spark job server and submit jobs to it. The thing is
that the most expensive part is context creation.

J.

2016-01-15 15:28 GMT+01:00 :

> Hello,
>
> In general, I am usually able to run spark submit jobs in local mode, in a
> 32-cores node with plenty of memory ram. The performance is significantly
> faster in local mode than when using a cluster of spark workers.
>
> How can this be explained and what measures can one take in order to
> improve such performance?
> Usually a job that takes 35 seconds in local mode takes around 48 seconds
> in a small cluster.
>
> Thanks,
> Saif
>
>


Re: FileNotFoundException in appcache shuffle files

2015-12-10 Thread Jiří Syrový
Usually there is another error or log message before FileNotFoundException.
Try to check your logs for something like that.

2015-12-10 10:47 GMT+01:00 kendal :

> I have similar issues... Exception only with very large data.
> And I tried to double the memory or partition as suggested by some google
> search, but in vain..
> any idea?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/FileNotFoundException-in-appcache-shuffle-files-tp17605p25663.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark SQL - saving to multiple partitions in parallel - FileNotFoundException on _temporary directory possible bug?

2015-12-08 Thread Jiří Syrový
Hi,

I have a very similar issue on standalone SQL context, but when using
save() instead. I guess it might be related to
https://issues.apache.org/jira/browse/SPARK-8513. Also it usually happens
after using groupBy.

Regrads,
Jiri

2015-12-08 0:16 GMT+01:00 Deenar Toraskar :

> Hi
>
> I have a process that writes to multiple partitions of the same table in
> parallel using multiple threads sharing the same SQL context
> df.write.partitionedBy("partCol").insertInto("tableName") . I am
> getting FileNotFoundException on _temporary directory. Each write only goes
> to a single partition, is there some way of not using dynamic partitioning
> using df.write without having to resort to .save as I dont want to hardcode
> a physical DFS location in my code?
>
> This is similar to this issue listed here
> https://issues.apache.org/jira/browse/SPARK-2984
>
> Regards
> Deenar
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
>
>
>
>


Fwd: UnresolvedException - lag, window

2015-11-05 Thread Jiří Syrový
Hi,

I'm getting the following exception with Spark 1.5.2-rc2 (haven't tried
1.6.0 yet though) when using window function lag:

[2015-11-05 10:58:50,806] ERROR xo.builder.jobs.CompareJob []
[akka://JobServer/user/context-supervisor/MYCONTEXT] - Comparison has failed
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to
dataType on unresolved object, tree: 'lag(RANDOM_FIELD7307,0,null)
at
org.apache.spark.sql.catalyst.expressions.UnresolvedWindowFunction.dataType(windowExpressions.scala:277)
at
org.apache.spark.sql.catalyst.expressions.BinaryOperator.checkInputDataTypes(Expression.scala:419)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:58)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:53)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:292)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:290)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:290)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328

I've tried to search for similar bugs in JIRA, but have found only
something slightly related to it under totally different conditions with an
idea that it might be influenced by speculation flag turned on.

This error happens in the following piece of code:











*LOG.info("Calculating diff for "+
Joiner.on(',').join(cs.getKeyColumns()) + " and values "
+Joiner.on(',').join(cs.getValueColumns()));// get combined grouped
dataleftDF = leftDF.withColumn("level", functions.lit(1)); //
firstrightDF = rightDF.withColumn("level", functions.lit(2)); //
secondDataFrame both = leftDF.unionAll(rightDF);// get
statusboth.withColumn(DIFF_COLUMN_NAME,
functions.first(status(valueColDiff(both, cs.getValueColumns()),
both.col("level")))
.over(Window.partitionBy(DFUtils.toColumns(both,
cs.getKeyColumns())).orderBy("level")));*
with following methods used:
















*private static Column valueColDiff(DataFrame df, Set
valueCols) {return
valueCols.stream().map(df::col).map(CompareJob::colDiff).reduce((a, b) ->
a.and(b)).get();}private static Column colDiff(Column col) {
return functions.lag(col, 0).equalTo(functions.lag(col, 1));}
private static Column status(Column diff, Column level) {Column
leftLevel = functions.lag(level, 0);Column rightLevel =
functions.lag(level, 1);return functions.when(leftLevel.isNull(),
EntityChangeStatus.NEW.toString())
.when(rightLevel.isNull(),
EntityChangeStatus.REMOVED.toString()).when(diff,
EntityChangeStatus.CHANGED.toString())
.otherwise(EntityChangeStatus.UNCHANGED.toString());}*

Any hints?

Thanks,
Jiri