This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new fe23584fc3 Core: Add validation for table commit properties (#11437)
fe23584fc3 is described below
commit fe23584fc3af9f0ea1371989030b0a99affb233f
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Fri Nov 1 17:22:44 2024 -0700
Core: Add validation for table commit properties (#11437)
---
.../java/org/apache/iceberg/PropertiesUpdate.java | 9 ++--
.../java/org/apache/iceberg/TableMetadata.java | 6 +++
.../java/org/apache/iceberg/util/PropertyUtil.java | 50 ++++++++++++++++++++++
.../java/org/apache/iceberg/TestTransaction.java | 18 ++++++++
.../apache/iceberg/spark/sql/TestCreateTable.java | 42 ++++++++++++++++++
5 files changed, 121 insertions(+), 4 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java
b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java
index 35338a6892..9389aec50c 100644
--- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java
@@ -98,12 +98,13 @@ class PropertiesUpdate implements UpdateProperties {
@Override
public void commit() {
+ // If existing table commit properties in base are corrupted, allow
rectification
Tasks.foreach(ops)
- .retry(base.propertyAsInt(COMMIT_NUM_RETRIES,
COMMIT_NUM_RETRIES_DEFAULT))
+ .retry(base.propertyTryAsInt(COMMIT_NUM_RETRIES,
COMMIT_NUM_RETRIES_DEFAULT))
.exponentialBackoff(
- base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS,
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS,
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS,
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ base.propertyTryAsInt(COMMIT_MIN_RETRY_WAIT_MS,
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ base.propertyTryAsInt(COMMIT_MAX_RETRY_WAIT_MS,
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ base.propertyTryAsInt(COMMIT_TOTAL_RETRY_TIME_MS,
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index d20dd59d2b..3cdc53995d 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -134,6 +134,8 @@ public class TableMetadata implements Serializable {
// break existing tables.
MetricsConfig.fromProperties(properties).validateReferencedColumns(schema);
+ PropertyUtil.validateCommitProperties(properties);
+
return new Builder()
.setInitialFormatVersion(formatVersion)
.setCurrentSchema(freshSchema, lastColumnId.get())
@@ -486,6 +488,10 @@ public class TableMetadata implements Serializable {
return PropertyUtil.propertyAsInt(properties, property, defaultValue);
}
+ public int propertyTryAsInt(String property, int defaultValue) {
+ return PropertyUtil.propertyTryAsInt(properties, property, defaultValue);
+ }
+
public long propertyAsLong(String property, long defaultValue) {
return PropertyUtil.propertyAsLong(properties, property, defaultValue);
}
diff --git a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
index 68c8f3e9ef..633b0a6ae7 100644
--- a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
@@ -24,10 +24,23 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PropertyUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(PropertyUtil.class);
+
+ private static final Set<String> COMMIT_PROPERTIES =
+ ImmutableSet.of(
+ TableProperties.COMMIT_NUM_RETRIES,
+ TableProperties.COMMIT_MIN_RETRY_WAIT_MS,
+ TableProperties.COMMIT_MAX_RETRY_WAIT_MS,
+ TableProperties.COMMIT_TOTAL_RETRY_TIME_MS);
private PropertyUtil() {}
@@ -57,6 +70,20 @@ public class PropertyUtil {
return defaultValue;
}
+ public static int propertyTryAsInt(
+ Map<String, String> properties, String property, int defaultValue) {
+ String value = properties.get(property);
+ if (value == null) {
+ return defaultValue;
+ }
+ try {
+ return Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ LOG.warn("Failed to parse value of {} as integer, default to {}",
property, defaultValue, e);
+ return defaultValue;
+ }
+ }
+
public static int propertyAsInt(
Map<String, String> properties, String property, int defaultValue) {
String value = properties.get(property);
@@ -100,6 +127,29 @@ public class PropertyUtil {
return defaultValue;
}
+ /**
+ * Validate the table commit related properties to have non-negative integer
on table creation to
+ * prevent commit failure
+ */
+ public static void validateCommitProperties(Map<String, String> properties) {
+ for (String commitProperty : COMMIT_PROPERTIES) {
+ String value = properties.get(commitProperty);
+ if (value != null) {
+ int parsedValue;
+ try {
+ parsedValue = Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ throw new ValidationException(
+ "Table property %s must have integer value", commitProperty);
+ }
+ ValidationException.check(
+ parsedValue >= 0,
+ "Table property %s must have non negative integer value",
+ commitProperty);
+ }
+ }
+ }
+
/**
* Returns subset of provided map with keys matching the provided prefix.
Matching is
* case-sensitive and the matching prefix is removed from the keys in
returned map.
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java
b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 8fed7134fa..8770e24f8e 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -714,4 +714,22 @@ public class TestTransaction extends TestBase {
assertThat(paths).isEqualTo(expectedPaths);
assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(2);
}
+
+ @TestTemplate
+ public void testCommitProperties() {
+ table
+ .updateProperties()
+ .set(TableProperties.COMMIT_MAX_RETRY_WAIT_MS, "foo")
+ .set(TableProperties.COMMIT_NUM_RETRIES, "bar")
+ .set(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS, Integer.toString(60 *
60 * 1000))
+ .commit();
+
table.updateProperties().remove(TableProperties.COMMIT_MAX_RETRY_WAIT_MS).commit();
+
table.updateProperties().remove(TableProperties.COMMIT_NUM_RETRIES).commit();
+
+ assertThat(table.properties())
+ .doesNotContainKey(TableProperties.COMMIT_NUM_RETRIES)
+ .doesNotContainKey(TableProperties.COMMIT_MAX_RETRY_WAIT_MS)
+ .containsEntry(
+ TableProperties.COMMIT_TOTAL_RETRY_TIME_MS, Integer.toString(60 *
60 * 1000));
+ }
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index ae0aa2cda4..11d4cfebfe 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.types.Types;
@@ -348,6 +349,47 @@ public class TestCreateTable extends CatalogTestBase {
assertThat(table.properties()).containsEntry("p1",
"2").containsEntry("p2", "x");
}
+ @TestTemplate
+ public void testCreateTableCommitProperties() {
+ assertThat(validationCatalog.tableExists(tableIdent))
+ .as("Table should not already exist")
+ .isFalse();
+
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE %s "
+ + "(id BIGINT NOT NULL, data STRING) "
+ + "USING iceberg "
+ + "TBLPROPERTIES ('commit.retry.num-retries'='x',
p2='x')",
+ tableName))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Table property commit.retry.num-retries must have integer
value");
+
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE %s "
+ + "(id BIGINT NOT NULL, data STRING) "
+ + "USING iceberg "
+ + "TBLPROPERTIES ('commit.retry.max-wait-ms'='-1')",
+ tableName))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Table property commit.retry.max-wait-ms must have non
negative integer value");
+
+ sql(
+ "CREATE TABLE %s "
+ + "(id BIGINT NOT NULL, data STRING) "
+ + "USING iceberg "
+ + "TBLPROPERTIES ('commit.retry.num-retries'='1',
'commit.retry.max-wait-ms'='3000')",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(table.properties())
+ .containsEntry(TableProperties.COMMIT_NUM_RETRIES, "1")
+ .containsEntry(TableProperties.COMMIT_MAX_RETRY_WAIT_MS, "3000");
+ }
+
@TestTemplate
public void testCreateTableWithFormatV2ThroughTableProperty() {
assertThat(validationCatalog.tableExists(tableIdent))