Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2282#discussion_r186920585
--- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---
@@ -182,40 +184,79 @@ class Master(sparkConf: SparkConf) {
}
LOG.info(s"[SearchId:$queryId] accumulated result size $rowCount")
}
- def onFaiure(e: Throwable) = throw new IOException(s"exception in
worker: ${ e.getMessage }")
- def onTimedout() = throw new ExecutionTimeoutException()
+
+ def onFailure(e: Throwable) = throw new IOException(s"exception in
worker: ${e.getMessage}")
+
+ def onTimeout() = throw new ExecutionTimeoutException()
// prune data and get a mapping of worker hostname to list of blocks,
// then add these blocks to the SearchRequest and fire the RPC call
val nodeBlockMapping: JMap[String, JList[Distributable]] =
pruneBlock(table, columns, filter)
val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks)
=>
- // Build a SearchRequest
- val split = new SerializableWritable[CarbonMultiBlockSplit](
- new CarbonMultiBlockSplit(blocks, splitAddress))
- val request = SearchRequest(queryId, split, table.getTableInfo,
columns, filter, localLimit)
+ val hashMap = new mutable.HashMap[String, JList[Distributable]]()
+ for (i <- 0 until (blocks.size())) {
+ val shardName = CarbonTablePath
--- End diff --
I think this logic should be inside Scheduler
---