This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1460743  Spark: Add extensions DDL to set identifier fields (#2560)
1460743 is described below

commit 1460743422af341be997176ca5c0bf9dcff0946f
Author: Jack Ye <[email protected]>
AuthorDate: Tue Jun 22 12:33:12 2021 -0700

    Spark: Add extensions DDL to set identifier fields (#2560)
---
 .../IcebergSqlExtensions.g4                        |  11 +-
 .../IcebergSparkSqlExtensionsParser.scala          |   4 +-
 .../IcebergSqlExtensionsAstBuilder.scala           |  22 +++-
 .../plans/logical/DropIdentifierFields.scala       |  34 +++++
 .../plans/logical/SetIdentifierFields.scala        |  35 +++++
 .../datasources/v2/DropIdentifierFieldsExec.scala  |  65 ++++++++++
 .../v2/ExtendedDataSourceV2Strategy.scala          |   8 ++
 .../datasources/v2/SetIdentifierFieldsExec.scala   |  52 ++++++++
 .../spark/extensions/TestAlterTableSchema.java     | 142 +++++++++++++++++++++
 9 files changed, 370 insertions(+), 3 deletions(-)

diff --git 
a/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
 
b/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
index 40c8de8..d0b228d 100644
--- 
a/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
+++ 
b/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
@@ -71,6 +71,8 @@ statement
     | ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform           
             #dropPartitionField
     | ALTER TABLE multipartIdentifier REPLACE PARTITION FIELD transform WITH 
transform (AS name=identifier)? #replacePartitionField
     | ALTER TABLE multipartIdentifier WRITE writeSpec                          
             #setWriteDistributionAndOrdering
+    | ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList       
             #setIdentifierFields
+    | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList      
             #dropIdentifierFields
     ;
 
 writeSpec
@@ -157,9 +159,13 @@ quotedIdentifier
     : BACKQUOTED_IDENTIFIER
     ;
 
+fieldList
+    : fields+=multipartIdentifier (',' fields+=multipartIdentifier)*
+    ;
+
 nonReserved
     : ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST 
| NULLS | ORDERED | PARTITION | TABLE | WRITE
-    | DISTRIBUTED | LOCALLY | UNORDERED
+    | DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | 
FIELDS | SET
     | TRUE | FALSE
     | MAP
     ;
@@ -174,6 +180,7 @@ DESC: 'DESC';
 DISTRIBUTED: 'DISTRIBUTED';
 DROP: 'DROP';
 FIELD: 'FIELD';
+FIELDS: 'FIELDS';
 FIRST: 'FIRST';
 LAST: 'LAST';
 LOCALLY: 'LOCALLY';
@@ -181,6 +188,8 @@ NULLS: 'NULLS';
 ORDERED: 'ORDERED';
 PARTITION: 'PARTITION';
 REPLACE: 'REPLACE';
+IDENTIFIER_KW: 'IDENTIFIER';
+SET: 'SET';
 TABLE: 'TABLE';
 UNORDERED: 'UNORDERED';
 WITH: 'WITH';
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
index 11a8017..f11e30f 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
@@ -116,7 +116,9 @@ class IcebergSparkSqlExtensionsParser(delegate: 
ParserInterface) extends ParserI
             normalized.contains("write ordered by") ||
             normalized.contains("write locally ordered by") ||
             normalized.contains("write distributed by") ||
-            normalized.contains("write unordered")))
+            normalized.contains("write unordered") ||
+            normalized.contains("set identifier fields") ||
+            normalized.contains("drop identifier fields")))
   }
 
   protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser 
=> T): T = {
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
index 8ecd1f0..678da9b 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
@@ -37,11 +37,13 @@ import 
org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParse
 import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
 import org.apache.spark.sql.catalyst.plans.logical.CallArgument
 import org.apache.spark.sql.catalyst.plans.logical.CallStatement
+import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
 import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.logical.NamedArgument
 import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
 import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
+import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
 import 
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.catalyst.trees.Origin
@@ -85,7 +87,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: 
ParserInterface) extends IcebergS
 
 
   /**
-   * Create an CHANGE PARTITION FIELD logical command.
+   * Create an REPLACE PARTITION FIELD logical command.
    */
   override def visitReplacePartitionField(ctx: ReplacePartitionFieldContext): 
