This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-terraform.git
The following commit(s) were added to refs/heads/main by this push:
new 51ca7f9 Schema + Property updates on Tables (#17)
51ca7f9 is described below
commit 51ca7f9e386e51b7c8652233c0ec06044a22be13
Author: Alex Stephen <[email protected]>
AuthorDate: Mon Mar 16 10:58:36 2026 -0700
Schema + Property updates on Tables (#17)
* Add schema update functionality
* quick refactor
* Change load table to refresh
---
internal/provider/resource_table.go | 215 ++++++++++++++++++++++++++-----
internal/provider/resource_table_test.go | 133 +++++++++++++++++++
2 files changed, 314 insertions(+), 34 deletions(-)
diff --git a/internal/provider/resource_table.go
b/internal/provider/resource_table.go
index 9f9a599..b22971b 100644
--- a/internal/provider/resource_table.go
+++ b/internal/provider/resource_table.go
@@ -20,11 +20,12 @@ import (
"errors"
"strings"
+ "github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/table"
"github.com/hashicorp/terraform-plugin-framework/diag"
"github.com/hashicorp/terraform-plugin-framework/resource"
rscschema
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
-
"github.com/hashicorp/terraform-plugin-framework/resource/schema/objectplanmodifier"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier"
"github.com/hashicorp/terraform-plugin-framework/types"
@@ -80,10 +81,6 @@ func (r *icebergTableResource) Schema(_ context.Context, _
resource.SchemaReques
"schema": rscschema.SingleNestedAttribute{
Description: "The schema of the table.",
Required: true,
- // TODO: Update schema in place instead of
replacement
- PlanModifiers: []planmodifier.Object{
- objectplanmodifier.RequiresReplace(),
- },
Attributes: map[string]rscschema.Attribute{
"id": rscschema.Int64Attribute{
Description: "The schema ID.",
@@ -313,22 +310,7 @@ func (r *icebergTableResource) Create(ctx context.Context,
req resource.CreateRe
data.ID = types.StringValue(strings.Join(tableIdent, "."))
- serverProperties, diags := types.MapValueFrom(ctx, types.StringType,
tbl.Properties())
- resp.Diagnostics.Append(diags...)
- if resp.Diagnostics.HasError() {
- return
- }
- data.ServerProperties = serverProperties
-
- // Update schema from the created table to capture assigned IDs
- icebergSchema := tbl.Schema()
- var updatedSchema icebergTableSchema
- if err := updatedSchema.FromIceberg(icebergSchema); err != nil {
- resp.Diagnostics.AddError("failed to convert iceberg schema to
terraform schema", err.Error())
- return
- }
- data.Schema, diags = types.ObjectValueFrom(ctx,
icebergTableSchema{}.AttrTypes(), updatedSchema)
- resp.Diagnostics.Append(diags...)
+ r.syncTableToModel(ctx, tbl, &data, &resp.Diagnostics)
if resp.Diagnostics.HasError() {
return
}
@@ -373,34 +355,199 @@ func (r *icebergTableResource) Read(ctx context.Context,
req resource.ReadReques
return
}
- serverProperties, diags := types.MapValueFrom(ctx, types.StringType,
tbl.Properties())
+ r.syncTableToModel(ctx, tbl, &data, &resp.Diagnostics)
+ if resp.Diagnostics.HasError() {
+ return
+ }
+
+ diags = resp.State.Set(ctx, &data)
resp.Diagnostics.Append(diags...)
+}
+
+func (r *icebergTableResource) Update(ctx context.Context, req
resource.UpdateRequest, resp *resource.UpdateResponse) {
+ r.ConfigureCatalog(ctx, &resp.Diagnostics)
if resp.Diagnostics.HasError() {
return
}
- data.ServerProperties = serverProperties
- icebergSchema := tbl.Schema()
- var updatedSchema icebergTableSchema
- if err := updatedSchema.FromIceberg(icebergSchema); err != nil {
- resp.Diagnostics.AddError("failed to convert iceberg schema to
terraform schema", err.Error())
+ var plan, state icebergTableResourceModel
+
+ diags := req.Plan.Get(ctx, &plan)
+ resp.Diagnostics.Append(diags...)
+
+ diags = req.State.Get(ctx, &state)
+ resp.Diagnostics.Append(diags...)
+
+ if resp.Diagnostics.HasError() {
return
}
- data.Schema, diags = types.ObjectValueFrom(ctx,
icebergTableSchema{}.AttrTypes(), updatedSchema)
+
+ var namespaceName []string
+ diags = plan.Namespace.ElementsAs(ctx, &namespaceName, false)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}
- diags = resp.State.Set(ctx, &data)
+ tableName := plan.Name.ValueString()
+ tableIdent := append(namespaceName, tableName)
+
+ tbl, err := r.catalog.LoadTable(ctx, tableIdent)
+ if err != nil {
+ resp.Diagnostics.AddError("failed to load table", err.Error())
+ return
+ }
+
+ requirements := []table.Requirement{
+ table.AssertTableUUID(tbl.Metadata().TableUUID()),
+ }
+
+ updates := make([]table.Update, 0)
+
+ updates = append(updates, r.calculatePropertyUpdates(ctx, &plan,
&state, &resp.Diagnostics)...)
+ updates = append(updates, r.calculateSchemaUpdates(ctx, &plan, &state,
&resp.Diagnostics)...)
+
+ if resp.Diagnostics.HasError() {
+ return
+ }
+
+ if len(updates) > 0 {
+ _, _, err = r.catalog.CommitTable(ctx, tableIdent,
requirements, updates)
+ if err != nil {
+ resp.Diagnostics.AddError("failed to commit table
updates", err.Error())
+ return
+ }
+
+ // Reload the table to get the latest state
+ err = tbl.Refresh(ctx)
+ if err != nil {
+ resp.Diagnostics.AddError("failed to refresh table
after commit", err.Error())
+ return
+ }
+ }
+
+ r.syncTableToModel(ctx, tbl, &plan, &resp.Diagnostics)
+ if resp.Diagnostics.HasError() {
+ return
+ }
+
+ diags = resp.State.Set(ctx, &plan)
resp.Diagnostics.Append(diags...)
}
-func (r *icebergTableResource) Update(ctx context.Context, req
resource.UpdateRequest, resp *resource.UpdateResponse) {
- resp.Diagnostics.AddError(
- "Not Implemented",
- "Update is not implemented for iceberg_table yet. Please use
RequiresReplace if you need to change the table.",
- )
+func (r *icebergTableResource) calculatePropertyUpdates(ctx context.Context,
plan, state *icebergTableResourceModel, diags *diag.Diagnostics) []table.Update
{
+ updates := make([]table.Update, 0)
+ userUpdates := make(iceberg.Properties)
+ removals := make([]string, 0)
+
+ stateProps := make(map[string]string)
+ if !state.UserProperties.IsNull() {
+ d := state.UserProperties.ElementsAs(ctx, &stateProps, false)
+ diags.Append(d...)
+ }
+
+ planProps := make(map[string]string)
+ if !plan.UserProperties.IsNull() {
+ d := plan.UserProperties.ElementsAs(ctx, &planProps, false)
+ diags.Append(d...)
+ }
+
+ if diags.HasError() {
+ return nil
+ }
+
+ for k, v := range planProps {
+ if oldV, ok := stateProps[k]; !ok || oldV != v {
+ userUpdates[k] = v
+ }
+ }
+
+ for k := range stateProps {
+ if _, ok := planProps[k]; !ok {
+ removals = append(removals, k)
+ }
+ }
+
+ if len(userUpdates) > 0 {
+ updates = append(updates,
table.NewSetPropertiesUpdate(userUpdates))
+ }
+ if len(removals) > 0 {
+ updates = append(updates,
table.NewRemovePropertiesUpdate(removals))
+ }
+
+ return updates
+}
+
+func (r *icebergTableResource) calculateSchemaUpdates(ctx context.Context,
plan, state *icebergTableResourceModel, diags *diag.Diagnostics) []table.Update
{
+ var planSchema, stateSchema icebergTableSchema
+ d := plan.Schema.As(ctx, &planSchema, basetypes.ObjectAsOptions{})
+ diags.Append(d...)
+ d = state.Schema.As(ctx, &stateSchema, basetypes.ObjectAsOptions{})
+ diags.Append(d...)
+
+ if diags.HasError() {
+ return nil
+ }
+
+ planJson, _ := planSchema.MarshalJSON()
+ stateJson, _ := stateSchema.MarshalJSON()
+
+ if string(planJson) != string(stateJson) {
+ newIcebergSchema, err := planSchema.ToIceberg()
+ if err != nil {
+ diags.AddError("failed to convert schema", err.Error())
+ return nil
+ }
+ return []table.Update{
+ table.NewAddSchemaUpdate(newIcebergSchema),
+ table.NewSetCurrentSchemaUpdate(-1),
+ }
+ }
+
+ return nil
+}
+
+func (r *icebergTableResource) syncTableToModel(ctx context.Context, tbl
*table.Table, model *icebergTableResourceModel, diags *diag.Diagnostics) {
+ // Update ServerProperties
+ serverProperties, d := types.MapValueFrom(ctx, types.StringType,
tbl.Properties())
+ diags.Append(d...)
+ if diags.HasError() {
+ return
+ }
+ model.ServerProperties = serverProperties
+
+ // Update Schema from the table to capture any server-assigned IDs
+ icebergSchema := tbl.Schema()
+ var updatedSchema icebergTableSchema
+ if err := updatedSchema.FromIceberg(icebergSchema); err != nil {
+ diags.AddError("failed to convert iceberg schema to terraform
schema", err.Error())
+ return
+ }
+ var d2 diag.Diagnostics
+ model.Schema, d2 = types.ObjectValueFrom(ctx,
icebergTableSchema{}.AttrTypes(), updatedSchema)
+ diags.Append(d2...)
+ if diags.HasError() {
+ return
+ }
+
+ // Update UserProperties to match reality for tracked keys
+ if !model.UserProperties.IsNull() {
+ planProps := make(map[string]string)
+ d := model.UserProperties.ElementsAs(ctx, &planProps, false)
+ diags.Append(d...)
+ if diags.HasError() {
+ return
+ }
+
+ managedProps := make(map[string]string)
+ for k := range planProps {
+ if v, ok := tbl.Properties()[k]; ok {
+ managedProps[k] = v
+ }
+ }
+ model.UserProperties, d = types.MapValueFrom(ctx,
types.StringType, managedProps)
+ diags.Append(d...)
+ }
}
func (r *icebergTableResource) Delete(ctx context.Context, req
resource.DeleteRequest, resp *resource.DeleteResponse) {
diff --git a/internal/provider/resource_table_test.go
b/internal/provider/resource_table_test.go
index 80fb746..a7c4868 100644
--- a/internal/provider/resource_table_test.go
+++ b/internal/provider/resource_table_test.go
@@ -76,6 +76,139 @@ resource "iceberg_table" "test" {
`, tableName)
}
+func testAccIcebergTableUpdateConfig(providerCfg string, tableName string)
string {
+ return providerCfg + fmt.Sprintf(`
+resource "iceberg_namespace" "db1" {
+ name = ["db1"]
+}
+
+resource "iceberg_table" "test" {
+ namespace = iceberg_namespace.db1.name
+ name = "%s"
+ schema = {
+ fields = [
+ {
+ id = 1
+ name = "id"
+ type = "long"
+ required = true
+ },
+ {
+ id = 2
+ name = "data"
+ type = "string"
+ required = false
+ },
+ {
+ id = 3
+ name = "new_field"
+ type = "int"
+ required = false
+ }
+ ]
+ }
+}
+`, tableName)
+}
+
+func TestAccIcebergTableUpdate(t *testing.T) {
+ catalogURI := os.Getenv("ICEBERG_CATALOG_URI")
+ if catalogURI == "" {
+ catalogURI = "http://localhost:8181"
+ }
+
+ providerCfg := fmt.Sprintf(providerConfig, catalogURI)
+ tableName := "update_test_table"
+
+ resource.Test(t, resource.TestCase{
+ PreCheck: func() { testAccPreCheck(t) },
+ ProtoV6ProviderFactories: testAccProtoV6ProviderFactories,
+ Steps: []resource.TestStep{
+ {
+ Config:
testAccIcebergTableResourceConfig(providerCfg, tableName),
+ Check: resource.ComposeTestCheckFunc(
+
resource.TestCheckResourceAttr("iceberg_table.test", "name", tableName),
+
resource.TestCheckResourceAttr("iceberg_table.test", "schema.fields.#", "2"),
+
resource.TestCheckResourceAttr("iceberg_table.test", "schema.id", "0"),
+ ),
+ },
+ {
+ Config:
testAccIcebergTableUpdateConfig(providerCfg, tableName),
+ Check: resource.ComposeTestCheckFunc(
+
resource.TestCheckResourceAttr("iceberg_table.test", "name", tableName),
+
resource.TestCheckResourceAttr("iceberg_table.test", "schema.fields.#", "3"),
+
resource.TestCheckResourceAttr("iceberg_table.test", "schema.fields.2.name",
"new_field"),
+
resource.TestCheckResourceAttr("iceberg_table.test", "schema.fields.2.type",
"int"),
+
resource.TestCheckResourceAttr("iceberg_table.test", "schema.id", "1"),
+ ),
+ },
+ },
+ })
+}
+
+func TestAccIcebergTablePropertiesUpdate(t *testing.T) {
+ catalogURI := os.Getenv("ICEBERG_CATALOG_URI")
+ if catalogURI == "" {
+ catalogURI = "http://localhost:8181"
+ }
+
+ providerCfg := fmt.Sprintf(providerConfig, catalogURI)
+ tableName := "prop_update_test_table"
+
+ resource.Test(t, resource.TestCase{
+ PreCheck: func() { testAccPreCheck(t) },
+ ProtoV6ProviderFactories: testAccProtoV6ProviderFactories,
+ Steps: []resource.TestStep{
+ {
+ Config:
testAccIcebergTablePropertiesConfig(providerCfg, tableName, `owner =
"initial"`),
+ Check: resource.ComposeTestCheckFunc(
+
resource.TestCheckResourceAttr("iceberg_table.test", "user_properties.owner",
"initial"),
+ ),
+ },
+ {
+ Config:
testAccIcebergTablePropertiesConfig(providerCfg, tableName, `owner = "updated",
new_prop = "added"`),
+ Check: resource.ComposeTestCheckFunc(
+
resource.TestCheckResourceAttr("iceberg_table.test", "user_properties.owner",
"updated"),
+
resource.TestCheckResourceAttr("iceberg_table.test",
"user_properties.new_prop", "added"),
+ ),
+ },
+ {
+ Config:
testAccIcebergTablePropertiesConfig(providerCfg, tableName, `owner =
"updated"`),
+ Check: resource.ComposeTestCheckFunc(
+
resource.TestCheckResourceAttr("iceberg_table.test", "user_properties.owner",
"updated"),
+
resource.TestCheckNoResourceAttr("iceberg_table.test",
"user_properties.new_prop"),
+ ),
+ },
+ },
+ })
+}
+
+func testAccIcebergTablePropertiesConfig(providerCfg string, tableName string,
props string) string {
+ return providerCfg + fmt.Sprintf(`
+resource "iceberg_namespace" "db2" {
+ name = ["db2"]
+}
+
+resource "iceberg_table" "test" {
+ namespace = iceberg_namespace.db2.name
+ name = "%s"
+ user_properties = {
+ %s
+ }
+ schema = {
+ fields = [
+ {
+ id = 1
+ name = "id"
+ type = "long"
+ required = true
+ }
+ ]
+ }
+}
+`, tableName, props)
+}
+
func TestAccIcebergTableFull(t *testing.T) {
catalogURI := os.Getenv("ICEBERG_CATALOG_URI")
if catalogURI == "" {