fqaiser94 commented on code in PR #6513: URL: https://github.com/apache/iceberg/pull/6513#discussion_r1098754537
########## api/src/main/java/org/apache/iceberg/ValidatablePendingUpdate.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.function.Predicate; +import org.apache.iceberg.exceptions.ValidationException; + +public interface ValidatablePendingUpdate<ThisT, T> extends PendingUpdate<T> { + + /** + * A user provided test for determining whether it is safe to commit a {@link PendingUpdate} to a + * given table. This will be used in addition to whatever validations iceberg performs internally. + * For example this could be used for providing a restriction to only commit if a given Snapshot + * property is present in the current version of the table. + * + * @param test The predicate which will be used to determine if it is safe to commit to this + * table. + * @param exception The {@link ValidationException} to throw if the test fails. + * @return this for method chaining + */ + ThisT addValidation(Predicate<Table> test, ValidationException exception); + + /** + * Called prior to commit with the current version of the table. + * + * <p>It is guaranteed that a {@link ValidatablePendingUpdate} will only ever be committed as long + * as this method does not throw an exception against the current version of the table. + * + * @param current The current version of the table. This is the same as the version of the table + * over which the changes are being applied as part of this {@link PendingUpdate}. This may be + * null if there is no table i.e. the table is being created as part of this {@link + * PendingUpdate}. Note: attempting to modify this table in any way will throw an exception as + * this table is read-only. + * @throws ValidationException If the pending changes cannot be applied to the current metadata Review Comment: Chose specifically to throw a `ValidationException` as they make the most sense contextually and they don't cause retries (IMO it's pretty unlikely a validation would pass after a retry). ########## build.gradle: ########## @@ -698,6 +698,11 @@ project(':iceberg-dell') { implementation project(':iceberg-common') implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') compileOnly 'com.emc.ecs:object-client-bundle' + compileOnly("org.apache.hadoop:hadoop-common") { + exclude group: 'commons-beanutils' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } Review Comment: Without this change, I see the following test failure: ``` org.apache.iceberg.dell.ecs.TestEcsTableOperations > testConcurrentCommit FAILED java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configurable at java.base/java.lang.ClassLoader.defineClass1(Native Method) at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017) at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174) at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:800) at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:698) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:621) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:579) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at org.apache.iceberg.SerializableTable.fileIO(SerializableTable.java:112) at org.apache.iceberg.SerializableTable.<init>(SerializableTable.java:82) at org.apache.iceberg.SerializableTable.copyOf(SerializableTable.java:98) at org.apache.iceberg.PropertiesUpdate.lambda$commit$0(PropertiesUpdate.java:121) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) at org.apache.iceberg.PropertiesUpdate.commit(PropertiesUpdate.java:117) at org.apache.iceberg.dell.ecs.TestEcsTableOperations.testConcurrentCommit(TestEcsTableOperations.java:55) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurable at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ... 19 more ``` ########## api/src/main/java/org/apache/iceberg/ValidatablePendingUpdate.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.function.Predicate; +import org.apache.iceberg.exceptions.ValidationException; + +public interface ValidatablePendingUpdate<ThisT, T> extends PendingUpdate<T> { + + /** + * A user provided test for determining whether it is safe to commit a {@link PendingUpdate} to a + * given table. This will be used in addition to whatever validations iceberg performs internally. + * For example this could be used for providing a restriction to only commit if a given Snapshot + * property is present in the current version of the table. + * + * @param test The predicate which will be used to determine if it is safe to commit to this + * table. + * @param exception The {@link ValidationException} to throw if the test fails. + * @return this for method chaining + */ + ThisT addValidation(Predicate<Table> test, ValidationException exception); + + /** + * Called prior to commit with the current version of the table. + * + * <p>It is guaranteed that a {@link ValidatablePendingUpdate} will only ever be committed as long + * as this method does not throw an exception against the current version of the table. + * + * @param current The current version of the table. This is the same as the version of the table + * over which the changes are being applied as part of this {@link PendingUpdate}. This may be + * null if there is no table i.e. the table is being created as part of this {@link + * PendingUpdate}. Note: attempting to modify this table in any way will throw an exception as + * this table is read-only. + * @throws ValidationException If the pending changes cannot be applied to the current metadata + */ + void validateCurrent(Table current); Review Comment: I would preferred to name this `validate` rather than `validateCurrent` but the `./gradlew :iceberg-core:revapi` task reports some breaking changes: ``` > There were Java public API/ABI breaks reported by revapi: java.method.parameterTypeChanged: The type of the parameter changed from 'org.apache.iceberg.TableMetadata' to 'org.apache.iceberg.Table'. old: parameter void org.apache.iceberg.SnapshotProducer<ThisT>::validate(===org.apache.iceberg.TableMetadata===) @ org.apache.iceberg.BaseOverwriteFiles new: parameter void org.apache.iceberg.BaseValidatablePendingUpdate<ThisT, T>::validate(===org.apache.iceberg.Table===) @ org.apache.iceberg.BaseOverwriteFiles SOURCE: POTENTIALLY_BREAKING, BINARY: BREAKING ---------------------------------------------------------------------------------------------------- java.method.parameterTypeChanged: The type of the parameter changed from 'org.apache.iceberg.TableMetadata' to 'org.apache.iceberg.Table'. old: parameter void org.apache.iceberg.BaseReplacePartitions::validate(===org.apache.iceberg.TableMetadata===) new: parameter void org.apache.iceberg.BaseValidatablePendingUpdate<ThisT, T>::validate(===org.apache.iceberg.Table===) @ org.apache.iceberg.BaseReplacePartitions SOURCE: POTENTIALLY_BREAKING, BINARY: BREAKING ---------------------------------------------------------------------------------------------------- java.method.parameterTypeChanged: The type of the parameter changed from 'org.apache.iceberg.TableMetadata' to 'org.apache.iceberg.Table'. old: parameter void org.apache.iceberg.SnapshotProducer<ThisT>::validate(===org.apache.iceberg.TableMetadata===) @ org.apache.iceberg.BaseRewriteManifests new: parameter void org.apache.iceberg.BaseValidatablePendingUpdate<ThisT, T>::validate(===org.apache.iceberg.Table===) @ org.apache.iceberg.BaseRewriteManifests SOURCE: POTENTIALLY_BREAKING, BINARY: BREAKING ---------------------------------------------------------------------------------------------------- java.method.parameterTypeChanged: The type of the parameter changed from 'org.apache.iceberg.TableMetadata' to 'org.apache.iceberg.Table'. old: parameter void org.apache.iceberg.SnapshotProducer<ThisT>::validate(===org.apache.iceberg.TableMetadata===) @ org.apache.iceberg.StreamingDelete new: parameter void org.apache.iceberg.BaseValidatablePendingUpdate<ThisT, T>::validate(===org.apache.iceberg.Table===) @ org.apache.iceberg.StreamingDelete SOURCE: POTENTIALLY_BREAKING, BINARY: BREAKING ``` I don't know how serious these are but the simplest solution for me right now was to just call our new method something different. Open to other suggestions. If we decide to stick with `validateCurrent`, we may consider naming `addValidation` to `addCurrentValidation` ########## api/src/main/java/org/apache/iceberg/ValidatablePendingUpdate.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.function.Predicate; +import org.apache.iceberg.exceptions.ValidationException; + +public interface ValidatablePendingUpdate<ThisT, T> extends PendingUpdate<T> { + + /** + * A user provided test for determining whether it is safe to commit a {@link PendingUpdate} to a + * given table. This will be used in addition to whatever validations iceberg performs internally. + * For example this could be used for providing a restriction to only commit if a given Snapshot + * property is present in the current version of the table. + * + * @param test The predicate which will be used to determine if it is safe to commit to this + * table. + * @param exception The {@link ValidationException} to throw if the test fails. + * @return this for method chaining + */ + ThisT addValidation(Predicate<Table> test, ValidationException exception); + + /** + * Called prior to commit with the current version of the table. + * + * <p>It is guaranteed that a {@link ValidatablePendingUpdate} will only ever be committed as long + * as this method does not throw an exception against the current version of the table. + * + * @param current The current version of the table. This is the same as the version of the table + * over which the changes are being applied as part of this {@link PendingUpdate}. This may be + * null if there is no table i.e. the table is being created as part of this {@link + * PendingUpdate}. Note: attempting to modify this table in any way will throw an exception as + * this table is read-only. + * @throws ValidationException If the pending changes cannot be applied to the current metadata Review Comment: This is a pretty important detail; implementations have to make sure that they call the `validateCurrent` method on the version of the table that is being used as the `TableMetadata base` argument for the `taskOps.commit` method. Put another way, we want to avoid situations like this (in pseudo-code): ``` validateCurrent(baseTable); baseTable = current.refresh(); // <- refresh might pull in a new version of the table which may or may not pass validateCurrent! taskOps.commit(baseTable, updated); ``` ########## api/src/main/java/org/apache/iceberg/ValidatablePendingUpdate.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.function.Predicate; +import org.apache.iceberg.exceptions.ValidationException; + +public interface ValidatablePendingUpdate<ThisT, T> extends PendingUpdate<T> { Review Comment: I chose to create a new interface instead of adding these methods directly to the `PendingUpdate` interface: - The `validateCurrent` method could be added to the `PendingUpdate` interface directly, I would be fine with that. - `addValidation` doesn't feel like it belongs on the `PendingUpdate` interface, hence the new interface for it. Happy to take opinions on this design decision. ########## api/src/main/java/org/apache/iceberg/BaseValidatablePendingUpdate.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.function.Predicate; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +abstract class BaseValidatablePendingUpdate<ThisT, T> + implements ValidatablePendingUpdate<ThisT, T> { + + private static class Validation { + private final Predicate<Table> predicate; + private final ValidationException exception; + + Validation(Predicate<Table> predicate, ValidationException exception) { + this.predicate = predicate; + this.exception = exception; + } + } + + private final List<Validation> validations = Lists.newArrayList(); + + @Override + public ThisT addValidation(Predicate<Table> predicate, ValidationException exception) { + this.validations.add(new Validation(predicate, exception)); + return returnThis(); + } + + @Override + public final void validateCurrent(Table current) { + validations.forEach( + validation -> { + if (!validation.predicate.test(current)) { + throw validation.exception; + } + }); + } + + protected abstract ThisT returnThis(); Review Comment: This has to be implemented for every class that extends this `abstract class`. It's pretty easy to implement: `return this;` A bit boilerplate-y but I felt the trade-off was worth it to enable method chaining. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
