Github user gavlyukovskiy commented on a diff in the pull request: https://github.com/apache/incubator-griffin/pull/456#discussion_r234425707 --- Diff: measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala --- @@ -84,6 +87,26 @@ object DataConnectorFactory extends Loggable { } } + private def getCustomConnector(session: SparkSession, + context: StreamingContext, + param: DataConnectorParam, + storage: TimestampStorage, + maybeClient: Option[StreamingCacheClient]): DataConnector = { + val className = param.getConfig("class").asInstanceOf[String] + val cls = Class.forName(className) + if (classOf[BatchDataConnector].isAssignableFrom(cls)) { + val ctx = BatchDataConnectorContext(session, param, storage) + val meth = cls.getDeclaredMethod("apply", classOf[BatchDataConnectorContext]) + meth.invoke(null, ctx).asInstanceOf[BatchDataConnector] + } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) { + val ctx = StreamingDataConnectorContext(session, context, param, storage, maybeClient) + val meth = cls.getDeclaredMethod("apply", classOf[StreamingDataConnectorContext]) + meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector] + } else { + throw new ClassCastException("") --- End diff -- It would be nice to have here message that custom connector class must extend `BatchDataConnector` or `StreamingDataConnector`
---