Repository: incubator-griffin
Updated Branches:
  refs/heads/master 29b51997f -> 6997bf731


[GRIFFIN-213] Custom connector support

Provide ability to extend batch and streaming data integrations
with custom user-provided connectors. Introduces new data connector
type, `CUSTOM`, parameterized with `class` property. Also adds support
for custom data connector enum on service side.

Author: Nikolay Sokolov <[email protected]>

Closes #456 from chemikadze/GRIFFIN-213.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/6997bf73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/6997bf73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/6997bf73

Branch: refs/heads/master
Commit: 6997bf731034cdae03fafcb4b8524dd9021f6dee
Parents: 29b5199
Author: Nikolay Sokolov <[email protected]>
Authored: Thu Nov 29 16:00:47 2018 +0800
Committer: William Guo <[email protected]>
Committed: Thu Nov 29 16:00:47 2018 +0800

----------------------------------------------------------------------
 griffin-doc/deploy/deploy-guide.md              |   2 +
 .../measure/measure-configuration-guide.md      |  10 +-
 .../connector/DataConnectorFactory.scala        |  23 ++++
 .../batch/BatchDataConnectorContext.scala       |  28 ++++
 .../StreamingDataConnectorContext.scala         |  32 +++++
 .../reader/DataConnectorFactorySpec.scala       | 129 +++++++++++++++++++
 .../core/measure/entity/DataConnector.java      |   3 +-
 7 files changed, 225 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/griffin-doc/deploy/deploy-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/deploy/deploy-guide.md 
b/griffin-doc/deploy/deploy-guide.md
index 2c6d80f..b9f7ceb 100644
--- a/griffin-doc/deploy/deploy-guide.md
+++ b/griffin-doc/deploy/deploy-guide.md
@@ -164,6 +164,8 @@ You should also modify some configurations of Apache 
Griffin for your environmen
                "spark.yarn.dist.files": "hdfs:///<path to>/hive-site.xml"
         },
          "files": [
+         ],
+         "jars": [
          ]
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/griffin-doc/measure/measure-configuration-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-configuration-guide.md 
b/griffin-doc/measure/measure-configuration-guide.md
index feeaf1a..2522ee4 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -188,7 +188,7 @@ Above lists DQ job configure parameters.
 - **sinks**: Whitelisted sink types for this job. Note: no sinks will be used, 
if empty or omitted. 
 
 ### <a name="data-connector"></a>Data Connector
-- **type**: Data connector type, "AVRO", "HIVE", "TEXT-DIR" for batch mode, 
"KAFKA" for streaming mode.
+- **type**: Data connector type: "AVRO", "HIVE", "TEXT-DIR", "CUSTOM" for 
batch mode; "KAFKA", "CUSTOM" for streaming mode.
 - **version**: Version string of data connector type.
 - **config**: Configure parameters of each data connector type.
        + avro data connector
@@ -204,6 +204,14 @@ Above lists DQ job configure parameters.
                * data.dir.depth: integer, depth of data directories, 0 as 
default.
                * success.file: success file name, 
                * done.file: 
