This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9f12b85667 Core: Fixes OOM caused by Avro decoder caching (#7791)
9f12b85667 is described below
commit 9f12b85667362dbf0cc1bbb16972602a103938a8
Author: Xianyang Liu <[email protected]>
AuthorDate: Mon Jun 26 17:25:29 2023 +0800
Core: Fixes OOM caused by Avro decoder caching (#7791)
---
.../org/apache/iceberg/ManifestReadBenchmark.java | 163 ++++++++++++++++++++
.../apache/iceberg/data/avro/DecoderResolver.java | 11 +-
.../iceberg/data/avro/TestDecoderResolver.java | 167 +++++++++++++++++++++
3 files changed, 337 insertions(+), 4 deletions(-)
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
new file mode 100644
index 0000000000..82d618fe46
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.io.Files;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+
+@Fork(1)
+@State(Scope.Benchmark)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 1000, timeUnit = TimeUnit.HOURS)
+public class ManifestReadBenchmark {
+
+ private static final int NUM_FILES = 10;
+ private static final int NUM_ROWS = 100000;
+ private static final int NUM_COLS = 10;
+
+ private String baseDir;
+ private String manifestListFile;
+
+ @Setup
+ public void before() {
+ baseDir = Files.createTempDir().getAbsolutePath();
+ manifestListFile = String.format("%s/%s.avro", baseDir, UUID.randomUUID());
+
+ Random random = new Random(System.currentTimeMillis());
+ ManifestListWriter listWriter =
+ ManifestLists.write(1,
org.apache.iceberg.Files.localOutput(manifestListFile), 0, 1L, 0);
+
+ try {
+ for (int i = 0; i < NUM_FILES; i++) {
+ OutputFile manifestFile =
+ org.apache.iceberg.Files.localOutput(
+ String.format("%s/%s.avro", baseDir, UUID.randomUUID()));
+
+ ManifestWriter<DataFile> writer =
+ ManifestFiles.write(1, PartitionSpec.unpartitioned(),
manifestFile, 1L);
+ try (ManifestWriter<DataFile> finalWriter = writer) {
+ for (int j = 0; j < NUM_ROWS; j++) {
+ DataFile dataFile =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withFormat(FileFormat.PARQUET)
+ .withPath(String.format("/path/to/data-%s-%s.parquet", i,
j))
+ .withFileSizeInBytes(j)
+ .withRecordCount(j)
+ .withMetrics(randomMetrics(random))
+ .build();
+ finalWriter.add(dataFile);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ listWriter.add(writer.toManifestFile());
+ }
+
+ listWriter.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @TearDown
+ public void after() {
+ if (baseDir != null) {
+ FileUtils.deleteQuietly(new File(baseDir));
+ baseDir = null;
+ }
+
+ manifestListFile = null;
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readManifestFile() throws IOException {
+ List<ManifestFile> manifests =
+
ManifestLists.read(org.apache.iceberg.Files.localInput(manifestListFile));
+ TestTables.LocalFileIO fileIO = new TestTables.LocalFileIO();
+ Map<Integer, PartitionSpec> specs =
+ ImmutableMap.of(PartitionSpec.unpartitioned().specId(),
PartitionSpec.unpartitioned());
+ long recordCount = 0L;
+ for (ManifestFile manifestFile : manifests) {
+ ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile,
fileIO, specs);
+ try (CloseableIterator<DataFile> it = reader.iterator()) {
+ while (it.hasNext()) {
+ recordCount += it.next().recordCount();
+ }
+ }
+ }
+ }
+
+ private Metrics randomMetrics(Random random) {
+ long rowCount = 100000L + random.nextInt(1000);
+ Map<Integer, Long> columnSizes = Maps.newHashMap();
+ Map<Integer, Long> valueCounts = Maps.newHashMap();
+ Map<Integer, Long> nullValueCounts = Maps.newHashMap();
+ Map<Integer, Long> nanValueCounts = Maps.newHashMap();
+ Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
+ Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();
+ for (int i = 0; i < NUM_COLS; i++) {
+ columnSizes.put(i, 1000000L + random.nextInt(100000));
+ valueCounts.put(i, 100000L + random.nextInt(100));
+ nullValueCounts.put(i, (long) random.nextInt(5));
+ nanValueCounts.put(i, (long) random.nextInt(5));
+ byte[] lower = new byte[8];
+ random.nextBytes(lower);
+ lowerBounds.put(i, ByteBuffer.wrap(lower));
+ byte[] upper = new byte[8];
+ random.nextBytes(upper);
+ upperBounds.put(i, ByteBuffer.wrap(upper));
+ }
+
+ return new Metrics(
+ rowCount,
+ columnSizes,
+ valueCounts,
+ nullValueCounts,
+ nanValueCounts,
+ lowerBounds,
+ upperBounds);
+ }
+}
diff --git
a/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
b/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
index 97b90755d9..3cf3d1bcde 100644
--- a/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
@@ -20,14 +20,15 @@ package org.apache.iceberg.data.avro;
import java.io.IOException;
import java.util.Map;
+import java.util.WeakHashMap;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
/**
* Resolver to resolve {@link Decoder} to a {@link ResolvingDecoder}. This
class uses a {@link
@@ -35,7 +36,8 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Maps;
*/
public class DecoderResolver {
- private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>>
DECODER_CACHES =
+ @VisibleForTesting
+ static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>>
DECODER_CACHES =
ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
private DecoderResolver() {}
@@ -49,11 +51,12 @@ public class DecoderResolver {
return value;
}
- private static ResolvingDecoder resolve(Decoder decoder, Schema readSchema,
Schema fileSchema)
+ @VisibleForTesting
+ static ResolvingDecoder resolve(Decoder decoder, Schema readSchema, Schema
fileSchema)
throws IOException {
Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
Map<Schema, ResolvingDecoder> fileSchemaToResolver =
- cache.computeIfAbsent(readSchema, k -> Maps.newHashMap());
+ cache.computeIfAbsent(readSchema, k -> new WeakHashMap<>());
ResolvingDecoder resolver =
fileSchemaToResolver.computeIfAbsent(fileSchema, schema ->
newResolver(readSchema, schema));
diff --git
a/core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java
b/core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java
new file mode 100644
index 0000000000..5855e80998
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/data/avro/TestDecoderResolver.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data.avro;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDecoderResolver {
+
+ @Before
+ public void before() {
+ DecoderResolver.DECODER_CACHES.get().clear();
+ }
+
+ @Test
+ public void testDecoderCachingReadSchemaSameAsFileSchema() throws Exception {
+ Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {},
null);
+ Schema fileSchema = avroSchema();
+ ResolvingDecoder resolvingDecoder =
+ DecoderResolver.resolve(dummyDecoder, fileSchema, fileSchema);
+
+ assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+
assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema).size()).isEqualTo(1);
+ checkCached(fileSchema, fileSchema);
+
+ // Equal but new one
+ Schema fileSchema1 = avroSchema();
+ assertThat(fileSchema1).isEqualTo(fileSchema);
+ ResolvingDecoder resolvingDecoder1 =
+ DecoderResolver.resolve(dummyDecoder, fileSchema1, fileSchema1);
+ assertThat(resolvingDecoder1).isNotSameAs(resolvingDecoder);
+
+ assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+
assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema1).size()).isEqualTo(1);
+ checkCached(fileSchema1, fileSchema1);
+
+ // New one
+ Schema fileSchema2 = avroSchema("manifest_path", "manifest_length");
+ ResolvingDecoder resolvingDecoder2 =
+ DecoderResolver.resolve(dummyDecoder, fileSchema2, fileSchema2);
+ assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+ assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(3);
+
assertThat(DecoderResolver.DECODER_CACHES.get().get(fileSchema2).size()).isEqualTo(1);
+ checkCached(fileSchema2, fileSchema2);
+
+ checkCachedSize(3);
+
+ fileSchema = null;
+ checkCachedSize(2);
+
+ fileSchema1 = null;
+ checkCachedSize(1);
+
+ fileSchema2 = null;
+ checkCachedSize(0);
+ }
+
+ @Test
+ public void testDecoderCachingReadSchemaNotSameAsFileSchema() throws
Exception {
+ Decoder dummyDecoder = DecoderFactory.get().binaryDecoder(new byte[] {},
null);
+ Schema fileSchema = avroSchema();
+ Schema readSchema = avroSchema("manifest_path", "manifest_length",
"partition_spec_id");
+ ResolvingDecoder resolvingDecoder =
+ DecoderResolver.resolve(dummyDecoder, readSchema, fileSchema);
+
+ assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(1);
+
assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema).size()).isEqualTo(1);
+ checkCached(readSchema, fileSchema);
+
+ // Equal but new one
+ Schema fileSchema1 = avroSchema();
+ Schema readSchema1 = avroSchema("manifest_path", "manifest_length",
"partition_spec_id");
+ assertThat(fileSchema1).isEqualTo(fileSchema);
+ assertThat(readSchema1).isEqualTo(readSchema);
+ ResolvingDecoder resolvingDecoder1 =
+ DecoderResolver.resolve(dummyDecoder, readSchema1, fileSchema1);
+ assertThat(resolvingDecoder1).isNotSameAs(resolvingDecoder);
+
+ assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(2);
+
assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema1).size()).isEqualTo(1);
+ checkCached(readSchema1, fileSchema1);
+
+ // New read schema
+ Schema readSchema2 = avroSchema("manifest_path", "manifest_length");
+ ResolvingDecoder resolvingDecoder2 =
+ DecoderResolver.resolve(dummyDecoder, readSchema2, fileSchema);
+ assertThat(resolvingDecoder2).isNotSameAs(resolvingDecoder);
+
+ assertThat(DecoderResolver.DECODER_CACHES.get().size()).isEqualTo(3);
+
assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema2).size()).isEqualTo(1);
+ checkCached(readSchema2, fileSchema);
+
+ checkCachedSize(3);
+
+ readSchema = null;
+ checkCachedSize(2);
+
+ readSchema1 = null;
+ checkCachedSize(1);
+
+ readSchema2 = null;
+ checkCachedSize(0);
+ }
+
+ private Schema avroSchema(String... columns) {
+ if (columns.length == 0) {
+ return AvroSchemaUtil.convert(ManifestFile.schema(), "manifest_file");
+ } else {
+ return AvroSchemaUtil.convert(ManifestFile.schema().select(columns),
"manifest_file");
+ }
+ }
+
+ private void checkCached(Schema readSchema, Schema fileSchema) {
+ assertThat(DecoderResolver.DECODER_CACHES.get()).containsKey(readSchema);
+
assertThat(DecoderResolver.DECODER_CACHES.get().get(readSchema)).containsKey(fileSchema);
+ }
+
+ private int getActualSize() {
+ // The size of keys included the GCed keys
+ Set<Schema> keys = DecoderResolver.DECODER_CACHES.get().keySet();
+ Set<Schema> identityKeys = Sets.newIdentityHashSet();
+ // Forcefully remove keys that have been garbage collected
+ identityKeys.addAll(keys);
+ return identityKeys.size();
+ }
+
+ private void checkCachedSize(int expected) {
+ System.gc();
+ // Wait the weak reference keys are GCed
+ Awaitility.await()
+ .atMost(5, TimeUnit.SECONDS)
+ .pollInSameThread()
+ .untilAsserted(
+ () -> {
+ assertThat(getActualSize()).isEqualTo(expected);
+ });
+ }
+}