rdblue commented on a change in pull request #1098:
URL: https://github.com/apache/iceberg/pull/1098#discussion_r436209654



##########
File path: core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.util.BinPacking.ListPacker;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+
+abstract class ManifestMergeManager<F extends ContentFile<F>> {
+  private final long targetSizeBytes;
+  private final int minCountToMerge;
+  private final boolean mergeEnabled;
+
+  // cache merge results to reuse when retrying
+  private final Map<List<ManifestFile>, ManifestFile> mergedManifests = 
Maps.newConcurrentMap();
+
+  ManifestMergeManager(long targetSizeBytes, int minCountToMerge, boolean 
mergeEnabled) {
+    this.targetSizeBytes = targetSizeBytes;
+    this.minCountToMerge = minCountToMerge;
+    this.mergeEnabled = mergeEnabled;
+  }
+
+  protected abstract long snapshotId();
+  protected abstract PartitionSpec spec(int specId);
+  protected abstract void deleteFile(String location);
+  protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
+  protected abstract ManifestReader<F> newManifestReader(ManifestFile 
manifest);
+
+  Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
+    Iterator<ManifestFile> manifestIter = manifests.iterator();
+    if (!mergeEnabled || !manifestIter.hasNext()) {
+      return manifests;
+    }
+
+    ManifestFile first = manifestIter.next();
+
+    List<ManifestFile> merged = Lists.newArrayList();
+    ListMultimap<Integer, ManifestFile> groups = groupBySpec(first, 
manifestIter);
+    for (Integer specId : groups.keySet()) {
+      Iterables.addAll(merged, mergeGroup(first, specId, groups.get(specId)));
+    }
+
+    return merged;
+  }
+
+  void cleanUncommitted(Set<ManifestFile> committed) {
+    // iterate over a copy of entries to avoid concurrent modification
+    List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
+        Lists.newArrayList(mergedManifests.entrySet());
+
+    for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
+      // delete any new merged manifests that aren't in the committed list
+      ManifestFile merged = entry.getValue();
+      if (!committed.contains(merged)) {
+        deleteFile(merged.path());
+        // remove the deleted file from the cache
+        mergedManifests.remove(entry.getKey());
+      }
+    }
+  }
+
+  private ListMultimap<Integer, ManifestFile> groupBySpec(ManifestFile first, 
Iterator<ManifestFile> remaining) {
+    ListMultimap<Integer, ManifestFile> groups = Multimaps.newListMultimap(
+        Maps.newTreeMap(Comparator.<Integer>reverseOrder()),
+        Lists::newArrayList);
+    groups.put(first.partitionSpecId(), first);
+    remaining.forEachRemaining(manifest -> 
groups.put(manifest.partitionSpecId(), manifest));
+    return groups;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Iterable<ManifestFile> mergeGroup(ManifestFile first, int specId, 
List<ManifestFile> group) {
+    // use a lookback of 1 to avoid reordering the manifests. using 1 also 
means this should pack
+    // from the end so that the manifest that gets under-filled is the first 
one, which will be
+    // merged the next time.
+    ListPacker<ManifestFile> packer = new ListPacker<>(targetSizeBytes, 1, 
false);
+    List<List<ManifestFile>> bins = packer.packEnd(group, 
ManifestFile::length);
+
+    // process bins in parallel, but put results in the order of the bins into 
an array to preserve
+    // the order of manifests and contents. preserving the order helps avoid 
random deletes when
+    // data files are eventually aged off.
+    List<ManifestFile>[] binResults = (List<ManifestFile>[])
+        Array.newInstance(List.class, bins.size());
+
+    Tasks.range(bins.size())
+        .stopOnFailure().throwFailureWhenFinished()
+        .executeWith(ThreadPools.getWorkerPool())
+        .run(index -> {
+          List<ManifestFile> bin = bins.get(index);
+          List<ManifestFile> outputManifests = Lists.newArrayList();
+          binResults[index] = outputManifests;
+
+          if (bin.size() == 1) {
+            // no need to rewrite
+            outputManifests.add(bin.get(0));
+            return;
+          }
+
+          // if the bin has the first manifest (the new data files or an 
appended manifest file) then only merge it
+          // if the number of manifests is above the minimum count. this is 
applied only to bins with an in-memory
+          // manifest so that large manifests don't prevent merging older 
groups.
+          if (bin.contains(first) && bin.size() < minCountToMerge) {

Review comment:
       This introduces a slight behavior change. Before, the first new manifest 
was used to identify the starting bin to apply the minimum merge count. As a 
result, the min count had no effect if there were no added files or appended 
manifests.
   
   Now, this uses the first manifest in the manifest list passed to merge to 
identify the initial bin. This will result in some operations that did not add 
files or manifests not being merged, like the delete operations. This is what 
required the change to the v2 sequence number transaction test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to