This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new fe23584fc3 Core: Add validation for table commit properties (#11437)
fe23584fc3 is described below

commit fe23584fc3af9f0ea1371989030b0a99affb233f
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Fri Nov 1 17:22:44 2024 -0700

    Core: Add validation for table commit properties (#11437)
---
 .../java/org/apache/iceberg/PropertiesUpdate.java  |  9 ++--
 .../java/org/apache/iceberg/TableMetadata.java     |  6 +++
 .../java/org/apache/iceberg/util/PropertyUtil.java | 50 ++++++++++++++++++++++
 .../java/org/apache/iceberg/TestTransaction.java   | 18 ++++++++
 .../apache/iceberg/spark/sql/TestCreateTable.java  | 42 ++++++++++++++++++
 5 files changed, 121 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java 
b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java
index 35338a6892..9389aec50c 100644
--- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java
@@ -98,12 +98,13 @@ class PropertiesUpdate implements UpdateProperties {
 
   @Override
   public void commit() {
+    // If existing table commit properties in base are corrupted, allow 
rectification
     Tasks.foreach(ops)
-        .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, 
COMMIT_NUM_RETRIES_DEFAULT))
+        .retry(base.propertyTryAsInt(COMMIT_NUM_RETRIES, 
COMMIT_NUM_RETRIES_DEFAULT))
         .exponentialBackoff(
-            base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, 
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, 
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, 
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+            base.propertyTryAsInt(COMMIT_MIN_RETRY_WAIT_MS, 
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+            base.propertyTryAsInt(COMMIT_MAX_RETRY_WAIT_MS, 
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+            base.propertyTryAsInt(COMMIT_TOTAL_RETRY_TIME_MS, 
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
             2.0 /* exponential */)
         .onlyRetryOn(CommitFailedException.class)
         .run(
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java 
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index d20dd59d2b..3cdc53995d 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -134,6 +134,8 @@ public class TableMetadata implements Serializable {
     // break existing tables.
     MetricsConfig.fromProperties(properties).validateReferencedColumns(schema);
 
+    PropertyUtil.validateCommitProperties(properties);
+
     return new Builder()
         .setInitialFormatVersion(formatVersion)
         .setCurrentSchema(freshSchema, lastColumnId.get())
@@ -486,6 +488,10 @@ public class TableMetadata implements Serializable {
     return PropertyUtil.propertyAsInt(properties, property, defaultValue);
   }
 
+  public int propertyTryAsInt(String property, int defaultValue) {
+    return PropertyUtil.propertyTryAsInt(properties, property, defaultValue);
+  }
+
   public long propertyAsLong(String property, long defaultValue) {
     return PropertyUtil.propertyAsLong(properties, property, defaultValue);
   }
diff --git a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java 
b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
index 68c8f3e9ef..633b0a6ae7 100644
--- a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
@@ -24,10 +24,23 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PropertyUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PropertyUtil.class);
+
+  private static final Set<String> COMMIT_PROPERTIES =
+      ImmutableSet.of(
+          TableProperties.COMMIT_NUM_RETRIES,
+          TableProperties.COMMIT_MIN_RETRY_WAIT_MS,
+          TableProperties.COMMIT_MAX_RETRY_WAIT_MS,
+          TableProperties.COMMIT_TOTAL_RETRY_TIME_MS);
 
   private PropertyUtil() {}
 
@@ -57,6 +70,20 @@ public class PropertyUtil {
     return defaultValue;
   }
 
+  public static int propertyTryAsInt(
+      Map<String, String> properties, String property, int defaultValue) {
+    String value = properties.get(property);
+    if (value == null) {
+      return defaultValue;
+    }
+    try {
+      return Integer.parseInt(value);
+    } catch (NumberFormatException e) {
+      LOG.warn("Failed to parse value of {} as integer, default to {}", 
property, defaultValue, e);
+      return defaultValue;
+    }
+  }
+
   public static int propertyAsInt(
       Map<String, String> properties, String property, int defaultValue) {
     String value = properties.get(property);
@@ -100,6 +127,29 @@ public class PropertyUtil {
     return defaultValue;
   }
 
+  /**
+   * Validate the table commit related properties to have non-negative integer 
on table creation to
+   * prevent commit failure
+   */
+  public static void validateCommitProperties(Map<String, String> properties) {
+    for (String commitProperty : COMMIT_PROPERTIES) {
+      String value = properties.get(commitProperty);
+      if (value != null) {
+        int parsedValue;
+        try {
+          parsedValue = Integer.parseInt(value);
+        } catch (NumberFormatException e) {
+          throw new ValidationException(
+              "Table property %s must have integer value", commitProperty);
+        }
+        ValidationException.check(
+            parsedValue >= 0,
+            "Table property %s must have non negative integer value",
+            commitProperty);
+      }
+    }
+  }
+
   /**
    * Returns subset of provided map with keys matching the provided prefix. 
Matching is
    * case-sensitive and the matching prefix is removed from the keys in 
returned map.
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java 
b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 8fed7134fa..8770e24f8e 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -714,4 +714,22 @@ public class TestTransaction extends TestBase {
     assertThat(paths).isEqualTo(expectedPaths);
     assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2);
   }
+
+  @TestTemplate
+  public void testCommitProperties() {
+    table
+        .updateProperties()
+        .set(TableProperties.COMMIT_MAX_RETRY_WAIT_MS, "foo")
+        .set(TableProperties.COMMIT_NUM_RETRIES, "bar")
+        .set(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS, Integer.toString(60 * 
60 * 1000))
+        .commit();
+    
table.updateProperties().remove(TableProperties.COMMIT_MAX_RETRY_WAIT_MS).commit();
+    
table.updateProperties().remove(TableProperties.COMMIT_NUM_RETRIES).commit();
+
+    assertThat(table.properties())
+        .doesNotContainKey(TableProperties.COMMIT_NUM_RETRIES)
+        .doesNotContainKey(TableProperties.COMMIT_MAX_RETRY_WAIT_MS)
+        .containsEntry(
+            TableProperties.COMMIT_TOTAL_RETRY_TIME_MS, Integer.toString(60 * 
60 * 1000));
+  }
 }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index ae0aa2cda4..11d4cfebfe 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.spark.CatalogTestBase;
 import org.apache.iceberg.types.Types;
@@ -348,6 +349,47 @@ public class TestCreateTable extends CatalogTestBase {
     assertThat(table.properties()).containsEntry("p1", 
"2").containsEntry("p2", "x");
   }
 
+  @TestTemplate
+  public void testCreateTableCommitProperties() {
+    assertThat(validationCatalog.tableExists(tableIdent))
+        .as("Table should not already exist")
+        .isFalse();
+
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CREATE TABLE %s "
+                        + "(id BIGINT NOT NULL, data STRING) "
+                        + "USING iceberg "
+                        + "TBLPROPERTIES ('commit.retry.num-retries'='x', 
p2='x')",
+                    tableName))
+        .isInstanceOf(ValidationException.class)
+        .hasMessage("Table property commit.retry.num-retries must have integer 
value");
+
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CREATE TABLE %s "
+                        + "(id BIGINT NOT NULL, data STRING) "
+                        + "USING iceberg "
+                        + "TBLPROPERTIES ('commit.retry.max-wait-ms'='-1')",
+                    tableName))
+        .isInstanceOf(ValidationException.class)
+        .hasMessage("Table property commit.retry.max-wait-ms must have non 
negative integer value");
+
+    sql(
+        "CREATE TABLE %s "
+            + "(id BIGINT NOT NULL, data STRING) "
+            + "USING iceberg "
+            + "TBLPROPERTIES ('commit.retry.num-retries'='1', 
'commit.retry.max-wait-ms'='3000')",
+        tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    assertThat(table.properties())
+        .containsEntry(TableProperties.COMMIT_NUM_RETRIES, "1")
+        .containsEntry(TableProperties.COMMIT_MAX_RETRY_WAIT_MS, "3000");
+  }
+
   @TestTemplate
   public void testCreateTableWithFormatV2ThroughTableProperty() {
     assertThat(validationCatalog.tableExists(tableIdent))

Reply via email to