This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 90841bc  [CARBONDATA-4202] Fix issue when refresh main table with MV
90841bc is described below

commit 90841bc10860efd5a1417b19cc649f1d483dca07
Author: ShreelekhyaG <[email protected]>
AuthorDate: Fri Jun 4 14:55:13 2021 +0530

    [CARBONDATA-4202] Fix issue when refresh main table with MV
    
    Why is this PR needed?
    When trying to register a table of old store which has MV, it fails parser
    error(syntax issue while creating table). It is trying to create table with
    relatedmvtablesmap property which is not valid.
    
    What changes were proposed in this PR?
    1. Removed relatedmvtablesmap from table properties in 
RefreshCarbonTableCommand
    2. After Main table has registered, to register MV made changes to get the 
schema
       from the system folder and register.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4147
---
 .../org/apache/carbondata/core/view/MVManager.java |  7 +++-
 .../apache/carbondata/core/view/MVProvider.java    | 21 ++++++++++--
 .../apache/carbondata/view/MVCatalogInSpark.scala  |  4 +++
 .../management/RefreshCarbonTableCommand.scala     |  3 ++
 .../command/view/CarbonRefreshMVCommand.scala      | 22 ++++++++++--
 .../register/TestRegisterCarbonTable.scala         | 39 ++++++++++++++++++++++
 6 files changed, 90 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java 
b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
index 3892657..0618415 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
@@ -142,7 +142,12 @@ public abstract class MVManager {
   }
 
   public MVSchema getSchema(String databaseName, String viewName) throws 
