fqaiser94 commented on code in PR #6513:
URL: https://github.com/apache/iceberg/pull/6513#discussion_r1098754537


##########
api/src/main/java/org/apache/iceberg/ValidatablePendingUpdate.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.function.Predicate;
+import org.apache.iceberg.exceptions.ValidationException;
+
+public interface ValidatablePendingUpdate<ThisT, T> extends PendingUpdate<T> {
+
+  /**
+   * A user provided test for determining whether it is safe to commit a 
{@link PendingUpdate} to a
+   * given table. This will be used in addition to whatever validations 
iceberg performs internally.
+   * For example this could be used for providing a restriction to only commit 
if a given Snapshot
+   * property is present in the current version of the table.
+   *
+   * @param test The predicate which will be used to determine if it is safe 
to commit to this
+   *     table.
+   * @param exception The {@link ValidationException} to throw if the test 
fails.
+   * @return this for method chaining
+   */
+  ThisT addValidation(Predicate<Table> test, ValidationException exception);
+
+  /**
+   * Called prior to commit with the current version of the table.
+   *
+   * <p>It is guaranteed that a {@link ValidatablePendingUpdate} will only 
ever be committed as long
+   * as this method does not throw an exception against the current version of 
the table.
+   *
+   * @param current The current version of the table. This is the same as the 
version of the table
+   *     over which the changes are being applied as part of this {@link 
PendingUpdate}. This may be
+   *     null if there is no table i.e. the table is being created as part of 
this {@link
+   *     PendingUpdate}. Note: attempting to modify this table in any way will 
throw an exception as
+   *     this table is read-only.
+   * @throws ValidationException If the pending changes cannot be applied to 
the current metadata

Review Comment:
   Chose specifically to throw a `ValidationException` as they make the most 
sense contextually and they don't cause retries (IMO it's pretty unlikely a 
validation would pass after a retry).



##########
build.gradle:
##########
@@ -698,6 +698,11 @@ project(':iceberg-dell') {
     implementation project(':iceberg-common')
     implementation project(path: ':iceberg-bundled-guava', configuration: 
'shadow')
     compileOnly 'com.emc.ecs:object-client-bundle'
+    compileOnly("org.apache.hadoop:hadoop-common") {
+      exclude group: 'commons-beanutils'
+      exclude group: 'org.apache.avro', module: 'avro'
+      exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+    }

Review Comment:
   Without this change, I see the following test failure: 
   ```
   org.apache.iceberg.dell.ecs.TestEcsTableOperations > testConcurrentCommit 
FAILED
       java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configurable
           at java.base/java.lang.ClassLoader.defineClass1(Native Method)
           at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
           at 
java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
           at 
java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:800)
           at 
java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:698)
           at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:621)
           at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:579)
           at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
           at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
           at 
org.apache.iceberg.SerializableTable.fileIO(SerializableTable.java:112)
           at 
org.apache.iceberg.SerializableTable.<init>(SerializableTable.java:82)
           at 
org.apache.iceberg.SerializableTable.copyOf(SerializableTable.java:98)
           at 
org.apache.iceberg.PropertiesUpdate.lambda$commit$0(PropertiesUpdate.java:121)
           at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
           at 
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
           at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
           at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
           at 
org.apache.iceberg.PropertiesUpdate.commit(PropertiesUpdate.java:117)
           at 
org.apache.iceberg.dell.ecs.TestEcsTableOperations.testConcurrentCommit(TestEcsTableOperations.java:55)
           Caused by:
           java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurable
               at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
               at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
               at 
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
               ... 19 more
   ```



##########
api/src/main/java/org/apache/iceberg/ValidatablePendingUpdate.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.function.Predicate;
+import org.apache.iceberg.exceptions.ValidationException;
+
+public interface ValidatablePendingUpdate<ThisT, T> extends PendingUpdate<T> {
+
+  /**
+   * A user provided test for determining whether it is safe to commit a 
{@link PendingUpdate} to a
+   * given table. This will be used in addition to whatever validations 
iceberg performs internally.
+   * For example this could be used for providing a restriction to only commit 
if a given Snapshot
+   * property is present in the current version of the table.
+   *
+   * @param test The predicate which will be used to determine if it is safe 
to commit to this
+   *     table.
+   * @param exception The {@link ValidationException} to throw if the test 
fails.
+   * @return this for method chaining
+   */
+  ThisT addValidation(Predicate<Table> test, ValidationException exception);
+
+  /**
+   * Called prior to commit with the current version of the table.
+   *
+   * <p>It is guaranteed that a {@link ValidatablePendingUpdate} will only 
ever be committed as long
+   * as this method does not throw an exception against the current version of 
the table.
+   *
+   * @param current The current version of the table. This is the same as the 
version of the table
+   *     over which the changes are being applied as part of this {@link 
PendingUpdate}. This may be
+   *     null if there is no table i.e. the table is being created as part of 
this {@link
+   *     PendingUpdate}. Note: attempting to modify this table in any way will 
throw an exception as
+   *     this table is read-only.
+   * @throws ValidationException If the pending changes cannot be applied to 
the current metadata
+   */
+  void validateCurrent(Table current);

Review Comment:
   I would preferred to name this `validate` rather than `validateCurrent` but 
the `./gradlew :iceberg-core:revapi` task reports some breaking changes: 
   
   ```
   > There were Java public API/ABI breaks reported by revapi:
     
     java.method.parameterTypeChanged: The type of the parameter changed from 
'org.apache.iceberg.TableMetadata' to 'org.apache.iceberg.Table'.
     
     old: parameter void 
org.apache.iceberg.SnapshotProducer<ThisT>::validate(===org.apache.iceberg.TableMetadata===)
 @ org.apache.iceberg.BaseOverwriteFiles
     new: parameter void org.apache.iceberg.BaseValidatablePendingUpdate<ThisT, 
T>::validate(===org.apache.iceberg.Table===) @ 
org.apache.iceberg.BaseOverwriteFiles
     
     SOURCE: POTENTIALLY_BREAKING, BINARY: BREAKING
       
----------------------------------------------------------------------------------------------------
     java.method.parameterTypeChanged: The type of the parameter changed from 
'org.apache.iceberg.TableMetadata' to 'org.apache.iceberg.Table'.
     
     old: parameter void 
org.apache.iceberg.BaseReplacePartitions::validate(===org.apache.iceberg.TableMetadata===)
     new: parameter void org.apache.iceberg.BaseValidatablePendingUpdate<ThisT, 
T>::validate(===org.apache.iceberg.Table===) @ 
org.apache.iceberg.BaseReplacePartitions
     
     SOURCE: POTENTIALLY_BREAKING, BINARY: BREAKING
     
----------------------------------------------------------------------------------------------------
     java.method.parameterTypeChanged: The type of the parameter changed from 
'org.apache.iceberg.TableMetadata' to 'org.apache.iceberg.Table'.
     
     old: parameter void 
org.apache.iceberg.SnapshotProducer<ThisT>::validate(===org.apache.iceberg.TableMetadata===)
 @ org.apache.iceberg.BaseRewriteManifests
     new: parameter void org.apache.iceberg.BaseValidatablePendingUpdate<ThisT, 
T>::validate(===org.apache.iceberg.Table===) @ 
org.apache.iceberg.BaseRewriteManifests
     
     SOURCE: POTENTIALLY_BREAKING, BINARY: BREAKING
     
----------------------------------------------------------------------------------------------------
     java.method.parameterTypeChanged: The type of the parameter changed from 
'org.apache.iceberg.TableMetadata' to 'org.apache.iceberg.Table'.
     
     old: parameter void 
org.apache.iceberg.SnapshotProducer<ThisT>::validate(===org.apache.iceberg.TableMetadata===)
 @ org.apache.iceberg.StreamingDelete
     new: parameter void org.apache.iceberg.BaseValidatablePendingUpdate<ThisT, 
T>::validate(===org.apache.iceberg.Table===) @ 
org.apache.iceberg.StreamingDelete
     
     SOURCE: POTENTIALLY_BREAKING, BINARY: BREAKING
   ```
   
   I don't know how serious these are but the simplest solution for me right 
now was to just call our new method something different. Open to other 
suggestions. 
   
   If we decide to stick with `validateCurrent`, we may consider naming 
`addValidation` to `addCurrentValidation`



##########
api/src/main/java/org/apache/iceberg/ValidatablePendingUpdate.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.function.Predicate;
+import org.apache.iceberg.exceptions.ValidationException;
+
+public interface ValidatablePendingUpdate<ThisT, T> extends PendingUpdate<T> {
+
+  /**
+   * A user provided test for determining whether it is safe to commit a 
{@link PendingUpdate} to a
+   * given table. This will be used in addition to whatever validations 
iceberg performs internally.
+   * For example this could be used for providing a restriction to only commit 
if a given Snapshot
+   * property is present in the current version of the table.
+   *
+   * @param test The predicate which will be used to determine if it is safe 
to commit to this
+   *     table.
+   * @param exception The {@link ValidationException} to throw if the test 
fails.
+   * @return this for method chaining
+   */
+  ThisT addValidation(Predicate<Table> test, ValidationException exception);
+
+  /**
+   * Called prior to commit with the current version of the table.
+   *
+   * <p>It is guaranteed that a {@link ValidatablePendingUpdate} will only 
ever be committed as long
+   * as this method does not throw an exception against the current version of 
the table.
+   *
+   * @param current The current version of the table. This is the same as the 
version of the table
+   *     over which the changes are being applied as part of this {@link 
PendingUpdate}. This may be
+   *     null if there is no table i.e. the table is being created as part of 
this {@link
+   *     PendingUpdate}. Note: attempting to modify this table in any way will 
throw an exception as
+   *     this table is read-only.
+   * @throws ValidationException If the pending changes cannot be applied to 
the current metadata

Review Comment:
   This is a pretty important detail; implementations have to make sure that 
they call the `validateCurrent` method on the version of the table that is 
being used as the `TableMetadata base` argument for the `taskOps.commit` 
method. Put another way, we want to avoid situations like this (in 
pseudo-code): 
   ```
   validateCurrent(baseTable);
   baseTable = current.refresh(); // <- refresh might pull in a new version of 
the table which may or may not pass validateCurrent!
   taskOps.commit(baseTable, updated);
   ```



##########
api/src/main/java/org/apache/iceberg/ValidatablePendingUpdate.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.function.Predicate;
+import org.apache.iceberg.exceptions.ValidationException;
+
+public interface ValidatablePendingUpdate<ThisT, T> extends PendingUpdate<T> {

Review Comment:
   I chose to create a new interface instead of adding these methods directly 
to the `PendingUpdate` interface: 
   - The `validateCurrent` method could be added to the `PendingUpdate` 
interface directly, I would be fine with that. 
   - `addValidation` doesn't feel like it belongs on the `PendingUpdate` 
interface, hence the new interface for it. 
   
   Happy to take opinions on this design decision. 



##########
api/src/main/java/org/apache/iceberg/BaseValidatablePendingUpdate.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+abstract class BaseValidatablePendingUpdate<ThisT, T>
+    implements ValidatablePendingUpdate<ThisT, T> {
+
+  private static class Validation {
+    private final Predicate<Table> predicate;
+    private final ValidationException exception;
+
+    Validation(Predicate<Table> predicate, ValidationException exception) {
+      this.predicate = predicate;
+      this.exception = exception;
+    }
+  }
+
+  private final List<Validation> validations = Lists.newArrayList();
+
+  @Override
+  public ThisT addValidation(Predicate<Table> predicate, ValidationException 
exception) {
+    this.validations.add(new Validation(predicate, exception));
+    return returnThis();
+  }
+
+  @Override
+  public final void validateCurrent(Table current) {
+    validations.forEach(
+        validation -> {
+          if (!validation.predicate.test(current)) {
+            throw validation.exception;
+          }
+        });
+  }
+
+  protected abstract ThisT returnThis();

Review Comment:
   This has to be implemented for every class that extends this `abstract 
class`.
   It's pretty easy to implement: `return this;`
   A bit boilerplate-y but I felt the trade-off was worth it to enable method 
chaining. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to