Hi Jin,
Would you pls tell us your kafka version? Our kafka data connector only
support kafka 0.8 at current, if your kafka version is 0.10 or later, there
might be some issues, the solution is to implement a kafka data connector for
kafka 0.10, or switch to kafka 0.8.
If you just want to try the streaming mode of griffin, we've prepared a new
docker image for it, including a kafka 0.8 env with test data. You can follow
the guide here
https://github.com/apache/incubator-griffin/blob/master/griffin-doc/griffin-docker-guide.md
, it might helps.
Thanks a lot.
--
Regards,
Lionel, Liu
At 2017-11-03 08:21:21, "Jin Liu" <[email protected]> wrote:
>Try snippet to verify kafka, if source/target data cannot be accessed in my
>env
>
>SimpleConsumer simpleConsumer = new SimpleConsumer("10.117.7.110",
>
> KafkaProperties.kafkaServerPort,
>
> KafkaProperties.connectionTimeOut,
>
> KafkaProperties.kafkaProducerBufferSize,
>
> KafkaProperties.clientId);
>
> System.out.println("Testing single fetch");
> FetchRequest req = new FetchRequestBuilder()
> .clientId(KafkaProperties.clientId)
> .addFetch("source", 0, 0L, 100)
> .build();
> FetchResponse fetchResponse = simpleConsumer.fetch(req);
> printMessages((ByteBufferMessageSet) fetchResponse.messageSet("source",
> 0));
>
>
>running result:
>
>Testing single fetch
>{"name": "kevin", "age": 24}
>
>
>Yet still hit problem to test measure codes
>
>root@sandbox:~# spark-submit --class org.apache.griffin.measure.Application
>--master yarn-client --queue default --verbose griffin-measure.jar env.json
>config.json local,local
>
>17/11/03 01:20:09 WARN metastore.ObjectStore: Version information not found in
>metastore. hive.metastore.schema.verification is not enabled so recording the
>schema version 1.2.0
>17/11/03 01:20:10 WARN metastore.ObjectStore: Failed to get database default,
>returning NoSuchObjectException
>17/11/03 01:20:21 ERROR info.ZKInfoCache: delete /lock error: KeeperErrorCode
>= NoNode for /griffin/infocache/streaming-accu-sample/lock
>[1509672021151] streaming-accu-sample start
>17/11/03 01:20:22 ERROR measure.Application$: app error:
>java.nio.channels.ClosedChannelException
>org.apache.spark.SparkException: Couldn't find leader offsets for
>Set([source,0])
>Exception in thread "main" org.apache.spark.SparkException:
>java.nio.channels.ClosedChannelException
>org.apache.spark.SparkException: Couldn't find leader offsets for
>Set([source,0])
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
> at
> org.apache.griffin.measure.connector.DataConnectorFactory$$anon$1.createDStream(DataConnectorFactory.scala:120)
> at
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector$$anonfun$stream$1.apply(KafkaStreamingDataConnector.scala:54)
> at
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector$$anonfun$stream$1.apply(KafkaStreamingDataConnector.scala:52)
> at scala.util.Try$.apply(Try.scala:161)
>
>
>
>From: Jin Liu
>Sent: Friday, October 27, 2017 2:58 PM
>To: [email protected]
>Subject: exception in measure task
>
>Guys,
>
>Who knows what's something wrong in measure codes?
>
>17/10/27 06:31:34 ERROR measure.Application$: app error:
>java.nio.channels.ClosedChannelException
>org.apache.spark.SparkException: Couldn't find leader offsets for
>Set([source,0])
>Exception in thread "main" org.apache.spark.SparkException:
>java.nio.channels.ClosedChannelException
>org.apache.spark.SparkException: Couldn't find leader offsets for
>Set([source,0])
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
> at
> org.apache.griffin.measure.connector.DataConnectorFactory$$anon$1.createDStream(DataConnectorFactory.scala:120)
> at
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector$$anonfun$stream$1.apply(KafkaStreamingDataConnector.scala:54)
> at
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector$$anonfun$stream$1.apply(KafkaStreamingDataConnector.scala:52)
> at scala.util.Try$.apply(Try.scala:161)
> at
> org.apache.griffin.measure.connector.streaming.KafkaStreamingDataConnector.stream(KafkaStreamingDataConnector.scala:52)
> at
> org.apache.griffin.measure.connector.direct.StreamingCacheDirectDataConnector$class.init(StreamingCacheDirectDataConnector.scala:41)
> at
> org.apache.griffin.measure.connector.direct.KafkaCacheDirectDataConnector.init(KafkaCacheDirectDataConnector.scala:33)
> at
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo$$anonfun$run$1.apply$mcV$sp(StreamingAccuracyAlgo.scala:125)
> at
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo$$anonfun$run$1.apply(StreamingAccuracyAlgo.scala:50)
> at
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo$$anonfun$run$1.apply(StreamingAccuracyAlgo.scala:50)
> at scala.util.Try$.apply(Try.scala:161)
> at
> org.apache.griffin.measure.algo.streaming.StreamingAccuracyAlgo.run(StreamingAccuracyAlgo.scala:50)
> at org.apache.griffin.measure.Application$.main(Application.scala:98)
> at org.apache.griffin.measure.Application.main(Application.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)