ReplacePartitionField = withOrigin(ctx) {
     ReplacePartitionField(
@@ -96,6 +98,24 @@ class IcebergSqlExtensionsAstBuilder(delegate: 
ParserInterface) extends IcebergS
   }
 
   /**
+   * Create an SET IDENTIFIER FIELDS logical command.
+   */
+  override def visitSetIdentifierFields(ctx: SetIdentifierFieldsContext): 
SetIdentifierFields = withOrigin(ctx) {
+    SetIdentifierFields(
+      typedVisit[Seq[String]](ctx.multipartIdentifier),
+      ctx.fieldList.fields.asScala.map(_.getText))
+  }
+
+  /**
+   * Create an DROP IDENTIFIER FIELDS logical command.
+   */
+  override def visitDropIdentifierFields(ctx: DropIdentifierFieldsContext): 
DropIdentifierFields = withOrigin(ctx) {
+    DropIdentifierFields(
+      typedVisit[Seq[String]](ctx.multipartIdentifier),
+      ctx.fieldList.fields.asScala.map(_.getText))
+  }
+
+  /**
    * Create a [[SetWriteDistributionAndOrdering]] for changing the write 
distribution and ordering.
    */
   override def visitSetWriteDistributionAndOrdering(
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropIdentifierFields.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropIdentifierFields.scala
new file mode 100644
index 0000000..115af15
--- /dev/null
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropIdentifierFields.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+case class DropIdentifierFields(
+    table: Seq[String],
+    fields: Seq[String]) extends Command {
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override def simpleString(maxFields: Int): String = {
+    s"DropIdentifierFields ${table.quoted} (${fields.quoted})"
+  }
+}
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetIdentifierFields.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetIdentifierFields.scala
new file mode 100644
index 0000000..2e9a34b
--- /dev/null
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetIdentifierFields.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.expressions.Transform
+
+case class SetIdentifierFields(
+    table: Seq[String],
+    fields: Seq[String]) extends Command {
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override def simpleString(maxFields: Int): String = {
+    s"SetIdentifierFields ${table.quoted} (${fields.quoted})"
+  }
+}
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIdentifierFieldsExec.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIdentifierFieldsExec.scala
new file mode 100644
index 0000000..525ed77
--- /dev/null
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIdentifierFieldsExec.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions
+import org.apache.iceberg.relocated.com.google.common.collect.Sets
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.TableCatalog
+
+case class DropIdentifierFieldsExec(
+    catalog: TableCatalog,
+    ident: Identifier,
+    fields: Seq[String]) extends V2CommandExec {
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+    catalog.loadTable(ident) match {
+      case iceberg: SparkTable =>
+        val schema = iceberg.table.schema
+        val identifierFieldNames = Sets.newHashSet(schema.identifierFieldNames)
+
+        for (name <- fields) {
+          Preconditions.checkArgument(schema.findField(name) != null,
+            "Cannot complete drop identifier fields operation: field %s not 
found", name)
+          Preconditions.checkArgument(identifierFieldNames.contains(name),
+            "Cannot complete drop identifier fields operation: %s is not an 
identifier field", name)
+          identifierFieldNames.remove(name)
+        }
+
+        iceberg.table.updateSchema()
+          .setIdentifierFields(identifierFieldNames)
+          .commit();
+      case table =>
+        throw new UnsupportedOperationException(s"Cannot drop identifier 
fields in non-Iceberg table: $table")
+    }
+
+    Nil
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"DropIdentifierFields ${catalog.name}.${ident.quoted} (${fields.quoted})";
+  }
+}
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index d5901a8..6f0361f 100644
--- 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.catalyst.expressions.NamedExpression
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
 import org.apache.spark.sql.catalyst.plans.logical.Call
+import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
 import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
 import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilter
 import 
org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilterWithCardinalityCheck
@@ -40,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.logical.MergeInto
 import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
 import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
+import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
 import 
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.TableCatalog
@@ -66,6 +68,12 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) 
extends Strategy {
     case ReplacePartitionField(IcebergCatalogAndIdentifier(catalog, ident), 
transformFrom, transformTo, name) =>
       ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, 
name) :: Nil
 
+    case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), 
fields) =>
+      SetIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
+    case DropIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), 
fields) =>
+      DropIdentifierFieldsExec(catalog, ident, fields) :: Nil
+
     case SetWriteDistributionAndOrdering(
         IcebergCatalogAndIdentifier(catalog, ident), distributionMode, 
ordering) =>
       SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode, 
ordering) :: Nil
diff --git 
a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetIdentifierFieldsExec.scala
 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetIdentifierFieldsExec.scala
new file mode 100644
index 0000000..7fad2dc
--- /dev/null
+++ 
b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetIdentifierFieldsExec.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.TableCatalog
+
+case class SetIdentifierFieldsExec(
+    catalog: TableCatalog,
+    ident: Identifier,
+    fields: Seq[String]) extends V2CommandExec {
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+    catalog.loadTable(ident) match {
+      case iceberg: SparkTable =>
+        iceberg.table.updateSchema()
+          
.setIdentifierFields(scala.collection.JavaConverters.seqAsJavaList(fields))
+          .commit();
+      case table =>
+        throw new UnsupportedOperationException(s"Cannot set identifier fields 
in non-Iceberg table: $table")
+    }
+
+    Nil
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"SetIdentifierFields ${catalog.name}.${ident.quoted} (${fields.quoted})";
+  }
+}
diff --git 
a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTableSchema.java
 
