This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8694ed4  [CARBONDATA-3531] Support load and query for MV timeseries 
and support multiple granularity
8694ed4 is described below

commit 8694ed4068cc1b9e44c9b3ed7d3ba5f310a3e52c
Author: akashrn5 <[email protected]>
AuthorDate: Thu Nov 7 11:05:33 2019 +0530

    [CARBONDATA-3531] Support load and query for MV timeseries and support 
multiple granularity
    
    Why this PR?
    This PR implements load and query supports and some of the MV bug fixes.
    Timeseries on MV supports different granularities like below:
    second, minute, thirty_minute, fifteen_minute, ten_minute, five_minute,
    hour, day, week, month, year.
    Bug Fixes:
    
    1. Fix the filter query not hitting MV timeseries.
    2. Fix between filters not working on timeseries query
    
    This closes #3438
---
 .../core/constants/CarbonCommonConstants.java      |  11 +
 .../core/metadata/schema/datamap/Granularity.java  |   6 +
 ...SeriesFunctionEnum.java => DaysOfWeekEnum.java} |  45 ++-
 .../core/preagg/TimeSeriesFunctionEnum.java        |  13 +-
 .../carbondata/core/preagg/TimeSeriesUDF.java      |  56 ++++
 .../carbondata/mv/datamap/MVAnalyzerRule.scala     |  53 ++-
 .../carbondata/mv/rewrite/DefaultMatchMaker.scala  |  44 ++-
 .../org/apache/carbondata/mv/rewrite/Utils.scala   |  30 +-
 .../timeseries/TestMVTimeSeriesLoadAndQuery.scala  | 361 +++++++++++++++++++++
 docs/configuration-parameters.md                   |   1 +
 docs/datamap/mv-datamap-guide.md                   |   8 +-
 .../src/test/resources/mv_sampledata.csv           |  14 +
 .../datamap/CarbonCreateDataMapCommand.scala       |   7 +-
 .../command/datamap/CarbonDataMapShowCommand.scala |   9 +-
 .../command/timeseries/TimeSeriesUtil.scala        |  12 +-
 15 files changed, 621 insertions(+), 49 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index da5586e..9b12251 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2285,4 +2285,15 @@ public final class CarbonCommonConstants {
    * hive column-name maximum length
    */
   public static final int MAXIMUM_CHAR_LENGTH = 128;
+
+  /**
+   * Carbon property for timeseries MV to define the first day of week
+   */
+  public static final String CARBON_TIMESERIES_FIRST_DAY_OF_WEEK =
+      "carbon.timeseries.first.day.of.week";
+
+  /**
+   * Default first day of week
+   */
+  public static final String CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT = 
"SUNDAY";
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
index d6aefb6..b054bdf 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
@@ -29,10 +29,16 @@ package org.apache.carbondata.core.metadata.schema.datamap;
 public enum Granularity {
   YEAR("year_granularity"),
   MONTH("month_granularity"),
+  WEEK("week_granularity"),
   DAY("day_granularity"),
   HOUR("hour_granularity"),
+  THIRTY_MINUTE("thirty_minute_granularity"),
+  FIFTEEN_MINUTE("fifteen_minute_granularity"),
+  TEN_MINUTE("ten_minute_granularity"),
+  FIVE_MINUTE("five_minute_granularity"),
   MINUTE("minute_granularity"),
   SECOND("second_granularity");
+
   private String name;
 
   Granularity(String name) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
 b/core/src/main/java/org/apache/carbondata/core/preagg/DaysOfWeekEnum.java
similarity index 69%
copy from 
core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
copy to core/src/main/java/org/apache/carbondata/core/preagg/DaysOfWeekEnum.java
index 5d0d2af..e73b595 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/DaysOfWeekEnum.java
@@ -14,40 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.carbondata.core.preagg;
 
 /**
- * enum for timeseries function
+ * Class to define the days of the week inline with the Java Calender class 
DAYs.
  */
-public enum TimeSeriesFunctionEnum {
-  SECOND("second", 0),
-  MINUTE("minute", 1),
-  HOUR("hour", 2),
-  DAY("day", 3),
-  MONTH("month", 4),
-  YEAR("year", 5);
-
-  /**
-   * name of the function
-   */
-  private String name;
-
-  /**
-   * ordinal for function
-   */
-  private int ordinal;
+public enum DaysOfWeekEnum {
 
-  TimeSeriesFunctionEnum(String name, int ordinal) {
-    this.name = name;
-    this.ordinal = ordinal;
-  }
+  SUNDAY("SUNDAY", 1),
+  MONDAY("MONDAY", 2),
+  TUESDAY("TUESDAY", 3),
+  WEDNESDAY("WEDNESDAY", 4),
+  THURSDAY("THURSDAY", 5),
+  FRIDAY("FRIDAY", 6),
+  SATURDAY("SATURDAY", 7);
 
-  public String getName() {
-    return name;
+  public String getDay() {
+    return day;
   }
 
   public int getOrdinal() {
     return ordinal;
   }
+
+  private String day;
+
+  private int ordinal;
+
+  DaysOfWeekEnum(String day, int ordinal) {
+    this.day = day;
+    this.ordinal = ordinal;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
 
b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
index 5d0d2af..f30923b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
@@ -23,10 +23,15 @@ package org.apache.carbondata.core.preagg;
 public enum TimeSeriesFunctionEnum {
   SECOND("second", 0),
   MINUTE("minute", 1),
-  HOUR("hour", 2),
-  DAY("day", 3),
-  MONTH("month", 4),
-  YEAR("year", 5);
+  FIVE_MINUTE("five_minute", 2),
+  TEN_MINUTE("ten_minute", 3),
+  FIFTEEN_MINUTE("fifteen_minute", 4),
+  THIRTY_MINUTE("thirty_minute", 5),
+  HOUR("hour", 6),
+  DAY("day", 7),
+  WEEK("week", 8),
+  MONTH("month", 9),
+  YEAR("year", 10);
 
   /**
    * name of the function
diff --git 
a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java 
b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
index ce0f15d..5cfa7f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
@@ -22,11 +22,20 @@ import java.util.Calendar;
 import java.util.GregorianCalendar;
 import java.util.List;
 
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.log4j.Logger;
+
 /**
  * class for applying timeseries udf
  */
 public class TimeSeriesUDF {
 
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TimeSeriesUDF.class.getName());
+
   public final List<String> TIMESERIES_FUNCTION = new ArrayList<>();
 
   // thread local for keeping calender instance
@@ -76,6 +85,25 @@ public class TimeSeriesUDF {
         calendar.set(Calendar.MILLISECOND, 0);
         calendar.set(Calendar.SECOND, 0);
         break;
+      case FIVE_MINUTE:
+        setData(calendar, 5);
+        break;
+      case TEN_MINUTE:
+        setData(calendar, 10);
+        break;
+      case FIFTEEN_MINUTE:
+        setData(calendar, 15);
+        break;
+      case THIRTY_MINUTE:
+        setData(calendar, 30);
+        break;
+      case WEEK:
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+        calendar.set(Calendar.DAY_OF_WEEK, calendar.getFirstDayOfWeek());
+        break;
       case HOUR:
         calendar.set(Calendar.MILLISECOND, 0);
         calendar.set(Calendar.SECOND, 0);
@@ -110,6 +138,15 @@ public class TimeSeriesUDF {
   }
 
   /**
+   * This method sets the calender data based on the interval time of 
granularity
+   */
+  private void setData(Calendar calendar, int intervalTime) {
+    calendar.set(Calendar.MILLISECOND, 0);
+    calendar.set(Calendar.SECOND, 0);
+    calendar.set(Calendar.MINUTE, (calendar.get(Calendar.MINUTE) / 
intervalTime) * intervalTime);
+  }
+
+  /**
    * Below method will be used to initialize the thread local
    */
   private void initialize() {
@@ -119,10 +156,29 @@ public class TimeSeriesUDF {
     if (TIMESERIES_FUNCTION.isEmpty()) {
       TIMESERIES_FUNCTION.add("second");
       TIMESERIES_FUNCTION.add("minute");
+      TIMESERIES_FUNCTION.add("five_minute");
+      TIMESERIES_FUNCTION.add("ten_minute");
+      TIMESERIES_FUNCTION.add("fifteen_minute");
+      TIMESERIES_FUNCTION.add("thirty_minute");
       TIMESERIES_FUNCTION.add("hour");
       TIMESERIES_FUNCTION.add("day");
+      TIMESERIES_FUNCTION.add("week");
       TIMESERIES_FUNCTION.add("month");
       TIMESERIES_FUNCTION.add("year");
     }
+    int firstDayOfWeek;
+    try {
+      firstDayOfWeek = DaysOfWeekEnum.valueOf(CarbonProperties.getInstance()
+          
.getProperty(CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK,
+              
CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT).toUpperCase())
+          .getOrdinal();
+    } catch (IllegalArgumentException ex) {
+      LOGGER.warn("Invalid value set for first of the week. Considering the 
default value as: "
+          + CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT);
+      firstDayOfWeek = DaysOfWeekEnum.valueOf(CarbonProperties.getInstance()
+          
.getProperty(CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT)
+          .toUpperCase()).getOrdinal();
+    }
+    calanderThreadLocal.get().setFirstDayOfWeek(firstDayOfWeek);
   }
 }
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
index edd9c81..0ba6bfc 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, 
UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, ScalaUDF}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Command, 
DeserializeToObject, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -33,6 +33,7 @@ import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.datamap.DataMapManager
+import org.apache.carbondata.mv.plans.modular.{ModularPlan, Select}
 import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
 
 /**
@@ -87,7 +88,8 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
     if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
       val modularPlan = 
catalog.mvSession.sessionState.rewritePlan(plan).withMVTable
       if (modularPlan.find(_.rewritten).isDefined) {
-        val compactSQL = modularPlan.asCompactSQL
+        var compactSQL = modularPlan.asCompactSQL
+        compactSQL = reWriteTheUDFInSQLWithQualifierName(modularPlan, 
compactSQL)
         val analyzed = sparkSession.sql(compactSQL).queryExecution.analyzed
         analyzed
       } else {
@@ -99,6 +101,53 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
   }
 
   /**
+   * This method is specially handled for timeseries on MV, because when we 
use timeseries UDF which
+   * is a scala UDF, so after plan matching when query is made. We get as 
below query for example
+   *
+   * SELECT gen_subsumer_0.`UDF:timeseries(projectjoindate, hour)` AS 
`UDF:timeseries(projectjoi...
+   * FROM
+   * (SELECT datamap1_table.`UDF:timeseries_projectjoindate_hour` AS 
`UDF:timeseries(projectjoin...
+   * FROM
+   *     default.datamap1_table
+   * GROUP BY datamap1_table.`UDF:timeseries_projectjoindate_hour`) 
gen_subsumer_0
+   * WHERE
+   * (UDF:timeseries(projectjoindate, hour) = TIMESTAMP('2016-02-23 
09:00:00.0'))
+   *
+   * Here for Where filter expression is of type ScalaUDF, so when we do 
.sql() to prepare SQL, we
+   * get without qualifier name(Refer 
org.apache.spark.sql.catalyst.expressions.NonSQLExpression)
+   * which is 'gen_subsumer_0', so this funtion rewrites with qualifier name 
and returns, so that
+   * parsing does not fail in spark, for rewritten MV query.
+   * @param plan Modular Plan
+   * @param compactSQL compactSQL generated from Modular plan
+   * @return Rewritten plan with the qualifier names for where clauses in 
query.
+   */
+  private def reWriteTheUDFInSQLWithQualifierName(plan: ModularPlan, 
compactSQL: String): String = {
+    var outPutUDFColumn = ""
+    var reWrittenQuery = compactSQL
+    plan match {
+      case select: Select =>
+        select.outputList.collect {
+          case a: Alias if a.child.isInstanceOf[Attribute] =>
+            val childName = a.child.asInstanceOf[Attribute].name
+            if (childName.startsWith("UDF:timeseries")) {
+              outPutUDFColumn = childName
+            }
+        }
+        var queryArray: Array[String] = Array.empty
+        if (!outPutUDFColumn.equalsIgnoreCase("") && 
compactSQL.contains("WHERE")) {
+          queryArray = compactSQL.split("\n")
+          queryArray(queryArray.indexOf("WHERE") + 1) = queryArray(
+            queryArray.indexOf("WHERE") + 1).replace(outPutUDFColumn,
+            s"gen_subsumer_0.`$outPutUDFColumn`")
+          reWrittenQuery = queryArray.mkString("\n")
+        }
+        reWrittenQuery
+      case _ =>
+        compactSQL
+    }
+  }
+
+  /**
    * Whether the plan is valid for doing modular plan matching and datamap 
replacing.
    */
   def isValidPlan(plan: LogicalPlan, catalog: SummaryDatasetCatalog): Boolean 
= {
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 5329608..616d0bd 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -448,17 +448,53 @@ object GroupbyGroupbySelectOnlyChildDelta extends 
DefaultMatchPattern with Predi
       subsumer: ModularPlan,
       compensation: Option[ModularPlan]) = {
     if (subsumee.asInstanceOf[GroupBy].predicateList.contains(exprE)) {
-      if (exprListR.exists(_.semanticEquals(exprE)) || canEvaluate(exprE, 
exprListR)) true
-      else false
+      if (exprListR.exists(_.semanticEquals(exprE)) || canEvaluate(exprE, 
exprListR) ||
+          isDerivableForUDF(exprE, exprListR)) {
+        true
+      } else {
+        false
+      }
     } else if (compensation.getOrElse(throw new RuntimeException("compensation 
cannot be None"))
       .asInstanceOf[Select].predicateList.contains(exprE)) {
-      if (canEvaluate(exprE, exprListR) || 
exprListR.exists(_.semanticEquals(exprE))) true
-      else false
+      if (canEvaluate(exprE, exprListR) || 
exprListR.exists(_.semanticEquals(exprE)) ||
+          isDerivableForUDF(exprE, exprListR)) {
+        true
+      } else {
+        false
+      }
     } else {
       false
     }
   }
 
+  /**
+   * org.apache.carbondata.mv.plans.MorePredicateHelper#canEvaluate will be 
checking the
+   * exprE.references as subset of AttibuteSet(exprListR), which will just 
take the column name from
+   * UDF, so it will be always false for ScalaUDF.
+   * This method takes care of checking whether the exprE references can be 
derived form list
+   *
+   * Example:
+   * exprE has attribute reference for timeseries column like 
UDF:timeseries(projectjoindate, month)
+   * So exprE.references will give UDF:timeseries(projectjoindate, month)
+   * exprListR has ScalaUDF also, so AttributeSet(exprListR) contains just 
column name like
+   * projectjoindate, so canEvaluate method returns false.
+   *
+   * Here checking whether the exprListR(ScalaUDF) .sql gives 
UDF:timeseries(projectjoindate, month)
+   * which can be checked with exprE.references
+   */
+  private def isDerivableForUDF(exprE: Expression, exprListR: 
Seq[Expression]): Boolean = {
+    var canBeDerived = false
+    exprListR.forall {
+      case a: ScalaUDF =>
+        a.references.foreach { a =>
+          canBeDerived = exprE.sql.contains(a.name)
+        }
+        canBeDerived
+      case _ =>
+        canBeDerived
+    }
+  }
+
   def apply(
       subsumer: ModularPlan,
       subsumee: ModularPlan,
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
index fa2ea10..7a8b538 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
@@ -17,11 +17,11 @@
 
 package org.apache.carbondata.mv.rewrite
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeMap, Cast, Divide, Expression, Multiply, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeMap, Cast, Divide, Expression, Multiply, PredicateHelper, ScalaUDF}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 
 import org.apache.carbondata.mv.plans.modular
-import org.apache.carbondata.mv.plans.modular.ModularPlan
+import org.apache.carbondata.mv.plans.modular.{ModularPlan, Select}
 
 /**
  * Utility functions used by mqo matcher to convert our plan to new 
aggregation code path
@@ -391,7 +391,7 @@ object Utils extends PredicateHelper {
           }
         } else if (sel_2c.predicateList.contains(exprE)) {
           if (sel_2a.predicateList.exists(_.semanticEquals(exprE)) ||
-              canEvaluate(exprE, subsumer)) {
+              canEvaluate(exprE, subsumer) || canBeDerived(subsumer, exprE)) {
             Some(exprE)
           } else {
             None
@@ -416,6 +416,30 @@ object Utils extends PredicateHelper {
     }
   }
 
+  private def canBeDerived(subsumer: ModularPlan, expE: Expression): Boolean = 
{
+    var canBeDerived = false
+    subsumer.asInstanceOf[Select].outputList.forall {
+      case Alias(s: ScalaUDF, _) =>
+        expE.children.foreach { expr =>
+          if (s.semanticEquals(expr)) {
+            canBeDerived = true
+          }
+          // It is because when expression is like between filter, the expr 
will be as Cast
+          // expression and its child will be ScalaUDF(timeseries), So compare 
the child also.
+          if (!canBeDerived && null != expr.children) {
+            expr.children.foreach { expC =>
+              if (s.semanticEquals(expC)) {
+                canBeDerived = true
+              }
+            }
+          }
+        }
+        canBeDerived
+      case _ =>
+        canBeDerived
+    }
+  }
+
   private def isSemanticEquivalent(translatedExpr: Expression, subsumer: 
ModularPlan) = {
     subsumer match {
       // if subsumer has where clause, even if expr can be translated into new 
expr based on
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
new file mode 100644
index 0000000..8815a94
--- /dev/null
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
@@ -0,0 +1,361 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.mv.timeseries
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.mv.rewrite.TestUtil
+
+class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    drop()
+    createTable()
+  }
+
+  test("create MV timeseries datamap with simple projection and aggregation 
and filter") {
+    sql("drop datamap if exists datamap1")
+    sql("drop datamap if exists datamap2")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'minute'), sum(projectcode) from 
maintable group by timeseries(projectjoindate,'minute')")
+    loadData("maintable")
+    val df = sql("select timeseries(projectjoindate,'minute'), 
sum(projectcode) from maintable group by timeseries(projectjoindate,'minute')")
+    val analyzed = df.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
+    dropDataMap("datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'minute'), sum(projectcode) from 
maintable where timeseries(projectjoindate,'minute') = '2016-02-23 09:17:00' 
group by timeseries(projectjoindate,'minute')")
+
+    sql("select * from datamap1_table").show(false)
+    val df1 = sql("select 
timeseries(projectjoindate,'minute'),sum(projectcode) from maintable where 
timeseries(projectjoindate,'minute') = '2016-02-23 09:17:00'" +
+                  "group by timeseries(projectjoindate,'minute')")
+    val analyzed1 = df1.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed1, "datamap1"))
+    dropDataMap("datamap1")
+  }
+
+  test("test mv timeseries with ctas and filter in actual query") {
+    dropDataMap("datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'hour'), sum(projectcode) from 
maintable group by timeseries(projectjoindate,'hour')")
+    loadData("maintable")
+    val df = sql("select timeseries(projectjoindate,'hour'), sum(projectcode) 
from maintable where timeseries(projectjoindate,'hour') = '2016-02-23 09:00:00' 
" +
+                 "group by timeseries(projectjoindate,'hour')")
+    val analyzed = df.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
+    dropDataMap("datamap1")
+  }
+
+  test("test mv timeseries with multiple granularity datamaps") {
+    dropDataMap("datamap1")
+    dropDataMap("datamap2")
+    dropDataMap("datamap3")
+    dropDataMap("datamap4")
+    dropDataMap("datamap5")
+    dropDataMap("datamap6")
+    loadData("maintable")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'minute'), sum(salary) from maintable 
group by timeseries(projectjoindate,'minute')")
+    sql(
+      "create datamap datamap2 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'hour'), sum(salary) from maintable 
group by timeseries(projectjoindate,'hour')")
+    sql(
+      "create datamap datamap3 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'fifteen_minute'), sum(salary) from 
maintable group by timeseries(projectjoindate,'fifteen_minute')")
+    sql(
+      "create datamap datamap4 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'five_minute'), sum(salary) from 
maintable group by timeseries(projectjoindate,'five_minute')")
+    sql(
+      "create datamap datamap5 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'week'), sum(salary) from maintable 
group by timeseries(projectjoindate,'week')")
+    sql(
+      "create datamap datamap6 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'year'), sum(salary) from maintable 
group by timeseries(projectjoindate,'year')")
+    val df1 = sql("select timeseries(projectjoindate,'minute'), sum(salary) 
from maintable group by timeseries(projectjoindate,'minute')")
+    checkPlan("datamap1", df1)
+    val df2 = sql("select timeseries(projectjoindate,'hour'), sum(salary) from 
maintable group by timeseries(projectjoindate,'hour')")
+    checkPlan("datamap2", df2)
+    val df3 = sql("select timeseries(projectjoindate,'fifteen_minute'), 
sum(salary) from maintable group by 
timeseries(projectjoindate,'fifteen_minute')")
+    checkPlan("datamap3", df3)
+    val df4 = sql("select timeseries(projectjoindate,'five_minute'), 
sum(salary) from maintable group by timeseries(projectjoindate,'five_minute')")
+    checkPlan("datamap4", df4)
+    val df5 = sql("select timeseries(projectjoindate,'week'), sum(salary) from 
maintable group by timeseries(projectjoindate,'week')")
+    checkPlan("datamap5", df5)
+    val df6 = sql("select timeseries(projectjoindate,'year'), sum(salary) from 
maintable group by timeseries(projectjoindate,'year')")
+    checkPlan("datamap6", df6)
+    val result = sql("show datamap on table maintable").collect()
+    result.find(_.get(0).toString.contains("datamap1")) match {
+      case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+      case None => assert(false)
+    }
+    result.find(_.get(0).toString.contains("datamap2")) match {
+      case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+      case None => assert(false)
+    }
+    result.find(_.get(0).toString.contains("datamap3")) match {
+      case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+      case None => assert(false)
+    }
+    result.find(_.get(0).toString.contains("datamap4")) match {
+      case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+      case None => assert(false)
+    }
+    result.find(_.get(0).toString.contains("datamap5")) match {
+      case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+      case None => assert(false)
+    }
+    result.find(_.get(0).toString.contains("datamap6")) match {
+      case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+      case None => assert(false)
+    }
+    dropDataMap("datamap1")
+    dropDataMap("datamap2")
+    dropDataMap("datamap3")
+    dropDataMap("datamap4")
+    dropDataMap("datamap5")
+    dropDataMap("datamap6")
+  }
+
+  test("test mv timeseries with week granular select data") {
+    dropDataMap("datamap1")
+    loadData("maintable")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'week'), sum(salary) from maintable 
group by timeseries(projectjoindate,'week')")
+/*
+    +-----------------------------------+----------+
+    |UDF:timeseries_projectjoindate_week|sum_salary|
+    +-----------------------------------+----------+
+    |2016-02-21 00:00:00                |3801      |
+    |2016-03-20 00:00:00                |400.2     |
+    |2016-04-17 00:00:00                |350.0     |
+    |2016-03-27 00:00:00                |150.6     |
+    +-----------------------------------+----------+*/
+    val df1 = sql("select timeseries(projectjoindate,'week'), sum(salary) from 
maintable group by timeseries(projectjoindate,'week')")
+    checkPlan("datamap1", df1)
+    checkExistence(df1, true, "2016-02-21 00:00:00.0" )
+    dropDataMap("datamap1")
+  }
+
+  test("test timeseries with different aggregations") {
+    dropDataMap("datamap1")
+    dropDataMap("datamap2")
+    loadData("maintable")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'hour'), avg(salary), max(salary) 
from maintable group by timeseries(projectjoindate,'hour')")
+    sql(
+      "create datamap datamap2 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'day'), count(projectcode), 
min(salary) from maintable group by timeseries(projectjoindate,'day')")
+    val df1 = sql("select timeseries(projectjoindate,'hour'), avg(salary), 
max(salary) from maintable group by timeseries(projectjoindate,'hour')")
+    checkPlan("datamap1", df1)
+    val df2 = sql("select timeseries(projectjoindate,'day'), 
count(projectcode), min(salary) from maintable group by 
timeseries(projectjoindate,'day')")
+    checkPlan("datamap2", df2)
+    dropDataMap("datamap1")
+    dropDataMap("datamap2")
+  }
+
+  test("test timeseries with and and or filters") {
+    dropDataMap("datamap1")
+    dropDataMap("datamap2")
+    dropDataMap("datamap3")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'month'), max(salary) from maintable 
where timeseries(projectjoindate,'month') = '2016-03-01 00:00:00' or  
timeseries(projectjoindate,'month') = '2016-02-01 00:00:00' group by 
timeseries(projectjoindate,'month')")
+    loadData("maintable")
+    val df1 = sql("select timeseries(projectjoindate,'month'), max(salary) 
from maintable where timeseries(projectjoindate,'month') = '2016-03-01 
00:00:00' or  timeseries(projectjoindate,'month') = '2016-02-01 00:00:00' group 
by timeseries(projectjoindate,'month')")
+    checkPlan("datamap1", df1)
+    sql(
+      "create datamap datamap2 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'month'), max(salary) from maintable 
where timeseries(projectjoindate,'month') = '2016-03-01 00:00:00' and  
timeseries(projectjoindate,'month') = '2016-02-01 00:00:00' group by 
timeseries(projectjoindate,'month')")
+    val df2 = sql("select timeseries(projectjoindate,'month'), max(salary) 
from maintable where timeseries(projectjoindate,'month') = '2016-03-01 
00:00:00' and  timeseries(projectjoindate,'month') = '2016-02-01 00:00:00' 
group by timeseries(projectjoindate,'month')")
+    checkPlan("datamap2", df2)
+    sql(
+      "create datamap datamap3 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'month'), max(salary) from maintable 
where timeseries(projectjoindate,'month') = '2016-03-01 00:00:00' and  
timeseries(projectjoindate,'month') = '2016-02-01 00:00:00' or  
timeseries(projectjoindate,'month') = '2016-04-01 00:00:00'  group by 
timeseries(projectjoindate,'month')")
+    val df3 = sql("select timeseries(projectjoindate,'month'), max(salary) 
from maintable where timeseries(projectjoindate,'month') = '2016-03-01 
00:00:00' and  timeseries(projectjoindate,'month') = '2016-02-01 00:00:00' or  
timeseries(projectjoindate,'month') = '2016-04-01 00:00:00' group by 
timeseries(projectjoindate,'month')")
+    checkPlan("datamap3", df3)
+    dropDataMap("datamap1")
+    dropDataMap("datamap2")
+    dropDataMap("datamap3")
+  }
+
+  test("test timeseries with simple projection of time aggregation and between 
filter") {
+    dropDataMap("datamap1")
+    loadData("maintable")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'month') from maintable group by 
timeseries(projectjoindate,'month')")
+    val df1 = sql("select timeseries(projectjoindate,'month') from maintable 
group by timeseries(projectjoindate,'month')")
+    checkPlan("datamap1", df1)
+    val df2 = sql("select timeseries(projectjoindate,'month') from maintable 
where timeseries(projectjoindate,'month') = '2016-03-01 00:00:00' group by 
timeseries(projectjoindate,'month')")
+    checkPlan("datamap1", df2)
+    val df3 = sql("select timeseries(projectjoindate,'month') from maintable 
where timeseries(projectjoindate,'month') between '2016-03-01 00:00:00' and 
'2016-04-01 00:00:00' group by timeseries(projectjoindate,'month')")
+    checkPlan("datamap1", df3)
+    dropDataMap("datamap1")
+  }
+
+  test("test mv timeseries with dictinct, cast") {
+    dropDataMap("datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'month'),projectcode from maintable 
group by timeseries(projectjoindate,'month'),projectcode")
+    loadData("maintable")
+    val df1 = sql("select distinct(projectcode) from maintable")
+    checkPlan("datamap1", df1)
+    val df2 = sql("select distinct(timeseries(projectjoindate,'month')) from 
maintable")
+    checkPlan("datamap1", df2)
+    // TODO: cast expression and group by not allowing to create datamap, 
check later
+//    sql(
+//      "create datamap datamap2 on table maintable using 'mv' as " +
+//      "select timeseries(projectjoindate,'month'),cast(floor((projectcode + 
1000) / 900) * 900 - 2000 AS INT) from maintable group by 
timeseries(projectjoindate,'month'),projectcode")
+//    val df3 = sql("select 
timeseries(projectjoindate,'month'),cast(floor((projectcode + 1000) / 900) * 
900 - 2000 AS INT) from maintable group by 
timeseries(projectjoindate,'month'),projectcode")
+//    checkPlan("datamap2", df3)
+    dropDataMap("datamap1")
+  }
+
+  test("test mvtimeseries with alias") {
+    dropDataMap("datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'month') as t,projectcode as y from 
maintable group by timeseries(projectjoindate,'month'),projectcode")
+    loadData("maintable")
+    val df1 = sql("select timeseries(projectjoindate,'month') as t,projectcode 
as y from maintable group by timeseries(projectjoindate,'month'),projectcode")
+    checkPlan("datamap1", df1)
+    // TODO: fix the base issue of alias with group by
+    //   val df2 = sql("select timeseries(projectjoindate,'month'),projectcode 
from maintable group by timeseries(projectjoindate,'month'),projectcode")
+    //   checkPlan("datamap1", df2)
+    dropDataMap("datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'month'),projectcode from maintable 
group by timeseries(projectjoindate,'month'),projectcode")
+    val df3 = sql("select timeseries(projectjoindate,'month') as t,projectcode 
as y from maintable group by timeseries(projectjoindate,'month'),projectcode")
+    checkPlan("datamap1", df3)
+    dropDataMap("datamap1")
+  }
+
+  test("test mv timeseries with case when and Sum + Sum") {
+    dropDataMap("datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'month') ,sum(CASE WHEN projectcode=5 
THEN salary ELSE 0 END)  from maintable group by 
timeseries(projectjoindate,'month')")
+    val df = sql("select timeseries(projectjoindate,'month') ,sum(CASE WHEN 
projectcode=5 THEN salary ELSE 0 END)  from maintable group by 
timeseries(projectjoindate,'month')")
+    checkPlan("datamap1", df)
+    dropDataMap("datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'hour') ,sum(projectcode) + 
sum(salary)  from maintable group by timeseries(projectjoindate,'hour')")
+    loadData("maintable")
+    val df1 = sql("select timeseries(projectjoindate,'hour') ,sum(projectcode) 
+ sum(salary)  from maintable group by timeseries(projectjoindate,'hour')")
+    checkPlan("datamap1", df1)
+    dropDataMap("datamap1")
+  }
+
+  test("test mv timeseries with IN filter subquery") {
+    dropDataMap("datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'hour') ,sum(projectcode)  from 
maintable group by timeseries(projectjoindate,'hour')")
+    val df = sql("select max(salary) from maintable where projectcode IN 
(select sum(projectcode)  from maintable group by 
timeseries(projectjoindate,'hour')) ")
+    checkPlan("datamap1", df)
+    dropDataMap("datamap1")
+  }
+
+  test("test mv timeseries duplicate columns and constant columns") {
+    dropDataMap("datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'month') 
,sum(projectcode),sum(projectcode)  from maintable group by 
timeseries(projectjoindate,'month')")
+    loadData("maintable")
+    val df1 = sql("select timeseries(projectjoindate,'month') 
,sum(projectcode)  from maintable group by timeseries(projectjoindate,'month')")
+    checkPlan("datamap1", df1)
+    val df2 = sql("select timeseries(projectjoindate,'month') 
,sum(projectcode),sum(projectcode)  from maintable group by 
timeseries(projectjoindate,'month')")
+    checkPlan("datamap1", df2)
+    dropDataMap("datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'month') ,sum(1) ex  from maintable 
group by timeseries(projectjoindate,'month')")
+    val df3 = sql("select timeseries(projectjoindate,'month') ,sum(1)  ex from 
maintable group by timeseries(projectjoindate,'month')")
+    checkPlan("datamap1", df3)
+    dropDataMap("datamap1")
+  }
+
+  test("test mv timeseries with like filters") {
+    dropDataMap("datamap1")
+    sql(
+      "create datamap datamap1 on table maintable using 'mv' as " +
+      "select timeseries(projectjoindate,'month') ,sum(salary) from maintable  
where salary NOT LIKE '6%' group by timeseries(projectjoindate,'month')")
+    val df1 = sql("select timeseries(projectjoindate,'month') ,sum(salary) 
from maintable  where salary NOT LIKE '6%' group by 
timeseries(projectjoindate,'month')")
+    checkPlan("datamap1", df1)
+    dropDataMap("datamap1")
+  }
+
+  test("test mv timeseries with join scenario") {
+    sql("drop table if exists secondtable")
+    sql(
+      "CREATE TABLE secondtable (empno int,empname string, projectcode int, 
projectjoindate " +
+      "Timestamp,salary double) STORED BY 'org.apache.carbondata.format'")
+    loadData("secondtable")
+    sql(
+      "create datamap datamap1 using 'mv' as " +
+      "select timeseries(t1.projectjoindate,'month'), sum(t1.projectcode), 
sum(t2.projectcode) " +
+      " from maintable t1 inner join secondtable t2 where" +
+      " t2.projectcode = t1.projectcode group by 
timeseries(t1.projectjoindate,'month')")
+    val df =  sql(
+      "select timeseries(t1.projectjoindate,'month'), sum(t1.projectcode), 
sum(t2.projectcode)" +
+      " from maintable t1 inner join secondtable t2 where" +
+      " t2.projectcode = t1.projectcode group by 
timeseries(t1.projectjoindate,'month')")
+    checkPlan("datamap1", df)
+  }
+
+
+  override def afterAll(): Unit = {
+    drop()
+  }
+
+  def drop(): Unit = {
+    sql("drop table if exists maintable")
+  }
+
+  def dropDataMap(datamapName: String): Unit = {
+    sql(s"drop datamap if exists $datamapName")
+  }
+
+  def createTable(): Unit = {
+    sql(
+      "CREATE TABLE maintable (empno int,empname string, projectcode int, 
projectjoindate " +
+      "Timestamp,salary double) STORED BY 'org.apache.carbondata.format'")
+  }
+
+  def loadData(table: String): Unit = {
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/mv_sampledata.csv' INTO TABLE 
$table  OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+  }
+
+  def checkPlan(dataMapName: String, df: DataFrame): Unit = {
+    val analyzed = df.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed, dataMapName))
+  }
+}
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 51017fe..847580c 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -49,6 +49,7 @@ This section provides the details of all the configurations 
required for the Car
 | carbon.lock.retries | 3 | CarbonData ensures consistency of operations by 
blocking certain operations from running in parallel. In order to block the 
operations from running in parallel, lock is obtained on the table. This 
configuration specifies the maximum number of retries to obtain the lock for 
any operations other than load. **NOTE:** Data manupulation operations like 
Compaction,UPDATE,DELETE  or LOADING,UPDATE,DELETE are not allowed to run in 
parallel. How ever data loading can h [...]
 | carbon.lock.retry.timeout.sec | 5 | Specifies the interval between the 
retries to obtain the lock for any operation other than load. **NOTE:** Refer 
to ***carbon.lock.retries*** for understanding why CarbonData uses locks for 
operations. |
 | carbon.fs.custom.file.provider | None | To support FileTypeInterface for 
configuring custom CarbonFile implementation to work with custom FileSystem. |
+| carbon.timeseries.first.day.of.week | SUNDAY | This parameter configures 
which day of the week to be considered as first day of the week. Because first 
day of the week will be different in different parts of the world. |
 
 ## Data Loading Configuration
 
diff --git a/docs/datamap/mv-datamap-guide.md b/docs/datamap/mv-datamap-guide.md
index dfa8618..a0c3f1a 100644
--- a/docs/datamap/mv-datamap-guide.md
+++ b/docs/datamap/mv-datamap-guide.md
@@ -211,8 +211,8 @@ release, user can do as following:
 Basically, user can manually trigger the operation by re-building the datamap.
 
 ## MV TimeSeries Support
-MV non-lazy datamap support's TimeSeries queries. Supported granularities 
strings are: year, month, day, week,
-hour,thirty_minute, fifteen_minute, minute and second.
+MV non-lazy datamap supports TimeSeries queries. Supported granularity strings 
are: year, month, week, day,
+hour,thirty_minute, fifteen_minute, ten_minute, five_minute, minute and second.
 
  User can create MV datamap with timeseries queries like the below example:
 
@@ -229,5 +229,5 @@ Supported columns that can be provided in timeseries udf 
should be of TimeStamp/
 Timeseries queries with Date type support's only year, month, day and week 
granularities.
 
  **NOTE**:
- 1. Multiple timeseries udf functions cannot be defined in Select statement 
with different timestamp 
- columns or different granularities.
\ No newline at end of file
+ 1. Single select statement cannot contain timeseries udf(s) neither with 
different granularity nor
+ with different timestamp/date columns.
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/resources/mv_sampledata.csv 
b/integration/spark-common-test/src/test/resources/mv_sampledata.csv
new file mode 100644
index 0000000..a278f5e
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/mv_sampledata.csv
@@ -0,0 +1,14 @@
+empno,empname,projectcode,projectjoindate,salary
+11,joey,2,2016-02-23 09:01:30,300
+12,chandler,5,2016-02-23 09:01:50,400
+13,pheobe,1,2016-02-23 09:03:30,450
+14,monica,5,2016-02-23 09:03:50,650
+15,ross,5,2016-02-23 09:07:50,250.3
+16,rachel,1,2016-02-23 09:08:30,300
+17,gunther,8,2016-02-23 09:08:40,800.2
+18,tag,5,2016-02-23 09:16:50,200
+19,will,1,2016-03-23 09:17:30,200
+20,akash,8,2016-03-23 10:18:40,200.2
+21,smith,5,2016-03-29 10:02:50,150.6
+22,cathy,1,2016-02-25 10:03:30,450.5
+23,pablo,5,2016-04-23 11:06:50,350
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 2b62bf2..bf4106b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -181,7 +181,12 @@ case class CarbonCreateDataMapCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (dataMapProvider != null) {
       dataMapProvider.initData()
-      if (mainTable != null && !deferredRebuild) {
+      // TODO: remove these checks once the preaggregate and preaggregate 
timeseries are deprecated
+      if (mainTable != null && !deferredRebuild &&
+          (dataMapSchema.isIndexDataMap || dataMapSchema.getProviderName
+            .equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.getShortName) 
||
+           dataMapSchema.getProviderName
+             .equalsIgnoreCase(DataMapClassProvider.TIMESERIES.getShortName))) 
{
         dataMapProvider.rebuild()
         if (dataMapSchema.isIndexDataMap) {
           val operationContext: OperationContext = new OperationContext()
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index b44cf79..85f5068 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -125,8 +125,13 @@ case class CarbonDataMapShowCommand(tableIdentifier: 
Option[TableIdentifier])
                     val iterator = segmentMaps.entrySet().iterator()
                     while (iterator.hasNext) {
                       val entry = iterator.next()
-
-                      syncInfoMap.put(entry.getKey, 
DataMapUtil.getMaxSegmentID(entry.getValue))
+                      // when in join scenario, one table is loaded and one 
more is not loaded,
+                      // then put value as NA
+                      if (entry.getValue.isEmpty) {
+                        syncInfoMap.put(entry.getKey, "NA")
+                      } else {
+                        syncInfoMap.put(entry.getKey, 
DataMapUtil.getMaxSegmentID(entry.getValue))
+                      }
                     }
                     val loadEndTime =
                       if (loadMetadataDetails(i).getLoadEndTime ==
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 5dd3a30..7ea62da 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -168,9 +168,7 @@ object TimeSeriesUtil {
     for (granularity <- Granularity.values()) {
       if (timeSeriesFunction.equalsIgnoreCase(granularity.getName
         .substring(0, 
granularity.getName.indexOf(CarbonCommonConstants.UNDERSCORE)))) {
-        if (!(granularity.getName.equalsIgnoreCase(Granularity.DAY.getName) ||
-              granularity.getName.equalsIgnoreCase(Granularity.MONTH.getName) 
||
-              granularity.getName.equalsIgnoreCase(Granularity.YEAR.getName))) 
{
+        if (!supportedGranularitiesForDate.contains(granularity.getName)) {
           throw new MalformedCarbonCommandException(
             "Granularity should be DAY,MONTH or YEAR, for timeseries column of 
Date type")
         }
@@ -178,6 +176,12 @@ object TimeSeriesUtil {
     }
   }
 
+  val supportedGranularitiesForDate: Seq[String] = Seq(
+    Granularity.DAY.getName,
+    Granularity.WEEK.getName,
+    Granularity.MONTH.getName,
+    Granularity.YEAR.getName)
+
   /**
    * validate TimeSeries Granularity
    *
@@ -189,7 +193,7 @@ object TimeSeriesUtil {
     breakable {
       for (granularity <- Granularity.values()) {
         if (timeSeriesFunction.equalsIgnoreCase(granularity.getName
-          .substring(0, 
granularity.getName.indexOf(CarbonCommonConstants.UNDERSCORE)))) {
+          .substring(0, 
granularity.getName.lastIndexOf(CarbonCommonConstants.UNDERSCORE)))) {
           found = true
           break
         }

Reply via email to