This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 0c24adfda59 [SPARK-42398][SQL] Refine default column value DS v2
interface
0c24adfda59 is described below
commit 0c24adfda5945f78aa19539c62d21a7efc265719
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Feb 20 16:30:50 2023 +0800
[SPARK-42398][SQL] Refine default column value DS v2 interface
### What changes were proposed in this pull request?
The current default value DS V2 API is a bit inconsistent. The
`createTable` API only takes `StructType`, so implementations must know the
special metadata key of the default value to access it. The `TableChange` API
has the default value as an individual field.
This API adds a new `Column` interface, which holds both current default
(as a SQL string) and exist default (as a v2 literal). `createTable` API now
takes `Column`. This avoids the need of special metadata key and is also more
extensible when adding more special cols like generated cols. This is also
type-safe and makes sure the exist default is literal. The implementation is
free to decide how to encode and store default values. Note: backward
compatibility is taken care of.
### Why are the changes needed?
better DS v2 API for default value
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes #40049 from cloud-fan/table2.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 70a098c83da4cff2bdc8d15a5a8b513a32564dbc)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/connect/ProtoToParsedPlanTestSuite.scala | 8 +-
.../spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala | 3 +-
.../apache/spark/sql/connector/catalog/Column.java | 90 ++++++++++++++++++
.../sql/connector/catalog/ColumnDefaultValue.java | 84 +++++++++++++++++
.../sql/connector/catalog/StagingTableCatalog.java | 67 ++++++++++++--
.../apache/spark/sql/connector/catalog/Table.java | 11 +++
.../spark/sql/connector/catalog/TableCatalog.java | 23 ++++-
.../spark/sql/connector/catalog/TableChange.java | 20 ++--
.../spark/sql/catalyst/analysis/Analyzer.scala | 4 +-
.../sql/catalyst/analysis/v2ResolutionPlans.scala | 2 +-
.../sql/catalyst/plans/logical/statements.scala | 13 +++
.../plans/logical/v2AlterTableCommands.scala | 4 +-
.../catalyst/util/ResolveDefaultColumnsUtil.scala | 30 ++++--
.../sql/connector/catalog/CatalogV2Implicits.scala | 7 ++
.../sql/connector/catalog/CatalogV2Util.scala | 89 +++++++++++++++++-
.../connector/write/RowLevelOperationTable.scala | 3 +-
.../datasources/v2/DataSourceV2Relation.scala | 3 +-
.../spark/sql/internal/connector/ColumnImpl.scala | 30 ++++++
.../internal/connector/SimpleTableProvider.scala | 3 +-
.../spark/sql/connector/catalog/CatalogSuite.scala | 103 +++++++++++----------
.../sql/connector/catalog/CatalogV2UtilSuite.scala | 4 +-
.../connector/catalog/InMemoryTableCatalog.scala | 10 ++
.../SupportsAtomicPartitionManagementSuite.scala | 4 +-
.../catalog/SupportsPartitionManagementSuite.scala | 7 +-
.../spark/sql/execution/datasources/rules.scala | 1 -
.../execution/datasources/v2/CreateTableExec.scala | 7 +-
.../datasources/v2/DataSourceV2Strategy.scala | 8 +-
.../datasources/v2/FileDataSourceV2.scala | 3 +-
.../datasources/v2/ReplaceTableExec.scala | 13 ++-
.../datasources/v2/ShowCreateTableExec.scala | 3 +-
.../datasources/v2/V2SessionCatalog.scala | 11 ++-
.../datasources/v2/WriteToDataSourceV2Exec.scala | 22 +++--
.../sources/TextSocketSourceProvider.scala | 5 +-
.../spark/sql/streaming/DataStreamReader.scala | 3 +-
.../sql/connector/DataSourceV2DataFrameSuite.scala | 3 +-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 20 ++--
.../sql/connector/DeleteFromTableSuiteBase.scala | 4 +-
.../sql/connector/TestV2SessionCatalogBase.scala | 11 ++-
.../WriteDistributionAndOrderingSuite.scala | 2 +-
.../execution/command/PlanResolutionSuite.scala | 56 ++++++-----
.../datasources/InMemoryTableMetricSuite.scala | 3 +-
.../datasources/v2/V2SessionCatalogSuite.scala | 96 +++++++++----------
.../org/apache/spark/sql/hive/InsertSuite.scala | 6 +-
43 files changed, 670 insertions(+), 229 deletions(-)
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index 18f656748ac..841017ae6c0 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -30,6 +30,7 @@ import
org.apache.spark.sql.catalyst.analysis.{caseSensitiveResolution, Analyzer
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier,
InMemoryCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -67,16 +68,17 @@ class ProtoToParsedPlanTestSuite extends SparkFunSuite with
SharedSparkSession {
protected val inputFilePath: Path = baseResourcePath.resolve("queries")
protected val goldenFilePath: Path =
baseResourcePath.resolve("explain-results")
+ private val emptyProps: util.Map[String, String] =
util.Collections.emptyMap()
private val analyzer = {
val inMemoryCatalog = new InMemoryCatalog
inMemoryCatalog.initialize("primary", CaseInsensitiveStringMap.empty())
- inMemoryCatalog.createNamespace(Array("tempdb"),
util.Collections.emptyMap())
+ inMemoryCatalog.createNamespace(Array("tempdb"), emptyProps)
inMemoryCatalog.createTable(
Identifier.of(Array("tempdb"), "myTable"),
new StructType().add("id", "long"),
- Array.empty,
- util.Collections.emptyMap())
+ Array.empty[Transform],
+ emptyProps)
val catalogManager = new CatalogManager(
inMemoryCatalog,
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala
index b2500a2dbf2..d3f17187a37 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala
@@ -27,6 +27,7 @@ import org.apache.logging.log4j.Level
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange}
+import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
import org.apache.spark.sql.test.SharedSparkSession
@@ -118,7 +119,7 @@ private[v2] trait V2JDBCNamespaceTest extends
SharedSparkSession with DockerInte
// Drop non empty namespace without cascade
catalog.createNamespace(Array("foo"), commentMap.asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
- catalog.createTable(ident1, schema, Array.empty, emptyProps)
+ catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps)
if (supportsDropSchemaRestrict) {
intercept[NonEmptyNamespaceException] {
catalog.dropNamespace(Array("foo"), cascade = false)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
new file mode 100644
index 00000000000..d2c8f25e739
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.internal.connector.ColumnImpl;
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * An interface representing a column of a {@link Table}. It defines basic
properties of a column,
+ * such as name and data type, as well as some advanced ones like default
column value.
+ * <p>
+ * Data Sources do not need to implement it. They should consume it in APIs
like
+ * {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)},
and report it in
+ * {@link Table#columns()} by calling the static {@code create} functions of
this interface to
+ * create it.
+ */
+@Evolving
+public interface Column {
+
+ static Column create(String name, DataType dataType) {
+ return create(name, dataType, true);
+ }
+
+ static Column create(String name, DataType dataType, boolean nullable) {
+ return create(name, dataType, nullable, null, null, null);
+ }
+
+ static Column create(
+ String name,
+ DataType dataType,
+ boolean nullable,
+ String comment,
+ ColumnDefaultValue defaultValue,
+ String metadataInJSON) {
+ return new ColumnImpl(name, dataType, nullable, comment, defaultValue,
metadataInJSON);
+ }
+
+ /**
+ * Returns the name of this table column.
+ */
+ String name();
+
+ /**
+ * Returns the data type of this table column.
+ */
+ DataType dataType();
+
+ /**
+ * Returns true if this column may produce null values.
+ */
+ boolean nullable();
+
+ /**
+ * Returns the comment of this table column. Null means no comment.
+ */
+ @Nullable
+ String comment();
+
+ /**
+ * Returns the default value of this table column. Null means no default
value.
+ */
+ @Nullable
+ ColumnDefaultValue defaultValue();
+
+ /**
+ * Returns the column metadata in JSON format.
+ */
+ @Nullable
+ String metadataInJSON();
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnDefaultValue.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnDefaultValue.java
new file mode 100644
index 00000000000..b8e75c11c81
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnDefaultValue.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Objects;
+import javax.annotation.Nonnull;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.Literal;
+
+/**
+ * A class representing the default value of a column. It contains both the
SQL string and literal
+ * value of the user-specified default value expression. The SQL string should
be re-evaluated for
+ * each table writing command, which may produce different values if the
default value expression is
+ * something like {@code CURRENT_DATE()}. The literal value is used to
back-fill existing data if
+ * new columns with default value are added. Note: the back-fill can be lazy.
The data sources can
+ * remember the column default value and let the reader fill the column value
when reading existing
+ * data that do not have these new columns.
+ */
+@Evolving
+public class ColumnDefaultValue {
+ private String sql;
+ private Literal<?> value;
+
+ public ColumnDefaultValue(String sql, Literal<?> value) {
+ this.sql = sql;
+ this.value = value;
+ }
+
+ /**
+ * Returns the SQL string (Spark SQL dialect) of the default value
expression. This is the
+ * original string contents of the SQL expression specified at the time the
column was created in
+ * a CREATE TABLE, REPLACE TABLE, or ADD COLUMN command. For example, for
+ * "CREATE TABLE t (col INT DEFAULT 40 + 2)", this returns the string
literal "40 + 2" (without
+ * quotation marks).
+ */
+ @Nonnull
+ public String getSql() {
+ return sql;
+ }
+
+ /**
+ * Returns the default value literal. This is the literal value
corresponding to
+ * {@link #getSql()}. For the example in the doc of {@link #getSql()}, this
returns a literal
+ * integer with a value of 42.
+ */
+ @Nonnull
+ public Literal<?> getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof ColumnDefaultValue)) return false;
+ ColumnDefaultValue that = (ColumnDefaultValue) o;
+ return sql.equals(that.sql) && value.equals(that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sql, value);
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnDefaultValue{sql='" + sql + "\', value=" + value + '}';
+ }
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
index 35455a0ed99..4337a7c6152 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
@@ -54,6 +54,19 @@ import org.apache.spark.sql.types.StructType;
@Evolving
public interface StagingTableCatalog extends TableCatalog {
+ /**
+ * Stage the creation of a table, preparing it to be committed into the
metastore.
+ * <p>
+ * This is deprecated. Please override
+ * {@link #stageCreate(Identifier, Column[], Transform[], Map)} instead.
+ */
+ @Deprecated
+ StagedTable stageCreate(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties) throws TableAlreadyExistsException,
NoSuchNamespaceException;
+
/**
* Stage the creation of a table, preparing it to be committed into the
metastore.
* <p>
@@ -64,7 +77,7 @@ public interface StagingTableCatalog extends TableCatalog {
* committed, an exception should be thrown by {@link
StagedTable#commitStagedChanges()}.
*
* @param ident a table identifier
- * @param schema the schema of the new table, as a struct type
+ * @param columns the column of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
@@ -72,11 +85,26 @@ public interface StagingTableCatalog extends TableCatalog {
* @throws UnsupportedOperationException If a requested partition transform
is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not
exist (optional)
*/
- StagedTable stageCreate(
+ default StagedTable stageCreate(
+ Identifier ident,
+ Column[] columns,
+ Transform[] partitions,
+ Map<String, String> properties) throws TableAlreadyExistsException,
NoSuchNamespaceException {
+ return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns),
partitions, properties);
+ }
+
+ /**
+ * Stage the replacement of a table, preparing it to be committed into the
metastore when the
+ * returned table's {@link StagedTable#commitStagedChanges()} is called.
+ * <p>
+ * This is deprecated, please override
+ * {@link #stageReplace(Identifier, StructType, Transform[], Map)} instead.
+ */
+ StagedTable stageReplace(
Identifier ident,
StructType schema,
Transform[] partitions,
- Map<String, String> properties) throws TableAlreadyExistsException,
NoSuchNamespaceException;
+ Map<String, String> properties) throws NoSuchNamespaceException,
NoSuchTableException;
/**
* Stage the replacement of a table, preparing it to be committed into the
metastore when the
@@ -97,7 +125,7 @@ public interface StagingTableCatalog extends TableCatalog {
* operation.
*
* @param ident a table identifier
- * @param schema the schema of the new table, as a struct type
+ * @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
@@ -105,11 +133,27 @@ public interface StagingTableCatalog extends TableCatalog
{
* @throws NoSuchNamespaceException If the identifier namespace does not
exist (optional)
* @throws NoSuchTableException If the table does not exist
*/
- StagedTable stageReplace(
+ default StagedTable stageReplace(
+ Identifier ident,
+ Column[] columns,
+ Transform[] partitions,
+ Map<String, String> properties) throws NoSuchNamespaceException,
NoSuchTableException {
+ return stageReplace(
+ ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions,
properties);
+ }
+
+ /**
+ * Stage the creation or replacement of a table, preparing it to be
committed into the metastore
+ * when the returned table's {@link StagedTable#commitStagedChanges()} is
called.
+ * <p>
+ * This is deprecated, please override
+ * {@link #stageCreateOrReplace(Identifier, Column[], Transform[], Map)}
instead.
+ */
+ StagedTable stageCreateOrReplace(
Identifier ident,
StructType schema,
Transform[] partitions,
- Map<String, String> properties) throws NoSuchNamespaceException,
NoSuchTableException;
+ Map<String, String> properties) throws NoSuchNamespaceException;
/**
* Stage the creation or replacement of a table, preparing it to be
committed into the metastore
@@ -129,16 +173,19 @@ public interface StagingTableCatalog extends TableCatalog
{
* the staged changes are committed but the table doesn't exist at commit
time.
*
* @param ident a table identifier
- * @param schema the schema of the new table, as a struct type
+ * @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws UnsupportedOperationException If a requested partition transform
is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not
exist (optional)
*/
- StagedTable stageCreateOrReplace(
+ default StagedTable stageCreateOrReplace(
Identifier ident,
- StructType schema,
+ Column[] columns,
Transform[] partitions,
- Map<String, String> properties) throws NoSuchNamespaceException;
+ Map<String, String> properties) throws NoSuchNamespaceException {
+ return stageCreateOrReplace(
+ ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions,
properties);
+ }
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java
index 8f7a8740483..b9753a08aba 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java
@@ -51,9 +51,20 @@ public interface Table {
/**
* Returns the schema of this table. If the table is not readable and
doesn't have a schema, an
* empty schema can be returned here.
+ * <p>
+ * This is deprecated. Please override {@link #columns} instead.
*/
+ @Deprecated
StructType schema();
+ /**
+ * Returns the columns of this table. If the table is not readable and
doesn't have a schema, an
+ * empty array can be returned here.
+ */
+ default Column[] columns() {
+ return CatalogV2Util.structTypeToV2Columns(schema());
+ }
+
/**
* Returns the physical partitioning of this table.
*/
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index b04c7e55138..82622d65205 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -159,11 +159,24 @@ public interface TableCatalog extends CatalogPlugin {
}
}
+ /**
+ * Create a table in the catalog.
+ * <p>
+ * This is deprecated. Please override
+ * {@link #createTable(Identifier, Column[], Transform[], Map)} instead.
+ */
+ @Deprecated
+ Table createTable(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties) throws TableAlreadyExistsException,
NoSuchNamespaceException;
+
/**
* Create a table in the catalog.
*
* @param ident a table identifier
- * @param schema the schema of the new table, as a struct type
+ * @param columns the columns of the new table.
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
@@ -171,11 +184,13 @@ public interface TableCatalog extends CatalogPlugin {
* @throws UnsupportedOperationException If a requested partition transform
is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not
exist (optional)
*/
- Table createTable(
+ default Table createTable(
Identifier ident,
- StructType schema,
+ Column[] columns,
Transform[] partitions,
- Map<String, String> properties) throws TableAlreadyExistsException,
NoSuchNamespaceException;
+ Map<String, String> properties) throws TableAlreadyExistsException,
NoSuchNamespaceException {
+ return createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns),
partitions, properties);
+ }
/**
* Apply a set of {@link TableChange changes} to a table in the catalog.
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
index cf735ed9452..609cfab2d56 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
@@ -140,7 +140,7 @@ public interface TableChange {
boolean isNullable,
String comment,
ColumnPosition position,
- String defaultValue) {
+ ColumnDefaultValue defaultValue) {
return new AddColumn(fieldNames, dataType, isNullable, comment, position,
defaultValue);
}
@@ -228,7 +228,7 @@ public interface TableChange {
* If the field does not exist, the change will result in an {@link
IllegalArgumentException}.
*
* @param fieldNames field names of the column to update
- * @param newDefaultValue the new default value
+ * @param newDefaultValue the new default value SQL string (Spark SQL
dialect).
* @return a TableChange for the update
*/
static TableChange updateColumnDefaultValue(String[] fieldNames, String
newDefaultValue) {
@@ -383,7 +383,9 @@ public interface TableChange {
}
/**
- * A TableChange to add a field.
+ * A TableChange to add a field. The implementation may need to back-fill
all the existing data
+ * to add this new column, or remember the column default value specified
here and let the reader
+ * fill the column value when reading existing data that do not have this
new column.
* <p>
* If the field already exists, the change must result in an {@link
IllegalArgumentException}.
* If the new field is nested and its parent does not exist or is not a
struct, the change must
@@ -395,7 +397,7 @@ public interface TableChange {
private final boolean isNullable;
private final String comment;
private final ColumnPosition position;
- private final String defaultValue;
+ private final ColumnDefaultValue defaultValue;
private AddColumn(
String[] fieldNames,
@@ -403,7 +405,7 @@ public interface TableChange {
boolean isNullable,
String comment,
ColumnPosition position,
- String defaultValue) {
+ ColumnDefaultValue defaultValue) {
this.fieldNames = fieldNames;
this.dataType = dataType;
this.isNullable = isNullable;
@@ -436,7 +438,7 @@ public interface TableChange {
}
@Nullable
- public String defaultValue() { return defaultValue; }
+ public ColumnDefaultValue defaultValue() { return defaultValue; }
@Override
public boolean equals(Object o) {
@@ -691,6 +693,12 @@ public interface TableChange {
return fieldNames;
}
+ /**
+ * Returns the column default value SQL string (Spark SQL dialect). The
default value literal
+ * is not provided as updating column default values does not need to
back-fill existing data.
+ * Null means dropping the column default value.
+ */
+ @Nullable
public String newDefaultValue() { return newDefaultValue; }
@Override
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 46cc0b0fbf0..d7cc34d6f15 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1249,7 +1249,7 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
}
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
- StreamingRelationV2(None, table.name, table, options,
table.schema.toAttributes,
+ StreamingRelationV2(None, table.name, table, options,
table.columns.toAttributes,
Some(catalog), Some(ident), v1Fallback))
} else {
SubqueryAlias(
@@ -3722,7 +3722,7 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
colsToAdd(resolvedParentName) = fieldsAdded :+ col.colName
resolvedPosition
}
- val schema = r.table.schema
+ val schema = r.table.columns.asSchema
val resolvedCols = cols.map { col =>
col.path match {
case Some(parent: UnresolvedFieldName) =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index e6be5c23955..2d26e281607 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -181,7 +181,7 @@ object ResolvedTable {
catalog: TableCatalog,
identifier: Identifier,
table: Table): ResolvedTable = {
- val schema =
CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.schema)
+ val schema =
CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema)
ResolvedTable(catalog, identifier, table, schema.toAttributes)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index af70f07bc87..9c639a4bce6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.trees.{LeafLike, UnaryLike}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
+import org.apache.spark.sql.connector.catalog.ColumnDefaultValue
+import org.apache.spark.sql.connector.expressions.LiteralValue
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.DataType
@@ -135,6 +138,16 @@ case class QualifiedColType(
def name: Seq[String] = path.map(_.name).getOrElse(Nil) :+ colName
def resolved: Boolean = path.forall(_.resolved) &&
position.forall(_.resolved)
+
+ def getV2Default: ColumnDefaultValue = {
+ default.map { sql =>
+ val e = ResolveDefaultColumns.analyze(colName, dataType, sql, "ALTER
TABLE")
+ assert(e.resolved && e.foldable,
+ "The existence default value must be a simple SQL string that is
resolved and foldable, " +
+ "but got: " + sql)
+ new ColumnDefaultValue(sql, LiteralValue(e.eval(), dataType))
+ }.orNull
+ }
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index 94f2a570663..eb9d45f06ec 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -120,7 +120,7 @@ case class AddColumns(
col.nullable,
col.comment.orNull,
col.position.map(_.position).orNull,
- col.default.orNull)
+ col.getV2Default)
}
}
@@ -156,7 +156,7 @@ case class ReplaceColumns(
col.nullable,
col.comment.orNull,
null,
- col.default.orNull)
+ col.getV2Default)
}
deleteChanges ++ addChanges
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 667c0988d0c..be7d74b0782 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -151,16 +151,28 @@ object ResolveDefaultColumns {
field: StructField,
statementType: String,
metadataKey: String = CURRENT_DEFAULT_COLUMN_METADATA_KEY): Expression =
{
+ analyze(field.name, field.dataType, field.metadata.getString(metadataKey),
statementType)
+ }
+
+ /**
+ * Parses and analyzes the DEFAULT column SQL string, returning an error
upon failure.
+ *
+ * @return Result of the analysis and constant-folding operation.
+ */
+ def analyze(
+ colName: String,
+ dataType: DataType,
+ defaultSQL: String,
+ statementType: String): Expression = {
// Parse the expression.
- val colText: String = field.metadata.getString(metadataKey)
lazy val parser = new CatalystSqlParser()
val parsed: Expression = try {
- parser.parseExpression(colText)
+ parser.parseExpression(defaultSQL)
} catch {
case ex: ParseException =>
throw new AnalysisException(
s"Failed to execute $statementType command because the destination
table column " +
- s"${field.name} has a DEFAULT value of $colText which fails to
parse as a valid " +
+ s"$colName has a DEFAULT value of $defaultSQL which fails to parse
as a valid " +
s"expression: ${ex.getMessage}")
}
// Check invariants before moving on to analysis.
@@ -170,28 +182,28 @@ object ResolveDefaultColumns {
// Analyze the parse result.
val plan = try {
val analyzer: Analyzer = DefaultColumnAnalyzer
- val analyzed = analyzer.execute(Project(Seq(Alias(parsed,
field.name)()), OneRowRelation()))
+ val analyzed = analyzer.execute(Project(Seq(Alias(parsed, colName)()),
OneRowRelation()))
analyzer.checkAnalysis(analyzed)
ConstantFolding(analyzed)
} catch {
case ex: AnalysisException =>
throw new AnalysisException(
s"Failed to execute $statementType command because the destination
table column " +
- s"${field.name} has a DEFAULT value of $colText which fails to
resolve as a valid " +
+ s"$colName has a DEFAULT value of $defaultSQL which fails to
resolve as a valid " +
s"expression: ${ex.getMessage}")
}
val analyzed: Expression = plan.collectFirst {
case Project(Seq(a: Alias), OneRowRelation()) => a.child
}.get
// Perform implicit coercion from the provided expression type to the
required column type.
- if (field.dataType == analyzed.dataType) {
+ if (dataType == analyzed.dataType) {
analyzed
- } else if (Cast.canUpCast(analyzed.dataType, field.dataType)) {
- Cast(analyzed, field.dataType)
+ } else if (Cast.canUpCast(analyzed.dataType, dataType)) {
+ Cast(analyzed, dataType)
} else {
throw new AnalysisException(
s"Failed to execute $statementType command because the destination
table column " +
- s"${field.name} has a DEFAULT value with type ${field.dataType}, but
the " +
+ s"$colName has a DEFAULT value with type $dataType, but the " +
s"statement provided a value of incompatible type
${analyzed.dataType}")
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 0c9282f9675..12858887bb5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -21,10 +21,12 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIfNeeded
import org.apache.spark.sql.connector.expressions.{BucketTransform,
FieldReference, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
/**
* Conversion helpers for working with v2 [[CatalogPlugin]].
@@ -183,6 +185,11 @@ private[sql] object CatalogV2Implicits {
}
}
+ implicit class ColumnsHelper(columns: Array[Column]) {
+ def asSchema: StructType = CatalogV2Util.v2ColumnsToStructType(columns)
+ def toAttributes: Seq[AttributeReference] = asSchema.toAttributes
+ }
+
def parseColumnPath(name: String): Seq[String] = {
CatalystSqlParser.parseMultipartIdentifier(name)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 72c557c8d77..9b481356fa6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -23,12 +23,14 @@ import java.util.Collections
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion,
NamedRelation, NoSuchDatabaseException, NoSuchFunctionException,
NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
+import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
+import org.apache.spark.sql.connector.expressions.LiteralValue
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, MapType, Metadata,
MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
@@ -142,8 +144,7 @@ private[sql] object CatalogV2Util {
add.fieldNames match {
case Array(name) =>
val field = StructField(name, add.dataType, nullable =
add.isNullable)
- val fieldWithDefault: StructField =
-
Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
+ val fieldWithDefault: StructField =
encodeDefaultValue(add.defaultValue(), field)
val fieldWithComment: StructField =
Option(add.comment).map(fieldWithDefault.withComment).getOrElse(fieldWithDefault)
addField(schema, fieldWithComment, add.position(),
tableProvider, statementType, true)
@@ -151,8 +152,7 @@ private[sql] object CatalogV2Util {
replace(schema, names.init, parent => parent.dataType match {
case parentType: StructType =>
val field = StructField(names.last, add.dataType, nullable =
add.isNullable)
- val fieldWithDefault: StructField =
-
Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
+ val fieldWithDefault: StructField =
encodeDefaultValue(add.defaultValue(), field)
val fieldWithComment: StructField =
Option(add.comment).map(fieldWithDefault.withComment)
.getOrElse(fieldWithDefault)
@@ -431,4 +431,83 @@ private[sql] object CatalogV2Util {
.getOrElse(catalogManager.v2SessionCatalog)
.asTableCatalog
}
+
+ /**
+ * Converts DS v2 columns to StructType, which encodes column comment and
default value to
+ * StructField metadata. This is mainly used to define the schema of v2
scan, w.r.t. the columns
+ * of the v2 table.
+ */
+ def v2ColumnsToStructType(columns: Array[Column]): StructType = {
+ StructType(columns.map(v2ColumnToStructField))
+ }
+
+ private def v2ColumnToStructField(col: Column): StructField = {
+ val metadata =
Option(col.metadataInJSON()).map(Metadata.fromJson).getOrElse(Metadata.empty)
+ var f = StructField(col.name(), col.dataType(), col.nullable(), metadata)
+ Option(col.comment()).foreach { comment =>
+ f = f.withComment(comment)
+ }
+ Option(col.defaultValue()).foreach { default =>
+ f = encodeDefaultValue(default, f)
+ }
+ f
+ }
+
+ // For built-in file sources, we encode the default value in StructField
metadata. An analyzer
+ // rule will check the special metadata and change the DML input plan to
fill the default value.
+ private def encodeDefaultValue(defaultValue: ColumnDefaultValue, f:
StructField): StructField = {
+ Option(defaultValue).map { default =>
+ // The "exist default" is used to back-fill the existing data when new
columns are added, and
+ // should be a fixed value which was evaluated at the definition time.
For example, if the
+ // default value is `current_date()`, the "exist default" should be the
value of
+ // `current_date()` when the column was defined/altered, instead of when
back-fall happens.
+ // Note: the back-fill here is a logical concept. The data source can
keep the existing
+ // data unchanged and let the data reader to return "exist
default" for missing
+ // columns.
+ val existingDefault = Literal(default.getValue.value(),
default.getValue.dataType()).sql
+
f.withExistenceDefaultValue(existingDefault).withCurrentDefaultValue(default.getSql)
+ }.getOrElse(f)
+ }
+
+ /**
+ * Converts a StructType to DS v2 columns, which decodes the StructField
metadata to v2 column
+ * comment and default value. This is mainly used to generate DS v2 columns
from table schema in
+ * DDL commands, so that Spark can pass DS v2 columns to DS v2 createTable
and related APIs.
+ */
+ def structTypeToV2Columns(schema: StructType): Array[Column] = {
+ schema.fields.map(structFieldToV2Column)
+ }
+
+ private def structFieldToV2Column(f: StructField): Column = {
+ def createV2Column(defaultValue: ColumnDefaultValue, metadata: Metadata):
Column = {
+ val metadataJSON = if (metadata == Metadata.empty) {
+ null
+ } else {
+ metadata.json
+ }
+ Column.create(
+ f.name, f.dataType, f.nullable, f.getComment().orNull, defaultValue,
metadataJSON)
+ }
+ if (f.getCurrentDefaultValue().isDefined &&
f.getExistenceDefaultValue().isDefined) {
+ val e = analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY)
+ assert(e.resolved && e.foldable,
+ "The existence default value must be a simple SQL string that is
resolved and foldable, " +
+ "but got: " + f.getExistenceDefaultValue().get)
+ val defaultValue = new ColumnDefaultValue(
+ f.getCurrentDefaultValue().get, LiteralValue(e.eval(), f.dataType))
+ val cleanedMetadata = new MetadataBuilder()
+ .withMetadata(f.metadata)
+ .remove("comment")
+ .remove(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
+ .remove(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
+ .build()
+ createV2Column(defaultValue, cleanedMetadata)
+ } else {
+ val cleanedMetadata = new MetadataBuilder()
+ .withMetadata(f.metadata)
+ .remove("comment")
+ .build()
+ createV2Column(null, cleanedMetadata)
+ }
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala
index d1f7ba000c6..07acacd9a35 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.write
import java.util
-import org.apache.spark.sql.connector.catalog.{SupportsRead,
SupportsRowLevelOperations, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.connector.catalog.{Column, SupportsRead,
SupportsRowLevelOperations, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -38,6 +38,7 @@ private[sql] case class RowLevelOperationTable(
override def name: String = table.name
override def schema: StructType = table.schema
+ override def columns: Array[Column] = table.columns()
override def capabilities: util.Set[TableCapability] = table.capabilities
override def toString: String = table.toString
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 51ef3dda817..c170b7ae672 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -189,9 +189,10 @@ object DataSourceV2Relation {
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
options: CaseInsensitiveStringMap): DataSourceV2Relation = {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
// The v2 source may return schema containing char/varchar type. We
replace char/varchar
// with "annotated" string type here as the query engine doesn't support
char/varchar yet.
- val schema =
CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.schema)
+ val schema =
CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema)
DataSourceV2Relation(table, schema.toAttributes, catalog, identifier,
options)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
new file mode 100644
index 00000000000..5ab3f83eeae
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue}
+import org.apache.spark.sql.types.DataType
+
+// The standard concrete implementation of data source V2 column.
+case class ColumnImpl(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean,
+ comment: String,
+ defaultValue: ColumnDefaultValue,
+ metadataInJSON: String) extends Column
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala
index 7bfe1df1117..f8b237195fa 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala
@@ -37,7 +37,8 @@ trait SimpleTableProvider extends TableProvider {
}
override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
- getOrLoadTable(options).schema()
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+ getOrLoadTable(options).columns.asSchema
}
override def getTable(
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
index 032b04bb887..6be50f36c84 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
@@ -28,7 +28,7 @@ import
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction,
ScalarFunction, UnboundFunction}
-import org.apache.spark.sql.connector.expressions.LogicalExpressions
+import org.apache.spark.sql.connector.expressions.{LogicalExpressions,
Transform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType,
LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -37,6 +37,7 @@ class CatalogSuite extends SparkFunSuite {
import CatalogV2Implicits._
private val emptyProps: util.Map[String, String] =
Collections.emptyMap[String, String]
+ private val emptyTrans: Array[Transform] = Array.empty
private val schema: StructType = new StructType()
.add("id", IntegerType)
.add("data", StringType)
@@ -74,13 +75,13 @@ class CatalogSuite extends SparkFunSuite {
intercept[NoSuchNamespaceException](catalog.listTables(Array("ns")))
- catalog.createTable(ident1, schema, Array.empty, emptyProps)
+ catalog.createTable(ident1, schema, emptyTrans, emptyProps)
assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
intercept[NoSuchNamespaceException](catalog.listTables(Array("ns2")))
- catalog.createTable(ident3, schema, Array.empty, emptyProps)
- catalog.createTable(ident2, schema, Array.empty, emptyProps)
+ catalog.createTable(ident3, schema, emptyTrans, emptyProps)
+ catalog.createTable(ident2, schema, emptyTrans, emptyProps)
assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
@@ -100,7 +101,7 @@ class CatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
assert(parsed == Seq("test", "`", ".", "test_table"))
@@ -118,7 +119,7 @@ class CatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
assert(parsed == Seq("test", "`", ".", "test_table"))
@@ -133,10 +134,10 @@ class CatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
val exc = intercept[TableAlreadyExistsException] {
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
}
checkErrorTableAlreadyExists(exc, testIdentQuoted)
@@ -149,7 +150,7 @@ class CatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(testIdent))
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(catalog.tableExists(testIdent))
@@ -161,7 +162,7 @@ class CatalogSuite extends SparkFunSuite {
test("loadTable") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
val loaded = catalog.loadTable(testIdent)
assert(table.name == loaded.name)
@@ -182,7 +183,7 @@ class CatalogSuite extends SparkFunSuite {
test("invalidateTable") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
catalog.invalidateTable(testIdent)
val loaded = catalog.loadTable(testIdent)
@@ -203,7 +204,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: add property") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.properties.asScala == Map())
@@ -222,7 +223,7 @@ class CatalogSuite extends SparkFunSuite {
val properties = new util.HashMap[String, String]()
properties.put("prop-1", "1")
- val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
assert(table.properties.asScala == Map("prop-1" -> "1"))
@@ -241,7 +242,7 @@ class CatalogSuite extends SparkFunSuite {
val properties = new util.HashMap[String, String]()
properties.put("prop-1", "1")
- val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
assert(table.properties.asScala == Map("prop-1" -> "1"))
@@ -257,7 +258,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: remove missing property") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.properties.asScala == Map())
@@ -273,7 +274,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: add top-level column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -285,7 +286,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: add required column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -298,7 +299,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: add column with comment") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -315,7 +316,7 @@ class CatalogSuite extends SparkFunSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -330,7 +331,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: add column to primitive field fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -348,7 +349,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: add field to missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -364,7 +365,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: update column data type") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -380,7 +381,7 @@ class CatalogSuite extends SparkFunSuite {
val originalSchema = new StructType()
.add("id", IntegerType, nullable = false)
.add("data", StringType)
- val table = catalog.createTable(testIdent, originalSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, originalSchema, emptyTrans,
emptyProps)
assert(table.schema == originalSchema)
@@ -394,7 +395,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: update missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -410,7 +411,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: add comment") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -426,7 +427,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: replace comment") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -445,7 +446,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: add comment to missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -461,7 +462,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: rename top-level column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -478,7 +479,7 @@ class CatalogSuite extends SparkFunSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -497,7 +498,7 @@ class CatalogSuite extends SparkFunSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -513,7 +514,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: rename missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -532,7 +533,7 @@ class CatalogSuite extends SparkFunSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -549,7 +550,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: delete top-level column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -566,7 +567,7 @@ class CatalogSuite extends SparkFunSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -582,7 +583,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterTable: delete missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -604,7 +605,7 @@ class CatalogSuite extends SparkFunSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -635,7 +636,7 @@ class CatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(testIdent))
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(catalog.tableExists(testIdent))
@@ -667,7 +668,7 @@ class CatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNew))
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(catalog.tableExists(testIdent))
catalog.renameTable(testIdent, testIdentNew)
@@ -692,8 +693,8 @@ class CatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNew))
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
- catalog.createTable(testIdentNew, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdentNew, schema, emptyTrans, emptyProps)
assert(catalog.tableExists(testIdent))
assert(catalog.tableExists(testIdentNew))
@@ -719,8 +720,8 @@ class CatalogSuite extends SparkFunSuite {
val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1")
val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2")
- catalog.createTable(ident1, schema, Array.empty, emptyProps)
- catalog.createTable(ident2, schema, Array.empty, emptyProps)
+ catalog.createTable(ident1, schema, emptyTrans, emptyProps)
+ catalog.createTable(ident2, schema, emptyTrans, emptyProps)
assert(catalog.listNamespaces === Array(Array("ns1")))
assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
@@ -734,8 +735,8 @@ class CatalogSuite extends SparkFunSuite {
val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2")
catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava)
- catalog.createTable(ident1, schema, Array.empty, emptyProps)
- catalog.createTable(ident2, schema, Array.empty, emptyProps)
+ catalog.createTable(ident1, schema, emptyTrans, emptyProps)
+ catalog.createTable(ident2, schema, emptyTrans, emptyProps)
assert(catalog.listNamespaces === Array(Array("ns1")))
assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
@@ -756,7 +757,7 @@ class CatalogSuite extends SparkFunSuite {
test("loadNamespaceMetadata: no metadata, table exists") {
val catalog = newCatalog()
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
val metadata = catalog.loadNamespaceMetadata(testNs)
@@ -777,7 +778,7 @@ class CatalogSuite extends SparkFunSuite {
val catalog = newCatalog()
catalog.createNamespace(testNs, Map("property" -> "value").asJava)
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
val metadata = catalog.loadNamespaceMetadata(testNs)
@@ -810,7 +811,7 @@ class CatalogSuite extends SparkFunSuite {
test("createNamespace: fail if namespace already exists from table") {
val catalog = newCatalog()
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(catalog.namespaceExists(testNs) === true)
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty)
@@ -852,7 +853,7 @@ class CatalogSuite extends SparkFunSuite {
val catalog = newCatalog()
catalog.createNamespace(testNs, Map("property" -> "value").asJava)
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(catalog.dropNamespace(testNs, cascade = true))
@@ -882,7 +883,7 @@ class CatalogSuite extends SparkFunSuite {
test("alterNamespace: create metadata if missing and table exists") {
val catalog = newCatalog()
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
catalog.alterNamespace(testNs, NamespaceChange.setProperty("property",
"value"))
@@ -902,7 +903,7 @@ class CatalogSuite extends SparkFunSuite {
test("truncate non-partitioned table") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
.asInstanceOf[InMemoryTable]
table.withData(Array(
new BufferedRows("3").withRow(InternalRow(0, "abc", "3")),
@@ -920,7 +921,7 @@ class CatalogSuite extends SparkFunSuite {
new StructType()
.add("col0", IntegerType)
.add("part0", IntegerType),
-
Array(LogicalExpressions.identity(LogicalExpressions.parseReference("part0"))),
+
Array[Transform](LogicalExpressions.identity(LogicalExpressions.parseReference("part0"))),
util.Collections.emptyMap[String, String])
val partTable = table.asInstanceOf[InMemoryPartitionTable]
val partIdent = InternalRow.apply(0)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala
index da5cfab8be3..eda401ceb6b 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala
@@ -21,14 +21,14 @@ import org.mockito.Mockito.{mock, when}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.IntegerType
class CatalogV2UtilSuite extends SparkFunSuite {
test("Load relation should encode the identifiers for V2Relations") {
val testCatalog = mock(classOf[TableCatalog])
val ident = mock(classOf[Identifier])
val table = mock(classOf[Table])
- when(table.schema()).thenReturn(new StructType().add("i", "int"))
+ when(table.columns()).thenReturn(Array(Column.create("i", IntegerType)))
when(testCatalog.loadTable(ident)).thenReturn(table)
val r = CatalogV2Util.loadRelation(testCatalog, ident)
assert(r.isDefined)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 06ee588329c..50bea2b8d2f 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -84,6 +84,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
invalidatedTables.add(ident)
}
+ // TODO: remove it when no tests calling this deprecated method.
override def createTable(
ident: Identifier,
schema: StructType,
@@ -93,6 +94,15 @@ class BasicInMemoryTableCatalog extends TableCatalog {
Array.empty, None)
}
+ override def createTable(
+ ident: Identifier,
+ columns: Array[Column],
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+ val schema = CatalogV2Util.v2ColumnsToStructType(columns)
+ createTable(ident, schema, partitions, properties)
+ }
+
def createTable(
ident: Identifier,
schema: StructType,
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
index 0590ca721cc..90ed106d8ed 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
@@ -22,7 +22,7 @@ import java.util
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException,
PartitionsAlreadyExistException}
-import org.apache.spark.sql.connector.expressions.{LogicalExpressions,
NamedReference}
+import org.apache.spark.sql.connector.expressions.{LogicalExpressions,
NamedReference, Transform}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -41,7 +41,7 @@ class SupportsAtomicPartitionManagementSuite extends
SparkFunSuite {
.add("id", IntegerType)
.add("data", StringType)
.add("dt", StringType),
- Array(LogicalExpressions.identity(ref("dt"))),
+ Array[Transform](LogicalExpressions.identity(ref("dt"))),
util.Collections.emptyMap[String, String])
newCatalog
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
index ddd08185527..40114d063aa 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException,
PartitionsAlreadyExistException}
-import org.apache.spark.sql.connector.expressions.{LogicalExpressions,
NamedReference}
+import org.apache.spark.sql.connector.expressions.{LogicalExpressions,
NamedReference, Transform}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -43,7 +43,7 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
.add("id", IntegerType)
.add("data", StringType)
.add("dt", StringType),
- Array(LogicalExpressions.identity(ref("dt"))),
+ Array[Transform](LogicalExpressions.identity(ref("dt"))),
util.Collections.emptyMap[String, String])
newCatalog
}
@@ -164,7 +164,8 @@ class SupportsPartitionManagementSuite extends
SparkFunSuite {
.add("col0", IntegerType)
.add("part0", IntegerType)
.add("part1", StringType),
- Array(LogicalExpressions.identity(ref("part0")),
LogicalExpressions.identity(ref("part1"))),
+ Array[Transform](
+ LogicalExpressions.identity(ref("part0")),
LogicalExpressions.identity(ref("part1"))),
util.Collections.emptyMap[String, String])
val partTable = table.asInstanceOf[InMemoryPartitionTable]
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index dc4fed49c1c..6fd47a534e9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -244,7 +244,6 @@ case class PreprocessTableCreation(sparkSession:
SparkSession) extends Rule[Logi
case create: V2CreateTablePlan if create.childrenResolved =>
val schema = create.tableSchema
val partitioning = create.partitioning
- val identifier = create.tableName
val isCaseSensitive = conf.caseSensitiveAnalysis
// Check that columns are not duplicated in the schema
val flattenedSchema = SchemaUtils.explodeNestedFieldNames(schema)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
index abc6bc60d96..55057844328 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
@@ -23,15 +23,14 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.TableSpec
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier,
TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types.StructType
case class CreateTableExec(
catalog: TableCatalog,
identifier: Identifier,
- tableSchema: StructType,
+ columns: Array[Column],
partitioning: Seq[Transform],
tableSpec: TableSpec,
ignoreIfExists: Boolean) extends LeafV2CommandExec {
@@ -42,7 +41,7 @@ case class CreateTableExec(
override protected def run(): Seq[InternalRow] = {
if (!catalog.tableExists(identifier)) {
try {
- catalog.createTable(identifier, tableSchema, partitioning.toArray,
tableProperties.asJava)
+ catalog.createTable(identifier, columns, partitioning.toArray,
tableProperties.asJava)
} catch {
case _: TableAlreadyExistsException if ignoreIfExists =>
logWarning(s"Table ${identifier.quoted} was created concurrently.
Ignoring.")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 757b66e1534..b45de06371c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -33,6 +33,7 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{toPrettySQL, ResolveDefaultColumns,
V2ExpressionBuilder}
import org.apache.spark.sql.connector.catalog.{Identifier,
StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces,
SupportsPartitionManagement, SupportsWrite, Table, TableCapability,
TableCatalog, TruncatableTable}
+import
org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference,
LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not =>
V2Not, Or => V2Or, Predicate}
@@ -177,7 +178,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
schema, tableSpec.provider, "CREATE TABLE", false)
- CreateTableExec(catalog.asTableCatalog, ident, newSchema,
+ CreateTableExec(catalog.asTableCatalog, ident,
structTypeToV2Columns(newSchema),
partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query,
tableSpec,
@@ -200,12 +201,13 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
schema, tableSpec.provider, "CREATE TABLE", false)
+ val v2Columns = structTypeToV2Columns(newSchema)
catalog match {
case staging: StagingTableCatalog =>
- AtomicReplaceTableExec(staging, ident, newSchema, parts,
+ AtomicReplaceTableExec(staging, ident, v2Columns, parts,
qualifyLocInTableSpec(tableSpec), orCreate = orCreate,
invalidateCache) :: Nil
case _ =>
- ReplaceTableExec(catalog.asTableCatalog, ident, newSchema, parts,
+ ReplaceTableExec(catalog.asTableCatalog, ident, v2Columns, parts,
qualifyLocInTableSpec(tableSpec), orCreate = orCreate,
invalidateCache) :: Nil
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index 0bd25064e35..3cb1a74417d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -90,8 +90,9 @@ trait FileDataSourceV2 extends TableProvider with
DataSourceRegister {
private var t: Table = null
override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
if (t == null) t = getTable(options)
- t.schema()
+ t.columns.asSchema
}
// TODO: implement a light-weight partition inference which only looks at
the path of one leaf
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
index ea221980fed..55d97577d57 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
@@ -23,16 +23,15 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.TableSpec
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier,
StagedTable, StagingTableCatalog, Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
case class ReplaceTableExec(
catalog: TableCatalog,
ident: Identifier,
- tableSchema: StructType,
+ columns: Array[Column],
partitioning: Seq[Transform],
tableSpec: TableSpec,
orCreate: Boolean,
@@ -48,7 +47,7 @@ case class ReplaceTableExec(
} else if (!orCreate) {
throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
}
- catalog.createTable(ident, tableSchema, partitioning.toArray,
tableProperties.asJava)
+ catalog.createTable(ident, columns, partitioning.toArray,
tableProperties.asJava)
Seq.empty
}
@@ -58,7 +57,7 @@ case class ReplaceTableExec(
case class AtomicReplaceTableExec(
catalog: StagingTableCatalog,
identifier: Identifier,
- tableSchema: StructType,
+ columns: Array[Column],
partitioning: Seq[Transform],
tableSpec: TableSpec,
orCreate: Boolean,
@@ -73,11 +72,11 @@ case class AtomicReplaceTableExec(
}
val staged = if (orCreate) {
catalog.stageCreateOrReplace(
- identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
+ identifier, columns, partitioning.toArray, tableProperties.asJava)
} else if (catalog.tableExists(identifier)) {
try {
catalog.stageReplace(
- identifier, tableSchema, partitioning.toArray,
tableProperties.asJava)
+ identifier, columns, partitioning.toArray, tableProperties.asJava)
} catch {
case e: NoSuchTableException =>
throw
QueryCompilationErrors.cannotReplaceMissingTableError(identifier, Some(e))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
index ec40ad70b79..5712159ddc8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
@@ -59,7 +59,8 @@ case class ShowCreateTableExec(
}
private def showTableDataColumns(table: Table, builder: StringBuilder): Unit
= {
- val columns = CharVarcharUtils.getRawSchema(table.schema(),
conf).fields.map(_.toDDL)
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+ val columns = CharVarcharUtils.getRawSchema(table.columns.asSchema,
conf).fields.map(_.toDDL)
builder ++= concatByMultiLines(columns)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index b9afe71d243..461e948b029 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper,
TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException,
NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable,
CatalogTableType, CatalogUtils, SessionCatalog}
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util,
FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table,
TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util,
Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces,
Table, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
@@ -92,6 +92,15 @@ class V2SessionCatalog(catalog: SessionCatalog)
catalog.refreshTable(ident.asTableIdentifier)
}
+ override def createTable(
+ ident: Identifier,
+ columns: Array[Column],
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+ createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns),
partitions, properties)
+ }
+
+ // TODO: remove it when no tests calling this deprecated method.
override def createTable(
ident: Identifier,
schema: StructType,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 490b7082223..c53c603ffaa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -86,8 +86,9 @@ case class CreateTableAsSelectExec(
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
- val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable
- val table = catalog.createTable(ident, schema,
+ val columns = CatalogV2Util.structTypeToV2Columns(
+ CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
+ val table = catalog.createTable(ident, columns,
partitioning.toArray, properties.asJava)
writeToTable(catalog, table, writeOptions, ident)
}
@@ -125,9 +126,10 @@ case class AtomicCreateTableAsSelectExec(
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
- val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable
+ val columns = CatalogV2Util.structTypeToV2Columns(
+ CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
val stagedTable = catalog.stageCreate(
- ident, schema, partitioning.toArray, properties.asJava)
+ ident, columns, partitioning.toArray, properties.asJava)
writeToTable(catalog, stagedTable, writeOptions, ident)
}
@@ -174,9 +176,10 @@ case class ReplaceTableAsSelectExec(
} else if (!orCreate) {
throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
}
- val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable
+ val columns = CatalogV2Util.structTypeToV2Columns(
+ CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
val table = catalog.createTable(
- ident, schema, partitioning.toArray, properties.asJava)
+ ident, columns, partitioning.toArray, properties.asJava)
writeToTable(catalog, table, writeOptions, ident)
}
@@ -210,18 +213,19 @@ case class AtomicReplaceTableAsSelectExec(
val properties = CatalogV2Util.convertTableProperties(tableSpec)
override protected def run(): Seq[InternalRow] = {
- val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable
+ val columns = CatalogV2Util.structTypeToV2Columns(
+ CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
if (catalog.tableExists(ident)) {
val table = catalog.loadTable(ident)
invalidateCache(catalog, table, ident)
}
val staged = if (orCreate) {
catalog.stageCreateOrReplace(
- ident, schema, partitioning.toArray, properties.asJava)
+ ident, columns, partitioning.toArray, properties.asJava)
} else if (catalog.tableExists(ident)) {
try {
catalog.stageReplace(
- ident, schema, partitioning.toArray, properties.asJava)
+ ident, columns, partitioning.toArray, properties.asJava)
} catch {
case e: NoSuchTableException =>
throw QueryCompilationErrors.cannotReplaceMissingTableError(ident,
Some(e))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index d9a3a074ce6..1ab88cd41d8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -86,7 +86,10 @@ class TextSocketTable(host: String, port: Int,
numPartitions: Int, includeTimest
}
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= () => new Scan {
- override def readSchema(): StructType = schema()
+ override def readSchema(): StructType = {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+ columns.asSchema
+ }
override def toMicroBatchStream(checkpointLocation: String):
MicroBatchStream = {
new TextSocketMicroBatchStream(host, port, numPartitions)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index d4621468f84..13f7695947e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -180,11 +180,12 @@ final class DataStreamReader private[sql](sparkSession:
SparkSession) extends Lo
import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
table match {
case _: SupportsRead if table.supportsAny(MICRO_BATCH_READ,
CONTINUOUS_READ) =>
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
Some(provider), source, table, dsOptions,
- table.schema.toAttributes, None, None, v1Relation))
+ table.columns.asSchema.toAttributes, None, None, v1Relation))
// fallback to v1
// TODO (SPARK-27483): we should move this fallback logic to an
analyzer rule.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 158e1634d58..3678f29ab49 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame,
Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData,
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
@@ -178,7 +179,7 @@ class DataSourceV2DataFrameSuite
testCatalog.createTable(
Identifier.of(Array(), "table_name"),
new StructType().add("i", "interval"),
- Array.empty, Collections.emptyMap[String, String])
+ Array.empty[Transform], Collections.emptyMap[String, String])
val df = sql(s"select interval 1 millisecond as i")
val v2Writer = df.writeTo("testcat.table_name")
val e1 = intercept[AnalysisException](v2Writer.append())
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 38bd24356f1..83fe2ba51b5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -31,10 +31,11 @@ import
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.statsEstimation.StatsEstimationTestBase
-import org.apache.spark.sql.catalyst.util.{DateTimeUtils,
ResolveDefaultColumns}
-import org.apache.spark.sql.connector.catalog._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, _}
import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import
org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
+import org.apache.spark.sql.connector.expressions.LiteralValue
import org.apache.spark.sql.errors.QueryErrorsBase
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -45,7 +46,7 @@ import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE,
PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION}
import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.sources.SimpleScanSource
-import org.apache.spark.sql.types.{LongType, MetadataBuilder, StringType,
StructField, StructType}
+import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
@@ -585,7 +586,7 @@ class DataSourceV2SQLSuiteV1Filter
assert(maybeReplacedTable === table, "Table should not have changed.")
}
- test("ReplaceTable: Erases the table contents and changes the metadata.") {
+ test("ReplaceTable: Erases the table contents and changes the metadata") {
spark.sql(s"CREATE TABLE testcat.table_name USING $v2Source AS SELECT id,
data FROM source")
val testCatalog = catalog("testcat").asTableCatalog
@@ -598,14 +599,11 @@ class DataSourceV2SQLSuiteV1Filter
assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty,
"Replaced table should have no rows after committing.")
- assert(replaced.schema().fields.length === 1,
+ assert(replaced.columns.length === 1,
"Replaced table should have new schema.")
- val actual = replaced.schema().fields(0)
- val expected = StructField("id", LongType, nullable = false,
- new MetadataBuilder().putString(
- ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "41 + 1")
- .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
"CAST(42 AS BIGINT)")
- .build())
+ val actual = replaced.columns.head
+ val expected = ColumnV2.create("id", LongType, false, null,
+ new ColumnDefaultValue("41 + 1", LiteralValue(42L, LongType)), null)
assert(actual === expected,
"Replaced table should have new schema with DEFAULT column metadata.")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
index 14b951e66db..781e0f96eaf 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
@@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders,
QueryTest, Row}
import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryRowLevelOperationTable, InMemoryRowLevelOperationTableCatalog}
import org.apache.spark.sql.connector.expressions.LogicalExpressions._
+import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec,
ReplaceDataExec, WriteDeltaExec}
@@ -564,7 +565,8 @@ abstract class DeleteFromTableSuiteBase
protected def createTable(schemaString: String): Unit = {
val schema = StructType.fromDDL(schemaString)
- catalog.createTable(ident, schema, Array(identity(reference(Seq("dep")))),
extraTableProps)
+ catalog.createTable(
+ ident, schema, Array[Transform](identity(reference(Seq("dep")))),
extraTableProps)
}
protected def createAndInitTable(schemaString: String, jsonData: String):
Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
index 0a0aaa80219..46586c622db 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
-import org.apache.spark.sql.connector.catalog.{DelegatingCatalogExtension,
Identifier, Table, TableCatalog, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
@@ -64,6 +64,15 @@ private[connector] trait TestV2SessionCatalogBase[T <:
Table] extends Delegating
}
}
+ override def createTable(
+ ident: Identifier,
+ columns: Array[Column],
+ partitions: Array[Transform],
+ properties: java.util.Map[String, String]): Table = {
+ createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns),
partitions, properties)
+ }
+
+ // TODO: remove it when no tests calling this deprecated method.
override def createTable(
ident: Identifier,
schema: StructType,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index b262e405d4e..f7905daa20a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -974,7 +974,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
}
test("continuous mode allows unspecified distribution and empty ordering") {
- catalog.createTable(ident, schema, Array.empty, emptyProps)
+ catalog.createTable(ident, schema, Array.empty[Transform], emptyProps)
withTempDir { checkpointDir =>
val inputData = ContinuousMemoryStream[(Long, String)]
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 44b4166c07a..2cf4792b8c1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -34,17 +34,17 @@ import
org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn,
AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect,
DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction,
InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable,
OneRowRelation, OverwriteByExpression, OverwritePartitionsDynamic, Project,
SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias,
UnsetTableProperties, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
import org.apache.spark.sql.connector.FakeV2Provider
-import org.apache.spark.sql.connector.catalog.{CatalogManager,
CatalogNotFoundException, Identifier, SupportsDelete, Table, TableCapability,
TableCatalog, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager,
CatalogNotFoundException, Column, ColumnDefaultValue, Identifier,
SupportsDelete, Table, TableCapability, TableCatalog, V1Table}
import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
-import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform}
import org.apache.spark.sql.execution.datasources.{CreateTable =>
CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE,
PartitionOverwriteMode}
import org.apache.spark.sql.sources.SimpleScanSource
-import org.apache.spark.sql.types.{BooleanType, CharType, DoubleType,
IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{BooleanType, CharType, DoubleType,
IntegerType, LongType, StringType, StructField, StructType, VarcharType}
+import org.apache.spark.unsafe.types.UTF8String
class PlanResolutionSuite extends AnalysisTest {
import CatalystSqlParser._
@@ -54,21 +54,24 @@ class PlanResolutionSuite extends AnalysisTest {
private val table: Table = {
val t = mock(classOf[SupportsDelete])
- when(t.schema()).thenReturn(new StructType().add("i", "int").add("s",
"string"))
+ when(t.columns()).thenReturn(
+ Array(Column.create("i", IntegerType), Column.create("s", StringType)))
when(t.partitioning()).thenReturn(Array.empty[Transform])
t
}
private val table1: Table = {
val t = mock(classOf[Table])
- when(t.schema()).thenReturn(new StructType().add("s", "string").add("i",
"int"))
+ when(t.columns()).thenReturn(
+ Array(Column.create("s", StringType), Column.create("i", IntegerType)))
when(t.partitioning()).thenReturn(Array.empty[Transform])
t
}
private val table2: Table = {
val t = mock(classOf[Table])
- when(t.schema()).thenReturn(new StructType().add("i", "int").add("x",
"string"))
+ when(t.columns()).thenReturn(
+ Array(Column.create("i", IntegerType), Column.create("x", StringType)))
when(t.partitioning()).thenReturn(Array.empty[Transform])
t
}
@@ -76,53 +79,46 @@ class PlanResolutionSuite extends AnalysisTest {
private val tableWithAcceptAnySchemaCapability: Table = {
val t = mock(classOf[Table])
when(t.name()).thenReturn("v2TableWithAcceptAnySchemaCapability")
- when(t.schema()).thenReturn(new StructType().add("i", "int"))
+ when(t.columns()).thenReturn(Array(Column.create("i", IntegerType)))
when(t.capabilities()).thenReturn(Collections.singleton(TableCapability.ACCEPT_ANY_SCHEMA))
t
}
private val charVarcharTable: Table = {
val t = mock(classOf[Table])
- when(t.schema()).thenReturn(new StructType().add("c1",
"char(5)").add("c2", "varchar(5)"))
+ when(t.columns()).thenReturn(
+ Array(Column.create("c1", CharType(5)), Column.create("c2",
VarcharType(5))))
when(t.partitioning()).thenReturn(Array.empty[Transform])
t
}
private val defaultValues: Table = {
val t = mock(classOf[Table])
- when(t.schema()).thenReturn(
- new StructType()
- .add("i", BooleanType, true,
- new MetadataBuilder()
-
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "true")
-
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
"true").build())
- .add("s", IntegerType, true,
- new MetadataBuilder()
-
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "42")
-
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
"42").build()))
+ val default1 = new ColumnDefaultValue("true", LiteralValue(true,
BooleanType))
+ val default2 = new ColumnDefaultValue("42", LiteralValue(42, IntegerType))
+ when(t.columns()).thenReturn(Array(
+ Column.create("i", BooleanType, true, null, default1, null),
+ Column.create("s", IntegerType, true, null, default2, null)))
when(t.partitioning()).thenReturn(Array.empty[Transform])
t
}
private val defaultValues2: Table = {
val t = mock(classOf[Table])
- when(t.schema()).thenReturn(
- new StructType()
- .add("i", StringType)
- .add("e", StringType, true,
- new MetadataBuilder()
-
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'abc'")
-
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
"'abc'").build()))
+ val default = new ColumnDefaultValue(
+ "'abc'", LiteralValue(UTF8String.fromString("abc"), StringType))
+ when(t.columns()).thenReturn(Array(
+ Column.create("i", StringType),
+ Column.create("e", StringType, true, null, default, null)))
when(t.partitioning()).thenReturn(Array.empty[Transform])
t
}
private val tableWithColumnNamedDefault: Table = {
val t = mock(classOf[Table])
- when(t.schema()).thenReturn(
- new StructType()
- .add("s", StringType)
- .add("default", StringType))
+ when(t.columns()).thenReturn(Array(
+ Column.create("s", StringType),
+ Column.create("default", StringType)))
when(t.partitioning()).thenReturn(Array.empty[Transform])
t
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala
index d0169bde40f..33e2fc46ccb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
@@ -51,7 +52,7 @@ class InMemoryTableMetricSuite
testCatalog.createTable(
Identifier.of(Array(), "table_name"),
new StructType().add("i", "int"),
- Array.empty, Collections.emptyMap[String, String])
+ Array.empty[Transform], Collections.emptyMap[String, String])
func("testcat.table_name")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index 2a441157f9d..8f5996438e2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -31,6 +31,7 @@ import
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier,
NamespaceChange, SupportsNamespaces, TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType,
StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -38,6 +39,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
abstract class V2SessionCatalogBaseSuite extends SharedSparkSession with
BeforeAndAfter {
val emptyProps: util.Map[String, String] = Collections.emptyMap[String,
String]
+ val emptyTrans: Array[Transform] = Array.empty
val schema: StructType = new StructType()
.add("id", IntegerType)
.add("data", StringType)
@@ -95,13 +97,13 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
assert(catalog.listTables(Array("ns")).isEmpty)
- catalog.createTable(ident1, schema, Array.empty, emptyProps)
+ catalog.createTable(ident1, schema, emptyTrans, emptyProps)
assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
assert(catalog.listTables(Array("ns2")).isEmpty)
- catalog.createTable(ident3, schema, Array.empty, emptyProps)
- catalog.createTable(ident2, schema, Array.empty, emptyProps)
+ catalog.createTable(ident3, schema, emptyTrans, emptyProps)
+ catalog.createTable(ident2, schema, emptyTrans, emptyProps)
assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
@@ -123,7 +125,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
assert(parsed == Seq("db", "test_table"))
@@ -141,7 +143,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
assert(parsed == Seq("db", "test_table"))
@@ -156,13 +158,13 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
.map(part => quoteIdentifier(part)).mkString(".")
val exc = intercept[TableAlreadyExistsException] {
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
}
checkErrorTableAlreadyExists(exc, parsed)
@@ -183,26 +185,26 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
// default location
- val t1 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[V1Table]
+ val t1 = catalog.createTable(testIdent, schema, emptyTrans,
properties).asInstanceOf[V1Table]
assert(t1.catalogTable.location ===
spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
catalog.dropTable(testIdent)
// relative path
properties.put(TableCatalog.PROP_LOCATION, "relative/path")
- val t2 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[V1Table]
+ val t2 = catalog.createTable(testIdent, schema, emptyTrans,
properties).asInstanceOf[V1Table]
assert(t2.catalogTable.location ===
makeQualifiedPathWithWarehouse("db.db/relative/path"))
catalog.dropTable(testIdent)
// absolute path without scheme
properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
- val t3 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[V1Table]
+ val t3 = catalog.createTable(testIdent, schema, emptyTrans,
properties).asInstanceOf[V1Table]
assert(t3.catalogTable.location.toString === "file:///absolute/path")
catalog.dropTable(testIdent)
// absolute path with scheme
properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path")
- val t4 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[V1Table]
+ val t4 = catalog.createTable(testIdent, schema, emptyTrans,
properties).asInstanceOf[V1Table]
assert(t4.catalogTable.location.toString === "file:/absolute/path")
catalog.dropTable(testIdent)
}
@@ -212,7 +214,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(catalog.tableExists(testIdent))
@@ -224,7 +226,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("loadTable") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
val loaded = catalog.loadTable(testIdent)
assert(table.name == loaded.name)
@@ -245,7 +247,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("invalidateTable") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
catalog.invalidateTable(testIdent)
val loaded = catalog.loadTable(testIdent)
@@ -266,7 +268,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: add property") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(filterV2TableProperties(table.properties) == Map())
@@ -285,7 +287,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
val properties = new util.HashMap[String, String]()
properties.put("prop-1", "1")
- val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1"))
@@ -304,7 +306,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
val properties = new util.HashMap[String, String]()
properties.put("prop-1", "1")
- val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1"))
@@ -320,7 +322,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: remove missing property") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(filterV2TableProperties(table.properties) == Map())
@@ -336,7 +338,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: add top-level column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -348,7 +350,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: add required column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -361,7 +363,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: add column with comment") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -378,7 +380,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -393,7 +395,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: add column to primitive field fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -411,7 +413,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: add field to missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -427,7 +429,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: update column data type") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -443,7 +445,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
val originalSchema = new StructType()
.add("id", IntegerType, nullable = false)
.add("data", StringType)
- val table = catalog.createTable(testIdent, originalSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, originalSchema, emptyTrans,
emptyProps)
assert(table.schema == originalSchema)
@@ -457,7 +459,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: update missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -473,7 +475,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: add comment") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -489,7 +491,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: replace comment") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -508,7 +510,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: add comment to missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -524,7 +526,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: rename top-level column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -541,7 +543,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -560,7 +562,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -576,7 +578,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: rename missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -595,7 +597,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -612,7 +614,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: delete top-level column") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -629,7 +631,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -645,7 +647,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
test("alterTable: delete missing column fails") {
val catalog = newCatalog()
- val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(table.schema == schema)
@@ -667,7 +669,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
val pointStruct = new StructType().add("x", DoubleType).add("y",
DoubleType)
val tableSchema = schema.add("point", pointStruct)
- val table = catalog.createTable(testIdent, tableSchema, Array.empty,
emptyProps)
+ val table = catalog.createTable(testIdent, tableSchema, emptyTrans,
emptyProps)
assert(table.schema == tableSchema)
@@ -698,7 +700,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
// default location
- val t1 = catalog.createTable(testIdent, schema, Array.empty,
emptyProps).asInstanceOf[V1Table]
+ val t1 = catalog.createTable(testIdent, schema, emptyTrans,
emptyProps).asInstanceOf[V1Table]
assert(t1.catalogTable.location ===
spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
@@ -723,7 +725,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(catalog.tableExists(testIdent))
@@ -750,7 +752,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNew))
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(catalog.tableExists(testIdent))
catalog.renameTable(testIdent, testIdentNew)
@@ -775,8 +777,8 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNew))
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
- catalog.createTable(testIdentNew, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+ catalog.createTable(testIdentNew, schema, emptyTrans, emptyProps)
assert(catalog.tableExists(testIdent))
assert(catalog.tableExists(testIdentNew))
@@ -795,7 +797,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNewOtherDb))
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
assert(catalog.tableExists(testIdent))
@@ -982,7 +984,7 @@ class V2SessionCatalogNamespaceSuite extends
V2SessionCatalogBaseSuite {
assert(catalog.namespaceExists(testNs) === false)
val exc = intercept[NoSuchDatabaseException] {
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
}
assert(exc.getMessage.contains(testNs.quoted))
@@ -1016,7 +1018,7 @@ class V2SessionCatalogNamespaceSuite extends
V2SessionCatalogBaseSuite {
val catalog = newCatalog()
catalog.createNamespace(testNs, Map("property" -> "value").asJava)
- catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
val exc = intercept[AnalysisException] {
catalog.dropNamespace(testNs, cascade = false)
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index 13f2a865936..ea1e9a7e048 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -869,7 +869,7 @@ class InsertSuite extends QueryTest with TestHiveSingleton
with BeforeAndAfter
|SORTED BY (s1)
|INTO 200 BUCKETS
|STORED AS PARQUET
- """.stripMargin
+ |""".stripMargin
} else {
"""
|CREATE TABLE test1(
@@ -880,14 +880,14 @@ class InsertSuite extends QueryTest with
TestHiveSingleton with BeforeAndAfter
|CLUSTERED BY (v1)
|SORTED BY (s1)
|INTO 200 BUCKETS
- """.stripMargin
+ |""".stripMargin
}
val insertString =
"""
|INSERT INTO test1
|SELECT * FROM VALUES(1,1,1)
- """.stripMargin
+ |""".stripMargin
val dropString = "DROP TABLE IF EXISTS test1"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]