[CARBONDATA-1214] Changing the delete syntax as in the hive for segment deletion
This closes #1078 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/05de7fda Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/05de7fda Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/05de7fda Branch: refs/heads/streaming_ingest Commit: 05de7fdae5f02f172321af614532bbc331309fcb Parents: 26d2f1c Author: ravikiran23 <[email protected]> Authored: Thu Jun 22 18:18:19 2017 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Thu Jul 6 16:03:29 2017 +0530 ---------------------------------------------------------------------- .../examples/DataManagementExample.scala | 4 ++-- .../MajorCompactionIgnoreInMinorTest.scala | 4 ++-- .../dataload/TestLoadTblNameIsKeyword.scala | 2 +- .../DataRetentionConcurrencyTestCase.scala | 4 ++-- .../dataretention/DataRetentionTestCase.scala | 24 ++++++++++---------- .../iud/HorizontalCompactionTestCase.scala | 2 +- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 1 + .../org/apache/spark/sql/CarbonSqlParser.scala | 21 ++++++++--------- .../DataCompactionNoDictionaryTest.scala | 2 +- .../sql/parser/CarbonSpark2SqlParser.scala | 18 +++++++-------- 10 files changed, 39 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/05de7fda/examples/spark/src/main/scala/org/apache/carbondata/examples/DataManagementExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataManagementExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataManagementExample.scala index 4552e06..551a008 100644 --- a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataManagementExample.scala +++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataManagementExample.scala @@ -45,7 +45,7 @@ object DataManagementExample { cc.sql("SHOW SEGMENTS FOR TABLE t3 ").show // delete the first segment - cc.sql("DELETE SEGMENT 0 FROM TABLE t3") + cc.sql("DELETE FROM TABLE T3 WHERE SEGMENT.ID IN (0)") cc.sql("SHOW SEGMENTS FOR TABLE t3 LIMIT 10").show // this query will be executed on last 4 segments, it should return 4000 rows @@ -63,7 +63,7 @@ object DataManagementExample { cc.sql("SELECT count(*) AS amount FROM t3").show // delete all segments whose loading time is before '2099-01-01 01:00:00' - cc.sql("DELETE SEGMENTS FROM TABLE t3 WHERE STARTTIME BEFORE '2099-01-01 01:00:00'") + cc.sql("DELETE FROM TABLE T3 WHERE SEGMENT.STARTTIME BEFORE '2099-01-01 01:00:00'") cc.sql("SHOW SEGMENTS FOR TABLE t3 ").show // this query will be executed on 0 segments, it should return 0 rows http://git-wip-us.apache.org/repos/asf/carbondata/blob/05de7fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala index 9d2cf96..b66e37b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala @@ -103,7 +103,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll */ test("delete compacted segment and check status") { try { - sql("delete segment 2 from table ignoremajor") + sql("delete from table ignoremajor where segment.id in (2)") assert(false) } catch { @@ -128,7 +128,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll */ test("delete compacted segment by date and check status") { sql( - "DELETE SEGMENTS FROM TABLE ignoremajor where STARTTIME before" + + "delete from table ignoremajor where segment.starttime before " + " '2222-01-01 19:35:01'" ) val carbontablePath = CarbonStorePath http://git-wip-us.apache.org/repos/asf/carbondata/blob/05de7fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadTblNameIsKeyword.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadTblNameIsKeyword.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadTblNameIsKeyword.scala index cadaa6e..71aeb99 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadTblNameIsKeyword.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadTblNameIsKeyword.scala @@ -78,7 +78,7 @@ class TestLoadTblNameIsKeyword extends QueryTest with BeforeAndAfterAll { LOAD DATA LOCAL INPATH '$testData' into table timestamp """) sql("show segments for table timestamp") - sql("delete segments from table timestamp where starttime before '2099-10-01 18:00:00'") + sql("delete from table timestamp where segment.starttime before '2099-10-01 18:00:00'") sql("clean files for table timestamp") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/05de7fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala index 784382b..79350eb 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala @@ -57,7 +57,7 @@ class DataRetentionConcurrencyTestCase extends QueryTest with BeforeAndAfterAll val tasks = new util.ArrayList[Callable[String]]() tasks .add(new QueryTask(s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE concurrent OPTIONS('DELIMITER' = ',')")) - tasks.add(new QueryTask("Delete segment 0 from table concurrent")) + tasks.add(new QueryTask("delete from table concurrent where segment.id in (0)")) tasks.add(new QueryTask("clean files for table concurrent")) val results = executorService.invokeAll(tasks) for (i <- 0 until tasks.size()) { @@ -77,7 +77,7 @@ class DataRetentionConcurrencyTestCase extends QueryTest with BeforeAndAfterAll .add(new QueryTask(s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE concurrent OPTIONS('DELIMITER' = ',')")) tasks .add(new QueryTask( - "DELETE SEGMENTS FROM TABLE concurrent where STARTTIME before '2099-01-01 00:00:00'")) + "delete from table concurrent where segment.starttime before '2099-01-01 00:00:00'")) tasks.add(new QueryTask("clean files for table concurrent")) val results = executorService.invokeAll(tasks) for (i <- 0 until tasks.size()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/05de7fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala index 684ed8e..b255099 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala @@ -131,7 +131,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { val actualValue: String = getSegmentStartTime(segments, 1) // delete segments (0,1) which contains ind, aus sql( - "DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before '" + actualValue + "'") + "delete from table DataRetentionTable where segment.starttime before '" + actualValue + "'") // load segment 2 which contains eng sql( @@ -147,7 +147,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { test("RetentionTest3_DeleteByLoadId") { // delete segment 2 and load ind segment - sql("DELETE SEGMENT 2 FROM TABLE DataRetentionTable") + sql("delete from table DataRetentionTable where segment.id in (2)") sql( s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE DataRetentionTable " + "OPTIONS('DELIMITER' = ',')") @@ -166,7 +166,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { test("RetentionTest4_DeleteByInvalidLoadId") { try { // delete segment with no id - sql("DELETE SEGMENT FROM TABLE DataRetentionTable") + sql("delete from table DataRetentionTable where segment.id in ()") assert(false) } catch { case e: MalformedCarbonCommandException => @@ -191,8 +191,8 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { checkAnswer( sql("select count(*) from carbon_table_1"), Seq(Row(20))) - sql("delete segments from table carbon_table_1 " + - "where starttime before '2099-07-28 11:00:00'") + sql("delete from table carbon_table_1 where segment.starttime " + + " before '2099-07-28 11:00:00'") checkAnswer( sql("select count(*) from carbon_table_1"), Seq(Row(0))) @@ -204,7 +204,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { try { sql( - "DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before" + + "delete from table DataRetentionTable where segment.starttime before" + " 'abcd-01-01 00:00:00'") assert(false) } catch { @@ -215,7 +215,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { try { sql( - "DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before" + + "delete from table DataRetentionTable where segment.starttime before" + " '2099:01:01 00:00:00'") assert(false) } catch { @@ -230,7 +230,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { ), Seq(Row("ind", 9)) ) - sql("DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before '2099-01-01'") + sql("delete from table DataRetentionTable where segment.starttime before '2099-01-01'") checkAnswer( sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" + " IN ('china','ind','aus','eng') GROUP BY country"), Seq()) @@ -280,7 +280,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { // delete segment 0 it should fail try { - sql("DELETE SEGMENT 0 FROM TABLE retentionlock") + sql("delete from table retentionlock where segment.id in (0)") throw new MalformedCarbonCommandException("Invalid") } catch { case me: MalformedCarbonCommandException => @@ -291,7 +291,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { // it should fail try { - sql("DELETE SEGMENTS FROM TABLE retentionlock where STARTTIME before " + + sql("delete from table retentionlock where segment.starttime before " + "'2099-01-01 00:00:00.0'") throw new MalformedCarbonCommandException("Invalid") } catch { @@ -317,10 +317,10 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { carbonCleanFilesLock.unlock() carbonDeleteSegmentLock.unlock() - sql("DELETE SEGMENT 0 FROM TABLE retentionlock") + sql("delete from table retentionlock where segment.id in (0)") //load and delete should execute parallely carbonMetadataLock.lockWithRetries() - sql("DELETE SEGMENT 1 FROM TABLE retentionlock") + sql("delete from table retentionlock where segment.id in (1)") carbonMetadataLock.unlock() } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/05de7fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala index d8310da..0d30333 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala @@ -321,7 +321,7 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll { sql(s"""load data local inpath '$resourcesPath/IUD/comp4.csv' INTO table dest2""") sql( """delete from dest2 where (c2 < 3) or (c2 > 10 and c2 < 13) or (c2 > 20 and c2 < 23) or (c2 > 30 and c2 < 33)""").show() - sql("""DELETE SEGMENT 0 FROM TABLE dest2""") + sql("""delete from table dest2 where segment.id in (0) """) sql("""clean files for table dest2""") sql( """update dest2 set (c5) = ('8RAM size') where (c2 > 3 and c2 < 5) or (c2 > 13 and c2 < 15) or (c2 > 23 and c2 < 25) or (c2 > 33 and c2 < 35)""") http://git-wip-us.apache.org/repos/asf/carbondata/blob/05de7fda/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 8207a9d..4dbdc8d 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -164,6 +164,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val SMALLINT = carbonKeyWord("SMALLINT") protected val CHANGE = carbonKeyWord("CHANGE") protected val TBLPROPERTIES = carbonKeyWord("TBLPROPERTIES") + protected val ID = carbonKeyWord("ID") protected val doubleQuotedString = "\"([^\"]+)\"".r protected val singleQuotedString = "'([^']+)'".r http://git-wip-us.apache.org/repos/asf/carbondata/blob/05de7fda/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala index a664104..9dc9ee2 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala @@ -417,23 +417,20 @@ class CarbonSqlParser() extends CarbonDDLSqlParser { } protected lazy val deleteLoadsByID: Parser[LogicalPlan] = - DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~> - (ident <~ ".").? ~ ident) <~ - opt(";") ^^ { - case loadids ~ table => table match { - case databaseName ~ tableName => - DeleteLoadsById(loadids, convertDbNameToLowerCase(databaseName), tableName.toLowerCase()) - } - } + DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~ + (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ ")" ~ opt(";") ^^ { + case dbName ~ tableName ~ loadids => + DeleteLoadsById(loadids, convertDbNameToLowerCase(dbName), tableName.toLowerCase()) + } protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] = - DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~ - (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~ + DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~ + (WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~ opt(";") ^^ { - case schema ~ table ~ condition => + case database ~ table ~ condition => condition match { case dateField ~ dateValue => - DeleteLoadsByLoadDate(convertDbNameToLowerCase(schema), + DeleteLoadsByLoadDate(convertDbNameToLowerCase(database), table.toLowerCase(), dateField, dateValue) http://git-wip-us.apache.org/repos/asf/carbondata/blob/05de7fda/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala index dda2a88..5897681 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala @@ -156,7 +156,7 @@ class DataCompactionNoDictionaryTest extends QueryTest with BeforeAndAfterAll { sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE nodictionaryCompaction " + "OPTIONS('DELIMITER' = ',')" ) - sql("DELETE segment 0.1,3 FROM TABLE nodictionaryCompaction") + sql("delete from table nodictionaryCompaction where segment.id in (0.1,3)") checkAnswer( sql("select country from nodictionaryCompaction"), Seq() http://git-wip-us.apache.org/repos/asf/carbondata/blob/05de7fda/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index af286eb..511a61c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -220,23 +220,21 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { } protected lazy val deleteLoadsByID: Parser[LogicalPlan] = - DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~> - (ident <~ ".").? ~ ident) <~ + DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~ + (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ ")" ~ opt(";") ^^ { - case loadids ~ table => table match { - case databaseName ~ tableName => - DeleteLoadsById(loadids, convertDbNameToLowerCase(databaseName), tableName.toLowerCase()) - } + case dbName ~ tableName ~ loadids => + DeleteLoadsById(loadids, dbName, tableName.toLowerCase()) } protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] = - DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~ - (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~ + DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~ + (WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~ opt(";") ^^ { - case schema ~ table ~ condition => + case database ~ table ~ condition => condition match { case dateField ~ dateValue => - DeleteLoadsByLoadDate(convertDbNameToLowerCase(schema), + DeleteLoadsByLoadDate(convertDbNameToLowerCase(database), table.toLowerCase(), dateField, dateValue)
