This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 5b76062 [CARBONDATA-3501] Fix update table with varchar column
5b76062 is described below
commit 5b7606259d1b3dd3f25223118039dc79e6216909
Author: Manhua <[email protected]>
AuthorDate: Wed Sep 25 11:47:01 2019 +0800
[CARBONDATA-3501] Fix update table with varchar column
Problem
Update on table with varchar column will throw exception
Analyse
In the loading part of update operation, it gets the isVarcharTypeMapping
for each column in the order when table created. And this gives a hint for
checking string length. It does not allow to exceeds 32000 char for a column
which is not varchar type.
However when changing the plan for updating in CarbonIUDRule, it first
deletes the old expression and appends the new one, which makes the order
differ to table created. Such that the string length checking fail.
Solution
Keep the order as table created when modify update plan
This closes #3398
---
.../longstring/VarcharDataTypesBasicTestCase.scala | 10 ++++++++++
.../command/management/CarbonLoadDataCommand.scala | 2 +-
.../org/apache/spark/sql/hive/CarbonAnalysisRules.scala | 4 ++--
.../org/apache/spark/sql/optimizer/CarbonIUDRule.scala | 17 ++++++++++++++---
4 files changed, 27 insertions(+), 6 deletions(-)
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
index 4fd2cc0..9719cfc 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
@@ -389,6 +389,16 @@ class VarcharDataTypesBasicTestCase extends QueryTest with
BeforeAndAfterEach wi
sql("DROP TABLE IF EXISTS varchar_complex_table")
}
+
+ test("update table with long string column") {
+ prepareTable()
+ // update non-varchar column
+ sql(s"update $longStringTable set(id)=(0) where name is not null").show()
+ // update varchar column
+ sql(s"update $longStringTable set(description)=('empty') where name is not
null").show()
+ // update non-varchar&varchar column
+ sql(s"update $longStringTable set(description, id)=('sth.', 1) where name
is not null").show()
+ }
// ignore this test in CI, because it will need at least 4GB memory to run
successfully
ignore("Exceed 2GB per column page for varchar datatype") {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 6a03eab..b2f9a1e 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -1060,7 +1060,7 @@ case class CarbonLoadDataCommand(
val dropAttributes = df.logicalPlan.output.dropRight(1)
val finalOutput = catalogTable.schema.map { attr =>
dropAttributes.find { d =>
- val index = d.name.lastIndexOf("-updatedColumn")
+ val index =
d.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)
if (index > 0) {
d.name.substring(0, index).equalsIgnoreCase(attr.name)
} else {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 9b923b0..d11bf1e 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -122,9 +122,9 @@ case class CarbonIUDAnalysisRule(sparkSession:
SparkSession) extends Rule[Logica
val renamedProjectList = projectList.zip(columns).map { case (attr,
col) =>
attr match {
case UnresolvedAlias(child22, _) =>
- UnresolvedAlias(Alias(child22, col + "-updatedColumn")())
+ UnresolvedAlias(Alias(child22, col +
CarbonCommonConstants.UPDATED_COL_EXTENSION)())
case UnresolvedAttribute(_) =>
- UnresolvedAlias(Alias(attr, col + "-updatedColumn")())
+ UnresolvedAlias(Alias(attr, col +
CarbonCommonConstants.UPDATED_COL_EXTENSION)())
case _ => attr
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
index ae5825d..da1ca55 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
@@ -23,6 +23,8 @@ import
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import
org.apache.spark.sql.execution.command.mutation.CarbonProjectForUpdateCommand
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
/**
* Rule specific for IUD operations
*/
@@ -37,9 +39,9 @@ class CarbonIUDRule extends Rule[LogicalPlan] with
PredicateHelper {
var isTransformed = false
val newPlan = updatePlan transform {
case Project(pList, child) if !isTransformed =>
- val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) =
pList
+ var (dest: Seq[NamedExpression], source: Seq[NamedExpression]) =
pList
.splitAt(pList.size - cols.size)
- val diff = cols.diff(dest.map(_.name.toLowerCase))
+ // check complex column
cols.foreach { col =>
val complexExists = "\"name\":\"" + col + "\""
if (dest.exists(m => m.dataType.json.contains(complexExists))) {
@@ -47,11 +49,20 @@ class CarbonIUDRule extends Rule[LogicalPlan] with
PredicateHelper {
"Unsupported operation on Complex data type")
}
}
+ // check updated columns exists in table
+ val diff = cols.diff(dest.map(_.name.toLowerCase))
if (diff.nonEmpty) {
sys.error(s"Unknown column(s) ${ diff.mkString(",") } in table
${ table.tableName }")
}
+ // modify plan for updated column *in place*
isTransformed = true
- Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++
source, child)
+ source.foreach { col =>
+ val colName = col.name.substring(0,
+
col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
+ val updateIdx = dest.indexWhere(_.name.equalsIgnoreCase(colName))
+ dest = dest.updated(updateIdx, col)
+ }
+ Project(dest, child)
}
CarbonProjectForUpdateCommand(
newPlan, table.tableIdentifier.database,
table.tableIdentifier.table, cols)