This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 8694ed4 [CARBONDATA-3531] Support load and query for MV timeseries
and support multiple granularity
8694ed4 is described below
commit 8694ed4068cc1b9e44c9b3ed7d3ba5f310a3e52c
Author: akashrn5 <[email protected]>
AuthorDate: Thu Nov 7 11:05:33 2019 +0530
[CARBONDATA-3531] Support load and query for MV timeseries and support
multiple granularity
Why this PR?
This PR implements load and query supports and some of the MV bug fixes.
Timeseries on MV supports different granularities like below:
second, minute, thirty_minute, fifteen_minute, ten_minute, five_minute,
hour, day, week, month, year.
Bug Fixes:
1. Fix the filter query not hitting MV timeseries.
2. Fix between filters not working on timeseries query
This closes #3438
---
.../core/constants/CarbonCommonConstants.java | 11 +
.../core/metadata/schema/datamap/Granularity.java | 6 +
...SeriesFunctionEnum.java => DaysOfWeekEnum.java} | 45 ++-
.../core/preagg/TimeSeriesFunctionEnum.java | 13 +-
.../carbondata/core/preagg/TimeSeriesUDF.java | 56 ++++
.../carbondata/mv/datamap/MVAnalyzerRule.scala | 53 ++-
.../carbondata/mv/rewrite/DefaultMatchMaker.scala | 44 ++-
.../org/apache/carbondata/mv/rewrite/Utils.scala | 30 +-
.../timeseries/TestMVTimeSeriesLoadAndQuery.scala | 361 +++++++++++++++++++++
docs/configuration-parameters.md | 1 +
docs/datamap/mv-datamap-guide.md | 8 +-
.../src/test/resources/mv_sampledata.csv | 14 +
.../datamap/CarbonCreateDataMapCommand.scala | 7 +-
.../command/datamap/CarbonDataMapShowCommand.scala | 9 +-
.../command/timeseries/TimeSeriesUtil.scala | 12 +-
15 files changed, 621 insertions(+), 49 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index da5586e..9b12251 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2285,4 +2285,15 @@ public final class CarbonCommonConstants {
* hive column-name maximum length
*/
public static final int MAXIMUM_CHAR_LENGTH = 128;
+
+ /**
+ * Carbon property for timeseries MV to define the first day of week
+ */
+ public static final String CARBON_TIMESERIES_FIRST_DAY_OF_WEEK =
+ "carbon.timeseries.first.day.of.week";
+
+ /**
+ * Default first day of week
+ */
+ public static final String CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT =
"SUNDAY";
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
index d6aefb6..b054bdf 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
@@ -29,10 +29,16 @@ package org.apache.carbondata.core.metadata.schema.datamap;
public enum Granularity {
YEAR("year_granularity"),
MONTH("month_granularity"),
+ WEEK("week_granularity"),
DAY("day_granularity"),
HOUR("hour_granularity"),
+ THIRTY_MINUTE("thirty_minute_granularity"),
+ FIFTEEN_MINUTE("fifteen_minute_granularity"),
+ TEN_MINUTE("ten_minute_granularity"),
+ FIVE_MINUTE("five_minute_granularity"),
MINUTE("minute_granularity"),
SECOND("second_granularity");
+
private String name;
Granularity(String name) {
diff --git
a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
b/core/src/main/java/org/apache/carbondata/core/preagg/DaysOfWeekEnum.java
similarity index 69%
copy from
core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
copy to core/src/main/java/org/apache/carbondata/core/preagg/DaysOfWeekEnum.java
index 5d0d2af..e73b595 100644
---
a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/DaysOfWeekEnum.java
@@ -14,40 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.carbondata.core.preagg;
/**
- * enum for timeseries function
+ * Class to define the days of the week inline with the Java Calender class
DAYs.
*/
-public enum TimeSeriesFunctionEnum {
- SECOND("second", 0),
- MINUTE("minute", 1),
- HOUR("hour", 2),
- DAY("day", 3),
- MONTH("month", 4),
- YEAR("year", 5);
-
- /**
- * name of the function
- */
- private String name;
-
- /**
- * ordinal for function
- */
- private int ordinal;
+public enum DaysOfWeekEnum {
- TimeSeriesFunctionEnum(String name, int ordinal) {
- this.name = name;
- this.ordinal = ordinal;
- }
+ SUNDAY("SUNDAY", 1),
+ MONDAY("MONDAY", 2),
+ TUESDAY("TUESDAY", 3),
+ WEDNESDAY("WEDNESDAY", 4),
+ THURSDAY("THURSDAY", 5),
+ FRIDAY("FRIDAY", 6),
+ SATURDAY("SATURDAY", 7);
- public String getName() {
- return name;
+ public String getDay() {
+ return day;
}
public int getOrdinal() {
return ordinal;
}
+
+ private String day;
+
+ private int ordinal;
+
+ DaysOfWeekEnum(String day, int ordinal) {
+ this.day = day;
+ this.ordinal = ordinal;
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
index 5d0d2af..f30923b 100644
---
a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
+++
b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java
@@ -23,10 +23,15 @@ package org.apache.carbondata.core.preagg;
public enum TimeSeriesFunctionEnum {
SECOND("second", 0),
MINUTE("minute", 1),
- HOUR("hour", 2),
- DAY("day", 3),
- MONTH("month", 4),
- YEAR("year", 5);
+ FIVE_MINUTE("five_minute", 2),
+ TEN_MINUTE("ten_minute", 3),
+ FIFTEEN_MINUTE("fifteen_minute", 4),
+ THIRTY_MINUTE("thirty_minute", 5),
+ HOUR("hour", 6),
+ DAY("day", 7),
+ WEEK("week", 8),
+ MONTH("month", 9),
+ YEAR("year", 10);
/**
* name of the function
diff --git
a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
index ce0f15d..5cfa7f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
@@ -22,11 +22,20 @@ import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.log4j.Logger;
+
/**
* class for applying timeseries udf
*/
public class TimeSeriesUDF {
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(TimeSeriesUDF.class.getName());
+
public final List<String> TIMESERIES_FUNCTION = new ArrayList<>();
// thread local for keeping calender instance
@@ -76,6 +85,25 @@ public class TimeSeriesUDF {
calendar.set(Calendar.MILLISECOND, 0);
calendar.set(Calendar.SECOND, 0);
break;
+ case FIVE_MINUTE:
+ setData(calendar, 5);
+ break;
+ case TEN_MINUTE:
+ setData(calendar, 10);
+ break;
+ case FIFTEEN_MINUTE:
+ setData(calendar, 15);
+ break;
+ case THIRTY_MINUTE:
+ setData(calendar, 30);
+ break;
+ case WEEK:
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ calendar.set(Calendar.DAY_OF_WEEK, calendar.getFirstDayOfWeek());
+ break;
case HOUR:
calendar.set(Calendar.MILLISECOND, 0);
calendar.set(Calendar.SECOND, 0);
@@ -110,6 +138,15 @@ public class TimeSeriesUDF {
}
/**
+ * This method sets the calender data based on the interval time of
granularity
+ */
+ private void setData(Calendar calendar, int intervalTime) {
+ calendar.set(Calendar.MILLISECOND, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MINUTE, (calendar.get(Calendar.MINUTE) /
intervalTime) * intervalTime);
+ }
+
+ /**
* Below method will be used to initialize the thread local
*/
private void initialize() {
@@ -119,10 +156,29 @@ public class TimeSeriesUDF {
if (TIMESERIES_FUNCTION.isEmpty()) {
TIMESERIES_FUNCTION.add("second");
TIMESERIES_FUNCTION.add("minute");
+ TIMESERIES_FUNCTION.add("five_minute");
+ TIMESERIES_FUNCTION.add("ten_minute");
+ TIMESERIES_FUNCTION.add("fifteen_minute");
+ TIMESERIES_FUNCTION.add("thirty_minute");
TIMESERIES_FUNCTION.add("hour");
TIMESERIES_FUNCTION.add("day");
+ TIMESERIES_FUNCTION.add("week");
TIMESERIES_FUNCTION.add("month");
TIMESERIES_FUNCTION.add("year");
}
+ int firstDayOfWeek;
+ try {
+ firstDayOfWeek = DaysOfWeekEnum.valueOf(CarbonProperties.getInstance()
+
.getProperty(CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK,
+
CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT).toUpperCase())
+ .getOrdinal();
+ } catch (IllegalArgumentException ex) {
+ LOGGER.warn("Invalid value set for first of the week. Considering the
default value as: "
+ + CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT);
+ firstDayOfWeek = DaysOfWeekEnum.valueOf(CarbonProperties.getInstance()
+
.getProperty(CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT)
+ .toUpperCase()).getOrdinal();
+ }
+ calanderThreadLocal.get().setFirstDayOfWeek(firstDayOfWeek);
}
}
diff --git
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
index edd9c81..0ba6bfc 100644
---
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
+++
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias,
UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, ScalaUDF}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Command,
DeserializeToObject, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -33,6 +33,7 @@ import
org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.datamap.DataMapManager
+import org.apache.carbondata.mv.plans.modular.{ModularPlan, Select}
import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
/**
@@ -87,7 +88,8 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
val modularPlan =
catalog.mvSession.sessionState.rewritePlan(plan).withMVTable
if (modularPlan.find(_.rewritten).isDefined) {
- val compactSQL = modularPlan.asCompactSQL
+ var compactSQL = modularPlan.asCompactSQL
+ compactSQL = reWriteTheUDFInSQLWithQualifierName(modularPlan,
compactSQL)
val analyzed = sparkSession.sql(compactSQL).queryExecution.analyzed
analyzed
} else {
@@ -99,6 +101,53 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
}
/**
+ * This method is specially handled for timeseries on MV, because when we
use timeseries UDF which
+ * is a scala UDF, so after plan matching when query is made. We get as
below query for example
+ *
+ * SELECT gen_subsumer_0.`UDF:timeseries(projectjoindate, hour)` AS
`UDF:timeseries(projectjoi...
+ * FROM
+ * (SELECT datamap1_table.`UDF:timeseries_projectjoindate_hour` AS
`UDF:timeseries(projectjoin...
+ * FROM
+ * default.datamap1_table
+ * GROUP BY datamap1_table.`UDF:timeseries_projectjoindate_hour`)
gen_subsumer_0
+ * WHERE
+ * (UDF:timeseries(projectjoindate, hour) = TIMESTAMP('2016-02-23
09:00:00.0'))
+ *
+ * Here for Where filter expression is of type ScalaUDF, so when we do
.sql() to prepare SQL, we
+ * get without qualifier name(Refer
org.apache.spark.sql.catalyst.expressions.NonSQLExpression)
+ * which is 'gen_subsumer_0', so this funtion rewrites with qualifier name
and returns, so that
+ * parsing does not fail in spark, for rewritten MV query.
+ * @param plan Modular Plan
+ * @param compactSQL compactSQL generated from Modular plan
+ * @return Rewritten plan with the qualifier names for where clauses in
query.
+ */
+ private def reWriteTheUDFInSQLWithQualifierName(plan: ModularPlan,
compactSQL: String): String = {
+ var outPutUDFColumn = ""
+ var reWrittenQuery = compactSQL
+ plan match {
+ case select: Select =>
+ select.outputList.collect {
+ case a: Alias if a.child.isInstanceOf[Attribute] =>
+ val childName = a.child.asInstanceOf[Attribute].name
+ if (childName.startsWith("UDF:timeseries")) {
+ outPutUDFColumn = childName
+ }
+ }
+ var queryArray: Array[String] = Array.empty
+ if (!outPutUDFColumn.equalsIgnoreCase("") &&
compactSQL.contains("WHERE")) {
+ queryArray = compactSQL.split("\n")
+ queryArray(queryArray.indexOf("WHERE") + 1) = queryArray(
+ queryArray.indexOf("WHERE") + 1).replace(outPutUDFColumn,
+ s"gen_subsumer_0.`$outPutUDFColumn`")
+ reWrittenQuery = queryArray.mkString("\n")
+ }
+ reWrittenQuery
+ case _ =>
+ compactSQL
+ }
+ }
+
+ /**
* Whether the plan is valid for doing modular plan matching and datamap
replacing.
*/
def isValidPlan(plan: LogicalPlan, catalog: SummaryDatasetCatalog): Boolean
= {
diff --git
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 5329608..616d0bd 100644
---
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -448,17 +448,53 @@ object GroupbyGroupbySelectOnlyChildDelta extends
DefaultMatchPattern with Predi
subsumer: ModularPlan,
compensation: Option[ModularPlan]) = {
if (subsumee.asInstanceOf[GroupBy].predicateList.contains(exprE)) {
- if (exprListR.exists(_.semanticEquals(exprE)) || canEvaluate(exprE,
exprListR)) true
- else false
+ if (exprListR.exists(_.semanticEquals(exprE)) || canEvaluate(exprE,
exprListR) ||
+ isDerivableForUDF(exprE, exprListR)) {
+ true
+ } else {
+ false
+ }
} else if (compensation.getOrElse(throw new RuntimeException("compensation
cannot be None"))
.asInstanceOf[Select].predicateList.contains(exprE)) {
- if (canEvaluate(exprE, exprListR) ||
exprListR.exists(_.semanticEquals(exprE))) true
- else false
+ if (canEvaluate(exprE, exprListR) ||
exprListR.exists(_.semanticEquals(exprE)) ||
+ isDerivableForUDF(exprE, exprListR)) {
+ true
+ } else {
+ false
+ }
} else {
false
}
}
+ /**
+ * org.apache.carbondata.mv.plans.MorePredicateHelper#canEvaluate will be
checking the
+ * exprE.references as subset of AttibuteSet(exprListR), which will just
take the column name from
+ * UDF, so it will be always false for ScalaUDF.
+ * This method takes care of checking whether the exprE references can be
derived form list
+ *
+ * Example:
+ * exprE has attribute reference for timeseries column like
UDF:timeseries(projectjoindate, month)
+ * So exprE.references will give UDF:timeseries(projectjoindate, month)
+ * exprListR has ScalaUDF also, so AttributeSet(exprListR) contains just
column name like
+ * projectjoindate, so canEvaluate method returns false.
+ *
+ * Here checking whether the exprListR(ScalaUDF) .sql gives
UDF:timeseries(projectjoindate, month)
+ * which can be checked with exprE.references
+ */
+ private def isDerivableForUDF(exprE: Expression, exprListR:
Seq[Expression]): Boolean = {
+ var canBeDerived = false
+ exprListR.forall {
+ case a: ScalaUDF =>
+ a.references.foreach { a =>
+ canBeDerived = exprE.sql.contains(a.name)
+ }
+ canBeDerived
+ case _ =>
+ canBeDerived
+ }
+ }
+
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
diff --git
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
index fa2ea10..7a8b538 100644
---
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
+++
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
@@ -17,11 +17,11 @@
package org.apache.carbondata.mv.rewrite
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeMap, Cast, Divide, Expression, Multiply, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeMap, Cast, Divide, Expression, Multiply, PredicateHelper, ScalaUDF}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.carbondata.mv.plans.modular
-import org.apache.carbondata.mv.plans.modular.ModularPlan
+import org.apache.carbondata.mv.plans.modular.{ModularPlan, Select}
/**
* Utility functions used by mqo matcher to convert our plan to new
aggregation code path
@@ -391,7 +391,7 @@ object Utils extends PredicateHelper {
}
} else if (sel_2c.predicateList.contains(exprE)) {
if (sel_2a.predicateList.exists(_.semanticEquals(exprE)) ||
- canEvaluate(exprE, subsumer)) {
+ canEvaluate(exprE, subsumer) || canBeDerived(subsumer, exprE)) {
Some(exprE)
} else {
None
@@ -416,6 +416,30 @@ object Utils extends PredicateHelper {
}
}
+ private def canBeDerived(subsumer: ModularPlan, expE: Expression): Boolean =
{
+ var canBeDerived = false
+ subsumer.asInstanceOf[Select].outputList.forall {
+ case Alias(s: ScalaUDF, _) =>
+ expE.children.foreach { expr =>
+ if (s.semanticEquals(expr)) {
+ canBeDerived = true
+ }
+ // It is because when expression is like between filter, the expr
will be as Cast
+ // expression and its child will be ScalaUDF(timeseries), So compare
the child also.
+ if (!canBeDerived && null != expr.children) {
+ expr.children.foreach { expC =>
+ if (s.semanticEquals(expC)) {
+ canBeDerived = true
+ }
+ }
+ }
+ }
+ canBeDerived
+ case _ =>
+ canBeDerived
+ }
+ }
+
private def isSemanticEquivalent(translatedExpr: Expression, subsumer:
ModularPlan) = {
subsumer match {
// if subsumer has where clause, even if expr can be translated into new
expr based on
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
new file mode 100644
index 0000000..8815a94
--- /dev/null
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
@@ -0,0 +1,361 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.mv.timeseries
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.mv.rewrite.TestUtil
+
+class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ drop()
+ createTable()
+ }
+
+ test("create MV timeseries datamap with simple projection and aggregation
and filter") {
+ sql("drop datamap if exists datamap1")
+ sql("drop datamap if exists datamap2")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'minute'), sum(projectcode) from
maintable group by timeseries(projectjoindate,'minute')")
+ loadData("maintable")
+ val df = sql("select timeseries(projectjoindate,'minute'),
sum(projectcode) from maintable group by timeseries(projectjoindate,'minute')")
+ val analyzed = df.queryExecution.analyzed
+ assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
+ dropDataMap("datamap1")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'minute'), sum(projectcode) from
maintable where timeseries(projectjoindate,'minute') = '2016-02-23 09:17:00'
group by timeseries(projectjoindate,'minute')")
+
+ sql("select * from datamap1_table").show(false)
+ val df1 = sql("select
timeseries(projectjoindate,'minute'),sum(projectcode) from maintable where
timeseries(projectjoindate,'minute') = '2016-02-23 09:17:00'" +
+ "group by timeseries(projectjoindate,'minute')")
+ val analyzed1 = df1.queryExecution.analyzed
+ assert(TestUtil.verifyMVDataMap(analyzed1, "datamap1"))
+ dropDataMap("datamap1")
+ }
+
+ test("test mv timeseries with ctas and filter in actual query") {
+ dropDataMap("datamap1")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'hour'), sum(projectcode) from
maintable group by timeseries(projectjoindate,'hour')")
+ loadData("maintable")
+ val df = sql("select timeseries(projectjoindate,'hour'), sum(projectcode)
from maintable where timeseries(projectjoindate,'hour') = '2016-02-23 09:00:00'
" +
+ "group by timeseries(projectjoindate,'hour')")
+ val analyzed = df.queryExecution.analyzed
+ assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
+ dropDataMap("datamap1")
+ }
+
+ test("test mv timeseries with multiple granularity datamaps") {
+ dropDataMap("datamap1")
+ dropDataMap("datamap2")
+ dropDataMap("datamap3")
+ dropDataMap("datamap4")
+ dropDataMap("datamap5")
+ dropDataMap("datamap6")
+ loadData("maintable")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'minute'), sum(salary) from maintable
group by timeseries(projectjoindate,'minute')")
+ sql(
+ "create datamap datamap2 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'hour'), sum(salary) from maintable
group by timeseries(projectjoindate,'hour')")
+ sql(
+ "create datamap datamap3 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'fifteen_minute'), sum(salary) from
maintable group by timeseries(projectjoindate,'fifteen_minute')")
+ sql(
+ "create datamap datamap4 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'five_minute'), sum(salary) from
maintable group by timeseries(projectjoindate,'five_minute')")
+ sql(
+ "create datamap datamap5 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'week'), sum(salary) from maintable
group by timeseries(projectjoindate,'week')")
+ sql(
+ "create datamap datamap6 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'year'), sum(salary) from maintable
group by timeseries(projectjoindate,'year')")
+ val df1 = sql("select timeseries(projectjoindate,'minute'), sum(salary)
from maintable group by timeseries(projectjoindate,'minute')")
+ checkPlan("datamap1", df1)
+ val df2 = sql("select timeseries(projectjoindate,'hour'), sum(salary) from
maintable group by timeseries(projectjoindate,'hour')")
+ checkPlan("datamap2", df2)
+ val df3 = sql("select timeseries(projectjoindate,'fifteen_minute'),
sum(salary) from maintable group by
timeseries(projectjoindate,'fifteen_minute')")
+ checkPlan("datamap3", df3)
+ val df4 = sql("select timeseries(projectjoindate,'five_minute'),
sum(salary) from maintable group by timeseries(projectjoindate,'five_minute')")
+ checkPlan("datamap4", df4)
+ val df5 = sql("select timeseries(projectjoindate,'week'), sum(salary) from
maintable group by timeseries(projectjoindate,'week')")
+ checkPlan("datamap5", df5)
+ val df6 = sql("select timeseries(projectjoindate,'year'), sum(salary) from
maintable group by timeseries(projectjoindate,'year')")
+ checkPlan("datamap6", df6)
+ val result = sql("show datamap on table maintable").collect()
+ result.find(_.get(0).toString.contains("datamap1")) match {
+ case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+ case None => assert(false)
+ }
+ result.find(_.get(0).toString.contains("datamap2")) match {
+ case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+ case None => assert(false)
+ }
+ result.find(_.get(0).toString.contains("datamap3")) match {
+ case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+ case None => assert(false)
+ }
+ result.find(_.get(0).toString.contains("datamap4")) match {
+ case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+ case None => assert(false)
+ }
+ result.find(_.get(0).toString.contains("datamap5")) match {
+ case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+ case None => assert(false)
+ }
+ result.find(_.get(0).toString.contains("datamap6")) match {
+ case Some(row) => assert(row.get(4).toString.contains("ENABLED"))
+ case None => assert(false)
+ }
+ dropDataMap("datamap1")
+ dropDataMap("datamap2")
+ dropDataMap("datamap3")
+ dropDataMap("datamap4")
+ dropDataMap("datamap5")
+ dropDataMap("datamap6")
+ }
+
+ test("test mv timeseries with week granular select data") {
+ dropDataMap("datamap1")
+ loadData("maintable")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'week'), sum(salary) from maintable
group by timeseries(projectjoindate,'week')")
+/*
+ +-----------------------------------+----------+
+ |UDF:timeseries_projectjoindate_week|sum_salary|
+ +-----------------------------------+----------+
+ |2016-02-21 00:00:00 |3801 |
+ |2016-03-20 00:00:00 |400.2 |
+ |2016-04-17 00:00:00 |350.0 |
+ |2016-03-27 00:00:00 |150.6 |
+ +-----------------------------------+----------+*/
+ val df1 = sql("select timeseries(projectjoindate,'week'), sum(salary) from
maintable group by timeseries(projectjoindate,'week')")
+ checkPlan("datamap1", df1)
+ checkExistence(df1, true, "2016-02-21 00:00:00.0" )
+ dropDataMap("datamap1")
+ }
+
+ test("test timeseries with different aggregations") {
+ dropDataMap("datamap1")
+ dropDataMap("datamap2")
+ loadData("maintable")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'hour'), avg(salary), max(salary)
from maintable group by timeseries(projectjoindate,'hour')")
+ sql(
+ "create datamap datamap2 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'day'), count(projectcode),
min(salary) from maintable group by timeseries(projectjoindate,'day')")
+ val df1 = sql("select timeseries(projectjoindate,'hour'), avg(salary),
max(salary) from maintable group by timeseries(projectjoindate,'hour')")
+ checkPlan("datamap1", df1)
+ val df2 = sql("select timeseries(projectjoindate,'day'),
count(projectcode), min(salary) from maintable group by
timeseries(projectjoindate,'day')")
+ checkPlan("datamap2", df2)
+ dropDataMap("datamap1")
+ dropDataMap("datamap2")
+ }
+
+ test("test timeseries with and and or filters") {
+ dropDataMap("datamap1")
+ dropDataMap("datamap2")
+ dropDataMap("datamap3")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'month'), max(salary) from maintable
where timeseries(projectjoindate,'month') = '2016-03-01 00:00:00' or
timeseries(projectjoindate,'month') = '2016-02-01 00:00:00' group by
timeseries(projectjoindate,'month')")
+ loadData("maintable")
+ val df1 = sql("select timeseries(projectjoindate,'month'), max(salary)
from maintable where timeseries(projectjoindate,'month') = '2016-03-01
00:00:00' or timeseries(projectjoindate,'month') = '2016-02-01 00:00:00' group
by timeseries(projectjoindate,'month')")
+ checkPlan("datamap1", df1)
+ sql(
+ "create datamap datamap2 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'month'), max(salary) from maintable
where timeseries(projectjoindate,'month') = '2016-03-01 00:00:00' and
timeseries(projectjoindate,'month') = '2016-02-01 00:00:00' group by
timeseries(projectjoindate,'month')")
+ val df2 = sql("select timeseries(projectjoindate,'month'), max(salary)
from maintable where timeseries(projectjoindate,'month') = '2016-03-01
00:00:00' and timeseries(projectjoindate,'month') = '2016-02-01 00:00:00'
group by timeseries(projectjoindate,'month')")
+ checkPlan("datamap2", df2)
+ sql(
+ "create datamap datamap3 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'month'), max(salary) from maintable
where timeseries(projectjoindate,'month') = '2016-03-01 00:00:00' and
timeseries(projectjoindate,'month') = '2016-02-01 00:00:00' or
timeseries(projectjoindate,'month') = '2016-04-01 00:00:00' group by
timeseries(projectjoindate,'month')")
+ val df3 = sql("select timeseries(projectjoindate,'month'), max(salary)
from maintable where timeseries(projectjoindate,'month') = '2016-03-01
00:00:00' and timeseries(projectjoindate,'month') = '2016-02-01 00:00:00' or
timeseries(projectjoindate,'month') = '2016-04-01 00:00:00' group by
timeseries(projectjoindate,'month')")
+ checkPlan("datamap3", df3)
+ dropDataMap("datamap1")
+ dropDataMap("datamap2")
+ dropDataMap("datamap3")
+ }
+
+ test("test timeseries with simple projection of time aggregation and between
filter") {
+ dropDataMap("datamap1")
+ loadData("maintable")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'month') from maintable group by
timeseries(projectjoindate,'month')")
+ val df1 = sql("select timeseries(projectjoindate,'month') from maintable
group by timeseries(projectjoindate,'month')")
+ checkPlan("datamap1", df1)
+ val df2 = sql("select timeseries(projectjoindate,'month') from maintable
where timeseries(projectjoindate,'month') = '2016-03-01 00:00:00' group by
timeseries(projectjoindate,'month')")
+ checkPlan("datamap1", df2)
+ val df3 = sql("select timeseries(projectjoindate,'month') from maintable
where timeseries(projectjoindate,'month') between '2016-03-01 00:00:00' and
'2016-04-01 00:00:00' group by timeseries(projectjoindate,'month')")
+ checkPlan("datamap1", df3)
+ dropDataMap("datamap1")
+ }
+
+ test("test mv timeseries with dictinct, cast") {
+ dropDataMap("datamap1")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'month'),projectcode from maintable
group by timeseries(projectjoindate,'month'),projectcode")
+ loadData("maintable")
+ val df1 = sql("select distinct(projectcode) from maintable")
+ checkPlan("datamap1", df1)
+ val df2 = sql("select distinct(timeseries(projectjoindate,'month')) from
maintable")
+ checkPlan("datamap1", df2)
+ // TODO: cast expression and group by not allowing to create datamap,
check later
+// sql(
+// "create datamap datamap2 on table maintable using 'mv' as " +
+// "select timeseries(projectjoindate,'month'),cast(floor((projectcode +
1000) / 900) * 900 - 2000 AS INT) from maintable group by
timeseries(projectjoindate,'month'),projectcode")
+// val df3 = sql("select
timeseries(projectjoindate,'month'),cast(floor((projectcode + 1000) / 900) *
900 - 2000 AS INT) from maintable group by
timeseries(projectjoindate,'month'),projectcode")
+// checkPlan("datamap2", df3)
+ dropDataMap("datamap1")
+ }
+
+ test("test mvtimeseries with alias") {
+ dropDataMap("datamap1")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'month') as t,projectcode as y from
maintable group by timeseries(projectjoindate,'month'),projectcode")
+ loadData("maintable")
+ val df1 = sql("select timeseries(projectjoindate,'month') as t,projectcode
as y from maintable group by timeseries(projectjoindate,'month'),projectcode")
+ checkPlan("datamap1", df1)
+ // TODO: fix the base issue of alias with group by
+ // val df2 = sql("select timeseries(projectjoindate,'month'),projectcode
from maintable group by timeseries(projectjoindate,'month'),projectcode")
+ // checkPlan("datamap1", df2)
+ dropDataMap("datamap1")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'month'),projectcode from maintable
group by timeseries(projectjoindate,'month'),projectcode")
+ val df3 = sql("select timeseries(projectjoindate,'month') as t,projectcode
as y from maintable group by timeseries(projectjoindate,'month'),projectcode")
+ checkPlan("datamap1", df3)
+ dropDataMap("datamap1")
+ }
+
+ test("test mv timeseries with case when and Sum + Sum") {
+ dropDataMap("datamap1")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'month') ,sum(CASE WHEN projectcode=5
THEN salary ELSE 0 END) from maintable group by
timeseries(projectjoindate,'month')")
+ val df = sql("select timeseries(projectjoindate,'month') ,sum(CASE WHEN
projectcode=5 THEN salary ELSE 0 END) from maintable group by
timeseries(projectjoindate,'month')")
+ checkPlan("datamap1", df)
+ dropDataMap("datamap1")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'hour') ,sum(projectcode) +
sum(salary) from maintable group by timeseries(projectjoindate,'hour')")
+ loadData("maintable")
+ val df1 = sql("select timeseries(projectjoindate,'hour') ,sum(projectcode)
+ sum(salary) from maintable group by timeseries(projectjoindate,'hour')")
+ checkPlan("datamap1", df1)
+ dropDataMap("datamap1")
+ }
+
+ test("test mv timeseries with IN filter subquery") {
+ dropDataMap("datamap1")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'hour') ,sum(projectcode) from
maintable group by timeseries(projectjoindate,'hour')")
+ val df = sql("select max(salary) from maintable where projectcode IN
(select sum(projectcode) from maintable group by
timeseries(projectjoindate,'hour')) ")
+ checkPlan("datamap1", df)
+ dropDataMap("datamap1")
+ }
+
+ test("test mv timeseries duplicate columns and constant columns") {
+ dropDataMap("datamap1")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'month')
,sum(projectcode),sum(projectcode) from maintable group by
timeseries(projectjoindate,'month')")
+ loadData("maintable")
+ val df1 = sql("select timeseries(projectjoindate,'month')
,sum(projectcode) from maintable group by timeseries(projectjoindate,'month')")
+ checkPlan("datamap1", df1)
+ val df2 = sql("select timeseries(projectjoindate,'month')
,sum(projectcode),sum(projectcode) from maintable group by
timeseries(projectjoindate,'month')")
+ checkPlan("datamap1", df2)
+ dropDataMap("datamap1")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'month') ,sum(1) ex from maintable
group by timeseries(projectjoindate,'month')")
+ val df3 = sql("select timeseries(projectjoindate,'month') ,sum(1) ex from
maintable group by timeseries(projectjoindate,'month')")
+ checkPlan("datamap1", df3)
+ dropDataMap("datamap1")
+ }
+
+ test("test mv timeseries with like filters") {
+ dropDataMap("datamap1")
+ sql(
+ "create datamap datamap1 on table maintable using 'mv' as " +
+ "select timeseries(projectjoindate,'month') ,sum(salary) from maintable
where salary NOT LIKE '6%' group by timeseries(projectjoindate,'month')")
+ val df1 = sql("select timeseries(projectjoindate,'month') ,sum(salary)
from maintable where salary NOT LIKE '6%' group by
timeseries(projectjoindate,'month')")
+ checkPlan("datamap1", df1)
+ dropDataMap("datamap1")
+ }
+
+ test("test mv timeseries with join scenario") {
+ sql("drop table if exists secondtable")
+ sql(
+ "CREATE TABLE secondtable (empno int,empname string, projectcode int,
projectjoindate " +
+ "Timestamp,salary double) STORED BY 'org.apache.carbondata.format'")
+ loadData("secondtable")
+ sql(
+ "create datamap datamap1 using 'mv' as " +
+ "select timeseries(t1.projectjoindate,'month'), sum(t1.projectcode),
sum(t2.projectcode) " +
+ " from maintable t1 inner join secondtable t2 where" +
+ " t2.projectcode = t1.projectcode group by
timeseries(t1.projectjoindate,'month')")
+ val df = sql(
+ "select timeseries(t1.projectjoindate,'month'), sum(t1.projectcode),
sum(t2.projectcode)" +
+ " from maintable t1 inner join secondtable t2 where" +
+ " t2.projectcode = t1.projectcode group by
timeseries(t1.projectjoindate,'month')")
+ checkPlan("datamap1", df)
+ }
+
+
+ override def afterAll(): Unit = {
+ drop()
+ }
+
+ def drop(): Unit = {
+ sql("drop table if exists maintable")
+ }
+
+ def dropDataMap(datamapName: String): Unit = {
+ sql(s"drop datamap if exists $datamapName")
+ }
+
+ def createTable(): Unit = {
+ sql(
+ "CREATE TABLE maintable (empno int,empname string, projectcode int,
projectjoindate " +
+ "Timestamp,salary double) STORED BY 'org.apache.carbondata.format'")
+ }
+
+ def loadData(table: String): Unit = {
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/mv_sampledata.csv' INTO TABLE
$table OPTIONS
+ |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+ }
+
+ def checkPlan(dataMapName: String, df: DataFrame): Unit = {
+ val analyzed = df.queryExecution.analyzed
+ assert(TestUtil.verifyMVDataMap(analyzed, dataMapName))
+ }
+}
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 51017fe..847580c 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -49,6 +49,7 @@ This section provides the details of all the configurations
required for the Car
| carbon.lock.retries | 3 | CarbonData ensures consistency of operations by
blocking certain operations from running in parallel. In order to block the
operations from running in parallel, lock is obtained on the table. This
configuration specifies the maximum number of retries to obtain the lock for
any operations other than load. **NOTE:** Data manupulation operations like
Compaction,UPDATE,DELETE or LOADING,UPDATE,DELETE are not allowed to run in
parallel. How ever data loading can h [...]
| carbon.lock.retry.timeout.sec | 5 | Specifies the interval between the
retries to obtain the lock for any operation other than load. **NOTE:** Refer
to ***carbon.lock.retries*** for understanding why CarbonData uses locks for
operations. |
| carbon.fs.custom.file.provider | None | To support FileTypeInterface for
configuring custom CarbonFile implementation to work with custom FileSystem. |
+| carbon.timeseries.first.day.of.week | SUNDAY | This parameter configures
which day of the week to be considered as first day of the week. Because first
day of the week will be different in different parts of the world. |
## Data Loading Configuration
diff --git a/docs/datamap/mv-datamap-guide.md b/docs/datamap/mv-datamap-guide.md
index dfa8618..a0c3f1a 100644
--- a/docs/datamap/mv-datamap-guide.md
+++ b/docs/datamap/mv-datamap-guide.md
@@ -211,8 +211,8 @@ release, user can do as following:
Basically, user can manually trigger the operation by re-building the datamap.
## MV TimeSeries Support
-MV non-lazy datamap support's TimeSeries queries. Supported granularities
strings are: year, month, day, week,
-hour,thirty_minute, fifteen_minute, minute and second.
+MV non-lazy datamap supports TimeSeries queries. Supported granularity strings
are: year, month, week, day,
+hour,thirty_minute, fifteen_minute, ten_minute, five_minute, minute and second.
User can create MV datamap with timeseries queries like the below example:
@@ -229,5 +229,5 @@ Supported columns that can be provided in timeseries udf
should be of TimeStamp/
Timeseries queries with Date type support's only year, month, day and week
granularities.
**NOTE**:
- 1. Multiple timeseries udf functions cannot be defined in Select statement
with different timestamp
- columns or different granularities.
\ No newline at end of file
+ 1. Single select statement cannot contain timeseries udf(s) neither with
different granularity nor
+ with different timestamp/date columns.
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/resources/mv_sampledata.csv
b/integration/spark-common-test/src/test/resources/mv_sampledata.csv
new file mode 100644
index 0000000..a278f5e
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/mv_sampledata.csv
@@ -0,0 +1,14 @@
+empno,empname,projectcode,projectjoindate,salary
+11,joey,2,2016-02-23 09:01:30,300
+12,chandler,5,2016-02-23 09:01:50,400
+13,pheobe,1,2016-02-23 09:03:30,450
+14,monica,5,2016-02-23 09:03:50,650
+15,ross,5,2016-02-23 09:07:50,250.3
+16,rachel,1,2016-02-23 09:08:30,300
+17,gunther,8,2016-02-23 09:08:40,800.2
+18,tag,5,2016-02-23 09:16:50,200
+19,will,1,2016-03-23 09:17:30,200
+20,akash,8,2016-03-23 10:18:40,200.2
+21,smith,5,2016-03-29 10:02:50,150.6
+22,cathy,1,2016-02-25 10:03:30,450.5
+23,pablo,5,2016-04-23 11:06:50,350
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 2b62bf2..bf4106b 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -181,7 +181,12 @@ case class CarbonCreateDataMapCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (dataMapProvider != null) {
dataMapProvider.initData()
- if (mainTable != null && !deferredRebuild) {
+ // TODO: remove these checks once the preaggregate and preaggregate
timeseries are deprecated
+ if (mainTable != null && !deferredRebuild &&
+ (dataMapSchema.isIndexDataMap || dataMapSchema.getProviderName
+ .equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.getShortName)
||
+ dataMapSchema.getProviderName
+ .equalsIgnoreCase(DataMapClassProvider.TIMESERIES.getShortName)))
{
dataMapProvider.rebuild()
if (dataMapSchema.isIndexDataMap) {
val operationContext: OperationContext = new OperationContext()
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index b44cf79..85f5068 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -125,8 +125,13 @@ case class CarbonDataMapShowCommand(tableIdentifier:
Option[TableIdentifier])
val iterator = segmentMaps.entrySet().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
-
- syncInfoMap.put(entry.getKey,
DataMapUtil.getMaxSegmentID(entry.getValue))
+ // when in join scenario, one table is loaded and one
more is not loaded,
+ // then put value as NA
+ if (entry.getValue.isEmpty) {
+ syncInfoMap.put(entry.getKey, "NA")
+ } else {
+ syncInfoMap.put(entry.getKey,
DataMapUtil.getMaxSegmentID(entry.getValue))
+ }
}
val loadEndTime =
if (loadMetadataDetails(i).getLoadEndTime ==
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 5dd3a30..7ea62da 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -168,9 +168,7 @@ object TimeSeriesUtil {
for (granularity <- Granularity.values()) {
if (timeSeriesFunction.equalsIgnoreCase(granularity.getName
.substring(0,
granularity.getName.indexOf(CarbonCommonConstants.UNDERSCORE)))) {
- if (!(granularity.getName.equalsIgnoreCase(Granularity.DAY.getName) ||
- granularity.getName.equalsIgnoreCase(Granularity.MONTH.getName)
||
- granularity.getName.equalsIgnoreCase(Granularity.YEAR.getName)))
{
+ if (!supportedGranularitiesForDate.contains(granularity.getName)) {
throw new MalformedCarbonCommandException(
"Granularity should be DAY,MONTH or YEAR, for timeseries column of
Date type")
}
@@ -178,6 +176,12 @@ object TimeSeriesUtil {
}
}
+ val supportedGranularitiesForDate: Seq[String] = Seq(
+ Granularity.DAY.getName,
+ Granularity.WEEK.getName,
+ Granularity.MONTH.getName,
+ Granularity.YEAR.getName)
+
/**
* validate TimeSeries Granularity
*
@@ -189,7 +193,7 @@ object TimeSeriesUtil {
breakable {
for (granularity <- Granularity.values()) {
if (timeSeriesFunction.equalsIgnoreCase(granularity.getName
- .substring(0,
granularity.getName.indexOf(CarbonCommonConstants.UNDERSCORE)))) {
+ .substring(0,
granularity.getName.lastIndexOf(CarbonCommonConstants.UNDERSCORE)))) {
found = true
break
}