IOException {
-    return schemaProvider.getSchema(this, databaseName, viewName);
+    return schemaProvider.getSchema(this, databaseName, viewName, false);
+  }
+
+  public MVSchema getSchema(String databaseName, String viewName, boolean 
isRegisterMV)
+      throws IOException {
+    return schemaProvider.getSchema(this, databaseName, viewName, 
isRegisterMV);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java 
b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
index 87da842..149ff4f 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
@@ -102,12 +102,17 @@ public class MVProvider {
   }
 
   public MVSchema getSchema(MVManager viewManager, String databaseName,
-                            String viewName) throws IOException {
+                            String viewName, boolean isRegisterMV) throws 
IOException {
     SchemaProvider schemaProvider = this.getSchemaProvider(viewManager, 
databaseName);
     if (schemaProvider == null) {
       return null;
     }
-    return schemaProvider.retrieveSchema(viewManager, viewName);
+    if (!isRegisterMV) {
+      return schemaProvider.retrieveSchema(viewManager, viewName);
+    } else {
+      // in case of old store, get schema by checking in system folder.
+      return schemaProvider.retrieveSchemaFromFolder(viewManager, viewName);
+    }
   }
 
   List<MVSchema> getSchemas(MVManager viewManager, String databaseName,
@@ -563,6 +568,18 @@ public class MVProvider {
       return false;
     }
 
+    public MVSchema retrieveSchemaFromFolder(MVManager viewManager, String 
mvName)
+        throws IOException {
+      this.schemas = this.retrieveAllSchemasInternal(viewManager);
+      for (MVSchema schema : this.schemas) {
+        if (schema.getIdentifier().getTableName().equalsIgnoreCase(mvName)) {
+          touchMDTFile();
+          return schema;
+        }
+      }
+      return null;
+    }
+
     private synchronized void touchMDTFile() throws IOException {
       if (!FileFactory.isFileExist(this.systemDirectory)) {
         FileFactory.createDirectoryAndSetPermission(this.systemDirectory,
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
index 8707fab..8040575 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
@@ -82,6 +82,10 @@ case class MVCatalogInSpark(session: SparkSession)
     enabledSchemas.toArray
   }
 
+  def getAllSchemas: Array[MVSchemaWrapper] = {
+    viewSchemas.toArray
+  }
+
   def isMVInSync(mvSchema: MVSchema): Boolean = {
     viewManager.isMVInSyncWithParentTables(mvSchema)
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index a535988..3f856a2 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -84,6 +84,9 @@ case class RefreshCarbonTableCommand(
       if (FileFactory.isFileExist(schemaFilePath)) {
         // read TableInfo
         val tableInfo = SchemaReader.getTableInfo(identifier)
+        // remove mv related info from source table properties
+        tableInfo.getFactTable
+          
.getTableProperties.remove(CarbonCommonConstants.RELATED_MV_TABLES_MAP)
         // refresh the column schema in case of store before V3
         refreshColumnSchema(tableInfo)
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
index c4eb2bb..937dd6b 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
@@ -39,10 +39,26 @@ case class CarbonRefreshMVCommand(
     val databaseName =
       
databaseNameOption.getOrElse(session.sessionState.catalog.getCurrentDatabase)
     val viewManager = MVManagerInSpark.get(session)
-    val schema = viewManager.getSchema(databaseName, mvName)
+    var schema = viewManager.getSchema(databaseName, mvName)
     if (schema == null) {
-      throw new MalformedMVCommandException(
-        s"Materialized view $databaseName.$mvName does not exist")
+      // schema can be null when MV is registered i.e. in case of 
compatibility scenarios
+      // with old store. So check and get schema if exists in the system 
folder.
+      schema = viewManager.getSchema(databaseName, mvName, true)
+      if (schema == null) {
+        throw new MalformedMVCommandException(
+          s"Materialized view $databaseName.$mvName does not exist")
+      }
+      val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
+      if 
(!viewCatalog.getAllSchemas.exists(_.viewSchema.getIdentifier.getTableName
+          .equals(schema.getIdentifier.getTableName))) {
+        try {
+          viewCatalog.registerSchema(schema)
+        } catch {
+          case e: Exception =>
+            throw new Exception(
+              "Error while registering schema for mv: " + 
schema.getIdentifier.getTableName, e)
+        }
+      }
     }
 
     // refresh table property of parent table if needed
diff --git 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
index a55b5a4..48ddec4 100644
--- 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
+++ 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.view.rewrite.TestUtil
 
 /**
  *
@@ -231,6 +232,44 @@ class TestRegisterCarbonTable extends QueryTest with 
BeforeAndAfterEach {
     }
   }
 
+  test("test register table with mv") {
+    sql(s"create database carbon location '$dbLocation'")
+    sql("use carbon")
+    sql(
+      """create table carbon.carbontable (
+        |c1 string,c2 int,c3 string,c5 string) STORED AS 
carbondata""".stripMargin)
+    sql("insert into carbontable select 'a',1,'aa','aaa'")
+    sql("create materialized view mv1 as select avg(c2),c3 from carbontable 
group by c3")
+    backUpData(dbLocation, Some("carbon"), "carbontable")
+    backUpData(dbLocation, Some("carbon"), "mv1")
+    val source = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + "_system" 
+
+                 CarbonCommonConstants.FILE_SEPARATOR + "mv_schema.mv1"
+    val destination = dbLocation + "_back" + 
CarbonCommonConstants.FILE_SEPARATOR + "mv_schema.mv1"
+    backUpMvSchema(source, destination)
+    sql("drop table carbontable")
+    if 
(!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)
 {
+      restoreData(dbLocation, "carbontable")
+      restoreData(dbLocation, "mv1")
+      backUpMvSchema(destination, source)
+      sql("refresh table carbontable")
+      sql("refresh table mv1")
+      sql("refresh materialized view mv1")
+      checkAnswer(sql("select count(*) from carbontable"), Row(1))
+      checkAnswer(sql("select c1 from carbontable"), Seq(Row("a")))
+      val df = sql("select avg(c2),c3 from carbontable group by c3")
+      assert(TestUtil.verifyMVHit(df.queryExecution.optimizedPlan, "mv1"))
+    }
+  }
+
+  private def backUpMvSchema(source: String, destination: String) = {
+    try {
+      FileUtils.copyFile(new File(source), new File(destination))
+    } catch {
+      case e: Exception =>
+        throw new IOException("Mv schema file backup/restore failed.")
+    }
+  }
+
   override def afterEach {
     sql("use default")
     sql("drop database if exists carbon cascade")

Reply via email to