Github user grant-xuexu commented on a diff in the pull request:

    https://github.com/apache/incubator-griffin/pull/396#discussion_r210923398
  
    --- Diff: 
measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
 ---
    @@ -91,11 +91,12 @@ object DataConnectorFactory extends Loggable {
                                        ): KafkaStreamingDataConnector  = {
         val KeyType = "key.type"
         val ValueType = "value.type"
    -    val config = dcParam.config
    +    val config = dcParam.getConfig
         val keyType = config.getOrElse(KeyType, "java.lang.String").toString
         val valueType = config.getOrElse(ValueType, 
"java.lang.String").toString
    -    (getClassTag(keyType), getClassTag(valueType)) match {
    -      case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
    +
    +    (keyType, valueType) match {
    +      case ("java.lang.String", "java.lang.String") => {
             KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, 
tmstCache, streamingCacheClientOpt)
    --- End diff --
    
    the logic here seems assume the key type and value type are both String, or 
throw the exception. 
    
    And the type erasure makes `case (ClassTag(k: Class[String]), ClassTag(v: 
Class[String])) => {` already true


---

Reply via email to