Hi Jin Liu,
It seems like the issue of kafka connection, I need to know your kafka version 
and data schema of your topic.
On the other hand, you could try our measure module of latest version.

Thanks.



--

Regards,
Lionel, Liu



At 2017-10-27 13:58:29, "Jin Liu" <[email protected]> wrote:
>Guys,
>
>Who knows what's something wrong in measure codes?
>
>17/10/27 06:31:34 ERROR measure.Application$: app error: 
>java.nio.channels.ClosedChannelException
>org.apache.spark.SparkException: Couldn't find leader offsets for 
>Set([source,0])
>Exception in thread "main" org.apache.spark.SparkException: 
>java.nio.channels.ClosedChannelException
>org.apache.spark.SparkException: Couldn't find leader offsets for 
>Set([source,0])
>        at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>        at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>        at scala.util.Either.fold(Either.scala:97)
>        at 
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>        at 
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>        at 
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>        at 
> org.apache.griffin.measure.connector.DataConnectorFactory$$anon$1.createDStream(DataConnectorFactory.scala:120)
>        at 
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector$$anonfun$stream$1.apply(KafkaStreamingDataConnector.scala:54)
>        at 
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector$$anonfun$stream$1.apply(KafkaStreamingDataConnector.scala:52)
>        at scala.util.Try$.apply(Try.scala:161)
>        at 
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector.stream(KafkaStreamingDataConnector.scala:52)
>        at 
> org.apache.griffin.measure.connector.direct.StreamingCacheDirectDataConnector$class.init(StreamingCacheDirectDataConnector.scala:41)
>        at 
> org.apache.griffin.measure.connector.direct.KafkaCacheDirectDataConnector.init(KafkaCacheDirectDataConnector.scala:33)
>        at 
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo$$anonfun$run$1.apply$mcV$sp(StreamingAccuracyAlgo.scala:125)
>        at 
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo$$anonfun$run$1.apply(StreamingAccuracyAlgo.scala:50)
>        at 
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo$$anonfun$run$1.apply(StreamingAccuracyAlgo.scala:50)
>        at scala.util.Try$.apply(Try.scala:161)
>        at 
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo.run(StreamingAccuracyAlgo.scala:50)
>        at org.apache.griffin.measure.Application$.main(Application.scala:98)
>        at org.apache.griffin.measure.Application.main(Application.scala)
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>        at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>        at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>        at java.lang.reflect.Method.invoke(Method.java:483)
>        at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>        at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Reply via email to