This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f12b85667 Core: Fixes OOM caused by Avro decoder caching (#7791)
9f12b85667 is described below

commit 9f12b85667362dbf0cc1bbb16972602a103938a8
Author: Xianyang Liu <[email protected]>
AuthorDate: Mon Jun 26 17:25:29 2023 +0800

    Core: Fixes OOM caused by Avro decoder caching (#7791)
---
 .../org/apache/iceberg/ManifestReadBenchmark.java  | 163 ++++++++++++++++++++
 .../apache/iceberg/data/avro/DecoderResolver.java  |  11 +-
 .../iceberg/data/avro/TestDecoderResolver.java     | 167 +++++++++++++++++++++
 3 files changed, 337 insertions(+), 4 deletions(-)

diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java 
b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
new file mode 100644
index 0000000000..82d618fe46
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.io.Files;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+
+@Fork(1)
+@State(Scope.Benchmark)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 1000, timeUnit = TimeUnit.HOURS)
+public class ManifestReadBenchmark {
+
+  private static final int NUM_FILES = 10;
+  private static final int NUM_ROWS = 100000;
+  private static final int NUM_COLS = 10;
+
+  private String baseDir;
+  private String manifestListFile;
+
+  @Setup
+  public void before() {
+    baseDir = Files.createTempDir().getAbsolutePath();
+    manifestListFile = String.format("%s/%s.avro", baseDir, UUID.randomUUID());
+
+    Random random = new Random(System.currentTimeMillis());
+    ManifestListWriter listWriter =
+        ManifestLists.write(1, 
org.apache.iceberg.Files.localOutput(manifestListFile), 0, 1L, 0);
+
+    try {
+      for (int i = 0; i < NUM_FILES; i++) {
+        OutputFile manifestFile =
+            org.apache.iceberg.Files.localOutput(
+                String.format("%s/%s.avro", baseDir, UUID.randomUUID()));
+
+        ManifestWriter<DataFile> writer =
+            ManifestFiles.write(1, PartitionSpec.unpartitioned(), 
manifestFile, 1L);
+        try (ManifestWriter<DataFile> finalWriter = writer) {
+          for (int j = 0; j < NUM_ROWS; j++) {
+            DataFile dataFile =
+                DataFiles.builder(PartitionSpec.unpartitioned())
+                    .withFormat(FileFormat.PARQUET)
+                    .withPath(String.format("/path/to/data-%s-%s.parquet", i, 
j))
+                    .withFileSizeInBytes(j)
+                    .withRecordCount(j)
+                    .withMetrics(randomMetrics(random))
+                    .build();
+            finalWriter.add(dataFile);
+          }
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+
+        listWriter.add(writer.toManifestFile());
+      }
+
+      listWriter.close();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @TearDown
+  public void after() {
+    if (baseDir != null) {
+      FileUtils.deleteQuietly(new File(baseDir));
+      baseDir = null;
+    }
+
+    manifestListFile = null;
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readManifestFile() throws IOException {
+    List<ManifestFile> manifests =
+        
ManifestLists.read(org.apache.iceberg.Files.localInput(manifestListFile));
+    TestTables.LocalFileIO fileIO = new TestTables.LocalFileIO();
+    Map<Integer, PartitionSpec> specs =
+        ImmutableMap.of(PartitionSpec.unpartitioned().specId(), 
PartitionSpec.unpartitioned());
+    long recordCount = 0L;
+    for (ManifestFile manifestFile : manifests) {
+      ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, 
fileIO, specs);
+      try (CloseableIterator<DataFile> it = reader.iterator()) {
+        while (it.hasNext()) {
+          recordCount += it.next().recordCount();
+        }
+      }
+    }
+  }
+
+  private Metrics randomMetrics(Random random) {
+    long rowCount = 100000L + random.nextInt(1000);
+    Map<Integer, Long> columnSizes = Maps.newHashMap();
+    Map<Integer, Long> valueCounts = Maps.newHashMap();
+    Map<Integer, Long> nullValueCounts = Maps.newHashMap();
+    Map<Integer, Long> nanValueCounts = Maps.newHashMap();
+    Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+    Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+    for (int i = 0; i < NUM_COLS; i++) {
+      columnSizes.put(i, 1000000L + random.nextInt(100000));
+      valueCounts.put(i, 100000L + random.nextInt(100));
+      nullValueCounts.put(i, (long) random.nextInt(5));
+      nanValueCounts.put(i, (long) random.nextInt(5));
+      byte[] lower = new byte[8];
+      random.nextBytes(lower);
+      lowerBounds.put(i, ByteBuffer.wrap(lower));
+      byte[] upper = new byte[8];
+      random.nextBytes(upper);
+      upperBounds.put(i, ByteBuffer.wrap(upper));
+    }
+
+    return new Metrics(
+        rowCount,
+        columnSizes,
+        valueCounts,
+        nullValueCounts,
+        nanValueCounts,
+        lowerBounds,
+        upperBounds);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java 
b/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
index 97b90755d9..3cf3d1bcde 100644
--- a/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
@@ -20,14 +20,15 @@ package org.apache.iceberg.data.avro;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.WeakHashMap;
 import org.apache.avro.Schema;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.ResolvingDecoder;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
 /**
  * Resolver to resolve {@link Decoder} to a {@link ResolvingDecoder}. This 
class uses a {@link
@@ -35,7 +36,8 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Maps;
  */
 public class DecoderResolver {
 
-  private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> 
DECODER_CACHES =
+  @VisibleForTesting
+  static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> 
DECODER_CACHES =
       ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
 
   private DecoderResolver() {}
@@ -49,11 +51,12 @@ public class DecoderResolver {
     return value;
   }
 
-  private static ResolvingDecoder resolve(Decoder decoder, Schema readSchema, 
Schema fileSchema)
+  @VisibleForTesting
+  static ResolvingDecoder resolve(Decoder decoder, Schema readSchema, Schema 
fileSchema)
       throws IOException {
     Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
     Map<Schema, ResolvingDecoder> fileSchemaToResolver =
-        cache.computeIfAbsent(readSchema, k -> Maps.newHashMap());
+        cache.computeIfAbsent(readSchema, k -> new WeakHashMap<>());
 
     ResolvingDecoder resolver =
         fileSchemaToResolver.computeIfAbsent(fileSchema, schema -> 
newResolver(readSchema, schema));
diff --git 
a/core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java 
b/core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java
new file mode 100644
index 0000000000..5855e80998
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data.avro;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDecoderResolver {
+
+  @Before
+  public void before() {
+    DecoderResolver.DECODER_CACHES.get().clear();
+  }
+
+  @Test
+  public void testDecoderCachingReadSchemaSameAsFileSchema() throws Exception {
+    Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {}, 
null);
+    Schema fileSchema = avroSchema();
+    ResolvingDecoder resolvingDecoder =
+        DecoderResolver.resolve(dummyDecoder, fileSchema, fileSchema);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    
assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema).size()).isEqualTo(1);
+    checkCached(fileSchema, fileSchema);
+
+    // Equal but new one
+    Schema fileSchema1 = avroSchema();
+    assertThat(fileSchema1).isEqualTo(fileSchema);
+    ResolvingDecoder resolvingDecoder1 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema1, fileSchema1);
+    assertThat(resolvingDecoder1).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+    
assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema1).size()).isEqualTo(1);
+    checkCached(fileSchema1, fileSchema1);
+
+    // New one
+    Schema fileSchema2 = avroSchema("manifest_path", "manifest_length");
+    ResolvingDecoder resolvingDecoder2 =
+        DecoderResolver.resolve(dummyDecoder, fileSchema2, fileSchema2);
+    assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(3);
+    
assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema2).size()).isEqualTo(1);
+    checkCached(fileSchema2, fileSchema2);
+
+    checkCachedSize(3);
+
+    fileSchema = null;
+    checkCachedSize(2);
+
+    fileSchema1 = null;
+    checkCachedSize(1);
+
+    fileSchema2 = null;
+    checkCachedSize(0);
+  }
+
+  @Test
+  public void testDecoderCachingReadSchemaNotSameAsFileSchema() throws 
Exception {
+    Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {}, 
null);
+    Schema fileSchema = avroSchema();
+    Schema readSchema = avroSchema("manifest_path", "manifest_length", 
"partition_spec_id");
+    ResolvingDecoder resolvingDecoder =
+        DecoderResolver.resolve(dummyDecoder, readSchema, fileSchema);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+    
assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema).size()).isEqualTo(1);
+    checkCached(readSchema, fileSchema);
+
+    // Equal but new one
+    Schema fileSchema1 = avroSchema();
+    Schema readSchema1 = avroSchema("manifest_path", "manifest_length", 
"partition_spec_id");
+    assertThat(fileSchema1).isEqualTo(fileSchema);
+    assertThat(readSchema1).isEqualTo(readSchema);
+    ResolvingDecoder resolvingDecoder1 =
+        DecoderResolver.resolve(dummyDecoder, readSchema1, fileSchema1);
+    assertThat(resolvingDecoder1).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+    
assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema1).size()).isEqualTo(1);
+    checkCached(readSchema1, fileSchema1);
+
+    // New read schema
+    Schema readSchema2 = avroSchema("manifest_path", "manifest_length");
+    ResolvingDecoder resolvingDecoder2 =
+        DecoderResolver.resolve(dummyDecoder, readSchema2, fileSchema);
+    assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+    assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(3);
+    
assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema2).size()).isEqualTo(1);
+    checkCached(readSchema2, fileSchema);
+
+    checkCachedSize(3);
+
+    readSchema = null;
+    checkCachedSize(2);
+
+    readSchema1 = null;
+    checkCachedSize(1);
+
+    readSchema2 = null;
+    checkCachedSize(0);
+  }
+
+  private Schema avroSchema(String... columns) {
+    if (columns.length == 0) {
+      return AvroSchemaUtil.convert(ManifestFile.schema(), "manifest_file");
+    } else {
+      return AvroSchemaUtil.convert(ManifestFile.schema().select(columns), 
"manifest_file");
+    }
+  }
+
+  private void checkCached(Schema readSchema, Schema fileSchema) {
+    assertThat(DecoderResolver.DECODER_CACHES.get()).containsKey(readSchema);
+    
assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema)).containsKey(fileSchema);
+  }
+
+  private int getActualSize() {
+    // The size of keys included the GCed keys
+    Set<Schema> keys = DecoderResolver.DECODER_CACHES.get().keySet();
+    Set<Schema> identityKeys = Sets.newIdentityHashSet();
+    // Forcefully remove keys that have been garbage collected
+    identityKeys.addAll(keys);
+    return identityKeys.size();
+  }
+
+  private void checkCachedSize(int expected) {
+    System.gc();
+    // Wait the weak reference keys are GCed
+    Awaitility.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .pollInSameThread()
+        .untilAsserted(
+            () -> {
+              assertThat(getActualSize()).isEqualTo(expected);
+            });
+  }
+}

Reply via email to