This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8220d23be19 [HUDI-6676] Add command for CreateHoodieTableLike (#9412)
8220d23be19 is described below
commit 8220d23be19af4783a9a776dfffa48167975a6a2
Author: Rex(Hui) An <[email protected]>
AuthorDate: Tue Aug 15 09:02:04 2023 +0800
[HUDI-6676] Add command for CreateHoodieTableLike (#9412)
* add command for CreateHoodieTableLike
* don't support spark2
---
.../spark/sql/HoodieCatalystPlansUtils.scala | 7 ++
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 8 +-
.../apache/spark/sql/hudi/HoodieOptionConfig.scala | 8 ++
.../command/CreateHoodieTableLikeCommand.scala | 110 ++++++++++++++++
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 13 +-
.../apache/spark/sql/hudi/TestCreateTable.scala | 139 +++++++++++++++++++++
.../spark/sql/HoodieSpark2CatalystPlanUtils.scala | 9 ++
.../spark/sql/HoodieSpark3CatalystPlanUtils.scala | 13 +-
8 files changed, 302 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index 58789681c54..9cfe23f86cc 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
@@ -93,6 +94,12 @@ trait HoodieCatalystPlansUtils {
*/
def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan,
Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
+ /**
+ * Decomposes [[CreateTableLikeCommand]] into its arguments allowing to
accommodate for API
+ * changes in Spark 3
+ */
+ def unapplyCreateTableLikeCommand(plan: LogicalPlan):
Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String],
Map[String, String], Boolean)]
+
/**
* Rebases instance of {@code InsertIntoStatement} onto provided instance of
{@code targetTable} and {@code query}
*/
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 041beba95df..1c6111afe47 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -150,11 +150,11 @@ trait SparkAdapter extends Serializable {
}
def isHoodieTable(map: java.util.Map[String, String]): Boolean = {
- map.getOrDefault("provider", "").equals("hudi")
+ isHoodieTable(map.getOrDefault("provider", ""))
}
def isHoodieTable(table: CatalogTable): Boolean = {
- table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
+ isHoodieTable(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull)
}
def isHoodieTable(tableId: TableIdentifier, spark: SparkSession): Boolean = {
@@ -162,6 +162,10 @@ trait SparkAdapter extends Serializable {
isHoodieTable(table)
}
+ def isHoodieTable(provider: String): Boolean = {
+ "hudi".equalsIgnoreCase(provider)
+ }
+
/**
* Create instance of [[ParquetFileFormat]]
*/
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index d715a108d62..abe98bb46cf 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -182,6 +182,14 @@ object HoodieOptionConfig {
options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv =>
sqlOptionKeyToWriteConfigKey.contains(kv._1))
}
+ /**
+ * The opposite of `deleteHoodieOptions`, this method extract all hoodie
related
+ * options(start with `hoodie.` and all sql options)
+ */
+ def extractHoodieOptions(options: Map[String, String]): Map[String, String]
= {
+ options.filter(_._1.startsWith("hoodie.")) ++ extractSqlOptions(options)
+ }
+
// extract primaryKey, preCombineField, type options
def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
val sqlOptions = mapTableConfigsToSqlOptions(options)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableLikeCommand.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableLikeCommand.scala
new file mode 100644
index 00000000000..dc4458d8ad1
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableLikeCommand.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.util.ConfigUtils
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType, HoodieCatalogTable}
+import org.apache.spark.sql.hudi.HoodieOptionConfig
+
+import scala.util.control.NonFatal
+
+case class CreateHoodieTableLikeCommand(targetTable: TableIdentifier,
+ sourceTable: TableIdentifier,
+ fileFormat: CatalogStorageFormat,
+ properties: Map[String, String] =
Map.empty,
+ ignoreIfExists: Boolean)
+ extends HoodieLeafRunnableCommand with SparkAdapterSupport {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
+
+ val tableIsExists = catalog.tableExists(targetTable)
+ if (tableIsExists) {
+ if (ignoreIfExists) {
+ // scalastyle:off
+ return Seq.empty[Row]
+ // scalastyle:on
+ } else {
+ throw new IllegalArgumentException(s"Table $targetTable already
exists.")
+ }
+ }
+
+ val sourceTableDesc =
catalog.getTempViewOrPermanentTableMetadata(sourceTable)
+
+ val newStorage = if (fileFormat.inputFormat.isDefined) {
+ fileFormat
+ } else {
+ sourceTableDesc.storage.copy(locationUri = fileFormat.locationUri)
+ }
+
+ // If the location is specified, we create an external table internally.
+ // Otherwise create a managed table.
+ val tblType = if (newStorage.locationUri.isEmpty) {
+ CatalogTableType.MANAGED
+ } else {
+ CatalogTableType.EXTERNAL
+ }
+
+ val targetTableProperties = if
(sparkAdapter.isHoodieTable(sourceTableDesc)) {
+ HoodieOptionConfig.extractHoodieOptions(sourceTableDesc.properties) ++
properties
+ } else {
+ properties
+ }
+
+ val newTableDesc = CatalogTable(
+ identifier = targetTable,
+ tableType = tblType,
+ storage = newStorage,
+ schema = sourceTableDesc.schema,
+ provider = Some("hudi"),
+ partitionColumnNames = sourceTableDesc.partitionColumnNames,
+ bucketSpec = sourceTableDesc.bucketSpec,
+ properties = targetTableProperties,
+ tracksPartitionsInCatalog = sourceTableDesc.tracksPartitionsInCatalog)
+
+ val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTableDesc)
+ // check if there are conflict between table configs defined in hoodie
table and properties defined in catalog.
+ CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable)
+
+ val queryAsProp =
hoodieCatalogTable.catalogProperties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
+ if (queryAsProp.isEmpty) {
+ // init hoodie table for a normal table (not a ro/rt table)
+ hoodieCatalogTable.initHoodieTable()
+ } else {
+ if (!hoodieCatalogTable.hoodieTableExists) {
+ throw new AnalysisException("Creating ro/rt table need the existence
of the base table.")
+ }
+ if (HoodieTableType.MERGE_ON_READ != hoodieCatalogTable.tableType) {
+ throw new AnalysisException("Creating ro/rt table should only apply to
a mor table.")
+ }
+ }
+
+ try {
+ // create catalog table for this hoodie table
+ CreateHoodieTableCommand.createTableInCatalog(sparkSession,
hoodieCatalogTable, ignoreIfExists, queryAsProp)
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Failed to create catalog table in metastore", e)
+ }
+ Seq.empty[Row]
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 3c2d41aa582..24820c1c032 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.hudi.analysis
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.common.util.ReflectionUtils.loadClass
import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq,
Expression, GenericInternalRow}
import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions
import org.apache.spark.sql.catalyst.plans.logical._
@@ -29,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable,
LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField,
removeMetaFields}
-import
org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchInsertIntoStatement,
MatchMergeIntoTable, ResolvesToHudiTable, sparkAdapter}
+import
org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateTableLike,
MatchInsertIntoStatement, MatchMergeIntoTable, ResolvesToHudiTable,
sparkAdapter}
import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures,
Procedure, ProcedureArgs}
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -348,6 +349,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
sparkAdapter.resolveHoodieTable(plan)
}
+ private[sql] object MatchCreateTableLike {
+ def unapply(plan: LogicalPlan): Option[(TableIdentifier, TableIdentifier,
CatalogStorageFormat, Option[String], Map[String, String], Boolean)] =
+ sparkAdapter.getCatalystPlanUtils.unapplyCreateTableLikeCommand(plan)
+ }
+
private[sql] def failAnalysis(msg: String): Nothing = {
throw new AnalysisException(msg)
}
@@ -504,6 +510,9 @@ case class HoodiePostAnalysisRule(sparkSession:
SparkSession) extends Rule[Logic
case CreateDataSourceTableCommand(table, ignoreIfExists)
if sparkAdapter.isHoodieTable(table) =>
CreateHoodieTableCommand(table, ignoreIfExists)
+ case MatchCreateTableLike(targetTable, sourceTable, fileFormat,
provider, properties, ifNotExists)
+ if sparkAdapter.isHoodieTable(provider.orNull) =>
+ CreateHoodieTableLikeCommand(targetTable, sourceTable, fileFormat,
properties, ifNotExists)
// Rewrite the DropTableCommand to DropHoodieTableCommand
case DropTableCommand(tableName, ifExists, false, purge)
if sparkSession.sessionState.catalog.tableExists(tableName)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index a5ddd7ca854..bc3540ebf50 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -405,6 +405,145 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
}
}
+ test("Test create table like") {
+ if (HoodieSparkUtils.gteqSpark3_1) {
+ // 1. Test create table from an existing HUDI table
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ withTable(generateTableName) { sourceTable =>
+ spark.sql(
+ s"""
+ |create table $sourceTable (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id,name',
+ | type = '$tableType'
+ | )
+ | location
'${tmp.getCanonicalPath}/$sourceTable'""".stripMargin)
+
+ // 1.1 Test Managed table
+ withTable(generateTableName) { targetTable =>
+ spark.sql(
+ s"""
+ |create table $targetTable
+ |like $sourceTable
+ |using hudi""".stripMargin)
+
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable))
+
+ assertResult(targetTable)(table.identifier.table)
+ assertResult("hudi")(table.provider.get)
+ assertResult(CatalogTableType.MANAGED)(table.tableType)
+ assertResult(
+ HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_,
StringType))
+ ++ Seq(
+ StructField("id", IntegerType),
+ StructField("name", StringType),
+ StructField("price", DoubleType),
+ StructField("ts", LongType))
+ )(table.schema.fields)
+ assertResult(tableType)(table.properties("type"))
+ assertResult("id,name")(table.properties("primaryKey"))
+
+ // target table already exist
+ assertThrows[IllegalArgumentException] {
+ spark.sql(
+ s"""
+ |create table $targetTable
+ |like $sourceTable
+ |using hudi""".stripMargin)
+ }
+
+ // should ignore if the table already exist
+ spark.sql(
+ s"""
+ |create table if not exists $targetTable
+ |like $sourceTable
+ |using hudi""".stripMargin)
+ }
+
+ // 1.2 Test External table
+ withTable(generateTableName) { targetTable =>
+ spark.sql(
+ s"""
+ |create table $targetTable
+ |like $sourceTable
+ |using hudi
+ |location
'${tmp.getCanonicalPath}/$targetTable'""".stripMargin)
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable))
+ assertResult(CatalogTableType.EXTERNAL)(table.tableType)
+ }
+
+
+ // 1.3 New target table options should override source table's
+ withTable(generateTableName) { targetTable =>
+ spark.sql(
+ s"""
+ |create table $targetTable
+ |like $sourceTable
+ |using hudi
+ |tblproperties (primaryKey = 'id')""".stripMargin)
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable))
+ assertResult("id")(table.properties("primaryKey"))
+ }
+ }
+ }
+ }
+
+ // 2. Test create table from an existing non-HUDI table
+ withTempDir { tmp =>
+ withTable(generateTableName) { sourceTable =>
+ spark.sql(
+ s"""
+ |create table $sourceTable (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using parquet
+ | tblproperties (
+ | non.hoodie.property='value'
+ | )
+ | location
'${tmp.getCanonicalPath}/$sourceTable'""".stripMargin)
+
+ withTable(generateTableName) { targetTable =>
+ spark.sql(
+ s"""
+ |create table $targetTable
+ |like $sourceTable
+ |using hudi
+ |tblproperties (
+ | primaryKey = 'id,name',
+ | type = 'cow'
+ |)""".stripMargin)
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable))
+
+ assertResult(targetTable)(table.identifier.table)
+ assertResult("hudi")(table.provider.get)
+ assertResult(CatalogTableType.MANAGED)(table.tableType)
+ assertResult(
+ HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_,
StringType))
+ ++ Seq(
+ StructField("id", IntegerType),
+ StructField("name", StringType),
+ StructField("price", DoubleType),
+ StructField("ts", LongType))
+ )(table.schema.fields)
+
+ // Should not include non.hoodie.property
+ assertResult(2)(table.properties.size)
+ assertResult("cow")(table.properties("type"))
+ assertResult("id,name")(table.properties("primaryKey"))
+ }
+ }
+ }
+ }
+ }
+
test("Test Create Table As Select With Auto record key gen") {
withTempDir { tmp =>
// Create Non-Partitioned table
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index cdb4c5226a6..6fb1719cede 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -68,6 +69,14 @@ object HoodieSpark2CatalystPlanUtils extends
HoodieCatalystPlansUtils {
}
}
+ /**
+ * Don't support CreateTableLike in spark2, since spark2 doesn't support
passing
+ * provider, whereas HUDI can't identify whether the targetTable is a HUDI
table or not.
+ */
+ override def unapplyCreateTableLikeCommand(plan: LogicalPlan):
Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String],
Map[String, String], Boolean)] = {
+ None
+ }
+
def rebaseInsertIntoStatement(iis: LogicalPlan, targetTable: LogicalPlan,
query: LogicalPlan): LogicalPlan =
iis.asInstanceOf[InsertIntoTable].copy(table = targetTable, query = query)
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
index cd8d0ca6a70..a01cce70c1f 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
@@ -18,12 +18,14 @@
package org.apache.spark.sql
import org.apache.hudi.SparkAdapterSupport
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableOutputResolver
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join,
JoinHint, LeafNode, LogicalPlan}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
-import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.execution.command.{CreateTableLikeCommand,
ExplainCommand}
import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -63,6 +65,15 @@ trait HoodieSpark3CatalystPlanUtils extends
HoodieCatalystPlansUtils {
}
}
+
+ override def unapplyCreateTableLikeCommand(plan: LogicalPlan):
Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String],
Map[String, String], Boolean)] = {
+ plan match {
+ case CreateTableLikeCommand(targetTable, sourceTable, fileFormat,
provider, properties, ifNotExists) =>
+ Some(targetTable, sourceTable, fileFormat, provider, properties,
ifNotExists)
+ case _ => None
+ }
+ }
+
def rebaseInsertIntoStatement(iis: LogicalPlan, targetTable: LogicalPlan,
query: LogicalPlan): LogicalPlan =
iis.asInstanceOf[InsertIntoStatement].copy(table = targetTable, query =
query)