This is an automated email from the ASF dual-hosted git repository.

wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ef73f1  [GRIFFIN-301] Update custom data connector to have the same 
parameters as build-in data connector
8ef73f1 is described below

commit 8ef73f1e8c90330d22eb7d370f5e0f3faa1b22aa
Author: wankunde <[email protected]>
AuthorDate: Sat Nov 30 13:08:12 2019 +0800

    [GRIFFIN-301] Update custom data connector to have the same parameters as 
build-in data connector
    
    Now custom data connectors have different parameters with build-in data 
connector, which will confuse the user.
    
    For example : https://issues.apache.org/jira/browse/GRIFFIN-300
    
    Author: wankunde <[email protected]>
    
    Closes #556 from wankunde/custom_data_connector.
---
 .../connector/DataConnectorFactory.scala           | 33 +++++++++-----
 .../batch/BatchDataConnectorContext.scala          | 28 ------------
 .../streaming/StreamingDataConnectorContext.scala  | 32 --------------
 .../connector}/DataConnectorFactorySpec.scala      | 51 +++++++++++++---------
 4 files changed, 52 insertions(+), 92 deletions(-)

diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index 371fb7b..35d8ab8 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -89,21 +89,30 @@ object DataConnectorFactory extends Loggable {
     }
   }
 
