This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new c9a5231 [CARBONDATA-4204][CARBONDATA-4231] Fix add segment error
message, index server failed testcases and dataload fail error on update
c9a5231 is described below
commit c9a5231ab73d64a0966a45a755ff7e9031c0201e
Author: ShreelekhyaG <[email protected]>
AuthorDate: Mon Jun 21 20:44:31 2021 +0530
[CARBONDATA-4204][CARBONDATA-4231] Fix add segment error message,
index server failed testcases and dataload fail error on update
Why is this PR needed?
1. When the path is empty in Carbon add segments then
StringIndexOutOfBoundsException is thrown.
2. Index server UT failures fix.
3. Update fails with dataload fail error if set bad
records action is specified to force with spark 3.1v.
What changes were proposed in this PR?
1. Added check to see if the path is empty and then throw
a valid error message.
2. Used checkAnswer instead of assert in test cases so
that the order of rows returned would be same with or
without index server. Excluded 2 test cases where explain
with query statistics is used, as we are not setting any
pruning info from index server.
3. On update command, dataframe.persist is called and with
latest 3.1 spark changes, spark returns a cloned
SparkSession from cacheManager with all specified
configurations disabled. As now it's using a different
sparkSession for 3.1 which is not initialized in CarbonEnv.
So CarbonEnv.init is called where new CarbonSessionInfo is
created with no sessionParams. So, the properties set were
not accessible. When a new carbonSessionInfo object is
getting created, made changes to set existing sessionparams
from currentThreadSessionInfo.
This closes #4157
---
.../main/scala/org/apache/spark/sql/CarbonEnv.scala | 1 +
.../command/management/CarbonAddLoadCommand.scala | 3 +++
.../testsuite/addsegment/AddSegmentTestCase.scala | 9 +++++++++
.../alterTable/TestAlterTableAddColumns.scala | 21 ++++++++++-----------
.../createTable/TestRenameTableWithIndex.scala | 8 ++++++--
.../AlterTableColumnRenameTestCase.scala | 19 ++++++++-----------
6 files changed, 37 insertions(+), 24 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index bb6e2ef..98c18ac 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -119,6 +119,7 @@ class CarbonEnv {
val threadLevelCarbonSessionInfo = new CarbonSessionInfo()
if (currentThreadSessionInfo != null) {
threadLevelCarbonSessionInfo.setThreadParams(currentThreadSessionInfo.getThreadParams)
+
threadLevelCarbonSessionInfo.setSessionParams(currentThreadSessionInfo.getSessionParams)
}
ThreadLocalSessionInfo.setCarbonSessionInfo(threadLevelCarbonSessionInfo)
ThreadLocalSessionInfo.setConfigurationToCurrentThread(
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index e319185..ed40d9a 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -89,6 +89,9 @@ case class CarbonAddLoadCommand(
var givenPath = options.getOrElse(
"path", throw new UnsupportedOperationException("PATH is mandatory"))
+ if (givenPath.length == 0) {
+ throw new UnsupportedOperationException("PATH cannot be empty")
+ }
// remove file separator if already present
if (givenPath.charAt(givenPath.length - 1) == '/') {
givenPath = givenPath.substring(0, givenPath.length - 1)
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index cc5a373..72b8134 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -1120,6 +1120,15 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
assert(ex.getMessage.contains("can not add same segment path repeatedly"))
}
+ test("Test add segment with empty path") {
+ createCarbonTable()
+ val ex = intercept[Exception] {
+ sql("alter table addsegment1 add segment " +
+ s"options('path'='', 'format'='carbon')").collect()
+ }
+ assert(ex.getMessage.contains("PATH cannot be empty"))
+ }
+
def getDataSize(path: String): String = {
val allFiles = FileFactory.getCarbonFile(path).listFiles(new
CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index f248d72..9a512f6 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -277,8 +277,7 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
}
}
- // Exclude when running with index server as the returned rows order may vary
- test("Test alter add for arrays enabling local dictionary", true) {
+ test("Test alter add for arrays enabling local dictionary") {
import scala.collection.mutable.WrappedArray.make
createTableForComplexTypes("LOCAL_DICTIONARY_INCLUDE", "ARRAY")
// For the previous segments the default value for newly added array
column is null
@@ -289,15 +288,15 @@ class TestAlterTableAddColumns extends QueryTest with
BeforeAndAfterAll {
sql(
"insert into alter_com
values(2,array(9,0),array(1,2,3),array('hello','world'),array(6,7)," +
"array(8,9), named_struct('a',1,'b','abcde') )")
- val rows = sql("select * from alter_com").collect()
- assert(rows(6)(0) == 2)
- assert(rows(6)(1) == make(Array(9, 0)))
- assert(rows(6)(2) == make(Array(1, 2, 3)))
- assert(rows(6)(3) == make(Array("hello", "world")))
- assert(rows(6)(4) == make(Array(6, 7)))
- assert(rows(6)(5) == make(Array(8, 9)))
- assert(rows(6)(6) == Row(1, "abcde"))
- assert(rows.size == 7)
+ sql("select * from alter_com").show(false)
+ checkAnswer(sql("select * from alter_com where array_contains(arr4,6)"),
+ Seq(Row(2,
+ make(Array(9, 0)),
+ make(Array(1, 2, 3)),
+ make(Array("hello", "world")),
+ make(Array(6, 7)),
+ make(Array(8, 9)),
+ Row(1, "abcde"))))
val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
assert(addedColumns.size == 5)
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
index b416353..b522d3b 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
@@ -121,8 +121,10 @@ class TestRenameTableWithIndex extends QueryTest with
BeforeAndAfterAll {
true, "dm_carbon_si")
}
+ // Exclude when running with index server, as pruning info for explain
command
+ // not set with index server.
test("rename index table success, insert new record success" +
- " and query hit new index table") {
+ " and query hit new index table", true) {
sql("create table if not exists x1 (imei string, mac string) stored as
carbondata")
sql("create index idx_x1_mac on table x1(mac) as 'carbondata'")
sql("alter table idx_x1_mac rename to idx_x1_mac1")
@@ -135,8 +137,10 @@ class TestRenameTableWithIndex extends QueryTest with
BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS x1")
}
+ // Exclude when running with index server, as pruning info for explain
command
+ // not set with index server.
test("rename index table fail, revert success, insert new record success" +
- " and query hit old index table") {
+ " and query hit old index table", true) {
val mock: MockUp[MockClassForAlterRevertTests] = new
MockUp[MockClassForAlterRevertTests]() {
@Mock
@throws[ProcessMetaDataException]
diff --git
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
index ecabd6c..c54eb9a 100644
---
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
@@ -60,7 +60,7 @@ class AlterTableColumnRenameTestCase extends QueryTest with
BeforeAndAfterAll {
sql("insert into test_rename
values(named_struct('a11',named_struct('b2',24,'d',24), 'c', 24))")
val rows = sql("select str22.a11.b2 from test_rename").collect()
- assert(rows(0).equals(Row(12)) && rows(1).equals(Row(24)))
+ checkAnswer(sql("select str22.a11.b2 from test_rename"), Seq(Row(12),
Row(24)))
// check if old column names are still present
val ex1 = intercept[AnalysisException] {
sql("select str from test_rename").show(false)
@@ -73,10 +73,8 @@ class AlterTableColumnRenameTestCase extends QueryTest with
BeforeAndAfterAll {
assert(ex2.getMessage.contains("cannot resolve '`str.a`'"))
// check un-altered columns
- val rows1 = sql("select str22.c from test_rename").collect()
- val rows2 = sql("select str22.a11.d from test_rename").collect()
- assert(rows1.sameElements(Array(Row(12), Row(24))))
- assert(rows2.sameElements(Array(Row(12), Row(24))))
+ checkAnswer(sql("select str22.c from test_rename"), Seq(Row(12), Row(24)))
+ checkAnswer(sql("select str22.a11.b2 from test_rename"), Seq(Row(12),
Row(24)))
}
test("rename complex columns with invalid
structure/duplicate-names/Map-type") {
@@ -302,12 +300,11 @@ class AlterTableColumnRenameTestCase extends QueryTest
with BeforeAndAfterAll {
sql(
"insert into test_rename values (array(11,22,33),
array(array(11,22),array(33,44)), array" +
"('hello11', 'world11'), array(named_struct('a',4555)))")
- val rows = sql("select arr11, arr22, arr33, arr44.a11 from
test_rename").collect
- assert(rows.size == 2)
- val secondRow = rows(1)
- assert(secondRow(0).equals(make(Array(11, 22, 33))) &&
- secondRow(1).equals(make(Array(make(Array(11, 22)), make(Array(33,
44))))) &&
- secondRow(2).equals(make(Array("hello11", "world11"))))
+ checkAnswer(sql("select arr11, arr22, arr33, arr44.a11 from test_rename"),
+ Seq(Row(make(Array(1, 2, 3)), make(Array(make(Array(1, 2)),
make(Array(3, 4)))),
+ make(Array("hello", "world")), make(Array(45))),
+ Row(make(Array(11, 22, 33)), make(Array(make(Array(11, 22)),
make(Array(33, 44)))),
+ make(Array("hello11", "world11")), make(Array(4555)))))
}
test("validate alter change datatype for complex children columns") {