aokolnychyi commented on a change in pull request #1098: URL: https://github.com/apache/iceberg/pull/1098#discussion_r437756769
########## File path: core/src/main/java/org/apache/iceberg/ManifestFilterManager.java ########## @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.expressions.StrictMetricsEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.CharSequenceWrapper; +import org.apache.iceberg.util.ManifestFileUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class ManifestFilterManager<F extends ContentFile<F>> { + private static final Logger LOG = LoggerFactory.getLogger(ManifestFilterManager.class); + private static final Joiner COMMA = Joiner.on(","); + + protected static class DeleteException extends ValidationException { + private final String partition; + + private DeleteException(String partition) { + super("Operation would delete existing data"); + this.partition = partition; + } + + public String partition() { + return partition; + } + } + + private final Set<CharSequence> deletePaths = CharSequenceSet.empty(); Review comment: Did we migrate from `CharSequenceWrapper` on purpose here? ########## File path: core/src/main/java/org/apache/iceberg/ManifestFilterManager.java ########## @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.expressions.StrictMetricsEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.CharSequenceWrapper; +import org.apache.iceberg.util.ManifestFileUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class ManifestFilterManager<F extends ContentFile<F>> { + private static final Logger LOG = LoggerFactory.getLogger(ManifestFilterManager.class); + private static final Joiner COMMA = Joiner.on(","); + + protected static class DeleteException extends ValidationException { + private final String partition; + + private DeleteException(String partition) { + super("Operation would delete existing data"); + this.partition = partition; + } + + public String partition() { + return partition; + } + } + + private final Set<CharSequence> deletePaths = CharSequenceSet.empty(); Review comment: Oh, we actually switched to to our own `CharSequenceSet`. +1 ########## File path: core/src/main/java/org/apache/iceberg/ManifestMergeManager.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.ManifestEntry.Status; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.util.BinPacking.ListPacker; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; + +abstract class ManifestMergeManager<F extends ContentFile<F>> { + private final long targetSizeBytes; + private final int minCountToMerge; + private final boolean mergeEnabled; + + // cache merge results to reuse when retrying + private final Map<List<ManifestFile>, ManifestFile> mergedManifests = Maps.newConcurrentMap(); + + ManifestMergeManager(long targetSizeBytes, int minCountToMerge, boolean mergeEnabled) { + this.targetSizeBytes = targetSizeBytes; + this.minCountToMerge = minCountToMerge; + this.mergeEnabled = mergeEnabled; + } + + protected abstract long snapshotId(); + protected abstract PartitionSpec spec(int specId); + protected abstract void deleteFile(String location); + protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec); + protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest); + + Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) { + Iterator<ManifestFile> manifestIter = manifests.iterator(); + if (!mergeEnabled || !manifestIter.hasNext()) { + return manifests; + } + + ManifestFile first = manifestIter.next(); + + List<ManifestFile> merged = Lists.newArrayList(); + ListMultimap<Integer, ManifestFile> groups = groupBySpec(first, manifestIter); + for (Integer specId : groups.keySet()) { + Iterables.addAll(merged, mergeGroup(first, specId, groups.get(specId))); + } + + return merged; + } + + void cleanUncommitted(Set<ManifestFile> committed) { + // iterate over a copy of entries to avoid concurrent modification + List<Map.Entry<List<ManifestFile>, ManifestFile>> entries = + Lists.newArrayList(mergedManifests.entrySet()); + + for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) { + // delete any new merged manifests that aren't in the committed list + ManifestFile merged = entry.getValue(); + if (!committed.contains(merged)) { + deleteFile(merged.path()); + // remove the deleted file from the cache + mergedManifests.remove(entry.getKey()); + } + } + } + + private ListMultimap<Integer, ManifestFile> groupBySpec(ManifestFile first, Iterator<ManifestFile> remaining) { + ListMultimap<Integer, ManifestFile> groups = Multimaps.newListMultimap( + Maps.newTreeMap(Comparator.<Integer>reverseOrder()), + Lists::newArrayList); + groups.put(first.partitionSpecId(), first); + remaining.forEachRemaining(manifest -> groups.put(manifest.partitionSpecId(), manifest)); + return groups; + } + + @SuppressWarnings("unchecked") + private Iterable<ManifestFile> mergeGroup(ManifestFile first, int specId, List<ManifestFile> group) { + // use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack + // from the end so that the manifest that gets under-filled is the first one, which will be + // merged the next time. + ListPacker<ManifestFile> packer = new ListPacker<>(targetSizeBytes, 1, false); + List<List<ManifestFile>> bins = packer.packEnd(group, ManifestFile::length); + + // process bins in parallel, but put results in the order of the bins into an array to preserve + // the order of manifests and contents. preserving the order helps avoid random deletes when + // data files are eventually aged off. + List<ManifestFile>[] binResults = (List<ManifestFile>[]) + Array.newInstance(List.class, bins.size()); + + Tasks.range(bins.size()) + .stopOnFailure().throwFailureWhenFinished() + .executeWith(ThreadPools.getWorkerPool()) + .run(index -> { + List<ManifestFile> bin = bins.get(index); + List<ManifestFile> outputManifests = Lists.newArrayList(); + binResults[index] = outputManifests; + + if (bin.size() == 1) { + // no need to rewrite + outputManifests.add(bin.get(0)); + return; + } + + // if the bin has the first manifest (the new data files or an appended manifest file) then only merge it + // if the number of manifests is above the minimum count. this is applied only to bins with an in-memory + // manifest so that large manifests don't prevent merging older groups. + if (bin.contains(first) && bin.size() < minCountToMerge) { Review comment: Seems reasonable! ########## File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java ########## @@ -62,73 +41,91 @@ import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT; abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> { - private static final Logger LOG = LoggerFactory.getLogger(MergingSnapshotProducer.class); + private final String tableName; + private final TableOperations ops; + private final PartitionSpec spec; + private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); + private final ManifestMergeManager<DataFile> mergeManager; + private final ManifestFilterManager<DataFile> filterManager; + private final boolean snapshotIdInheritanceEnabled; - private static final Joiner COMMA = Joiner.on(","); + private class DataFileFilterManager extends ManifestFilterManager<DataFile> { Review comment: nit: Would it make sense to move private classes to the bottom of the file so that people will focus on the main logic? ########## File path: core/src/main/java/org/apache/iceberg/ManifestFilterManager.java ########## @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.expressions.StrictMetricsEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.CharSequenceWrapper; +import org.apache.iceberg.util.ManifestFileUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class ManifestFilterManager<F extends ContentFile<F>> { + private static final Logger LOG = LoggerFactory.getLogger(ManifestFilterManager.class); + private static final Joiner COMMA = Joiner.on(","); + + protected static class DeleteException extends ValidationException { + private final String partition; + + private DeleteException(String partition) { + super("Operation would delete existing data"); + this.partition = partition; + } + + public String partition() { + return partition; + } + } + + private final Set<CharSequence> deletePaths = CharSequenceSet.empty(); + private final Set<StructLikeWrapper> deleteFilePartitions = Sets.newHashSet(); + private final Set<StructLikeWrapper> dropPartitions = Sets.newHashSet(); + private Expression deleteExpression = Expressions.alwaysFalse(); + private boolean hasPathOnlyDeletes = false; + private boolean failAnyDelete = false; + private boolean failMissingDeletePaths = false; + private int duplicateDeleteCount = 0; + + // cache filtered manifests to avoid extra work when commits fail. + private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap(); + + // tracking where files were deleted to validate retries quickly + private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles = + Maps.newConcurrentMap(); + + protected abstract PartitionSpec spec(int specId); + protected abstract void deleteFile(String location); + protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec); + protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest); + + protected void failAnyDelete() { + this.failAnyDelete = true; + } + + protected void failMissingDeletePaths() { + this.failMissingDeletePaths = true; + } + + /** + * Add a filter to match files to delete. A file will be deleted if all of the rows it contains + * match this or any other filter passed to this method. + * + * @param expr an expression to match rows. + */ + protected void deleteByRowFilter(Expression expr) { + Preconditions.checkNotNull(expr, "Cannot delete files using filter: null"); + invalidateFilteredCache(); Review comment: We used to have a flag `filterUpdated` and take it into in `apply`. It seems okay to invalidate eagerly too. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
