I have a RDD of type (String,
 
Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])]

Here String is Key and a list of tuples for that key. I got above RDD after
doing a groupByKey. I later want to compute total number of values for a
given key and total number of unique values for the same given key and
hence i do this

    val totalViCount = details.size.toLong
    val uniqueViCount =
details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong

How do i do this using reduceByKey.

*Total Code:*

      val groupedDetail: RDD[(String, Iterable[(DetailInputRecord,
DataRecord)])] = detailInputsToGroup.map {
        case (detailInput, dataRecord) =>
          val key: StringBuilder = new StringBuilder
          dimensions.foreach {
            dimension =>
              key ++= {

Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse("")).toString
              }
          }
          (key.toString, (detailInput, dataRecord))
      }.groupByKey

      groupedDetail.map {
        case (key, values) => {
          val valueList = values.toList

          //Compute dimensions // You can skup this
          val (detailInput, dataRecord) = valueList.head
          val schema = SchemaUtil.outputSchema(_detail)
          val detailOutput = new DetailOutputRecord(detail, new
SessionRecord(schema))
          DataUtil.populateDimensions(schema, dimensions.toArray,
detailInput, dataRecord, detailOutput)


          val metricsData = metricProviders.flatMap {
            case (className, instance) =>
              val data = instance.getMetrics(valueList)
              ReflectionUtil.getData(data,
_metricProviderMemberNames(className))
          }
          metricsData.map { case (k, v) => detailOutput.put(k, v) }
          val wrap = new AvroKey[DetailOutputRecord](detailOutput)
          (wrap, NullWritable.get)
        }
      }


//getMetrics:
  def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = {
    val totalViCount = details.size.toLong
    val uniqueViCount =
details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong
    new ViewItemCountMetric(totalViCount, uniqueViCount)
  }


I understand that totalViCount can be implemented using reduceByKey. How
can i implement total unique count as i need to have the full list to know
the unique values.

-- 
Deepak

Reply via email to