This is an automated email from the ASF dual-hosted git repository.

wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d9e9a7  Bug fix for reflecting a custom sink object
6d9e9a7 is described below

commit 6d9e9a7fdac0c2b7833cacbf081f9d5a93c59773
Author: wankunde <[email protected]>
AuthorDate: Mon Nov 25 19:32:00 2019 +0800

    Bug fix for reflecting a custom sink object
    
    Bug fix for reflecting a custom sink object.
    
    Author: wankunde <[email protected]>
    
    Closes #551 from wankunde/custom_sink.
---
 .../measure/configuration/enums/SinkType.scala     |  2 +-
 .../apache/griffin/measure/sink/SinkFactory.scala  | 23 ++++++++++++++++------
 .../apache/griffin/measure/sink/CustomSink.scala   | 15 +++++++-------
 3 files changed, 26 insertions(+), 14 deletions(-)

diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index 2a6d335..6190137 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -82,7 +82,7 @@ case object ElasticsearchSinkType extends SinkType {
   */
 case object MongoSinkType extends SinkType {
   val idPattern = "^(?i)mongo|mongodb$".r
-  val desc = "distinct"
+  val desc = "mongo"
 }
 
 /**
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala 
b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 7b8bd31..baa1788 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -18,14 +18,15 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
-import scala.util.{Success, Try}
+import scala.util.{Failure, Success, Try}
 
+import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.utils.ParamUtil._
 
 case class SinkFactory(sinkParamIter: Iterable[SinkParam],
-                       metricName: String) extends Serializable {
+                       metricName: String) extends Loggable with Serializable {
 
   /**
     * create sink
@@ -51,7 +52,9 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam],
     }
     sinkTry match {
       case Success(sink) if (sink.available) => Some(sink)
-      case _ => None
+      case Failure(ex) =>
+        error("Failed to get sink", ex)
+        None
     }
   }
 
@@ -77,9 +80,17 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam],
     val className = config.getString("class", "")
     val cls = Class.forName(className)
     if (classOf[Sink].isAssignableFrom(cls)) {
-      val ctx = SinkContext(config, metricName, timeStamp, block)
-      val method = cls.getDeclaredMethod("apply", classOf[SinkContext])
-      method.invoke(null, ctx).asInstanceOf[Sink]
+      val method = cls.getDeclaredMethod("apply",
+        classOf[Map[String, Any]],
+        classOf[String],
+        classOf[Long],
+        classOf[Boolean])
+      method.invoke(
+        null,
+        config,
+        metricName.asInstanceOf[Object],
+        timeStamp.asInstanceOf[Object],
+        block.asInstanceOf[Object]).asInstanceOf[Sink]
     } else {
       throw new ClassCastException(s"$className should extend Sink")
     }
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala 
b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
index 01ccaba..0d7c4d6 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
@@ -25,14 +25,15 @@ import org.apache.spark.rdd.RDD
 /**
   * sink records and metrics in memory for test.
   *
-  * @param sinkContext
+  * @param config sink configurations
+  * @param metricName
+  * @param timeStamp
+  * @param block
   */
-case class CustomSink(sinkContext: SinkContext) extends Sink {
-  val config: Map[String, Any] = sinkContext.config
-  val metricName: String = sinkContext.metricName
-  val timeStamp: Long = sinkContext.timeStamp
-  val block: Boolean = sinkContext.block
-
+case class CustomSink(config: Map[String, Any],
+                      metricName: String,
+                      timeStamp: Long,
+                      block: Boolean) extends Sink {
   def available(): Boolean = true
 
   def start(msg: String): Unit = {}

Reply via email to