---------- Forwarded message ----------
From: 覃兆坤 <[email protected]>
Date: 2016-05-28 16:39 GMT+08:00
Subject: Re: Run Eagle on Spark Streaming 报错
To: yuming wang <[email protected]>
你好:
非常感谢关注Eagle,感谢您提出的Bug,我会尽快修复,谢谢。。。
2016-05-27 17:21 GMT+08:00 yuming wang <[email protected]>:
> hi,
>
> 我在看你的Run Eagle on Spark Streaming
> <https://github.com/apache/incubator-eagle/pull/191>
> ,自带的cassandraQueryLog可以运行,但是如果增加HbaseResourceSensitivityDataJoinExecutor就会报错:
> Exception in thread "main" java.lang.IllegalArgumentException: Cannot
> compile unknown HbaseResourceSensitivityDataJoinExecutor to a DStream
> at
> org.apache.eagle.datastream.sparkstreaming.DStreamFactory$.createInputDStream(DStreamFactory.scala:95)
> at
> org.apache.eagle.datastream.sparkstreaming.SparkStreamingCompiler.buildTopology(SparkStreamingCompiler.scala:51)
> at
> org.apache.eagle.datastream.sparkstreaming.SparkStreamingExecutionEnvironment.execute(SparkStreamingExecutionEnvironment.scala:24)
> at
> org.apache.eagle.datastream.core.StreamContextAdapter$class.submit(ExecutionEnvironment.scala:25)
>
> 这个该怎么处理呢?
>
> 我的代码如下:
>
> object TestSparkStreamingWithAlertDSLUseHBase extends App {
> val env =
> ExecutionEnvironments.get[SparkStreamingExecutionEnvironment](args)
> val streamName = "hbaseAuditLogStream"
> val streamExecutorId = "hbaseAuditLogExecutor"
> env.config.set("dataSourceConfig.deserializerClass",
> classOf[HbaseAuditLogKafkaDeserializer].getCanonicalName)
> env.fromKafka().parallelism(1).nameAs(streamName).!(Seq(streamName),
> streamExecutorId)
> *.flatMap(new HbaseResourceSensitivityDataJoinExecutor())*
> env.execute()
> }
>
> 谢谢
>
>
>