[
https://issues.apache.org/jira/browse/GRIFFIN-359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yang xuejun updated GRIFFIN-359:
--------------------------------
Description:
I try to use spark-sql rule to check data in streaming, but after restarting
the job, spark starts to report an exception:
{code:java}
//代码占位符
21/03/30 02:01:16 ERROR Application$: process run error:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not been
initialized org.apache.spark.SparkException:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not been
initialized at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:90)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289) at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply$mcZ$sp(StreamingDQApp.scala:116)
at
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
at
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
at scala.util.Try$.apply(Try.scala:192) at
org.apache.griffin.measure.launch.streaming.StreamingDQApp.run(StreamingDQApp.scala:76)
at org.apache.griffin.measure.Application$.main(Application.scala:92) at
org.apache.griffin.measure.Application.main(Application.scala) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684){code}
The reason why spark issue mentioned this problem is that streaming does not
support sqlconf recovery issue:
https://issues.apache.org/jira/browse/SPARK-6770
Official solutions
:[http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations]
My DQConfig:
{code:java}
//代码占位符
{ "name": "comments_lt_0", "process.type": "STREAMING", "data.sources": [ {
"name": "source", "connector": { "type": "KAFKA", "version": "1.0", "config": {
"kafka.config":{ "bootstrap.servers": "10.18.255.117:9092", "group.id":
"group1", "auto.offset.reset": "smallest", "auto.commit.enable": "false" },
"topics": "wdm_apiData", "key.type": "java.lang.String", "value.type":
"java.lang.String" }, "pre.proc": [ { "dsl.type": "df-ops",
"in.dataframe.name": "this", "out.dataframe.name": "s1", "rule": "from_json" },
{ "dsl.type": "spark-sql", "out.dataframe.name": "this", "rule": "select
pubcode, doc_url, from s1" } ] }, "cache": { "file.path":
"hdfs:///griffin/streaming/dump/source", "info.path": "source",
"ready.time.interval": "10s", "ready.time.delay": "0", "time.range": [ "0", "0"
] } } ], "evaluate.rule": { "rules": [ { "dsl.type": "spark-sql", "rule":
"select count(1) as comments_cnt from source where comments < 0", "out": [{
"type": "metric", "name": "prof" }] }, { "dsl.type": "spark-sql", "rule":
"select pubcode, duc_url from source where dcomments < 0", "out": [{ "type":
"record", "name": "comments_lt_0", "flatten": "array" }] } ] }, "sinks": [
"consoleSink", "elasticSink", "hdfsSink" ] }{code}
was:
I try to use spark-sql rule to check data in streaming, but after restarting
the job, spark starts to report an exception:
{code:java}
//代码占位符
{code}
21/03/30 02:01:16 ERROR Application$: process run error:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not been
initialized org.apache.spark.SparkException:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not been
initialized at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:90)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289) at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply$mcZ$sp(StreamingDQApp.scala:116)
at
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
at
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
at scala.util.Try$.apply(Try.scala:192) at
org.apache.griffin.measure.launch.streaming.StreamingDQApp.run(StreamingDQApp.scala:76)
at org.apache.griffin.measure.Application$.main(Application.scala:92) at
org.apache.griffin.measure.Application.main(Application.scala) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
The reason why spark issue mentioned this problem is that streaming does not
support sqlconf recovery issue:
https://issues.apache.org/jira/browse/SPARK-6770
Official solutions
:[http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations]
My DQConfig:
{code:java}
//代码占位符
{code}
{ "name": "comments_lt_0", "process.type": "STREAMING", "data.sources": [ \{
"name": "source", "connector": { "type": "KAFKA", "version": "1.0", "config": {
"kafka.config": { "bootstrap.servers": "10.18.255.117:9092", "group.id":
"group1", "auto.offset.reset": "smallest", "auto.commit.enable": "false" },
"topics": "wdm_apiData", "key.type": "java.lang.String", "value.type":
"java.lang.String" }, "pre.proc": [ \{ "dsl.type": "df-ops",
"in.dataframe.name": "this", "out.dataframe.name": "s1", "rule": "from_json" },
\{ "dsl.type": "spark-sql", "out.dataframe.name": "this", "rule": "select
pubcode, doc_url, from s1" } ] }, "cache": \{ "file.path":
"hdfs:///griffin/streaming/dump/source", "info.path": "source",
"ready.time.interval": "10s", "ready.time.delay": "0", "time.range": [ "0", "0"
] } } ], "evaluate.rule": \{ "rules": [ { "dsl.type": "spark-sql", "rule":
"select count(1) as comments_cnt from source where comments < 0", "out": [ {
"type": "metric", "name": "prof" } ] }, \{ "dsl.type": "spark-sql", "rule":
"select pubcode, duc_url from source where dcomments < 0", "out": [ { "type":
"record", "name": "comments_lt_0", "flatten": "array" } ] } ] }, "sinks": [
"consoleSink", "elasticSink", "hdfsSink" ] }
> DirectKafkaInputDStream has not been initialized when recovery from
> checkpoint when streaming connector use spark-sql rule
> ----------------------------------------------------------------------------------------------------------------------------
>
> Key: GRIFFIN-359
> URL: https://issues.apache.org/jira/browse/GRIFFIN-359
> Project: Griffin
> Issue Type: Bug
> Components: Measure Module
> Affects Versions: 0.5.0
> Reporter: yang xuejun
> Priority: Minor
>
> I try to use spark-sql rule to check data in streaming, but after restarting
> the job, spark starts to report an exception:
> {code:java}
> //代码占位符
> 21/03/30 02:01:16 ERROR Application$: process run error:
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not
> been initialized org.apache.spark.SparkException:
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not
> been initialized at
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at
> org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:90)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
> at scala.Option.orElse(Option.scala:289) at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
> at
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
> at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
> at
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
> at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
> at
> org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply$mcZ$sp(StreamingDQApp.scala:116)
> at
> org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
> at
> org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
> at scala.util.Try$.apply(Try.scala:192) at
> org.apache.griffin.measure.launch.streaming.StreamingDQApp.run(StreamingDQApp.scala:76)
> at org.apache.griffin.measure.Application$.main(Application.scala:92) at
> org.apache.griffin.measure.Application.main(Application.scala) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684){code}
>
> The reason why spark issue mentioned this problem is that streaming does not
> support sqlconf recovery issue:
> https://issues.apache.org/jira/browse/SPARK-6770
> Official solutions
> :[http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations]
> My DQConfig:
> {code:java}
> //代码占位符
> { "name": "comments_lt_0", "process.type": "STREAMING", "data.sources": [ {
> "name": "source", "connector": { "type": "KAFKA", "version": "1.0", "config":
> { "kafka.config":{ "bootstrap.servers": "10.18.255.117:9092", "group.id":
> "group1", "auto.offset.reset": "smallest", "auto.commit.enable": "false" },
> "topics": "wdm_apiData", "key.type": "java.lang.String", "value.type":
> "java.lang.String" }, "pre.proc": [ { "dsl.type": "df-ops",
> "in.dataframe.name": "this", "out.dataframe.name": "s1", "rule": "from_json"
> }, { "dsl.type": "spark-sql", "out.dataframe.name": "this", "rule": "select
> pubcode, doc_url, from s1" } ] }, "cache": { "file.path":
> "hdfs:///griffin/streaming/dump/source", "info.path": "source",
> "ready.time.interval": "10s", "ready.time.delay": "0", "time.range": [ "0",
> "0" ] } } ], "evaluate.rule": { "rules": [ { "dsl.type": "spark-sql", "rule":
> "select count(1) as comments_cnt from source where comments < 0", "out": [{
> "type": "metric", "name": "prof" }] }, { "dsl.type": "spark-sql", "rule":
> "select pubcode, duc_url from source where dcomments < 0", "out": [{ "type":
> "record", "name": "comments_lt_0", "flatten": "array" }] } ] }, "sinks": [
> "consoleSink", "elasticSink", "hdfsSink" ] }{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)