Repository: incubator-griffin Updated Branches: refs/heads/master 29b51997f -> 6997bf731
[GRIFFIN-213] Custom connector support Provide ability to extend batch and streaming data integrations with custom user-provided connectors. Introduces new data connector type, `CUSTOM`, parameterized with `class` property. Also adds support for custom data connector enum on service side. Author: Nikolay Sokolov <[email protected]> Closes #456 from chemikadze/GRIFFIN-213. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/6997bf73 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/6997bf73 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/6997bf73 Branch: refs/heads/master Commit: 6997bf731034cdae03fafcb4b8524dd9021f6dee Parents: 29b5199 Author: Nikolay Sokolov <[email protected]> Authored: Thu Nov 29 16:00:47 2018 +0800 Committer: William Guo <[email protected]> Committed: Thu Nov 29 16:00:47 2018 +0800 ---------------------------------------------------------------------- griffin-doc/deploy/deploy-guide.md | 2 + .../measure/measure-configuration-guide.md | 10 +- .../connector/DataConnectorFactory.scala | 23 ++++ .../batch/BatchDataConnectorContext.scala | 28 ++++ .../StreamingDataConnectorContext.scala | 32 +++++ .../reader/DataConnectorFactorySpec.scala | 129 +++++++++++++++++++ .../core/measure/entity/DataConnector.java | 3 +- 7 files changed, 225 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/griffin-doc/deploy/deploy-guide.md ---------------------------------------------------------------------- diff --git a/griffin-doc/deploy/deploy-guide.md b/griffin-doc/deploy/deploy-guide.md index 2c6d80f..b9f7ceb 100644 --- a/griffin-doc/deploy/deploy-guide.md +++ b/griffin-doc/deploy/deploy-guide.md @@ -164,6 +164,8 @@ You should also modify some configurations of Apache Griffin for your environmen "spark.yarn.dist.files": "hdfs:///<path to>/hive-site.xml" }, "files": [ + ], + "jars": [ ] } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/griffin-doc/measure/measure-configuration-guide.md ---------------------------------------------------------------------- diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md index feeaf1a..2522ee4 100644 --- a/griffin-doc/measure/measure-configuration-guide.md +++ b/griffin-doc/measure/measure-configuration-guide.md @@ -188,7 +188,7 @@ Above lists DQ job configure parameters. - **sinks**: Whitelisted sink types for this job. Note: no sinks will be used, if empty or omitted. ### <a name="data-connector"></a>Data Connector -- **type**: Data connector type, "AVRO", "HIVE", "TEXT-DIR" for batch mode, "KAFKA" for streaming mode. +- **type**: Data connector type: "AVRO", "HIVE", "TEXT-DIR", "CUSTOM" for batch mode; "KAFKA", "CUSTOM" for streaming mode. - **version**: Version string of data connector type. - **config**: Configure parameters of each data connector type. + avro data connector @@ -204,6 +204,14 @@ Above lists DQ job configure parameters. * data.dir.depth: integer, depth of data directories, 0 as default. * success.file: success file name, * done.file: + + custom connector + * class: class name for user-provided data connector implementation. For Batch + it should be implementing BatchDataConnector trait and have static method with signature + ```def apply(ctx: BatchDataConnectorContext): BatchDataConnector```. + For Streaming, it should be implementing StreamingDataConnector and have static method + ```def apply(ctx: StreamingDataConnectorContext): StreamingDataConnector```. User-provided + data connector should be present in Spark job's class path, by providing custom jar as -jar parameter + to spark-submit or by adding to "jars" list in sparkProperties.json. ### <a name="rule"></a>Rule - **dsl.type**: Rule dsl type, "spark-sql", "df-ops" and "griffin-dsl". http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala index b51d4fb..a1ef3ba 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala @@ -39,6 +39,8 @@ object DataConnectorFactory extends Loggable { val KafkaRegex = """^(?i)kafka$""".r + val CustomRegex = """^(?i)custom$""".r + /** * create data connector * @param sparkSession spark env @@ -61,6 +63,7 @@ object DataConnectorFactory extends Loggable { case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache) case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache) case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache) + case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case KafkaRegex() => getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case _ => throw new Exception("connector creation error!") @@ -84,6 +87,26 @@ object DataConnectorFactory extends Loggable { } } + private def getCustomConnector(session: SparkSession, + context: StreamingContext, + param: DataConnectorParam, + storage: TimestampStorage, + maybeClient: Option[StreamingCacheClient]): DataConnector = { + val className = param.getConfig("class").asInstanceOf[String] + val cls = Class.forName(className) + if (classOf[BatchDataConnector].isAssignableFrom(cls)) { + val ctx = BatchDataConnectorContext(session, param, storage) + val meth = cls.getDeclaredMethod("apply", classOf[BatchDataConnectorContext]) + meth.invoke(null, ctx).asInstanceOf[BatchDataConnector] + } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) { + val ctx = StreamingDataConnectorContext(session, context, param, storage, maybeClient) + val meth = cls.getDeclaredMethod("apply", classOf[StreamingDataConnectorContext]) + meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector] + } else { + throw new ClassCastException(s"$className should extend BatchDataConnector or StreamingDataConnector") + } + } + private def getKafkaDataConnector(sparkSession: SparkSession, ssc: StreamingContext, dcParam: DataConnectorParam, http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala new file mode 100644 index 0000000..c77fb35 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala @@ -0,0 +1,28 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.datasource.connector.batch + +import org.apache.spark.sql.SparkSession + +import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam +import org.apache.griffin.measure.datasource.TimestampStorage + +case class BatchDataConnectorContext(@transient sparkSession: SparkSession, + dcParam: DataConnectorParam, + timestampStorage: TimestampStorage) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala new file mode 100644 index 0000000..ec7a9ff --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala @@ -0,0 +1,32 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.datasource.connector.streaming + +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.StreamingContext + +import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam +import org.apache.griffin.measure.datasource.TimestampStorage +import org.apache.griffin.measure.datasource.cache.StreamingCacheClient + +case class StreamingDataConnectorContext(@transient sparkSession: SparkSession, + @transient ssc: StreamingContext, + dcParam: DataConnectorParam, + timestampStorage: TimestampStorage, + streamingCacheClientOpt: Option[StreamingCacheClient]) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala new file mode 100644 index 0000000..0310557 --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala @@ -0,0 +1,129 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.dqdefinition.reader + +import scala.util.Try + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.streaming.dstream.InputDStream +import org.scalatest.FlatSpec + +import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam +import org.apache.griffin.measure.context.TimeRange +import org.apache.griffin.measure.datasource.TimestampStorage +import org.apache.griffin.measure.datasource.cache.StreamingCacheClient +import org.apache.griffin.measure.datasource.connector.DataConnectorFactory +import org.apache.griffin.measure.datasource.connector.batch.{BatchDataConnector, BatchDataConnectorContext} +import org.apache.griffin.measure.datasource.connector.streaming.{StreamingDataConnector, StreamingDataConnectorContext} + +case class ExampleBatchDataConnector(ctx: BatchDataConnectorContext) extends BatchDataConnector { + override val sparkSession: SparkSession = ctx.sparkSession + override val dcParam: DataConnectorParam = ctx.dcParam + override val timestampStorage: TimestampStorage = ctx.timestampStorage + + override def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange(ms)) +} + + +case class ExampleStreamingDataConnector(ctx: StreamingDataConnectorContext) extends StreamingDataConnector { + override type K = Unit + override type V = Unit + override type OUT = Unit + + override protected def stream(): Try[InputDStream[this.OUT]] = null + + override def transform(rdd: RDD[this.OUT]): Option[DataFrame] = None + + override val streamingCacheClientOpt: Option[StreamingCacheClient] = ctx.streamingCacheClientOpt + override val sparkSession: SparkSession = ctx.sparkSession + override val dcParam: DataConnectorParam = ctx.dcParam + override val timestampStorage: TimestampStorage = ctx.timestampStorage + + override def init(): Unit = () +} + +class NotDataConnector + + +class DataConnectorWithoutApply extends BatchDataConnector { + override val sparkSession: SparkSession = null + override val dcParam: DataConnectorParam = null + override val timestampStorage: TimestampStorage = null + + override def data(ms: Long): (Option[DataFrame], TimeRange) = null +} + + +class DataConnectorFactorySpec extends FlatSpec { + + "DataConnectorFactory" should "be able to create custom batch connector" in { + val param = DataConnectorParam( + "CUSTOM", null, null, + Map("class" -> classOf[ExampleBatchDataConnector].getCanonicalName), Nil) + // apparently Scalamock can not mock classes without empty-paren constructor, providing nulls + val res = DataConnectorFactory.getDataConnector( + null, null, param, null, None) + assert(res.get != null) + assert(res.isSuccess) + assert(res.get.isInstanceOf[ExampleBatchDataConnector]) + assert(res.get.data(42)._2.begin == 42) + } + + it should "be able to create custom streaming connector" in { + val param = DataConnectorParam( + "CUSTOM", null, null, + Map("class" -> classOf[ExampleStreamingDataConnector].getCanonicalName), Nil) + // apparently Scalamock can not mock classes without empty-paren constructor, providing nulls + val res = DataConnectorFactory.getDataConnector( + null, null, param, null, None) + assert(res.isSuccess) + assert(res.get.isInstanceOf[ExampleStreamingDataConnector]) + assert(res.get.data(0)._2 == TimeRange.emptyTimeRange) + } + + it should "fail if class is not extending DataConnectors" in { + val param = DataConnectorParam( + "CUSTOM", null, null, + Map("class" -> classOf[NotDataConnector].getCanonicalName), Nil) + // apparently Scalamock can not mock classes without empty-paren constructor, providing nulls + val res = DataConnectorFactory.getDataConnector( + null, null, param, null, None) + assert(res.isFailure) + assert(res.failed.get.isInstanceOf[ClassCastException]) + assert(res.failed.get.getMessage == + "org.apache.griffin.measure.configuration.dqdefinition.reader.NotDataConnector" + + " should extend BatchDataConnector or StreamingDataConnector") + } + + it should "fail if class does not have apply() method" in { + val param = DataConnectorParam( + "CUSTOM", null, null, + Map("class" -> classOf[DataConnectorWithoutApply].getCanonicalName), Nil) + // apparently Scalamock can not mock classes without empty-paren constructor, providing nulls + val res = DataConnectorFactory.getDataConnector( + null, null, param, null, None) + assert(res.isFailure) + assert(res.failed.get.isInstanceOf[NoSuchMethodException]) + assert(res.failed.get.getMessage == + "org.apache.griffin.measure.configuration.dqdefinition.reader.DataConnectorWithoutApply" + + ".apply(org.apache.griffin.measure.datasource.connector.batch.BatchDataConnectorContext)") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java index f87e073..70a6b03 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java @@ -62,7 +62,8 @@ public class DataConnector extends AbstractAuditableEntity { */ HIVE, KAFKA, - AVRO + AVRO, + CUSTOM } @NotNull
