nastra commented on code in PR #6513:
URL: https://github.com/apache/iceberg/pull/6513#discussion_r1135588175
##########
api/src/main/java/org/apache/iceberg/PendingUpdate.java:
##########
@@ -29,6 +31,28 @@
*/
public interface PendingUpdate<T> {
+ /**
+ * Accepts a predicate which will be used to validate whether it is safe to
commit the pending
+ * changes to the current version of the table at commit time.
+ *
+ * <p>For example, this method could be used to ensure the pending changes
are only committed if a
+ * given snapshot property is present in the current version of the table.
+ *
+ * <p>This method can be called multiple times to add multiple predicates if
necessary.
+ *
+ * @param test The predicate which will be used to validate whether it is
safe to commit the
+ * pending changes to the current version of the table at commit time.
Any attempts to modify
+ * the table passed to the predicate will throw an exception as this
table is read-only.
+ * @param message The message that will be included in the {@link
ValidationException} that will
+ * be thrown if the test returns false.
+ * @param args The args that will be included in the {@link
ValidationException} that will be
+ * thrown if the test returns false.
+ */
+ @FormatMethod
+ default void validateCurrentTable(Predicate<Table> test, String message,
Object... args) {
Review Comment:
```suggestion
default void validateCurrentTable(Predicate<Table> predicate, String
message, Object... args) {
```
##########
core/src/main/java/org/apache/iceberg/BasePendingUpdate.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.google.errorprone.annotations.FormatMethod;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+abstract class BasePendingUpdate<T> implements PendingUpdate<T> {
+
+ private static class Validation {
+ private final Predicate<Table> predicate;
+ private final String message;
+ private final Object[] args;
+
+ @FormatMethod
+ Validation(Predicate<Table> predicate, String message, Object... args) {
+ this.predicate = predicate;
+ this.message = message;
+ this.args = args;
+ }
+ }
+
+ private final List<Validation> validations = Lists.newArrayList();
+
+ @Override
+ @FormatMethod
+ public final void validateCurrentTable(
+ Predicate<Table> predicate, String message, Object... args) {
+ this.validations.add(new Validation(predicate, message, args));
+ }
+
+ @SuppressWarnings("FormatStringAnnotation")
+ protected final void validateCurrentTableMetadata(TableMetadata base) {
Review Comment:
why not just name this `validate(..)`? The RevAPI failure you were seeing is
because your branch is on quite an old Iceberg version. The method it was
conflicting with was removed in
https://github.com/apache/iceberg/commit/5a9eb3c20a3867d6ca5ba0d4bea87e1760bab84c
##########
api/src/main/java/org/apache/iceberg/PendingUpdate.java:
##########
@@ -41,13 +65,15 @@
T apply();
/**
- * Apply the pending changes and commit.
+ * Apply the pending changes, validate the current version of the table, and
commit.
*
* <p>Changes are committed by calling the underlying table's commit method.
*
* <p>Once the commit is successful, the updated table will be refreshed.
*
* @throws ValidationException If the update cannot be applied to the
current table metadata.
+ * @throws UnsupportedOperationException If any predicate supplied through
{@link
Review Comment:
where is this verified?
##########
core/src/test/java/org/apache/iceberg/TestCustomValidations.java:
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestCustomValidations extends TableTestBase {
+
+ @Parameterized.Parameters(name = "formatVersion = {0}")
Review Comment:
do we really need to test with different format versions?
##########
core/src/test/java/org/apache/iceberg/TestCustomValidations.java:
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestCustomValidations extends TableTestBase {
+
+ @Parameterized.Parameters(name = "formatVersion = {0}")
+ public static Object[] parameters() {
+ return new Object[] {1, 2};
+ }
+
+ public TestCustomValidations(int formatVersion) {
+ super(formatVersion);
+ }
+
+ private <E> E setupTableAndEnv(Supplier<E> setupEnv) throws Exception {
+ cleanupTables();
+ setupTable();
+ return setupEnv.get();
+ }
+
+ private <E, T> void testValidationPasses(
+ Supplier<E> setupEnv,
+ Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+ Consumer<E> assertSuccess)
+ throws Exception {
+ E env = setupTableAndEnv(setupEnv);
+
+ PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env);
+ pendingUpdate.validateCurrentTable(currentTable -> true, "Never fails.");
+ pendingUpdate.commit();
+
+ assertSuccess.accept(env);
+ }
+
+ private <E, T> void testValidationFails(
+ Supplier<E> setupEnv,
+ Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+ Consumer<E> assertFailure)
+ throws Exception {
+ E env = setupTableAndEnv(setupEnv);
+
+ AssertHelpers.assertThrows(
+ "Should throw a ValidationException if the given predicate returns
false",
+ ValidationException.class,
+ "Test returned: false",
+ () -> {
+ PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env);
+ pendingUpdate.validateCurrentTable(currentTable -> false, "Test
returned: %b", false);
+ pendingUpdate.commit();
+ });
+
+ assertFailure.accept(env);
+ }
+
+ private <E, T> void testFirstValidationFails(
+ Supplier<E> setupEnv,
+ Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+ Consumer<E> assertFailure)
+ throws Exception {
+ E env = setupTableAndEnv(setupEnv);
+
+ AssertHelpers.assertThrows(
+ "Should throw a ValidationException if the first predicate returns
false",
+ ValidationException.class,
+ "First test returned: false",
+ () -> {
+ PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env);
+ pendingUpdate.validateCurrentTable(
+ currentTable -> false, "First test returned: %b", false);
+ pendingUpdate.validateCurrentTable(
+ currentTable -> true, "Second test returned: %b", true);
+ pendingUpdate.commit();
+ });
+
+ assertFailure.accept(env);
+ }
+
+ private <E, T> void testSecondValidationFails(
+ Supplier<E> setupEnv,
+ Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+ Consumer<E> assertFailure)
+ throws Exception {
+ E env = setupTableAndEnv(setupEnv);
+
+ AssertHelpers.assertThrows(
+ "Should throw a ValidationException if the second predicate returns
false",
+ ValidationException.class,
+ "Second test returned: false",
+ () -> {
+ PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env);
+ pendingUpdate.validateCurrentTable(currentTable -> true, "First test
returned: %b", true);
+ pendingUpdate.validateCurrentTable(
+ currentTable -> false, "Second test returned: %b", false);
+ pendingUpdate.commit();
+ });
+
+ assertFailure.accept(env);
+ }
+
+ private <E, T> void testValidationFailsDueToConcurrentCommit(
+ Supplier<E> setupEnv,
+ Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+ Consumer<E> assertFailure)
+ throws Exception {
+ E env = setupTableAndEnv(setupEnv);
+
+ String customWatermarkKey = "custom_watermark";
+ String currentCustomWatermarkValue = "1";
+ String nextCustomWatermarkValue = "2";
+ table.updateProperties().set(customWatermarkKey,
currentCustomWatermarkValue).commit();
+
+ PendingUpdate<T> uncommitted = pendingUpdateSupplier.apply(env);
+ String failMessage = "Test failed";
+ uncommitted.validateCurrentTable(
+ currentTable ->
+ Objects.equals(
+ currentTable.properties().get(customWatermarkKey),
currentCustomWatermarkValue),
+ failMessage);
+
+ // concurrent update to the table which advances our custom_watermark value
+ table.updateProperties().set(customWatermarkKey,
nextCustomWatermarkValue).commit();
+
+ if (uncommitted instanceof UpdateSchema
+ || uncommitted instanceof UpdatePartitionSpec
+ || uncommitted instanceof UpdateSnapshotReferencesOperation) {
+ // The implementations of the above interfaces do not refresh to get the
latest
+ // TableMetadata before calling the underlying table's commit method.
+ // As a result, no ValidationException is thrown because they do not see
the concurrent
+ // modifications until the underlying table's commit method is called
which is when they
+ // detect the TableMetadata is out-of-date and the commit attempt fails
at that point.
+ // Either way, we are able to ensure that we never commit to the table
unless we are assured
+ // that the validations hold for the current version of the table.
+ AssertHelpers.assertThrows(
+ "Should throw a CommitFailedException on commit due to concurrent
update causing metadata to become stale.",
+ CommitFailedException.class,
+ "Cannot commit changes based on stale metadata",
+ uncommitted::commit);
+ } else {
+ AssertHelpers.assertThrows(
+ "Should throw a ValidationException on commit due to concurrent
update causing the given predicate to return false",
+ ValidationException.class,
+ failMessage,
+ uncommitted::commit);
+ }
+
+ assertFailure.accept(env);
+ }
+
+ private <E, T> void testModifyingTableInsideValidationThrowsException(
+ Supplier<E> setupEnv,
+ Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+ Consumer<E> assertFailure)
+ throws Exception {
+ E env = setupTableAndEnv(setupEnv);
+
+ AssertHelpers.assertThrows(
+ "Any attempts to modify a table inside a validation should throw an
exception",
+ java.lang.UnsupportedOperationException.class,
+ "Cannot modify a static table",
+ () -> {
+ PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env);
+ pendingUpdate.validateCurrentTable(
+ currentTable -> {
+ // illegal action
+ currentTable.updateProperties().set("custom_watermark",
"2").commit();
+ return true;
+ },
+ "Test failed.");
+ pendingUpdate.commit();
+ });
+
+ assertFailure.accept(env);
+ }
+
+ private <E, T> void testValidationBehaviours(
+ Supplier<E> setup,
+ Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+ Consumer<E> assertSuccess,
+ Consumer<E> assertFailure)
+ throws Exception {
+ testValidationPasses(setup, pendingUpdateSupplier, assertSuccess);
+ testValidationFails(setup, pendingUpdateSupplier, assertFailure);
+ testFirstValidationFails(setup, pendingUpdateSupplier, assertFailure);
+ testSecondValidationFails(setup, pendingUpdateSupplier, assertFailure);
+ testValidationFailsDueToConcurrentCommit(setup, pendingUpdateSupplier,
assertFailure);
+ testModifyingTableInsideValidationThrowsException(setup,
pendingUpdateSupplier, assertFailure);
+ }
+
+ private <T> void testValidationBehaviours(
+ Supplier<PendingUpdate<T>> pendingUpdateSupplier,
+ Runnable assertSuccess,
+ Runnable assertFailure)
+ throws Exception {
+ testValidationBehaviours(
+ () -> null,
+ (__) -> pendingUpdateSupplier.get(),
+ (__) -> assertSuccess.run(),
+ (__) -> assertFailure.run());
+ }
+
+ @Test
+ public void testCherryPickOperation() throws Exception {
+ class Setup {
+ final long firstSnapshotId;
+ final long overwriteSnapshotId;
+
+ Setup(long firstSnapshotId, long overwriteSnapshotId) {
+ this.firstSnapshotId = firstSnapshotId;
+ this.overwriteSnapshotId = overwriteSnapshotId;
+ }
+ }
+
+ testValidationBehaviours(
Review Comment:
I understand that we don't want to repeat code, but reading through this
test makes it quite difficult to reason about what's exactly being tested and
expected (because we're just calling a bunch of lambdas).
I'm curious what others think, but I would vote for having clear and
readable than completely de-duplicated code.
##########
api/src/main/java/org/apache/iceberg/PendingUpdate.java:
##########
@@ -29,6 +31,28 @@
*/
public interface PendingUpdate<T> {
+ /**
+ * Accepts a predicate which will be used to validate whether it is safe to
commit the pending
+ * changes to the current version of the table at commit time.
+ *
+ * <p>For example, this method could be used to ensure the pending changes
are only committed if a
+ * given snapshot property is present in the current version of the table.
+ *
+ * <p>This method can be called multiple times to add multiple predicates if
necessary.
+ *
+ * @param test The predicate which will be used to validate whether it is
safe to commit the
+ * pending changes to the current version of the table at commit time.
Any attempts to modify
+ * the table passed to the predicate will throw an exception as this
table is read-only.
+ * @param message The message that will be included in the {@link
ValidationException} that will
+ * be thrown if the test returns false.
+ * @param args The args that will be included in the {@link
ValidationException} that will be
+ * thrown if the test returns false.
+ */
+ @FormatMethod
+ default void validateCurrentTable(Predicate<Table> test, String message,
Object... args) {
Review Comment:
having `validateCurrentTable(...)` leads to an awkward/non-fluent API. What
about having `commitIf(Validation)` that would add the given validation and
call `commit()` afterwards?
##########
core/src/test/java/org/apache/iceberg/TestTransaction.java:
##########
@@ -771,4 +773,88 @@ public void
testSimpleTransactionNotDeletingMetadataOnUnknownSate() throws IOExc
Assert.assertTrue("Manifest file should exist", new
File(manifests.get(0).path()).exists());
Assert.assertEquals("Should have 2 files in metadata", 2,
countAllMetadataFiles(tableDir));
}
+
+ @Test
+ public void testValidationPasses() {
+ Assert.assertEquals("Table should be on version 0", 0, (int) version());
+
+ Transaction txn = table.newTransaction();
+ AppendFiles appendFiles = txn.newAppend().appendFile(FILE_A);
+ appendFiles.validateCurrentTable(currentTable -> true, "Custom validation
failed.");
+ appendFiles.commit();
+ txn.commitTransaction();
+
+ Assert.assertEquals("Table should be on version 1 after commit", 1, (int)
version());
+ }
+
+ @Test
+ public void testValidationFails() {
+ Assert.assertEquals("Table should be on version 0", 0, (int) version());
+
+ Transaction txn = table.newTransaction();
+ AssertHelpers.assertThrows(
Review Comment:
usages of this class are deprecated now. Please use
`Assertions.assertThatThrownBy(..).isInstanceOf(..).hasMessage(..)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]