This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new a4700af [CARBONDATA-3602]Fix MV issues with session level operations
a4700af is described below
commit a4700afe40c1b10d018bc20eab5805def115337c
Author: akashrn5 <[email protected]>
AuthorDate: Wed Dec 4 13:46:40 2019 +0530
[CARBONDATA-3602]Fix MV issues with session level operations
Problems:
1. when the same timeseries query is being executed from different
session, one session query is not being hit to datamap table.
When the query executed from different session, the semantic equals
of ScalaUDF expression fails, as object reference is different,
when the object equals is checked for that.
2. When same datamap on same table, but in some other database is
executed, fails with table not found error. This is because, when
first time the datamap catalog map is initialized, it contains the
session info, which wont be cleared when session get refreshed.
Solution:
1. make the TimeSeriesFunction as case class, as during run time
scala generates the equals method, which will check the other
equality when the object reference are not equal.
2. when the session info gets updated, clear the entry in map
and put again with new session info.
This closes #3497
---
.../core/datamap/DataMapStoreManager.java | 8 +++++++
.../carbondata/mv/datamap/MVAnalyzerRule.scala | 10 ++++++++-
.../mv/rewrite/SummaryDatasetCatalog.scala | 26 +++++++++++++++++++++-
.../command/timeseries/TimeSeriesFunction.scala | 2 +-
4 files changed, 43 insertions(+), 3 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index c01a1ad..50c7d6b 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -277,6 +277,14 @@ public final class DataMapStoreManager {
}
/**
+ * This method removes the datamapCatalog for the corresponding provider if
the session gets
+ * refreshed or updated
+ */
+ public void clearDataMapCatalog() {
+ dataMapCatalogs = null;
+ }
+
+ /**
* Initialize by reading all datamaps from store and re register it
* @param dataMapProvider
*/
diff --git
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
index da38bae..be292be 100644
---
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
+++
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -83,8 +83,16 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
}
Aggregate(grp, aExp, child)
}
- val catalog =
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
+ var catalog =
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+ // when first time DataMapCatalogs are initialized, it stores session info
also, but when carbon
+ // session is newly created, catalog map will not be cleared, so if
session info is different,
+ // remove the entry from map.
+ if (catalog != null &&
!catalog.mvSession.sparkSession.equals(sparkSession)) {
+ DataMapStoreManager.getInstance().clearDataMapCatalog()
+ catalog =
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
+
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+ }
if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
val modularPlan =
catalog.mvSession.sessionState.rewritePlan(plan).withMVTable
if (modularPlan.find(_.rewritten).isDefined) {
diff --git
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
index 645b304..19e42a6 100644
---
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
+++
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.mv.rewrite
import java.util.concurrent.locks.ReentrantReadWriteLock
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{CarbonSession, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -108,7 +108,22 @@ private[mv] class SummaryDatasetCatalog(sparkSession:
SparkSession)
private[mv] def registerSchema(dataMapSchema: DataMapSchema): Unit = {
writeLock {
val updatedQuery = parser.addMVSkipFunction(dataMapSchema.getCtasQuery)
+ val currentDatabase = sparkSession match {
+ case carbonSession: CarbonSession =>
+ carbonSession.sessionState.catalog.getCurrentDatabase
+ case _ =>
+ sparkSession.catalog.currentDatabase
+ }
+ // This is required because datamap schemas are across databases, so
while loading the
+ // catalog, if the datamap is in database other than
sparkSession.currentDataBase(), then it
+ // fails to register, so set the database present in the dataMapSchema
Object
+ setCurrentDataBase(dataMapSchema.getRelationIdentifier.getDatabaseName)
val query = sparkSession.sql(updatedQuery)
+ // here setting back to current database of current session, because if
the actual query
+ // contains db name in query like, select db1.column1 from table and
current database is
+ // default and if we drop the db1, still the session has current db as
db1.
+ // So setting back to current database.
+ setCurrentDataBase(currentDatabase)
val planToRegister = MVHelper.dropDummFuc(query.queryExecution.analyzed)
val modularPlan =
mvSession.sessionState.modularizer.modularize(
@@ -143,6 +158,15 @@ private[mv] class SummaryDatasetCatalog(sparkSession:
SparkSession)
}
}
+ private def setCurrentDataBase(dataBaseName: String): Unit = {
+ sparkSession match {
+ case carbonSession: CarbonSession =>
+ carbonSession.sessionState.catalog.setCurrentDatabase(dataBaseName)
+ case _ =>
+ sparkSession.catalog.setCurrentDatabase(dataBaseName)
+ }
+ }
+
/** Removes the given [[DataFrame]] from the catalog */
private[mv] def unregisterSchema(dataMapName: String): Unit = {
writeLock {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
index 3ec02c9..ddcc787 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala
@@ -24,7 +24,7 @@ import org.apache.carbondata.core.preagg.TimeSeriesUDF
* Time series udf class
*/
-class TimeSeriesFunction extends ((Timestamp, String) => Timestamp) with
Serializable{
+case class TimeSeriesFunction() extends ((Timestamp, String) => Timestamp)
with Serializable{
override def apply(v1: Timestamp, v2: String): Timestamp = {
TimeSeriesUDF.INSTANCE.applyUDF(v1, v2)