Github user gavlyukovskiy commented on a diff in the pull request:

    https://github.com/apache/incubator-griffin/pull/456#discussion_r234425707
  
    --- Diff: 
measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
 ---
    @@ -84,6 +87,26 @@ object DataConnectorFactory extends Loggable {
         }
       }
     
    +  private def getCustomConnector(session: SparkSession,
    +                                 context: StreamingContext,
    +                                 param: DataConnectorParam,
    +                                 storage: TimestampStorage,
    +                                 maybeClient: 
Option[StreamingCacheClient]): DataConnector = {
    +    val className = param.getConfig("class").asInstanceOf[String]
    +    val cls = Class.forName(className)
    +    if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
    +      val ctx = BatchDataConnectorContext(session, param, storage)
    +      val meth = cls.getDeclaredMethod("apply", 
classOf[BatchDataConnectorContext])
    +      meth.invoke(null, ctx).asInstanceOf[BatchDataConnector]
    +    } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
    +      val ctx = StreamingDataConnectorContext(session, context, param, 
storage, maybeClient)
    +      val meth = cls.getDeclaredMethod("apply", 
classOf[StreamingDataConnectorContext])
    +      meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector]
    +    } else {
    +      throw new ClassCastException("")
    --- End diff --
    
    It would be nice to have here message that custom connector class must 
extend `BatchDataConnector` or `StreamingDataConnector`


---

Reply via email to