Hi Jin,
    Would you pls tell us your kafka version? Our kafka data connector only 
support kafka 0.8 at current, if your kafka version is 0.10 or later, there 
might be some issues, the solution is to implement a kafka data connector for 
kafka 0.10, or switch to kafka 0.8.
    If you just want to try the streaming mode of griffin, we've prepared a new 
docker image for it, including a kafka 0.8 env with test data. You can follow 
the guide here 
https://github.com/apache/incubator-griffin/blob/master/griffin-doc/griffin-docker-guide.md
 , it might helps.
    Thanks a lot.





--

Regards,
Lionel, Liu



At 2017-11-03 08:21:21, "Jin Liu" <[email protected]> wrote:
>Try snippet to verify kafka, if source/target data cannot be accessed  in my 
>env
>
>SimpleConsumer simpleConsumer = new SimpleConsumer("10.117.7.110",
>                                                       
> KafkaProperties.kafkaServerPort,
>                                                       
> KafkaProperties.connectionTimeOut,
>                                                       
> KafkaProperties.kafkaProducerBufferSize,
>                                                       
> KafkaProperties.clientId);
>
>    System.out.println("Testing single fetch");
>    FetchRequest req = new FetchRequestBuilder()
>            .clientId(KafkaProperties.clientId)
>            .addFetch("source", 0, 0L, 100)
>            .build();
>    FetchResponse fetchResponse = simpleConsumer.fetch(req);
>      printMessages((ByteBufferMessageSet) fetchResponse.messageSet("source", 
> 0));
>
>
>running result:
>
>Testing single fetch
>{"name": "kevin", "age": 24}
>
>
>Yet still hit problem to test measure codes
>
>root@sandbox:~# spark-submit --class org.apache.griffin.measure.Application 
>--master yarn-client --queue default --verbose griffin-measure.jar env.json 
>config.json local,local
>
>17/11/03 01:20:09 WARN metastore.ObjectStore: Version information not found in 
>metastore. hive.metastore.schema.verification is not enabled so recording the 
>schema version 1.2.0
>17/11/03 01:20:10 WARN metastore.ObjectStore: Failed to get database default, 
>returning NoSuchObjectException
>17/11/03 01:20:21 ERROR info.ZKInfoCache: delete /lock error: KeeperErrorCode 
>= NoNode for /griffin/infocache/streaming-accu-sample/lock
>[1509672021151] streaming-accu-sample start
>17/11/03 01:20:22 ERROR measure.Application$: app error: 
>java.nio.channels.ClosedChannelException
>org.apache.spark.SparkException: Couldn't find leader offsets for 
>Set([source,0])
>Exception in thread "main" org.apache.spark.SparkException: 
>java.nio.channels.ClosedChannelException
>org.apache.spark.SparkException: Couldn't find leader offsets for 
>Set([source,0])
>        at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>        at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>        at scala.util.Either.fold(Either.scala:97)
>        at 
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>        at 
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>        at 
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>        at 
> org.apache.griffin.measure.connector.DataConnectorFactory$$anon$1.createDStream(DataConnectorFactory.scala:120)
>        at 
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector$$anonfun$stream$1.apply(KafkaStreamingDataConnector.scala:54)
>        at 
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector$$anonfun$stream$1.apply(KafkaStreamingDataConnector.scala:52)
>        at scala.util.Try$.apply(Try.scala:161)
>
>
>
>From: Jin Liu
>Sent: Friday, October 27, 2017 2:58 PM
>To: [email protected]
>Subject: exception in measure task
>
>Guys,
>
>Who knows what's something wrong in measure codes?
>
>17/10/27 06:31:34 ERROR measure.Application$: app error: 
>java.nio.channels.ClosedChannelException
>org.apache.spark.SparkException: Couldn't find leader offsets for 
>Set([source,0])
>Exception in thread "main" org.apache.spark.SparkException: 
>java.nio.channels.ClosedChannelException
>org.apache.spark.SparkException: Couldn't find leader offsets for 
>Set([source,0])
>        at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>        at 
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>        at scala.util.Either.fold(Either.scala:97)
>        at 
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>        at 
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>        at 
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>        at 
> org.apache.griffin.measure.connector.DataConnectorFactory$$anon$1.createDStream(DataConnectorFactory.scala:120)
>        at 
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector$$anonfun$stream$1.apply(KafkaStreamingDataConnector.scala:54)
>        at 
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector$$anonfun$stream$1.apply(KafkaStreamingDataConnector.scala:52)
>        at scala.util.Try$.apply(Try.scala:161)
>        at 
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector.stream(KafkaStreamingDataConnector.scala:52)
>        at 
> org.apache.griffin.measure.connector.direct.StreamingCacheDirectDataConnector$class.init(StreamingCacheDirectDataConnector.scala:41)
>        at 
> org.apache.griffin.measure.connector.direct.KafkaCacheDirectDataConnector.init(KafkaCacheDirectDataConnector.scala:33)
>        at 
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo$$anonfun$run$1.apply$mcV$sp(StreamingAccuracyAlgo.scala:125)
>        at 
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo$$anonfun$run$1.apply(StreamingAccuracyAlgo.scala:50)
>        at 
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo$$anonfun$run$1.apply(StreamingAccuracyAlgo.scala:50)
>        at scala.util.Try$.apply(Try.scala:161)
>        at 
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo.run(StreamingAccuracyAlgo.scala:50)
>        at org.apache.griffin.measure.Application$.main(Application.scala:98)
>        at org.apache.griffin.measure.Application.main(Application.scala)
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>        at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>        at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>        at java.lang.reflect.Method.invoke(Method.java:483)
>        at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>        at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Reply via email to