+       + custom connector
+           * class: class name for user-provided data connector 
implementation. For Batch
+           it should be implementing BatchDataConnector trait and have static 
method with signature
+           ```def apply(ctx: BatchDataConnectorContext): 
BatchDataConnector```. 
+           For Streaming, it should be implementing StreamingDataConnector and 
have static method
+           ```def apply(ctx: StreamingDataConnectorContext): 
StreamingDataConnector```. User-provided
+           data connector should be present in Spark job's class path, by 
providing custom jar as -jar parameter
+           to spark-submit or by adding to "jars" list in 
sparkProperties.json.  
 
 ### <a name="rule"></a>Rule
 - **dsl.type**: Rule dsl type, "spark-sql", "df-ops" and "griffin-dsl".

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index b51d4fb..a1ef3ba 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -39,6 +39,8 @@ object DataConnectorFactory extends Loggable {
 
   val KafkaRegex = """^(?i)kafka$""".r
 
+  val CustomRegex = """^(?i)custom$""".r
+
   /**
     * create data connector
     * @param sparkSession     spark env
@@ -61,6 +63,7 @@ object DataConnectorFactory extends Loggable {
         case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, 
tmstCache)
         case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, 
tmstCache)
         case TextDirRegex() => TextDirBatchDataConnector(sparkSession, 
dcParam, tmstCache)
+        case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, 
tmstCache, streamingCacheClientOpt)
         case KafkaRegex() =>
           getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, 
streamingCacheClientOpt)
         case _ => throw new Exception("connector creation error!")
@@ -84,6 +87,26 @@ object DataConnectorFactory extends Loggable {
     }
   }
 
+  private def getCustomConnector(session: SparkSession,
+                                 context: StreamingContext,
+                                 param: DataConnectorParam,
+                                 storage: TimestampStorage,
+                                 maybeClient: Option[StreamingCacheClient]): 
DataConnector = {
+    val className = param.getConfig("class").asInstanceOf[String]
+    val cls = Class.forName(className)
+    if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
+      val ctx = BatchDataConnectorContext(session, param, storage)
+      val meth = cls.getDeclaredMethod("apply", 
classOf[BatchDataConnectorContext])
+      meth.invoke(null, ctx).asInstanceOf[BatchDataConnector]
+    } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
+      val ctx = StreamingDataConnectorContext(session, context, param, 
storage, maybeClient)
+      val meth = cls.getDeclaredMethod("apply", 
classOf[StreamingDataConnectorContext])
+      meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector]
+    } else {
+      throw new ClassCastException(s"$className should extend 
BatchDataConnector or StreamingDataConnector")
+    }
+  }
+
   private def getKafkaDataConnector(sparkSession: SparkSession,
                                     ssc: StreamingContext,
                                     dcParam: DataConnectorParam,

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala
new file mode 100644
index 0000000..c77fb35
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala
@@ -0,0 +1,28 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+
+case class BatchDataConnectorContext(@transient sparkSession: SparkSession,
+                                     dcParam: DataConnectorParam,
+                                     timestampStorage: TimestampStorage)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala
new file mode 100644
index 0000000..ec7a9ff
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.streaming
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+
+case class StreamingDataConnectorContext(@transient sparkSession: SparkSession,
+                                         @transient ssc: StreamingContext,
+                                         dcParam: DataConnectorParam,
+                                         timestampStorage: TimestampStorage,
+                                         streamingCacheClientOpt: 
Option[StreamingCacheClient])

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala
new file mode 100644
index 0000000..0310557
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala
@@ -0,0 +1,129 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.dqdefinition.reader
+
+import scala.util.Try
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.streaming.dstream.InputDStream
+import org.scalatest.FlatSpec
+
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.griffin.measure.datasource.connector.DataConnectorFactory
+import 
org.apache.griffin.measure.datasource.connector.batch.{BatchDataConnector, 
BatchDataConnectorContext}
+import 
org.apache.griffin.measure.datasource.connector.streaming.{StreamingDataConnector,
 StreamingDataConnectorContext}
+
+case class ExampleBatchDataConnector(ctx: BatchDataConnectorContext) extends 
BatchDataConnector {
+  override val sparkSession: SparkSession = ctx.sparkSession
+  override val dcParam: DataConnectorParam = ctx.dcParam
+  override val timestampStorage: TimestampStorage = ctx.timestampStorage
+
+  override def data(ms: Long): (Option[DataFrame], TimeRange) = (None, 
TimeRange(ms))
+}
+
+
+case class ExampleStreamingDataConnector(ctx: StreamingDataConnectorContext) 
extends StreamingDataConnector {
+  override type K = Unit
+  override type V = Unit
+  override type OUT = Unit
+
+  override protected def stream(): Try[InputDStream[this.OUT]] = null
+
+  override def transform(rdd: RDD[this.OUT]): Option[DataFrame] = None
+
+  override val streamingCacheClientOpt: Option[StreamingCacheClient] = 
ctx.streamingCacheClientOpt
+  override val sparkSession: SparkSession = ctx.sparkSession
+  override val dcParam: DataConnectorParam = ctx.dcParam
+  override val timestampStorage: TimestampStorage = ctx.timestampStorage
+
+  override def init(): Unit = ()
+}
+
+class NotDataConnector
+
+
+class DataConnectorWithoutApply extends BatchDataConnector {
+  override val sparkSession: SparkSession = null
+  override val dcParam: DataConnectorParam = null
+  override val timestampStorage: TimestampStorage = null
+
+  override def data(ms: Long): (Option[DataFrame], TimeRange) = null
+}
+
+
+class DataConnectorFactorySpec extends FlatSpec {
+
+  "DataConnectorFactory" should "be able to create custom batch connector" in {
+    val param = DataConnectorParam(
+      "CUSTOM", null, null,
+      Map("class" -> classOf[ExampleBatchDataConnector].getCanonicalName), Nil)
+    // apparently Scalamock can not mock classes without empty-paren 
constructor, providing nulls
+    val res = DataConnectorFactory.getDataConnector(
+      null, null, param, null, None)
+    assert(res.get != null)
+    assert(res.isSuccess)
+    assert(res.get.isInstanceOf[ExampleBatchDataConnector])
+    assert(res.get.data(42)._2.begin == 42)
+  }
+
+  it should "be able to create custom streaming connector" in {
+    val param = DataConnectorParam(
+      "CUSTOM", null, null,
+      Map("class" -> classOf[ExampleStreamingDataConnector].getCanonicalName), 
Nil)
+    // apparently Scalamock can not mock classes without empty-paren 
constructor, providing nulls
+    val res = DataConnectorFactory.getDataConnector(
+      null, null, param, null, None)
+    assert(res.isSuccess)
+    assert(res.get.isInstanceOf[ExampleStreamingDataConnector])
+    assert(res.get.data(0)._2 == TimeRange.emptyTimeRange)
+  }
+
+  it should "fail if class is not extending DataConnectors" in {
+    val param = DataConnectorParam(
+      "CUSTOM", null, null,
+      Map("class" -> classOf[NotDataConnector].getCanonicalName), Nil)
+    // apparently Scalamock can not mock classes without empty-paren 
constructor, providing nulls
+    val res = DataConnectorFactory.getDataConnector(
+      null, null, param, null, None)
+    assert(res.isFailure)
+    assert(res.failed.get.isInstanceOf[ClassCastException])
+    assert(res.failed.get.getMessage ==
+      
"org.apache.griffin.measure.configuration.dqdefinition.reader.NotDataConnector" 
+
+        " should extend BatchDataConnector or StreamingDataConnector")
+  }
+
+  it should "fail if class does not have apply() method" in {
+    val param = DataConnectorParam(
+      "CUSTOM", null, null,
+      Map("class" -> classOf[DataConnectorWithoutApply].getCanonicalName), Nil)
+    // apparently Scalamock can not mock classes without empty-paren 
constructor, providing nulls
+    val res = DataConnectorFactory.getDataConnector(
+      null, null, param, null, None)
+    assert(res.isFailure)
+    assert(res.failed.get.isInstanceOf[NoSuchMethodException])
+    assert(res.failed.get.getMessage ==
+      
"org.apache.griffin.measure.configuration.dqdefinition.reader.DataConnectorWithoutApply"
 +
+        
".apply(org.apache.griffin.measure.datasource.connector.batch.BatchDataConnectorContext)")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6997bf73/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
 
b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
index f87e073..70a6b03 100644
--- 
a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
@@ -62,7 +62,8 @@ public class DataConnector extends AbstractAuditableEntity {
          */
         HIVE,
         KAFKA,
-        AVRO
+        AVRO,
+        CUSTOM
     }
 
     @NotNull

Reply via email to