[CARBONDATA-1611][Streaming] Reject Update and Delete operation for streaming table
This closes #1447 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/74bd52b6 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/74bd52b6 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/74bd52b6 Branch: refs/heads/pre-aggregate Commit: 74bd52b66f7dae938c1d993e5dc3a7a225227866 Parents: ae280e2 Author: Jacky Li <[email protected]> Authored: Sun Oct 29 21:31:21 2017 +0530 Committer: QiangCai <[email protected]> Committed: Tue Nov 7 21:58:48 2017 +0800 ---------------------------------------------------------------------- .../core/metadata/schema/table/CarbonTable.java | 8 +++ .../mutation/ProjectForUpdateCommand.scala | 12 +--- .../strategy/StreamingTableStrategy.scala | 62 ++++++++++++++++++++ .../spark/sql/hive/CarbonSessionState.scala | 8 ++- .../TestStreamingTableOperation.scala | 59 +++++++++++++++++++ 5 files changed, 136 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/74bd52b6/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index a6738a3..e1a7143 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -657,4 +657,12 @@ public class CarbonTable implements Serializable { public TableInfo getTableInfo() { return tableInfo; } + + /** + * Return true if this is a streaming table (table with property "streaming"="true") + */ + public boolean isStreamingTable() { + String streaming = getTableInfo().getFactTable().getTableProperties().get("streaming"); + return streaming != null && streaming.equalsIgnoreCase("true"); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/74bd52b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala index 5e9d31f..2088396 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala @@ -43,13 +43,6 @@ private[sql] case class ProjectForUpdateCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName) - - // sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution - // .EXECUTION_ID_KEY, null) - // DataFrame(sqlContext, plan).show(truncate = false) - // return Seq.empty - - val res = plan find { case relation: LogicalRelation if relation.relation .isInstanceOf[CarbonDatasourceHadoopRelation] => @@ -63,9 +56,6 @@ private[sql] case class ProjectForUpdateCommand( val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession). asInstanceOf[CarbonRelation] - // val relation = CarbonEnv.get.carbonMetastore - // .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext). - // asInstanceOf[CarbonRelation] val carbonTable = relation.tableMeta.carbonTable val metadataLock = CarbonLockFactory .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, @@ -75,7 +65,7 @@ private[sql] case class ProjectForUpdateCommand( val currentTime = CarbonUpdateUtil.readCurrentTime // var dataFrame: DataFrame = null var dataSet: DataFrame = null - var isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset() + val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset() try { lockStatus = metadataLock.lockWithRetries() if (lockStatus) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/74bd52b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala new file mode 100644 index 0000000..0f0bc24 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.strategy + +import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.command.mutation.{DeleteExecution, ProjectForDeleteCommand, ProjectForUpdateCommand} +import org.apache.spark.sql.hive.CarbonRelation + +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException + +/** + * Strategy for streaming table, like blocking unsupported operation + */ +private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends SparkStrategy { + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + plan match { + case update@ProjectForUpdateCommand(_, tableIdentifier) => + rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Data update") + ExecutedCommandExec(update) :: Nil + case delete@ProjectForDeleteCommand(_, tableIdentifier, _) => + rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Date delete") + ExecutedCommandExec(delete) :: Nil + case _ => Nil + } + } + + /** + * Validate whether Update operation is allowed for specified table in the command + */ + private def rejectIfStreamingTable(tableIdentifier: TableIdentifier, operation: String): Unit = { + val streaming = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(tableIdentifier)(sparkSession) + .asInstanceOf[CarbonRelation] + .tableMeta + .carbonTable + .isStreamingTable + if (streaming) { + throw new MalformedCarbonCommandException( + s"$operation is not allowed for streaming table") + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/74bd52b6/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala index 6892dad..9cad7b0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.SparkOptimizer import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy} +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.optimizer.CarbonLateDecodeRule import org.apache.spark.sql.parser.CarbonSparkSqlParser @@ -133,7 +133,11 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) experimentalMethods.extraStrategies = - Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession)) + Seq( + new StreamingTableStrategy(sparkSession), + new CarbonLateDecodeStrategy, + new DDLStrategy(sparkSession) + ) experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule) override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) http://git-wip-us.apache.org/repos/asf/carbondata/blob/74bd52b6/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala new file mode 100644 index 0000000..2c1c6b8 --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.carbondata + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException + +class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { + override def beforeAll { + sql("DROP DATABASE IF EXISTS streaming CASCADE") + sql("CREATE DATABASE streaming") + sql("USE streaming") + sql( + """ + | create table source( + | c1 string, + | c2 int, + | c3 string, + | c5 string + | ) STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES ('streaming' = 'true') + """.stripMargin) + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO TABLE source""") + } + + + test("test blocking update and delete operation on streaming table") { + intercept[MalformedCarbonCommandException] { + sql("""UPDATE source d SET (d.c2) = (d.c2 + 1) WHERE d.c1 = 'a'""").show() + } + intercept[MalformedCarbonCommandException] { + sql("""DELETE FROM source WHERE d.c1 = 'a'""").show() + } + } + + override def afterAll { + sql("USE default") + sql("DROP DATABASE IF EXISTS streaming CASCADE") + } +}
