Dose --py-files place the files on the PYTHONPATH of executor?
I use spark-submint --help, and find: --jars: driver,executor classpath --files --archives: working directory of each executor. But for --py-files: --py-files: place on the PYTHONPATH for Python apps. It hasn't describe whether it is placed on executor(it is sure whill be placed on driver). So dose --py-files place the files on the PYTHONPATH of executor? dose --files and --archives can't be read in driver? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Structured streaming from Kafka by timestamp
Hey Tomas, >From your description, you just ran a batch query rather than a Structured Streaming query. The Kafka data source doesn't support filter push down right now. But that's definitely doable. One workaround here is setting proper "startingOffsets" and "endingOffsets" options when loading from Kafka. Best Regards, Ryan On Thu, Jan 24, 2019 at 10:15 AM Gabor Somogyi wrote: > Hi Tomas, > > As a general note don't fully understand your use-case. You've mentioned > structured streaming but your query is more like a one-time SQL statement. > Kafka doesn't support predicates how it's integrated with spark. What can > be done from spark perspective is to look for an offset for a specific > lowest timestamp and start the reading from there. > > BR, > G > > > On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos > wrote: > >> Hello, >> >> I'm trying to read Kafka via spark structured streaming. I'm trying to >> read data within specific time range: >> >> select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' >> as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP); >> >> >> The problem is that timestamp query is not pushed-down to Kafka, so Spark >> tries to read the whole topic from beginning. >> >> >> explain query: >> >> >> >> +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 > >> 15351480)) && (timestamp#57 < 15352344)) >> >> >> Scan >> KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production], >> start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit) >> [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58] >> *PushedFilters: []*, ReadSchema: >> struct> >> >> Obviously the query takes forever to complete. Is there a solution to >> this ? >> >> I'm using kafka and kafka-client version 1.1.1 >> >> >> BR, >> >> Tomas >> >
Re: How to force-quit a Spark application?
Hi, On Tue, Jan 22, 2019 at 11:30 AM Pola Yao wrote: > "Thread-1" #19 prio=5 os_prio=0 tid=0x7f9b6828e800 nid=0x77cb waiting on > condition [0x7f9a123e3000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005408a5420> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) > at > org.apache.spark.scheduler.AsyncEventQueue.stop(AsyncEventQueue.scala:131) This looks a little weird. Are you sure this thread is not making any progress (i.e. did you take multiple stack snapshots)? I wouldn't expect that call to block. At first I was suspicious of SPARK-24309 but that looks different from what you're seeing. -- Marcelo - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Structured streaming from Kafka by timestamp
Hi Tomas, As a general note don't fully understand your use-case. You've mentioned structured streaming but your query is more like a one-time SQL statement. Kafka doesn't support predicates how it's integrated with spark. What can be done from spark perspective is to look for an offset for a specific lowest timestamp and start the reading from there. BR, G On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos wrote: > Hello, > > I'm trying to read Kafka via spark structured streaming. I'm trying to > read data within specific time range: > > select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' > as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP); > > > The problem is that timestamp query is not pushed-down to Kafka, so Spark > tries to read the whole topic from beginning. > > > explain query: > > > > +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 > > 15351480)) && (timestamp#57 < 15352344)) > > > Scan KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production], > start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit) > [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58] > *PushedFilters: []*, ReadSchema: > struct > > Obviously the query takes forever to complete. Is there a solution to this > ? > > I'm using kafka and kafka-client version 1.1.1 > > > BR, > > Tomas >
Structured streaming from Kafka by timestamp
Hello, I'm trying to read Kafka via spark structured streaming. I'm trying to read data within specific time range: select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP); The problem is that timestamp query is not pushed-down to Kafka, so Spark tries to read the whole topic from beginning. explain query: +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 > 15351480)) && (timestamp#57 < 15352344)) Scan KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production], start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit) [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58] *PushedFilters: []*, ReadSchema: struct
Re: Reading compacted Kafka topic is slow
Hi Tomas, Presume the 60 sec window means trigger interval. Maybe a quick win could be to try structured streaming because there the trigger interval is optional. If it is not specified, the system will check for availability of new data as soon as the previous processing has completed. BR, G On Thu, Jan 24, 2019 at 12:55 PM Tomas Bartalos wrote: > Hello Spark folks, > > I'm reading compacted Kafka topic with spark 2.4, using direct stream - > KafkaUtils.createDirectStream(...). I have configured necessary options for > compacted stream, so its processed with CompactedKafkaRDDIterator. > It works well, however in case of many gaps in the topic, the processing > is very slow and 90% of time the executors are idle. > > I had a look to the source are here are my findings: > Spark first computes number of records to stream from Kafka (processing > rate * batch window size). # of records are translated to Kafka's > (offset_from, offset_to) and eventually the Iterator reads records within > the offset boundaries. > This works fine until there are many gaps in the topic, which reduces the > real number of processed records. > Let's say we wanted to read 100k records in 60 sec window. With gaps it > gets to 10k (because 90k are just compacted gaps) in 60 sec. > As a result executor is working only 6 sec and 54 sec doing nothing. > I'd like to utilize the executor as much as possible. > > A great feature would be to read 100k real records (skip the gaps) no > matter what are the offsets. > > I've tried to make some improvement with backpressure and my custom > RateEstimator (decorating PidRateEstimator and boosting the rate per > second). And was even able to fully utilize the executors, but my approach > have a big problem when compacted part of the topic meets non compacted > part. The executor just tries to read a too big chunk of Kafka and the > whole processing dies. > > BR, > Tomas >
Reading compacted Kafka topic is slow
Hello Spark folks, I'm reading compacted Kafka topic with spark 2.4, using direct stream - KafkaUtils.createDirectStream(...). I have configured necessary options for compacted stream, so its processed with CompactedKafkaRDDIterator. It works well, however in case of many gaps in the topic, the processing is very slow and 90% of time the executors are idle. I had a look to the source are here are my findings: Spark first computes number of records to stream from Kafka (processing rate * batch window size). # of records are translated to Kafka's (offset_from, offset_to) and eventually the Iterator reads records within the offset boundaries. This works fine until there are many gaps in the topic, which reduces the real number of processed records. Let's say we wanted to read 100k records in 60 sec window. With gaps it gets to 10k (because 90k are just compacted gaps) in 60 sec. As a result executor is working only 6 sec and 54 sec doing nothing. I'd like to utilize the executor as much as possible. A great feature would be to read 100k real records (skip the gaps) no matter what are the offsets. I've tried to make some improvement with backpressure and my custom RateEstimator (decorating PidRateEstimator and boosting the rate per second). And was even able to fully utilize the executors, but my approach have a big problem when compacted part of the topic meets non compacted part. The executor just tries to read a too big chunk of Kafka and the whole processing dies. BR, Tomas
Fwd: unsubscribe
unsubscribe
unsubscribe
unsubscribe
答复: Re: How to get all input tables of a SPARK SQL 'select' statement
Thanks all for your help. I'll try your suggestions. Thanks again :) 发件人: "Shahab Yunus" 收件人: "Ramandeep Singh Nanda" 抄送: "Tomas Bartalos" , l...@china-inv.cn, "user @spark/'user @spark'/spark users/user@spark" 日期: 2019/01/24 06:45 主题: Re: How to get all input tables of a SPARK SQL 'select' statement Could be a tangential idea but might help: Why not use queryExecution and logicalPlan objects that are available when you execute a query using SparkSession and get a DataFrame back? The Json representation contains almost all the info that you need and you don't need to go to Hive to get this info. Some details here: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Dataset.html#queryExecution On Wed, Jan 23, 2019 at 5:35 PM Ramandeep Singh Nanda < ramannan...@gmail.com> wrote: Explain extended or explain would list the plan along with the tables. Not aware of any statements that explicitly list dependencies or tables directly. Regards, Ramandeep Singh On Wed, Jan 23, 2019, 11:05 Tomas Bartalos napísal(a): Hi, All, We need to get all input tables of several SPARK SQL 'select' statements. We can get those information of Hive SQL statements by using 'explain dependency select'. But I can't find the equivalent command for SPARK SQL. Does anyone know how to get this information of a SPARK SQL 'select' statement? Thanks Boying 本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外 披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件 人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。 This email message may contain confidential and/or privileged information. If you are not the intended recipient, please do not read, save, forward, disclose or copy the contents of this email or open any file attached to this email. We will be grateful if you could advise the sender immediately by replying this email, and delete this email and any attachment or links to this email completely and immediately from your computer system. 本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外 披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件 人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。 This email message may contain confidential and/or privileged information. If you are not the intended recipient, please do not read, save, forward, disclose or copy the contents of this email or open any file attached to this email. We will be grateful if you could advise the sender immediately by replying this email, and delete this email and any attachment or links to this email completely and immediately from your computer system.