This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new cd283d8  Avro: Extract ResolvingDecoder caching into DecoderResolver 
(#1234)
cd283d8 is described below

commit cd283d8b9baa874caa0b4c08981b189e80fd7378
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jul 27 04:32:43 2020 +0800

    Avro: Extract ResolvingDecoder caching into DecoderResolver (#1234)
---
 .../org/apache/iceberg/avro/GenericAvroReader.java | 39 +-----------
 .../org/apache/iceberg/data/avro/DataReader.java   | 37 +----------
 .../apache/iceberg/data/avro/DecoderResolver.java  | 73 ++++++++++++++++++++++
 .../apache/iceberg/spark/data/SparkAvroReader.java | 38 +----------
 4 files changed, 78 insertions(+), 109 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java 
b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
index 05e0508..c8a81ca 100644
--- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
@@ -20,26 +20,18 @@
 package org.apache.iceberg.avro;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.ResolvingDecoder;
 import org.apache.iceberg.common.DynClasses;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
+import org.apache.iceberg.data.avro.DecoderResolver;
 
 class GenericAvroReader<T> implements DatumReader<T> {
 
-  private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> 
DECODER_CACHES =
-      ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
-
   private final Schema readSchema;
   private ClassLoader loader = Thread.currentThread().getContextClassLoader();
   private Schema fileSchema = null;
@@ -66,34 +58,7 @@ class GenericAvroReader<T> implements DatumReader<T> {
 
   @Override
   public T read(T reuse, Decoder decoder) throws IOException {
-    ResolvingDecoder resolver = resolve(decoder);
-    T value = reader.read(resolver, reuse);
-    resolver.drain();
-    return value;
-  }
-
-  private ResolvingDecoder resolve(Decoder decoder) throws IOException {
-    Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
-    Map<Schema, ResolvingDecoder> fileSchemaToResolver = cache
-        .computeIfAbsent(readSchema, k -> new HashMap<>());
-
-    ResolvingDecoder resolver = fileSchemaToResolver.get(fileSchema);
-    if (resolver == null) {
-      resolver = newResolver();
-      fileSchemaToResolver.put(fileSchema, resolver);
-    }
-
-    resolver.configure(decoder);
-
-    return resolver;
-  }
-
-  private ResolvingDecoder newResolver() {
-    try {
-      return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema, 
null);
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
+    return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, 
reader, reuse);
   }
 
   private static class ReadBuilder extends AvroSchemaVisitor<ValueReader<?>> {
diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java 
b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
index d75acf9..7c8cabc 100644
--- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
@@ -20,7 +20,6 @@
 package org.apache.iceberg.data.avro;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
@@ -29,24 +28,17 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.ResolvingDecoder;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
 import org.apache.iceberg.avro.SupportsRowPosition;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
-import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
 public class DataReader<T> implements DatumReader<T>, SupportsRowPosition {
 
-  private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> 
DECODER_CACHES =
-      ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
-
   public static <D> DataReader<D> create(org.apache.iceberg.Schema 
expectedSchema, Schema readSchema) {
     return create(expectedSchema, readSchema, ImmutableMap.of());
   }
@@ -74,10 +66,7 @@ public class DataReader<T> implements DatumReader<T>, 
SupportsRowPosition {
 
   @Override
   public T read(T reuse, Decoder decoder) throws IOException {
-    ResolvingDecoder resolver = resolve(decoder);
-    T value = reader.read(resolver, reuse);
-    resolver.drain();
-    return value;
+    return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, 
reader, reuse);
   }
 
   @Override
@@ -87,30 +76,6 @@ public class DataReader<T> implements DatumReader<T>, 
SupportsRowPosition {
     }
   }
 
-  private ResolvingDecoder resolve(Decoder decoder) throws IOException {
-    Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
-    Map<Schema, ResolvingDecoder> fileSchemaToResolver = cache
-        .computeIfAbsent(readSchema, k -> new HashMap<>());
-
-    ResolvingDecoder resolver = fileSchemaToResolver.get(fileSchema);
-    if (resolver == null) {
-      resolver = newResolver();
-      fileSchemaToResolver.put(fileSchema, resolver);
-    }
-
-    resolver.configure(decoder);
-
-    return resolver;
-  }
-
-  private ResolvingDecoder newResolver() {
-    try {
-      return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema, 
null);
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
-  }
-
   protected ValueReader<?> createStructReader(Types.StructType struct,
                                               List<ValueReader<?>> fields, 
Map<Integer, ?> idToConstant) {
     return GenericReaders.struct(struct, fields, idToConstant);
diff --git 
a/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java 
b/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
new file mode 100644
index 0000000..8c89f3f
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.avro;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.iceberg.avro.ValueReader;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
+
+/**
+ * Resolver to resolve {@link Decoder} to a {@link ResolvingDecoder}. This 
class uses a {@link ThreadLocal} for caching
+ * {@link ResolvingDecoder}.
+ */
+public class DecoderResolver {
+
+  private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> 
DECODER_CACHES =
+      ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
+
+  private DecoderResolver() {}
+
+  public static <T> T resolveAndRead(
+      Decoder decoder, Schema readSchema, Schema fileSchema, ValueReader<T> 
reader, T reuse) throws IOException {
+    ResolvingDecoder resolver = DecoderResolver.resolve(decoder, readSchema, 
fileSchema);
+    T value = reader.read(resolver, reuse);
+    resolver.drain();
+    return value;
+  }
+
+  private static ResolvingDecoder resolve(Decoder decoder, Schema readSchema, 
Schema fileSchema) throws IOException {
+    Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
+    Map<Schema, ResolvingDecoder> fileSchemaToResolver = cache
+        .computeIfAbsent(readSchema, k -> new HashMap<>());
+
+    ResolvingDecoder resolver = fileSchemaToResolver.computeIfAbsent(
+        fileSchema,
+        schema -> newResolver(readSchema, schema));
+
+    resolver.configure(decoder);
+
+    return resolver;
+  }
+
+  private static ResolvingDecoder newResolver(Schema readSchema, Schema 
fileSchema) {
+    try {
+      return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema, 
null);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+}
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
index baecc25..46c594e 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
@@ -20,7 +20,6 @@
 package org.apache.iceberg.spark.data;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.avro.LogicalType;
@@ -28,14 +27,11 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.ResolvingDecoder;
 import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
-import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.data.avro.DecoderResolver;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -43,9 +39,6 @@ import org.apache.spark.sql.catalyst.InternalRow;
 
 public class SparkAvroReader implements DatumReader<InternalRow> {
 
-  private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> 
DECODER_CACHES =
-      ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
-
   private final Schema readSchema;
   private final ValueReader<InternalRow> reader;
   private Schema fileSchema = null;
@@ -68,34 +61,7 @@ public class SparkAvroReader implements 
DatumReader<InternalRow> {
 
   @Override
   public InternalRow read(InternalRow reuse, Decoder decoder) throws 
IOException {
-    ResolvingDecoder resolver = resolve(decoder);
-    InternalRow row = reader.read(resolver, reuse);
-    resolver.drain();
-    return row;
-  }
-
-  private ResolvingDecoder resolve(Decoder decoder) throws IOException {
-    Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
-    Map<Schema, ResolvingDecoder> fileSchemaToResolver = cache
-        .computeIfAbsent(readSchema, k -> new HashMap<>());
-
-    ResolvingDecoder resolver = fileSchemaToResolver.get(fileSchema);
-    if (resolver == null) {
-      resolver = newResolver();
-      fileSchemaToResolver.put(fileSchema, resolver);
-    }
-
-    resolver.configure(decoder);
-
-    return resolver;
-  }
-
-  private ResolvingDecoder newResolver() {
-    try {
-      return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema, 
null);
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
+    return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, 
reader, reuse);
   }
 
   private static class ReadBuilder extends 
AvroSchemaWithTypeVisitor<ValueReader<?>> {

Reply via email to