[ 
https://issues.apache.org/jira/browse/GRIFFIN-359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yang xuejun updated GRIFFIN-359:
--------------------------------
    Description: 
I try to use spark-sql rule to check data in streaming, but after restarting 
the job, spark starts to report an exception:
{code:java}
//代码占位符
21/03/30 02:01:16 ERROR Application$: process run error: 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not been 
initialized org.apache.spark.SparkException: 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not been 
initialized at 
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at 
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:90)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
 at scala.Option.orElse(Option.scala:289) at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98) 
at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103) 
at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
 at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
 at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
 at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () 
at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
 at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
at 
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply$mcZ$sp(StreamingDQApp.scala:116)
 at 
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
 at 
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
 at scala.util.Try$.apply(Try.scala:192) at 
org.apache.griffin.measure.launch.streaming.StreamingDQApp.run(StreamingDQApp.scala:76)
 at org.apache.griffin.measure.Application$.main(Application.scala:92) at 
org.apache.griffin.measure.Application.main(Application.scala) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684){code}
 

The reason why spark issue mentioned this problem is that streaming does not 
support sqlconf recovery   issue:  
https://issues.apache.org/jira/browse/SPARK-6770

Official solutions 
:[http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations]

My DQConfig:
{code:java}
//代码占位符
{ "name": "comments_lt_0", "process.type": "STREAMING", "data.sources": [ { 
"name": "source", "connector": { "type": "KAFKA", "version": "1.0", "config": { 
"kafka.config":{ "bootstrap.servers": "10.18.255.117:9092", "group.id": 
"group1", "auto.offset.reset": "smallest", "auto.commit.enable": "false" }, 
"topics": "wdm_apiData", "key.type": "java.lang.String", "value.type": 
"java.lang.String" }, "pre.proc": [ { "dsl.type": "df-ops", 
"in.dataframe.name": "this", "out.dataframe.name": "s1", "rule": "from_json" }, 
{ "dsl.type": "spark-sql", "out.dataframe.name": "this", "rule": "select 
pubcode, doc_url, from s1" } ] }, "cache": { "file.path": 
"hdfs:///griffin/streaming/dump/source", "info.path": "source", 
"ready.time.interval": "10s", "ready.time.delay": "0", "time.range": [ "0", "0" 
] } } ], "evaluate.rule": { "rules": [ { "dsl.type": "spark-sql", "rule": 
"select count(1) as comments_cnt from source where comments < 0", "out": [{ 
"type": "metric", "name": "prof" }] }, { "dsl.type": "spark-sql", "rule": 
"select pubcode, duc_url from source where dcomments < 0", "out": [{ "type": 
"record", "name": "comments_lt_0", "flatten": "array" }] } ] }, "sinks": [ 
"consoleSink", "elasticSink", "hdfsSink" ] }{code}

  was:
I try to use spark-sql rule to check data in streaming, but after restarting 
the job, spark starts to report an exception:
{code:java}
//代码占位符
{code}
21/03/30 02:01:16 ERROR Application$: process run error: 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not been 
initialized org.apache.spark.SparkException: 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not been 
initialized at 
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at 
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:90)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
 at scala.Option.orElse(Option.scala:289) at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98) 
at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103) 
at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
 at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
 at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
 at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () 
at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
 at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
at 
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply$mcZ$sp(StreamingDQApp.scala:116)
 at 
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
 at 
org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
 at scala.util.Try$.apply(Try.scala:192) at 
org.apache.griffin.measure.launch.streaming.StreamingDQApp.run(StreamingDQApp.scala:76)
 at org.apache.griffin.measure.Application$.main(Application.scala:92) at 
org.apache.griffin.measure.Application.main(Application.scala) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)

 

The reason why spark issue mentioned this problem is that streaming does not 
support sqlconf recovery   issue:  
https://issues.apache.org/jira/browse/SPARK-6770

Official solutions 
:[http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations]