-  private def getCustomConnector(session: SparkSession,
-                                 context: StreamingContext,
-                                 param: DataConnectorParam,
-                                 storage: TimestampStorage,
-                                 maybeClient: Option[StreamingCacheClient]): 
DataConnector = {
-    val className = param.getConfig("class").asInstanceOf[String]
+  private def getCustomConnector(sparkSession: SparkSession,
+                                 ssc: StreamingContext,
+                                 dcParam: DataConnectorParam,
+                                 timestampStorage: TimestampStorage,
+                                 streamingCacheClientOpt: 
Option[StreamingCacheClient]): DataConnector = {
+    val className = dcParam.getConfig("class").asInstanceOf[String]
     val cls = Class.forName(className)
     if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
-      val ctx = BatchDataConnectorContext(session, param, storage)
-      val meth = cls.getDeclaredMethod("apply", 
classOf[BatchDataConnectorContext])
-      meth.invoke(null, ctx).asInstanceOf[BatchDataConnector]
+      val method = cls.getDeclaredMethod("apply",
+        classOf[SparkSession],
+        classOf[DataConnectorParam],
+        classOf[TimestampStorage]
+      )
+      method.invoke(null, sparkSession, dcParam, 
timestampStorage).asInstanceOf[BatchDataConnector]
     } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
-      val ctx = StreamingDataConnectorContext(session, context, param, 
storage, maybeClient)
-      val meth = cls.getDeclaredMethod("apply", 
classOf[StreamingDataConnectorContext])
-      meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector]
+      val method = cls.getDeclaredMethod("apply",
+        classOf[SparkSession],
+        classOf[StreamingContext],
+        classOf[DataConnectorParam],
+        classOf[TimestampStorage],
+        classOf[Option[StreamingCacheClient]]
+      )
+      method.invoke(null, sparkSession, ssc, dcParam, timestampStorage, 
streamingCacheClientOpt)
+        .asInstanceOf[StreamingDataConnector]
     } else {
       throw new ClassCastException(s"$className should extend 
BatchDataConnector or StreamingDataConnector")
     }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala
deleted file mode 100644
index c77fb35..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnectorContext.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.datasource.connector.batch
-
-import org.apache.spark.sql.SparkSession
-
-import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
-import org.apache.griffin.measure.datasource.TimestampStorage
-
-case class BatchDataConnectorContext(@transient sparkSession: SparkSession,
-                                     dcParam: DataConnectorParam,
-                                     timestampStorage: TimestampStorage)
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala
deleted file mode 100644
index ec7a9ff..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnectorContext.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.datasource.connector.streaming
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
-
-import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
-import org.apache.griffin.measure.datasource.TimestampStorage
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
-
-case class StreamingDataConnectorContext(@transient sparkSession: SparkSession,
-                                         @transient ssc: StreamingContext,
-                                         dcParam: DataConnectorParam,
-                                         timestampStorage: TimestampStorage,
-                                         streamingCacheClientOpt: 
Option[StreamingCacheClient])
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
similarity index 68%
rename from 
measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala
rename to 
measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
index 0310557..e2c51d3 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/DataConnectorFactorySpec.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
@@ -16,13 +16,14 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.griffin.measure.configuration.dqdefinition.reader
+package org.apache.griffin.measure.datasource.connector
 
 import scala.util.Try
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.StreamingContext
 import org.scalatest.FlatSpec
 
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
@@ -30,19 +31,23 @@ import org.apache.griffin.measure.context.TimeRange
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
 import org.apache.griffin.measure.datasource.connector.DataConnectorFactory
-import 
org.apache.griffin.measure.datasource.connector.batch.{BatchDataConnector, 
BatchDataConnectorContext}
-import 
org.apache.griffin.measure.datasource.connector.streaming.{StreamingDataConnector,
 StreamingDataConnectorContext}
+import 
org.apache.griffin.measure.datasource.connector.batch.{BatchDataConnector, 
MySqlDataConnector}
+import 
org.apache.griffin.measure.datasource.connector.streaming.{KafkaStreamingStringDataConnector,
 StreamingDataConnector}
 
-case class ExampleBatchDataConnector(ctx: BatchDataConnectorContext) extends 
BatchDataConnector {
-  override val sparkSession: SparkSession = ctx.sparkSession
-  override val dcParam: DataConnectorParam = ctx.dcParam
-  override val timestampStorage: TimestampStorage = ctx.timestampStorage
+case class ExampleBatchDataConnector(@transient sparkSession: SparkSession,
+                                     dcParam: DataConnectorParam,
+                                     timestampStorage: TimestampStorage) 
extends BatchDataConnector {
 
   override def data(ms: Long): (Option[DataFrame], TimeRange) = (None, 
TimeRange(ms))
 }
 
 
-case class ExampleStreamingDataConnector(ctx: StreamingDataConnectorContext) 
extends StreamingDataConnector {
+case class ExampleStreamingDataConnector(@transient sparkSession: SparkSession,
+                                         @transient ssc: StreamingContext,
+                                         dcParam: DataConnectorParam,
+                                         timestampStorage: TimestampStorage,
+                                         streamingCacheClientOpt: 
Option[StreamingCacheClient]
+                                        ) extends StreamingDataConnector {
   override type K = Unit
   override type V = Unit
   override type OUT = Unit
@@ -51,11 +56,6 @@ case class ExampleStreamingDataConnector(ctx: 
StreamingDataConnectorContext) ext
 
   override def transform(rdd: RDD[this.OUT]): Option[DataFrame] = None
 
-  override val streamingCacheClientOpt: Option[StreamingCacheClient] = 
ctx.streamingCacheClientOpt
-  override val sparkSession: SparkSession = ctx.sparkSession
-  override val dcParam: DataConnectorParam = ctx.dcParam
-  override val timestampStorage: TimestampStorage = ctx.timestampStorage
-
   override def init(): Unit = ()
 }
 
@@ -86,16 +86,25 @@ class DataConnectorFactorySpec extends FlatSpec {
     assert(res.get.data(42)._2.begin == 42)
   }
 
-  it should "be able to create custom streaming connector" in {
+  it should "be able to create MySqlDataConnector" in {
     val param = DataConnectorParam(
       "CUSTOM", null, null,
-      Map("class" -> classOf[ExampleStreamingDataConnector].getCanonicalName), 
Nil)
+      Map("class" -> classOf[MySqlDataConnector].getCanonicalName), Nil)
     // apparently Scalamock can not mock classes without empty-paren 
constructor, providing nulls
     val res = DataConnectorFactory.getDataConnector(
       null, null, param, null, None)
     assert(res.isSuccess)
-    assert(res.get.isInstanceOf[ExampleStreamingDataConnector])
-    assert(res.get.data(0)._2 == TimeRange.emptyTimeRange)
+    assert(res.get.isInstanceOf[MySqlDataConnector])
+  }
+
+  it should "be able to create KafkaStreamingStringDataConnector" in {
+    val param = DataConnectorParam(
+      "CUSTOM", null, null,
+      Map("class" -> 
classOf[KafkaStreamingStringDataConnector].getCanonicalName), Nil)
+    val res = DataConnectorFactory.getDataConnector(
+      null, null, param, null, None)
+    assert(res.isSuccess)
+    assert(res.get.isInstanceOf[KafkaStreamingStringDataConnector])
   }
 
   it should "fail if class is not extending DataConnectors" in {
@@ -108,7 +117,7 @@ class DataConnectorFactorySpec extends FlatSpec {
     assert(res.isFailure)
     assert(res.failed.get.isInstanceOf[ClassCastException])
     assert(res.failed.get.getMessage ==
-      
"org.apache.griffin.measure.configuration.dqdefinition.reader.NotDataConnector" 
+
+      "org.apache.griffin.measure.datasource.connector.NotDataConnector" +
         " should extend BatchDataConnector or StreamingDataConnector")
   }
 
@@ -122,8 +131,10 @@ class DataConnectorFactorySpec extends FlatSpec {
     assert(res.isFailure)
     assert(res.failed.get.isInstanceOf[NoSuchMethodException])
     assert(res.failed.get.getMessage ==
-      
"org.apache.griffin.measure.configuration.dqdefinition.reader.DataConnectorWithoutApply"
 +
-        
".apply(org.apache.griffin.measure.datasource.connector.batch.BatchDataConnectorContext)")
+      
"org.apache.griffin.measure.datasource.connector.DataConnectorWithoutApply.apply"
 +
+        "(org.apache.spark.sql.SparkSession, " +
+        
"org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam, " +
+        "org.apache.griffin.measure.datasource.TimestampStorage)")
   }
 
 }

Reply via email to