This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 90841bc [CARBONDATA-4202] Fix issue when refresh main table with MV
90841bc is described below
commit 90841bc10860efd5a1417b19cc649f1d483dca07
Author: ShreelekhyaG <[email protected]>
AuthorDate: Fri Jun 4 14:55:13 2021 +0530
[CARBONDATA-4202] Fix issue when refresh main table with MV
Why is this PR needed?
When trying to register a table of old store which has MV, it fails parser
error(syntax issue while creating table). It is trying to create table with
relatedmvtablesmap property which is not valid.
What changes were proposed in this PR?
1. Removed relatedmvtablesmap from table properties in
RefreshCarbonTableCommand
2. After Main table has registered, to register MV made changes to get the
schema
from the system folder and register.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4147
---
.../org/apache/carbondata/core/view/MVManager.java | 7 +++-
.../apache/carbondata/core/view/MVProvider.java | 21 ++++++++++--
.../apache/carbondata/view/MVCatalogInSpark.scala | 4 +++
.../management/RefreshCarbonTableCommand.scala | 3 ++
.../command/view/CarbonRefreshMVCommand.scala | 22 ++++++++++--
.../register/TestRegisterCarbonTable.scala | 39 ++++++++++++++++++++++
6 files changed, 90 insertions(+), 6 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
index 3892657..0618415 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
@@ -142,7 +142,12 @@ public abstract class MVManager {
}
public MVSchema getSchema(String databaseName, String viewName) throws
IOException {
- return schemaProvider.getSchema(this, databaseName, viewName);
+ return schemaProvider.getSchema(this, databaseName, viewName, false);
+ }
+
+ public MVSchema getSchema(String databaseName, String viewName, boolean
isRegisterMV)
+ throws IOException {
+ return schemaProvider.getSchema(this, databaseName, viewName,
isRegisterMV);
}
/**
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
index 87da842..149ff4f 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
@@ -102,12 +102,17 @@ public class MVProvider {
}
public MVSchema getSchema(MVManager viewManager, String databaseName,
- String viewName) throws IOException {
+ String viewName, boolean isRegisterMV) throws
IOException {
SchemaProvider schemaProvider = this.getSchemaProvider(viewManager,
databaseName);
if (schemaProvider == null) {
return null;
}
- return schemaProvider.retrieveSchema(viewManager, viewName);
+ if (!isRegisterMV) {
+ return schemaProvider.retrieveSchema(viewManager, viewName);
+ } else {
+ // in case of old store, get schema by checking in system folder.
+ return schemaProvider.retrieveSchemaFromFolder(viewManager, viewName);
+ }
}
List<MVSchema> getSchemas(MVManager viewManager, String databaseName,
@@ -563,6 +568,18 @@ public class MVProvider {
return false;
}
+ public MVSchema retrieveSchemaFromFolder(MVManager viewManager, String
mvName)
+ throws IOException {
+ this.schemas = this.retrieveAllSchemasInternal(viewManager);
+ for (MVSchema schema : this.schemas) {
+ if (schema.getIdentifier().getTableName().equalsIgnoreCase(mvName)) {
+ touchMDTFile();
+ return schema;
+ }
+ }
+ return null;
+ }
+
private synchronized void touchMDTFile() throws IOException {
if (!FileFactory.isFileExist(this.systemDirectory)) {
FileFactory.createDirectoryAndSetPermission(this.systemDirectory,
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
index 8707fab..8040575 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
@@ -82,6 +82,10 @@ case class MVCatalogInSpark(session: SparkSession)
enabledSchemas.toArray
}
+ def getAllSchemas: Array[MVSchemaWrapper] = {
+ viewSchemas.toArray
+ }
+
def isMVInSync(mvSchema: MVSchema): Boolean = {
viewManager.isMVInSyncWithParentTables(mvSchema)
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index a535988..3f856a2 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -84,6 +84,9 @@ case class RefreshCarbonTableCommand(
if (FileFactory.isFileExist(schemaFilePath)) {
// read TableInfo
val tableInfo = SchemaReader.getTableInfo(identifier)
+ // remove mv related info from source table properties
+ tableInfo.getFactTable
+
.getTableProperties.remove(CarbonCommonConstants.RELATED_MV_TABLES_MAP)
// refresh the column schema in case of store before V3
refreshColumnSchema(tableInfo)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
index c4eb2bb..937dd6b 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
@@ -39,10 +39,26 @@ case class CarbonRefreshMVCommand(
val databaseName =
databaseNameOption.getOrElse(session.sessionState.catalog.getCurrentDatabase)
val viewManager = MVManagerInSpark.get(session)
- val schema = viewManager.getSchema(databaseName, mvName)
+ var schema = viewManager.getSchema(databaseName, mvName)
if (schema == null) {
- throw new MalformedMVCommandException(
- s"Materialized view $databaseName.$mvName does not exist")
+ // schema can be null when MV is registered i.e. in case of
compatibility scenarios
+ // with old store. So check and get schema if exists in the system
folder.
+ schema = viewManager.getSchema(databaseName, mvName, true)
+ if (schema == null) {
+ throw new MalformedMVCommandException(
+ s"Materialized view $databaseName.$mvName does not exist")
+ }
+ val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
+ if
(!viewCatalog.getAllSchemas.exists(_.viewSchema.getIdentifier.getTableName
+ .equals(schema.getIdentifier.getTableName))) {
+ try {
+ viewCatalog.registerSchema(schema)
+ } catch {
+ case e: Exception =>
+ throw new Exception(
+ "Error while registering schema for mv: " +
schema.getIdentifier.getTableName, e)
+ }
+ }
}
// refresh table property of parent table if needed
diff --git
a/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
b/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
index a55b5a4..48ddec4 100644
---
a/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
+++
b/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterEach
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.view.rewrite.TestUtil
/**
*
@@ -231,6 +232,44 @@ class TestRegisterCarbonTable extends QueryTest with
BeforeAndAfterEach {
}
}
+ test("test register table with mv") {
+ sql(s"create database carbon location '$dbLocation'")
+ sql("use carbon")
+ sql(
+ """create table carbon.carbontable (
+ |c1 string,c2 int,c3 string,c5 string) STORED AS
carbondata""".stripMargin)
+ sql("insert into carbontable select 'a',1,'aa','aaa'")
+ sql("create materialized view mv1 as select avg(c2),c3 from carbontable
group by c3")
+ backUpData(dbLocation, Some("carbon"), "carbontable")
+ backUpData(dbLocation, Some("carbon"), "mv1")
+ val source = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + "_system"
+
+ CarbonCommonConstants.FILE_SEPARATOR + "mv_schema.mv1"
+ val destination = dbLocation + "_back" +
CarbonCommonConstants.FILE_SEPARATOR + "mv_schema.mv1"
+ backUpMvSchema(source, destination)
+ sql("drop table carbontable")
+ if
(!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)
{
+ restoreData(dbLocation, "carbontable")
+ restoreData(dbLocation, "mv1")
+ backUpMvSchema(destination, source)
+ sql("refresh table carbontable")
+ sql("refresh table mv1")
+ sql("refresh materialized view mv1")
+ checkAnswer(sql("select count(*) from carbontable"), Row(1))
+ checkAnswer(sql("select c1 from carbontable"), Seq(Row("a")))
+ val df = sql("select avg(c2),c3 from carbontable group by c3")
+ assert(TestUtil.verifyMVHit(df.queryExecution.optimizedPlan, "mv1"))
+ }
+ }
+
+ private def backUpMvSchema(source: String, destination: String) = {
+ try {
+ FileUtils.copyFile(new File(source), new File(destination))
+ } catch {
+ case e: Exception =>
+ throw new IOException("Mv schema file backup/restore failed.")
+ }
+ }
+
override def afterEach {
sql("use default")
sql("drop database if exists carbon cascade")