b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTableSchema.java
new file mode 100644
index 0000000..ac12953
--- /dev/null
+++ 
b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTableSchema.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAlterTableSchema extends SparkExtensionsTestBase {
+  public TestAlterTableSchema(String catalogName, String implementation, 
Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testSetIdentifierFields() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, " +
+        "location struct<lon:bigint NOT NULL,lat:bigint NOT NULL> NOT NULL) 
USING iceberg", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertTrue("Table should start without identifier", 
table.schema().identifierFieldIds().isEmpty());
+
+    sql("ALTER TABLE %s SET IDENTIFIER FIELDS id", tableName);
+    table.refresh();
+    Assert.assertEquals("Should have new identifier field",
+        Sets.newHashSet(table.schema().findField("id").fieldId()),
+        table.schema().identifierFieldIds());
+
+    sql("ALTER TABLE %s SET IDENTIFIER FIELDS id, location.lon", tableName);
+    table.refresh();
+    Assert.assertEquals("Should have new identifier field",
+        Sets.newHashSet(
+            table.schema().findField("id").fieldId(),
+            table.schema().findField("location.lon").fieldId()),
+        table.schema().identifierFieldIds());
+
+    sql("ALTER TABLE %s SET IDENTIFIER FIELDS location.lon", tableName);
+    table.refresh();
+    Assert.assertEquals("Should have new identifier field",
+        Sets.newHashSet(table.schema().findField("location.lon").fieldId()),
+        table.schema().identifierFieldIds());
+  }
+
+  @Test
+  public void testSetInvalidIdentifierFields() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, id2 bigint) USING iceberg", 
tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertTrue("Table should start without identifier", 
table.schema().identifierFieldIds().isEmpty());
+    AssertHelpers.assertThrows("should not allow setting unknown fields",
+        IllegalArgumentException.class,
+        "not found in current schema or added columns",
+        () -> sql("ALTER TABLE %s SET IDENTIFIER FIELDS unknown", tableName));
+
+    AssertHelpers.assertThrows("should not allow setting optional fields",
+        IllegalArgumentException.class,
+        "not a required field",
+        () -> sql("ALTER TABLE %s SET IDENTIFIER FIELDS id2", tableName));
+  }
+
+  @Test
+  public void testDropIdentifierFields() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, " +
+        "location struct<lon:bigint NOT NULL,lat:bigint NOT NULL> NOT NULL) 
USING iceberg", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertTrue("Table should start without identifier", 
table.schema().identifierFieldIds().isEmpty());
+
+    sql("ALTER TABLE %s SET IDENTIFIER FIELDS id, location.lon", tableName);
+    table.refresh();
+    Assert.assertEquals("Should have new identifier fields",
+        Sets.newHashSet(
+            table.schema().findField("id").fieldId(),
+            table.schema().findField("location.lon").fieldId()),
+        table.schema().identifierFieldIds());
+
+    sql("ALTER TABLE %s DROP IDENTIFIER FIELDS id", tableName);
+    table.refresh();
+    Assert.assertEquals("Should removed identifier field",
+        Sets.newHashSet(table.schema().findField("location.lon").fieldId()),
+        table.schema().identifierFieldIds());
+
+    sql("ALTER TABLE %s SET IDENTIFIER FIELDS id, location.lon", tableName);
+    table.refresh();
+    Assert.assertEquals("Should have new identifier fields",
+        Sets.newHashSet(
+            table.schema().findField("id").fieldId(),
+            table.schema().findField("location.lon").fieldId()),
+        table.schema().identifierFieldIds());
+
+    sql("ALTER TABLE %s DROP IDENTIFIER FIELDS id, location.lon", tableName);
+    table.refresh();
+    Assert.assertEquals("Should have no identifier field",
+        Sets.newHashSet(),
+        table.schema().identifierFieldIds());
+  }
+
+  @Test
+  public void testDropInvalidIdentifierFields() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string NOT NULL, " +
+        "location struct<lon:bigint NOT NULL,lat:bigint NOT NULL> NOT NULL) 
USING iceberg", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertTrue("Table should start without identifier", 
table.schema().identifierFieldIds().isEmpty());
+    AssertHelpers.assertThrows("should not allow dropping unknown fields",
+        IllegalArgumentException.class,
+        "field unknown not found",
+        () -> sql("ALTER TABLE %s DROP IDENTIFIER FIELDS unknown", tableName));
+
+    sql("ALTER TABLE %s SET IDENTIFIER FIELDS id", tableName);
+    AssertHelpers.assertThrows("should not allow dropping a field that is not 
an identifier",
+        IllegalArgumentException.class,
+        "data is not an identifier field",
+        () -> sql("ALTER TABLE %s DROP IDENTIFIER FIELDS data", tableName));
+
+    AssertHelpers.assertThrows("should not allow dropping a nested field that 
is not an identifier",
+        IllegalArgumentException.class,
+        "location.lon is not an identifier field",
+        () -> sql("ALTER TABLE %s DROP IDENTIFIER FIELDS location.lon", 
tableName));
+  }
+}

Reply via email to