This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 1460743 Spark: Add extensions DDL to set identifier fields (#2560)
1460743 is described below
commit 1460743422af341be997176ca5c0bf9dcff0946f
Author: Jack Ye <[email protected]>
AuthorDate: Tue Jun 22 12:33:12 2021 -0700
Spark: Add extensions DDL to set identifier fields (#2560)
---
.../IcebergSqlExtensions.g4 | 11 +-
.../IcebergSparkSqlExtensionsParser.scala | 4 +-
.../IcebergSqlExtensionsAstBuilder.scala | 22 +++-
.../plans/logical/DropIdentifierFields.scala | 34 +++++
.../plans/logical/SetIdentifierFields.scala | 35 +++++
.../datasources/v2/DropIdentifierFieldsExec.scala | 65 ++++++++++
.../v2/ExtendedDataSourceV2Strategy.scala | 8 ++
.../datasources/v2/SetIdentifierFieldsExec.scala | 52 ++++++++
.../spark/extensions/TestAlterTableSchema.java | 142 +++++++++++++++++++++
9 files changed, 370 insertions(+), 3 deletions(-)
diff --git
a/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
b/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
index 40c8de8..d0b228d 100644
---
a/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
+++
b/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
@@ -71,6 +71,8 @@ statement
| ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform
#dropPartitionField
| ALTER TABLE multipartIdentifier REPLACE PARTITION FIELD transform WITH
transform (AS name=identifier)? #replacePartitionField
| ALTER TABLE multipartIdentifier WRITE writeSpec
#setWriteDistributionAndOrdering
+ | ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList
#setIdentifierFields
+ | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList
#dropIdentifierFields
;
writeSpec
@@ -157,9 +159,13 @@ quotedIdentifier
: BACKQUOTED_IDENTIFIER
;
+fieldList
+ : fields+=multipartIdentifier (',' fields+=multipartIdentifier)*
+ ;
+
nonReserved
: ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST
| NULLS | ORDERED | PARTITION | TABLE | WRITE
- | DISTRIBUTED | LOCALLY | UNORDERED
+ | DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW |
FIELDS | SET
| TRUE | FALSE
| MAP
;
@@ -174,6 +180,7 @@ DESC: 'DESC';
DISTRIBUTED: 'DISTRIBUTED';
DROP: 'DROP';
FIELD: 'FIELD';
+FIELDS: 'FIELDS';
FIRST: 'FIRST';
LAST: 'LAST';
LOCALLY: 'LOCALLY';
@@ -181,6 +188,8 @@ NULLS: 'NULLS';
ORDERED: 'ORDERED';
PARTITION: 'PARTITION';
REPLACE: 'REPLACE';
+IDENTIFIER_KW: 'IDENTIFIER';
+SET: 'SET';
TABLE: 'TABLE';
UNORDERED: 'UNORDERED';
WITH: 'WITH';
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
index 11a8017..f11e30f 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
@@ -116,7 +116,9 @@ class IcebergSparkSqlExtensionsParser(delegate:
ParserInterface) extends ParserI
normalized.contains("write ordered by") ||
normalized.contains("write locally ordered by") ||
normalized.contains("write distributed by") ||
- normalized.contains("write unordered")))
+ normalized.contains("write unordered") ||
+ normalized.contains("set identifier fields") ||
+ normalized.contains("drop identifier fields")))
}
protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser
=> T): T = {
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
index 8ecd1f0..678da9b 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
@@ -37,11 +37,13 @@ import
org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParse
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.CallArgument
import org.apache.spark.sql.catalyst.plans.logical.CallStatement
+import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.NamedArgument
import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
+import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.Origin
@@ -85,7 +87,7 @@ class IcebergSqlExtensionsAstBuilder(delegate:
ParserInterface) extends IcebergS
/**
- * Create an CHANGE PARTITION FIELD logical command.
+ * Create an REPLACE PARTITION FIELD logical command.
*/
override def visitReplacePartitionField(ctx: ReplacePartitionFieldContext):
ReplacePartitionField = withOrigin(ctx) {
ReplacePartitionField(
@@ -96,6 +98,24 @@ class IcebergSqlExtensionsAstBuilder(delegate:
ParserInterface) extends IcebergS
}
/**
+ * Create an SET IDENTIFIER FIELDS logical command.
+ */
+ override def visitSetIdentifierFields(ctx: SetIdentifierFieldsContext):
SetIdentifierFields = withOrigin(ctx) {
+ SetIdentifierFields(
+ typedVisit[Seq[String]](ctx.multipartIdentifier),
+ ctx.fieldList.fields.asScala.map(_.getText))
+ }
+
+ /**
+ * Create an DROP IDENTIFIER FIELDS logical command.
+ */
+ override def visitDropIdentifierFields(ctx: DropIdentifierFieldsContext):
DropIdentifierFields = withOrigin(ctx) {
+ DropIdentifierFields(
+ typedVisit[Seq[String]](ctx.multipartIdentifier),
+ ctx.fieldList.fields.asScala.map(_.getText))
+ }
+
+ /**
* Create a [[SetWriteDistributionAndOrdering]] for changing the write
distribution and ordering.
*/
override def visitSetWriteDistributionAndOrdering(
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropIdentifierFields.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropIdentifierFields.scala
new file mode 100644
index 0000000..115af15
--- /dev/null
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropIdentifierFields.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+case class DropIdentifierFields(
+ table: Seq[String],
+ fields: Seq[String]) extends Command {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ override lazy val output: Seq[Attribute] = Nil
+
+ override def simpleString(maxFields: Int): String = {
+ s"DropIdentifierFields ${table.quoted} (${fields.quoted})"
+ }
+}
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetIdentifierFields.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetIdentifierFields.scala
new file mode 100644
index 0000000..2e9a34b
--- /dev/null
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetIdentifierFields.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.expressions.Transform
+
+case class SetIdentifierFields(
+ table: Seq[String],
+ fields: Seq[String]) extends Command {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ override lazy val output: Seq[Attribute] = Nil
+
+ override def simpleString(maxFields: Int): String = {
+ s"SetIdentifierFields ${table.quoted} (${fields.quoted})"
+ }
+}
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIdentifierFieldsExec.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIdentifierFieldsExec.scala
new file mode 100644
index 0000000..525ed77
--- /dev/null
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIdentifierFieldsExec.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions
+import org.apache.iceberg.relocated.com.google.common.collect.Sets
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.TableCatalog
+
+case class DropIdentifierFieldsExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ fields: Seq[String]) extends V2CommandExec {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ override lazy val output: Seq[Attribute] = Nil
+
+ override protected def run(): Seq[InternalRow] = {
+ catalog.loadTable(ident) match {
+ case iceberg: SparkTable =>
+ val schema = iceberg.table.schema
+ val identifierFieldNames = Sets.newHashSet(schema.identifierFieldNames)
+
+ for (name <- fields) {
+ Preconditions.checkArgument(schema.findField(name) != null,
+ "Cannot complete drop identifier fields operation: field %s not
found", name)
+ Preconditions.checkArgument(identifierFieldNames.contains(name),
+ "Cannot complete drop identifier fields operation: %s is not an
identifier field", name)
+ identifierFieldNames.remove(name)
+ }
+
+ iceberg.table.updateSchema()
+ .setIdentifierFields(identifierFieldNames)
+ .commit();
+ case table =>
+ throw new UnsupportedOperationException(s"Cannot drop identifier
fields in non-Iceberg table: $table")
+ }
+
+ Nil
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"DropIdentifierFields ${catalog.name}.${ident.quoted} (${fields.quoted})";
+ }
+}
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index d5901a8..6f0361f 100644
---
a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -33,6 +33,7 @@ import
org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.Call
+import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilter
import
org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilterWithCardinalityCheck
@@ -40,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.MergeInto
import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
+import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog
@@ -66,6 +68,12 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession)
extends Strategy {
case ReplacePartitionField(IcebergCatalogAndIdentifier(catalog, ident),
transformFrom, transformTo, name) =>
ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo,
name) :: Nil
+ case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident),
fields) =>
+ SetIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
+ case DropIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident),
fields) =>
+ DropIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
case SetWriteDistributionAndOrdering(
IcebergCatalogAndIdentifier(catalog, ident), distributionMode,
ordering) =>
SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode,
ordering) :: Nil
diff --git
a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetIdentifierFieldsExec.scala
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetIdentifierFieldsExec.scala
new file mode 100644
index 0000000..7fad2dc
--- /dev/null
+++
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetIdentifierFieldsExec.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.TableCatalog
+
+case class SetIdentifierFieldsExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ fields: Seq[String]) extends V2CommandExec {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ override lazy val output: Seq[Attribute] = Nil
+
+ override protected def run(): Seq[InternalRow] = {
+ catalog.loadTable(ident) match {
+ case iceberg: SparkTable =>
+ iceberg.table.updateSchema()
+
.setIdentifierFields(scala.collection.JavaConverters.seqAsJavaList(fields))
+ .commit();
+ case table =>
+ throw new UnsupportedOperationException(s"Cannot set identifier fields
in non-Iceberg table: $table")
+ }
+
+ Nil
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"SetIdentifierFields ${catalog.name}.${ident.quoted} (${fields.quoted})";
+ }
+}
diff --git
a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTableSchema.java
b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTableSchema.java
new file mode 100644
index 0000000..ac12953
--- /dev/null
+++
b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTableSchema.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAlterTableSchema extends SparkExtensionsTestBase {
+ public TestAlterTableSchema(String catalogName, String implementation,
Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testSetIdentifierFields() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, " +
+ "location struct<lon:bigint NOT NULL,lat:bigint NOT NULL> NOT NULL)
USING iceberg", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertTrue("Table should start without identifier",
table.schema().identifierFieldIds().isEmpty());
+
+ sql("ALTER TABLE %s SET IDENTIFIER FIELDS id", tableName);
+ table.refresh();
+ Assert.assertEquals("Should have new identifier field",
+ Sets.newHashSet(table.schema().findField("id").fieldId()),
+ table.schema().identifierFieldIds());
+
+ sql("ALTER TABLE %s SET IDENTIFIER FIELDS id, location.lon", tableName);
+ table.refresh();
+ Assert.assertEquals("Should have new identifier field",
+ Sets.newHashSet(
+ table.schema().findField("id").fieldId(),
+ table.schema().findField("location.lon").fieldId()),
+ table.schema().identifierFieldIds());
+
+ sql("ALTER TABLE %s SET IDENTIFIER FIELDS location.lon", tableName);
+ table.refresh();
+ Assert.assertEquals("Should have new identifier field",
+ Sets.newHashSet(table.schema().findField("location.lon").fieldId()),
+ table.schema().identifierFieldIds());
+ }
+
+ @Test
+ public void testSetInvalidIdentifierFields() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, id2 bigint) USING iceberg",
tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertTrue("Table should start without identifier",
table.schema().identifierFieldIds().isEmpty());
+ AssertHelpers.assertThrows("should not allow setting unknown fields",
+ IllegalArgumentException.class,
+ "not found in current schema or added columns",
+ () -> sql("ALTER TABLE %s SET IDENTIFIER FIELDS unknown", tableName));
+
+ AssertHelpers.assertThrows("should not allow setting optional fields",
+ IllegalArgumentException.class,
+ "not a required field",
+ () -> sql("ALTER TABLE %s SET IDENTIFIER FIELDS id2", tableName));
+ }
+
+ @Test
+ public void testDropIdentifierFields() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, " +
+ "location struct<lon:bigint NOT NULL,lat:bigint NOT NULL> NOT NULL)
USING iceberg", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertTrue("Table should start without identifier",
table.schema().identifierFieldIds().isEmpty());
+
+ sql("ALTER TABLE %s SET IDENTIFIER FIELDS id, location.lon", tableName);
+ table.refresh();
+ Assert.assertEquals("Should have new identifier fields",
+ Sets.newHashSet(
+ table.schema().findField("id").fieldId(),
+ table.schema().findField("location.lon").fieldId()),
+ table.schema().identifierFieldIds());
+
+ sql("ALTER TABLE %s DROP IDENTIFIER FIELDS id", tableName);
+ table.refresh();
+ Assert.assertEquals("Should removed identifier field",
+ Sets.newHashSet(table.schema().findField("location.lon").fieldId()),
+ table.schema().identifierFieldIds());
+
+ sql("ALTER TABLE %s SET IDENTIFIER FIELDS id, location.lon", tableName);
+ table.refresh();
+ Assert.assertEquals("Should have new identifier fields",
+ Sets.newHashSet(
+ table.schema().findField("id").fieldId(),
+ table.schema().findField("location.lon").fieldId()),
+ table.schema().identifierFieldIds());
+
+ sql("ALTER TABLE %s DROP IDENTIFIER FIELDS id, location.lon", tableName);
+ table.refresh();
+ Assert.assertEquals("Should have no identifier field",
+ Sets.newHashSet(),
+ table.schema().identifierFieldIds());
+ }
+
+ @Test
+ public void testDropInvalidIdentifierFields() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string NOT NULL, " +
+ "location struct<lon:bigint NOT NULL,lat:bigint NOT NULL> NOT NULL)
USING iceberg", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertTrue("Table should start without identifier",
table.schema().identifierFieldIds().isEmpty());
+ AssertHelpers.assertThrows("should not allow dropping unknown fields",
+ IllegalArgumentException.class,
+ "field unknown not found",
+ () -> sql("ALTER TABLE %s DROP IDENTIFIER FIELDS unknown", tableName));
+
+ sql("ALTER TABLE %s SET IDENTIFIER FIELDS id", tableName);
+ AssertHelpers.assertThrows("should not allow dropping a field that is not
an identifier",
+ IllegalArgumentException.class,
+ "data is not an identifier field",
+ () -> sql("ALTER TABLE %s DROP IDENTIFIER FIELDS data", tableName));
+
+ AssertHelpers.assertThrows("should not allow dropping a nested field that
is not an identifier",
+ IllegalArgumentException.class,
+ "location.lon is not an identifier field",
+ () -> sql("ALTER TABLE %s DROP IDENTIFIER FIELDS location.lon",
tableName));
+ }
+}