This is an automated email from the ASF dual-hosted git repository.
wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 6d9e9a7 Bug fix for reflecting a custom sink object
6d9e9a7 is described below
commit 6d9e9a7fdac0c2b7833cacbf081f9d5a93c59773
Author: wankunde <[email protected]>
AuthorDate: Mon Nov 25 19:32:00 2019 +0800
Bug fix for reflecting a custom sink object
Bug fix for reflecting a custom sink object.
Author: wankunde <[email protected]>
Closes #551 from wankunde/custom_sink.
---
.../measure/configuration/enums/SinkType.scala | 2 +-
.../apache/griffin/measure/sink/SinkFactory.scala | 23 ++++++++++++++++------
.../apache/griffin/measure/sink/CustomSink.scala | 15 +++++++-------
3 files changed, 26 insertions(+), 14 deletions(-)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index 2a6d335..6190137 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -82,7 +82,7 @@ case object ElasticsearchSinkType extends SinkType {
*/
case object MongoSinkType extends SinkType {
val idPattern = "^(?i)mongo|mongodb$".r
- val desc = "distinct"
+ val desc = "mongo"
}
/**
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 7b8bd31..baa1788 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -18,14 +18,15 @@ under the License.
*/
package org.apache.griffin.measure.sink
-import scala.util.{Success, Try}
+import scala.util.{Failure, Success, Try}
+import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.utils.ParamUtil._
case class SinkFactory(sinkParamIter: Iterable[SinkParam],
- metricName: String) extends Serializable {
+ metricName: String) extends Loggable with Serializable {
/**
* create sink
@@ -51,7 +52,9 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam],
}
sinkTry match {
case Success(sink) if (sink.available) => Some(sink)
- case _ => None
+ case Failure(ex) =>
+ error("Failed to get sink", ex)
+ None
}
}
@@ -77,9 +80,17 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam],
val className = config.getString("class", "")
val cls = Class.forName(className)
if (classOf[Sink].isAssignableFrom(cls)) {
- val ctx = SinkContext(config, metricName, timeStamp, block)
- val method = cls.getDeclaredMethod("apply", classOf[SinkContext])
- method.invoke(null, ctx).asInstanceOf[Sink]
+ val method = cls.getDeclaredMethod("apply",
+ classOf[Map[String, Any]],
+ classOf[String],
+ classOf[Long],
+ classOf[Boolean])
+ method.invoke(
+ null,
+ config,
+ metricName.asInstanceOf[Object],
+ timeStamp.asInstanceOf[Object],
+ block.asInstanceOf[Object]).asInstanceOf[Sink]
} else {
throw new ClassCastException(s"$className should extend Sink")
}
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
index 01ccaba..0d7c4d6 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
@@ -25,14 +25,15 @@ import org.apache.spark.rdd.RDD
/**
* sink records and metrics in memory for test.
*
- * @param sinkContext
+ * @param config sink configurations
+ * @param metricName
+ * @param timeStamp
+ * @param block
*/
-case class CustomSink(sinkContext: SinkContext) extends Sink {
- val config: Map[String, Any] = sinkContext.config
- val metricName: String = sinkContext.metricName
- val timeStamp: Long = sinkContext.timeStamp
- val block: Boolean = sinkContext.block
-
+case class CustomSink(config: Map[String, Any],
+ metricName: String,
+ timeStamp: Long,
+ block: Boolean) extends Sink {
def available(): Boolean = true
def start(msg: String): Unit = {}