Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2083#discussion_r177685409
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
---
@@ -493,6 +629,210 @@ case class CarbonPreAggregateQueryRules(sparkSession:
SparkSession) extends Rule
updatedPlan
}
+ /**
+ * Method to get the aggregate query plan
+ * @param aggPlan
+ * aggregate table query plan
+ * @param grExp
+ * fact group by expression
+ * @param aggExp
+ * fact aggregate expression
+ * @param carbonTable
+ * fact table
+ * @param aggregationDataMapSchema
+ * selected aggregation data map
+ * @param factAggPlan
+ * fact aggregate query plan
+ * @return updated plan
+ */
+ def getAggregateQueryPlan(aggPlan: LogicalPlan,
+ grExp: Seq[Expression],
+ aggExp: Seq[NamedExpression],
+ carbonTable: CarbonTable,
+ aggregationDataMapSchema: DataMapSchema,
+ factAggPlan: LogicalPlan): LogicalPlan = {
+ // to handle streaming table with pre aggregate
+ if (carbonTable.isStreamingTable) {
+ setSegmentsForStreaming(carbonTable, aggregationDataMapSchema)
--- End diff --
There is a possibility of losing the data from the fact if you set only
segments for fact and read the streaming segments directly from fact streaming
segments, in case of handoff streaming segments would be marked for delete and
those won't be accessible during fact read. So set all the current fact
segments also here along with aggregate segments. During fact read,compare the
current segments and set segments and read all intersected segments
---