My DQConfig:
{code:java}
//代码占位符
{code}
{ "name": "comments_lt_0", "process.type": "STREAMING", "data.sources": [ \{ 
"name": "source", "connector": { "type": "KAFKA", "version": "1.0", "config": { 
"kafka.config": { "bootstrap.servers": "10.18.255.117:9092", "group.id": 
"group1", "auto.offset.reset": "smallest", "auto.commit.enable": "false" }, 
"topics": "wdm_apiData", "key.type": "java.lang.String", "value.type": 
"java.lang.String" }, "pre.proc": [ \{ "dsl.type": "df-ops", 
"in.dataframe.name": "this", "out.dataframe.name": "s1", "rule": "from_json" }, 
\{ "dsl.type": "spark-sql", "out.dataframe.name": "this", "rule": "select 
pubcode, doc_url, from s1" } ] }, "cache": \{ "file.path": 
"hdfs:///griffin/streaming/dump/source", "info.path": "source", 
"ready.time.interval": "10s", "ready.time.delay": "0", "time.range": [ "0", "0" 
] } } ], "evaluate.rule": \{ "rules": [ { "dsl.type": "spark-sql", "rule": 
"select count(1) as comments_cnt from source where comments < 0", "out": [ { 
"type": "metric", "name": "prof" } ] }, \{ "dsl.type": "spark-sql", "rule": 
"select pubcode, duc_url from source where dcomments < 0", "out": [ { "type": 
"record", "name": "comments_lt_0", "flatten": "array" } ] } ] }, "sinks": [ 
"consoleSink", "elasticSink", "hdfsSink" ] }


> DirectKafkaInputDStream  has not been initialized when recovery from 
> checkpoint when streaming connector use spark-sql rule 
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: GRIFFIN-359
>                 URL: https://issues.apache.org/jira/browse/GRIFFIN-359
>             Project: Griffin
>          Issue Type: Bug
>          Components: Measure Module
>    Affects Versions: 0.5.0
>            Reporter: yang xuejun
>            Priority: Minor
>
> I try to use spark-sql rule to check data in streaming, but after restarting 
> the job, spark starts to report an exception:
> {code:java}
> //代码占位符
> 21/03/30 02:01:16 ERROR Application$: process run error: 
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not 
> been initialized org.apache.spark.SparkException: 
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@4576f160 has not 
> been initialized at 
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at 
> org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:90)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>  at scala.Option.orElse(Option.scala:289) at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>  at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>  at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) 
> at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
>  at 
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
>  at 
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
>  at 
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
>  at 
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
>  at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () 
> at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
>  at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
> at 
> org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply$mcZ$sp(StreamingDQApp.scala:116)
>  at 
> org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
>  at 
> org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$run$1.apply(StreamingDQApp.scala:76)
>  at scala.util.Try$.apply(Try.scala:192) at 
> org.apache.griffin.measure.launch.streaming.StreamingDQApp.run(StreamingDQApp.scala:76)
>  at org.apache.griffin.measure.Application$.main(Application.scala:92) at 
> org.apache.griffin.measure.Application.main(Application.scala) at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684){code}
>  
> The reason why spark issue mentioned this problem is that streaming does not 
> support sqlconf recovery   issue:  
> https://issues.apache.org/jira/browse/SPARK-6770
> Official solutions 
> :[http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations]
> My DQConfig:
> {code:java}
> //代码占位符
> { "name": "comments_lt_0", "process.type": "STREAMING", "data.sources": [ { 
> "name": "source", "connector": { "type": "KAFKA", "version": "1.0", "config": 
> { "kafka.config":{ "bootstrap.servers": "10.18.255.117:9092", "group.id": 
> "group1", "auto.offset.reset": "smallest", "auto.commit.enable": "false" }, 
> "topics": "wdm_apiData", "key.type": "java.lang.String", "value.type": 
> "java.lang.String" }, "pre.proc": [ { "dsl.type": "df-ops", 
> "in.dataframe.name": "this", "out.dataframe.name": "s1", "rule": "from_json" 
> }, { "dsl.type": "spark-sql", "out.dataframe.name": "this", "rule": "select 
> pubcode, doc_url, from s1" } ] }, "cache": { "file.path": 
> "hdfs:///griffin/streaming/dump/source", "info.path": "source", 
> "ready.time.interval": "10s", "ready.time.delay": "0", "time.range": [ "0", 
> "0" ] } } ], "evaluate.rule": { "rules": [ { "dsl.type": "spark-sql", "rule": 
> "select count(1) as comments_cnt from source where comments < 0", "out": [{ 
> "type": "metric", "name": "prof" }] }, { "dsl.type": "spark-sql", "rule": 
> "select pubcode, duc_url from source where dcomments < 0", "out": [{ "type": 
> "record", "name": "comments_lt_0", "flatten": "array" }] } ] }, "sinks": [ 
> "consoleSink", "elasticSink", "hdfsSink" ] }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to