Github user gavlyukovskiy commented on a diff in the pull request:
https://github.com/apache/incubator-griffin/pull/456#discussion_r234425707
--- Diff:
measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
---
@@ -84,6 +87,26 @@ object DataConnectorFactory extends Loggable {
}
}
+ private def getCustomConnector(session: SparkSession,
+ context: StreamingContext,
+ param: DataConnectorParam,
+ storage: TimestampStorage,
+ maybeClient:
Option[StreamingCacheClient]): DataConnector = {
+ val className = param.getConfig("class").asInstanceOf[String]
+ val cls = Class.forName(className)
+ if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
+ val ctx = BatchDataConnectorContext(session, param, storage)
+ val meth = cls.getDeclaredMethod("apply",
classOf[BatchDataConnectorContext])
+ meth.invoke(null, ctx).asInstanceOf[BatchDataConnector]
+ } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
+ val ctx = StreamingDataConnectorContext(session, context, param,
storage, maybeClient)
+ val meth = cls.getDeclaredMethod("apply",
classOf[StreamingDataConnectorContext])
+ meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector]
+ } else {
+ throw new ClassCastException("")
--- End diff --
It would be nice to have here message that custom connector class must
extend `BatchDataConnector` or `StreamingDataConnector`
---