This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch parquet-1.15.x
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/parquet-1.15.x by this push:
     new 2fef79bb5 GH-3198: Allow specifying trusted classes by class name 
(#3199)
2fef79bb5 is described below

commit 2fef79bb53ea0a894e4bae861faa8105ca9da15b
Author: Gabor Szadovszky <[email protected]>
AuthorDate: Mon Apr 28 10:56:15 2025 +0200

    GH-3198: Allow specifying trusted classes by class name (#3199)
---
 parquet-avro/README.md                             |   3 +-
 .../org/apache/parquet/avro/AvroConverters.java    |  44 ++---
 .../org/apache/parquet/avro/AvroParquetReader.java |  22 +++
 .../org/apache/parquet/avro/AvroReadSupport.java   |  36 ++++-
 .../apache/parquet/avro/AvroRecordConverter.java   | 177 ++++++++++++++-------
 .../parquet/avro/AvroRecordMaterializer.java       |   5 +-
 .../apache/parquet/avro/ReflectClassValidator.java |  94 +++++++++++
 .../apache/parquet/UntrustedStringableClass.java   |  32 ----
 .../apache/parquet/avro/TestReflectReadWrite.java  |  35 ----
 .../apache/parquet/avro/TestStringBehavior.java    |  57 ++++++-
 10 files changed, 335 insertions(+), 170 deletions(-)

diff --git a/parquet-avro/README.md b/parquet-avro/README.md
index 4f6749158..644be9bfa 100644
--- a/parquet-avro/README.md
+++ b/parquet-avro/README.md
@@ -32,7 +32,8 @@ Apache Avro integration
 | `parquet.avro.read.schema`              | `String`  | The Avro schema to be 
used for reading. It shall be compatible with the file schema. The file schema 
will be used directly if not set. |
 | `parquet.avro.projection`               | `String`  | The Avro schema to be 
used for projection.                           |
 | `parquet.avro.compatible`               | `boolean` | Flag for compatibility 
mode. `true` for materializing Avro `IndexedRecord` objects, `false` for 
materializing the related objects for either generic, specific, or reflect 
records.<br/>The default value is `true`. |
-| `parquet.avro.readInt96AsFixed`        | `boolean` | Flag for handling the 
`INT96` Parquet types. `true` for converting it to the `fixed` Avro type, 
`false` for not handling `INT96` types (throwing exception).<br/>The default 
value is `false`.<br/>**NOTE: The `INT96` Parquet type is deprecated. This 
option is only to support old data.** |
+| `parquet.avro.readInt96AsFixed`         | `boolean` | Flag for handling the 
`INT96` Parquet types. `true` for converting it to the `fixed` Avro type, 
`false` for not handling `INT96` types (throwing exception).<br/>The default 
value is `false`.<br/>**NOTE: The `INT96` Parquet type is deprecated. This 
option is only to support old data.** |
+| `parquet.avro.serializable.classes`     | `String`  | List of the fully 
qualified class names separated by ',' that may be referenced from the Avro 
schema by "java-class" or "java-key-class" and are allowed to be loaded. |
 
 ### Configuration for writing
 
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
index 459449085..f3e8c2148 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroConverters.java
@@ -21,8 +21,6 @@ package org.apache.parquet.avro;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.util.Utf8;
@@ -36,13 +34,19 @@ import org.apache.parquet.schema.PrimitiveType;
 
 public class AvroConverters {
 
+  /**
+   * Contains the packages which classes are allowed to be loaded that may be 
referenced from the Avro schema by
+   * "java-class" or "java-key-class". It contains the packages parsed from 
system variable
+   * "org.apache.parquet.avro.SERIALIZABLE_PACKAGES".
+   *
+   * @deprecated will be removed in 2.0.0
+   */
+  @Deprecated
   public static final String[] SERIALIZABLE_PACKAGES;
 
   static {
-    SERIALIZABLE_PACKAGES = System.getProperty(
-            "org.apache.parquet.avro.SERIALIZABLE_PACKAGES",
-            "java.lang,java.math,java.io,java.net,org.apache.parquet.avro")
-        .split(",");
+    String prop = 
System.getProperty("org.apache.parquet.avro.SERIALIZABLE_PACKAGES");
+    SERIALIZABLE_PACKAGES = prop == null ? new String[0] : prop.split(",");
   }
 
   public abstract static class AvroGroupConverter extends GroupConverter {
@@ -272,7 +276,6 @@ public class AvroConverters {
 
     public FieldStringableConverter(ParentValueContainer parent, Class<?> 
stringableClass) {
       super(parent);
-      checkSecurity(stringableClass);
       stringableName = stringableClass.getName();
       try {
         this.ctor = stringableClass.getConstructor(String.class);
@@ -289,33 +292,6 @@ public class AvroConverters {
         throw new ParquetDecodingException("Cannot convert binary to " + 
stringableName, e);
       }
     }
-
-    private void checkSecurity(Class<?> clazz) throws SecurityException {
-      List<String> trustedPackages = Arrays.asList(SERIALIZABLE_PACKAGES);
-
-      boolean trustAllPackages = trustedPackages.size() == 1 && 
"*".equals(trustedPackages.get(0));
-      if (trustAllPackages || clazz.isPrimitive()) {
-        return;
-      }
-
-      boolean found = false;
-      Package thePackage = clazz.getPackage();
-      if (thePackage != null) {
-        for (String trustedPackage : trustedPackages) {
-          if (thePackage.getName().equals(trustedPackage)
-              || thePackage.getName().startsWith(trustedPackage + ".")) {
-            found = true;
-            break;
-          }
-        }
-        if (!found) {
-          throw new SecurityException("Forbidden " + clazz
-              + "! This class is not trusted to be included in Avro schema 
using java-class."
-              + " Please set org.apache.parquet.avro.SERIALIZABLE_PACKAGES 
system property"
-              + " with the packages you trust.");
-        }
-      }
-    }
   }
 
   static final class FieldEnumConverter extends BinaryConverter<Object> {
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
index 805d7579c..17a944d60 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
@@ -19,6 +19,8 @@
 package org.apache.parquet.avro;
 
 import java.io.IOException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificData;
@@ -179,6 +181,26 @@ public class AvroParquetReader<T> extends ParquetReader<T> 
{
       return this;
     }
 
+    public Builder<T> withSerializableClasses(String... classNames) {
+      if (classNames.length == 0) {
+        configuration.set(AvroReadSupport.SERIALIZABLE_CLASSES, null);
+      } else {
+        configuration.set(AvroReadSupport.SERIALIZABLE_CLASSES, 
String.join(",", classNames));
+      }
+      return this;
+    }
+
+    public Builder<T> withSerializableClasses(Class<?>... classes) {
+      if (classes.length == 0) {
+        configuration.set(AvroReadSupport.SERIALIZABLE_CLASSES, null);
+      } else {
+        configuration.set(
+            AvroReadSupport.SERIALIZABLE_CLASSES,
+            
Stream.of(classes).map(Class::getName).collect(Collectors.joining(",")));
+      }
+      return this;
+    }
+
     public Builder<T> disableCompatibility() {
       this.enableCompatibility = false;
       return this;
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
index 58a28bfeb..6d7ca398a 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -20,6 +20,8 @@ package org.apache.parquet.avro;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.hadoop.conf.Configuration;
@@ -61,6 +63,12 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
   public static final String READ_INT96_AS_FIXED = 
"parquet.avro.readInt96AsFixed";
   public static final boolean READ_INT96_AS_FIXED_DEFAULT = false;
 
+  /**
+   * List of the fully qualified class names separated by ',' that may be 
referenced from the Avro schema by
+   * "java-class" or "java-key-class" and are allowed to be loaded.
+   */
+  public static final String SERIALIZABLE_CLASSES = 
"parquet.avro.serializable.classes";
+
   /**
    * @param configuration       a configuration
    * @param requestedProjection the requested projection schema
@@ -83,6 +91,24 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
     configuration.set(AVRO_DATA_SUPPLIER, clazz.getName());
   }
 
+  public static void setSerializableClasses(Configuration configuration, 
String... classNames) {
+    if (classNames.length == 0) {
+      configuration.set(AvroReadSupport.SERIALIZABLE_CLASSES, null);
+    } else {
+      configuration.set(AvroReadSupport.SERIALIZABLE_CLASSES, String.join(",", 
classNames));
+    }
+  }
+
+  public static void setSerializableClasses(Configuration configuration, 
Class<?>... classes) {
+    if (classes.length == 0) {
+      configuration.set(AvroReadSupport.SERIALIZABLE_CLASSES, null);
+    } else {
+      configuration.set(
+          AvroReadSupport.SERIALIZABLE_CLASSES,
+          
Stream.of(classes).map(Class::getName).collect(Collectors.joining(",")));
+    }
+  }
+
   private GenericData model = null;
 
   public AvroReadSupport() {}
@@ -158,7 +184,15 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
     if (Boolean.parseBoolean(compatEnabled)) {
       return newCompatMaterializer(parquetSchema, avroSchema, model);
     }
-    return new AvroRecordMaterializer<T>(parquetSchema, avroSchema, model);
+    String[] serializableClasses = 
configuration.getStrings(SERIALIZABLE_CLASSES, null);
+
+    return new AvroRecordMaterializer<T>(
+        parquetSchema,
+        avroSchema,
+        model,
+        serializableClasses == null
+            ? new ReflectClassValidator.PackageValidator()
+            : new ReflectClassValidator.ClassValidator(serializableClasses));
   }
 
   @SuppressWarnings("unchecked")
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 441428bfa..895bada73 100644
--- 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -92,8 +92,9 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
   private final GenericData model;
   private final Map<Schema.Field, Object> recordDefaults = new 
HashMap<Schema.Field, Object>();
 
-  public AvroRecordConverter(MessageType parquetSchema, Schema avroSchema, 
GenericData baseModel) {
-    this(null, parquetSchema, avroSchema, baseModel);
+  AvroRecordConverter(
+      MessageType parquetSchema, Schema avroSchema, GenericData baseModel, 
ReflectClassValidator validator) {
+    this(null, parquetSchema, avroSchema, baseModel, validator);
     LogicalType logicalType = avroSchema.getLogicalType();
     Conversion<?> conversion = baseModel.getConversionFor(logicalType);
     this.rootContainer = ParentValueContainer.getConversionContainer(
@@ -108,8 +109,12 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
         avroSchema);
   }
 
-  public AvroRecordConverter(
-      ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema, 
GenericData model) {
+  AvroRecordConverter(
+      ParentValueContainer parent,
+      GroupType parquetSchema,
+      Schema avroSchema,
+      GenericData model,
+      ReflectClassValidator validator) {
     super(parent);
     this.avroSchema = avroSchema;
     this.model = (model == null ? ReflectData.get() : model);
@@ -142,7 +147,7 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
 
       Class<?> fieldClass = fields.get(avroField.name());
       converters[parquetFieldIndex] =
-          newConverter(nonNullSchema, parquetField, this.model, fieldClass, 
container);
+          newConverter(nonNullSchema, parquetField, this.model, fieldClass, 
container, validator);
 
       // @Stringable doesn't affect the reflected schema; must be enforced here
       if (recordClass != null && converters[parquetFieldIndex] instanceof 
FieldStringConverter) {
@@ -326,12 +331,18 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
         String.format("Parquet/Avro schema mismatch: Avro field '%s' not 
found", parquetFieldName));
   }
 
-  private static Converter newConverter(Schema schema, Type type, GenericData 
model, ParentValueContainer setter) {
-    return newConverter(schema, type, model, null, setter);
+  private static Converter newConverter(
+      Schema schema, Type type, GenericData model, ParentValueContainer 
setter, ReflectClassValidator validator) {
+    return newConverter(schema, type, model, null, setter, validator);
   }
 
   private static Converter newConverter(
-      Schema schema, Type type, GenericData model, Class<?> knownClass, 
ParentValueContainer setter) {
+      Schema schema,
+      Type type,
+      GenericData model,
+      Class<?> knownClass,
+      ParentValueContainer setter,
+      ReflectClassValidator validator) {
     LogicalType logicalType = schema.getLogicalType();
     Conversion<?> conversion;
 
@@ -381,21 +392,23 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
             && logicalType.getName().equals(LogicalTypes.uuid().getName())) {
           return new AvroConverters.FieldUUIDConverter(parent, 
type.asPrimitiveType());
         }
-        return newStringConverter(schema, model, parent);
+        return newStringConverter(schema, model, parent, validator);
       case RECORD:
-        return new AvroRecordConverter(parent, type.asGroupType(), schema, 
model);
+        return new AvroRecordConverter(parent, type.asGroupType(), schema, 
model, validator);
       case ENUM:
         return new AvroConverters.FieldEnumConverter(parent, schema, model);
       case ARRAY:
         Class<?> arrayDatumClass = getDatumClass(conversion, knownClass, 
schema, model);
         if (arrayDatumClass != null && arrayDatumClass.isArray()) {
-          return new AvroArrayConverter(parent, type.asGroupType(), schema, 
model, arrayDatumClass);
+          return new AvroArrayConverter(
+              parent, type.asGroupType(), schema, model, arrayDatumClass, 
validator);
         }
-        return new AvroCollectionConverter(parent, type.asGroupType(), schema, 
model, arrayDatumClass);
+        return new AvroCollectionConverter(
+            parent, type.asGroupType(), schema, model, arrayDatumClass, 
validator);
       case MAP:
-        return new MapConverter(parent, type.asGroupType(), schema, model);
+        return new MapConverter(parent, type.asGroupType(), schema, model, 
validator);
       case UNION:
-        return new AvroUnionConverter(parent, type, schema, model);
+        return new AvroUnionConverter(parent, type, schema, model, validator);
       case FIXED:
         return new AvroConverters.FieldFixedConverter(parent, schema, model);
       default:
@@ -404,8 +417,9 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
     }
   }
 
-  private static Converter newStringConverter(Schema schema, GenericData 
model, ParentValueContainer parent) {
-    Class<?> stringableClass = getStringableClass(schema, model);
+  private static Converter newStringConverter(
+      Schema schema, GenericData model, ParentValueContainer parent, 
ReflectClassValidator validator) {
+    Class<?> stringableClass = getStringableClass(schema, model, validator);
     if (stringableClass == String.class) {
       return new FieldStringConverter(parent);
     } else if (stringableClass == CharSequence.class) {
@@ -414,12 +428,13 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
     return new FieldStringableConverter(parent, stringableClass);
   }
 
-  private static Class<?> getStringableClass(Schema schema, GenericData model) 
{
+  private static Class<?> getStringableClass(Schema schema, GenericData model, 
ReflectClassValidator validator) {
     if (model instanceof SpecificData) {
       // both specific and reflect (and any subclasses) use this logic
       boolean isMap = (schema.getType() == Schema.Type.MAP);
       String stringableClass = schema.getProp(isMap ? JAVA_KEY_CLASS_PROP : 
JAVA_CLASS_PROP);
       if (stringableClass != null) {
+        validator.validate(stringableClass);
         try {
           return ClassUtils.forName(model.getClassLoader(), stringableClass);
         } catch (ClassNotFoundException e) {
@@ -560,12 +575,13 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
     private Class<?> containerClass;
     private Collection<Object> container;
 
-    public AvroCollectionConverter(
+    AvroCollectionConverter(
         ParentValueContainer parent,
         GroupType type,
         Schema avroSchema,
         GenericData model,
-        Class<?> containerClass) {
+        Class<?> containerClass,
+        ReflectClassValidator validator) {
       this.parent = parent;
       this.avroSchema = avroSchema;
       this.containerClass = containerClass;
@@ -575,16 +591,21 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
       // matching it against the element schema.
       if (isElementType(repeatedType, elementSchema)) {
         // the element type is the repeated type (and required)
-        converter = newConverter(elementSchema, repeatedType, model, new 
ParentValueContainer() {
-          @Override
-          @SuppressWarnings("unchecked")
-          public void add(Object value) {
-            container.add(value);
-          }
-        });
+        converter = newConverter(
+            elementSchema,
+            repeatedType,
+            model,
+            new ParentValueContainer() {
+              @Override
+              @SuppressWarnings("unchecked")
+              public void add(Object value) {
+                container.add(value);
+              }
+            },
+            validator);
       } else {
         // the element is wrapped in a synthetic group and may be optional
-        converter = new ElementConverter(repeatedType.asGroupType(), 
elementSchema, model);
+        converter = new ElementConverter(repeatedType.asGroupType(), 
elementSchema, model, validator);
       }
     }
 
@@ -631,17 +652,22 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
       private Object element;
       private final Converter elementConverter;
 
-      public ElementConverter(GroupType repeatedType, Schema elementSchema, 
GenericData model) {
+      ElementConverter(
+          GroupType repeatedType, Schema elementSchema, GenericData model, 
ReflectClassValidator validator) {
         Type elementType = repeatedType.getType(0);
         Schema nonNullElementSchema = 
AvroSchemaConverter.getNonNull(elementSchema);
-        this.elementConverter =
-            newConverter(nonNullElementSchema, elementType, model, new 
ParentValueContainer() {
+        this.elementConverter = newConverter(
+            nonNullElementSchema,
+            elementType,
+            model,
+            new ParentValueContainer() {
               @Override
               @SuppressWarnings("unchecked")
               public void add(Object value) {
                 ElementConverter.this.element = value;
               }
-            });
+            },
+            validator);
       }
 
       @Override
@@ -683,12 +709,13 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
     private Class<?> elementClass;
     private Collection<?> container;
 
-    public AvroArrayConverter(
+    AvroArrayConverter(
         ParentValueContainer parent,
         GroupType type,
         Schema avroSchema,
         GenericData model,
-        Class<?> arrayClass) {
+        Class<?> arrayClass,
+        ReflectClassValidator validator) {
       this.parent = parent;
       this.avroSchema = avroSchema;
 
@@ -703,10 +730,11 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
       // matching it against the element schema.
       if (isElementType(repeatedType, elementSchema)) {
         // the element type is the repeated type (and required)
-        converter = newConverter(elementSchema, repeatedType, model, 
elementClass, setter);
+        converter = newConverter(elementSchema, repeatedType, model, 
elementClass, setter, validator);
       } else {
         // the element is wrapped in a synthetic group and may be optional
-        converter = new ArrayElementConverter(repeatedType.asGroupType(), 
elementSchema, model, setter);
+        converter =
+            new ArrayElementConverter(repeatedType.asGroupType(), 
elementSchema, model, setter, validator);
       }
     }
 
@@ -846,18 +874,23 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
       private boolean isSet;
       private final Converter elementConverter;
 
-      public ArrayElementConverter(
+      ArrayElementConverter(
           GroupType repeatedType,
           Schema elementSchema,
           GenericData model,
-          final ParentValueContainer setter) {
+          final ParentValueContainer setter,
+          ReflectClassValidator validator) {
         Type elementType = repeatedType.getType(0);
         Preconditions.checkArgument(
             !elementClass.isPrimitive() || elementType.isRepetition(REQUIRED),
             "Cannot convert list of optional elements to primitive array");
         Schema nonNullElementSchema = 
AvroSchemaConverter.getNonNull(elementSchema);
         this.elementConverter = newConverter(
-            nonNullElementSchema, elementType, model, elementClass, new 
ParentValueContainer() {
+            nonNullElementSchema,
+            elementType,
+            model,
+            elementClass,
+            new ParentValueContainer() {
               @Override
               public void add(Object value) {
                 isSet = true;
@@ -911,7 +944,8 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
                 isSet = true;
                 setter.addDouble(value);
               }
-            });
+            },
+            validator);
       }
 
       @Override
@@ -978,7 +1012,11 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
     private Object memberValue = null;
 
     public AvroUnionConverter(
-        ParentValueContainer parent, Type parquetSchema, Schema avroSchema, 
GenericData model) {
+        ParentValueContainer parent,
+        Type parquetSchema,
+        Schema avroSchema,
+        GenericData model,
+        ReflectClassValidator validator) {
       super(parent);
       GroupType parquetGroup = parquetSchema.asGroupType();
       this.memberConverters = new Converter[parquetGroup.getFieldCount()];
@@ -988,8 +1026,11 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
         Schema memberSchema = avroSchema.getTypes().get(index);
         if (!memberSchema.getType().equals(Schema.Type.NULL)) {
           Type memberType = parquetGroup.getType(parquetIndex);
-          memberConverters[parquetIndex] =
-              newConverter(memberSchema, memberType, model, new 
ParentValueContainer() {
+          memberConverters[parquetIndex] = newConverter(
+              memberSchema,
+              memberType,
+              model,
+              new ParentValueContainer() {
                 @Override
                 public void add(Object value) {
                   Preconditions.checkArgument(
@@ -997,7 +1038,8 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
                       "Union is resolving to more than one type");
                   memberValue = value;
                 }
-              });
+              },
+              validator);
           parquetIndex++; // Note for nulls the parquetIndex id not increased
         }
       }
@@ -1027,10 +1069,15 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
     private final Class<?> mapClass;
     private Map<K, V> map;
 
-    public MapConverter(ParentValueContainer parent, GroupType mapType, Schema 
mapSchema, GenericData model) {
+    MapConverter(
+        ParentValueContainer parent,
+        GroupType mapType,
+        Schema mapSchema,
+        GenericData model,
+        ReflectClassValidator validator) {
       this.parent = parent;
       GroupType repeatedKeyValueType = mapType.getType(0).asGroupType();
-      this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, 
mapSchema, model);
+      this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, 
mapSchema, model, validator);
       this.schema = mapSchema;
       this.mapClass = getDatumClass(mapSchema, model);
     }
@@ -1066,24 +1113,34 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
       private final Converter keyConverter;
       private final Converter valueConverter;
 
-      public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema, 
GenericData model) {
-        keyConverter = newStringConverter(mapSchema, model, new 
ParentValueContainer() {
-          @Override
-          @SuppressWarnings("unchecked")
-          public void add(Object value) {
-            MapKeyValueConverter.this.key = (K) value;
-          }
-        });
+      MapKeyValueConverter(
+          GroupType keyValueType, Schema mapSchema, GenericData model, 
ReflectClassValidator validator) {
+        keyConverter = newStringConverter(
+            mapSchema,
+            model,
+            new ParentValueContainer() {
+              @Override
+              @SuppressWarnings("unchecked")
+              public void add(Object value) {
+                MapKeyValueConverter.this.key = (K) value;
+              }
+            },
+            validator);
 
         Type valueType = keyValueType.getType(1);
         Schema nonNullValueSchema = 
AvroSchemaConverter.getNonNull(mapSchema.getValueType());
-        valueConverter = newConverter(nonNullValueSchema, valueType, model, 
new ParentValueContainer() {
-          @Override
-          @SuppressWarnings("unchecked")
-          public void add(Object value) {
-            MapKeyValueConverter.this.value = (V) value;
-          }
-        });
+        valueConverter = newConverter(
+            nonNullValueSchema,
+            valueType,
+            model,
+            new ParentValueContainer() {
+              @Override
+              @SuppressWarnings("unchecked")
+              public void add(Object value) {
+                MapKeyValueConverter.this.value = (V) value;
+              }
+            },
+            validator);
       }
 
       @Override
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
index 0034149e1..b2e2ddc2b 100644
--- 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
+++ 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
@@ -28,8 +28,9 @@ class AvroRecordMaterializer<T> extends RecordMaterializer<T> 
{
 
   private AvroRecordConverter<T> root;
 
-  public AvroRecordMaterializer(MessageType requestedSchema, Schema 
avroSchema, GenericData baseModel) {
-    this.root = new AvroRecordConverter<T>(requestedSchema, avroSchema, 
baseModel);
+  AvroRecordMaterializer(
+      MessageType requestedSchema, Schema avroSchema, GenericData baseModel, 
ReflectClassValidator validator) {
+    this.root = new AvroRecordConverter<T>(requestedSchema, avroSchema, 
baseModel, validator);
   }
 
   @Override
diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/ReflectClassValidator.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/ReflectClassValidator.java
new file mode 100644
index 000000000..bcc6807ac
--- /dev/null
+++ 
b/parquet-avro/src/main/java/org/apache/parquet/avro/ReflectClassValidator.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+abstract class ReflectClassValidator {
+
+  abstract void validate(String className);
+
+  static class PackageValidator extends ReflectClassValidator {
+
+    private final List<String> trustedPackagePrefixes = 
Stream.of(AvroConverters.SERIALIZABLE_PACKAGES)
+        .map(p -> p.endsWith(".") ? p : p + ".")
+        .collect(Collectors.toList());
+    private final boolean trustAllPackages =
+        trustedPackagePrefixes.size() == 1 && 
"*".equals(trustedPackagePrefixes.get(0));
+    // The primitive "class names" based on Class.isPrimitive()
+    private static final Set<String> PRIMITIVES = new HashSet<>(Arrays.asList(
+        Boolean.TYPE.getName(),
+        Character.TYPE.getName(),
+        Byte.TYPE.getName(),
+        Short.TYPE.getName(),
+        Integer.TYPE.getName(),
+        Long.TYPE.getName(),
+        Float.TYPE.getName(),
+        Double.TYPE.getName(),
+        Void.TYPE.getName()));
+
+    @Override
+    public void validate(String className) {
+      if (trustAllPackages || PRIMITIVES.contains(className)) {
+        return;
+      }
+
+      for (String packagePrefix : trustedPackagePrefixes) {
+        if (className.startsWith(packagePrefix)) {
+          return;
+        }
+      }
+
+      forbiddenClass(className);
+    }
+  }
+
+  static class ClassValidator extends ReflectClassValidator {
+    private final Set<String> trustedClassNames = new HashSet<>();
+
+    ClassValidator(String... classNames) {
+      for (String className : classNames) {
+        addTrustedClassName(className);
+      }
+    }
+
+    void addTrustedClassName(String className) {
+      trustedClassNames.add(className);
+    }
+
+    @Override
+    void validate(String className) {
+      if (!trustedClassNames.contains(className)) {
+        forbiddenClass(className);
+      }
+    }
+  }
+
+  private static void forbiddenClass(String className) {
+    throw new SecurityException("Forbidden " + className
+        + "! This class is not trusted to be included in Avro schema using 
java-class or java-key-class."
+        + " Please set the Parquet/Hadoop configuration 
parquet.avro.serializable.classes"
+        + " with the classes you trust.");
+  }
+}
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/UntrustedStringableClass.java 
b/parquet-avro/src/test/java/org/apache/parquet/UntrustedStringableClass.java
deleted file mode 100644
index abdeeb129..000000000
--- 
a/parquet-avro/src/test/java/org/apache/parquet/UntrustedStringableClass.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet;
-
-public class UntrustedStringableClass {
-  private final String value;
-
-  public UntrustedStringableClass(String value) {
-    this.value = value;
-  }
-
-  @Override
-  public String toString() {
-    return this.value;
-  }
-}
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java
index 2acd827ad..5b6260e11 100644
--- 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java
+++ 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java
@@ -37,7 +37,6 @@ import org.apache.avro.reflect.Stringable;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.UntrustedStringableClass;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -77,40 +76,6 @@ public class TestReflectReadWrite {
     }
   }
 
-  @Test(expected = SecurityException.class)
-  public void testUntrustedStringableClass() {
-    new AvroConverters.FieldStringableConverter(
-        new ParentValueContainer() {
-          @Override
-          public void add(Object value) {}
-
-          @Override
-          public void addBoolean(boolean value) {}
-
-          @Override
-          public void addInt(int value) {}
-
-          @Override
-          public void addLong(long value) {}
-
-          @Override
-          public void addFloat(float value) {}
-
-          @Override
-          public void addDouble(double value) {}
-
-          @Override
-          public void addChar(char value) {}
-
-          @Override
-          public void addByte(byte value) {}
-
-          @Override
-          public void addShort(short value) {}
-        },
-        UntrustedStringableClass.class);
-  }
-
   private GenericRecord getGenericPojoUtf8() {
     Schema schema = ReflectData.get().getSchema(Pojo.class);
     GenericData.Record record = new GenericData.Record(schema);
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestStringBehavior.java 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestStringBehavior.java
index 39f3a2272..51509dc17 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestStringBehavior.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestStringBehavior.java
@@ -37,6 +37,7 @@ import org.apache.avro.reflect.AvroSchema;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.Stringable;
+import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
@@ -177,13 +178,11 @@ public class TestStringBehavior {
     }
 
     org.apache.parquet.avro.StringBehaviorTest parquetRecord;
-    Configuration conf = new Configuration();
-    conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
-    AvroReadSupport.setAvroDataSupplier(conf, SpecificDataSupplier.class);
-    AvroReadSupport.setAvroReadSchema(conf, 
org.apache.parquet.avro.StringBehaviorTest.getClassSchema());
     try (ParquetReader<org.apache.parquet.avro.StringBehaviorTest> parquet =
         
AvroParquetReader.<org.apache.parquet.avro.StringBehaviorTest>builder(parquetFile)
-            .withConf(conf)
+            .withDataModel(SpecificData.get())
+            .withSerializableClasses(BigDecimal.class)
+            .withCompatibility(false)
             .build()) {
       parquetRecord = parquet.read();
     }
@@ -258,6 +257,7 @@ public class TestStringBehavior {
     conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
     AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class);
     AvroReadSupport.setAvroReadSchema(conf, reflectSchema);
+    AvroReadSupport.setSerializableClasses(conf, BigDecimal.class);
     try (ParquetReader<ReflectRecord> parquet = 
AvroParquetReader.<ReflectRecord>builder(parquetFile)
         .withConf(conf)
         .build()) {
@@ -326,6 +326,7 @@ public class TestStringBehavior {
     AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class);
     AvroReadSupport.setAvroReadSchema(conf, reflectSchema);
     AvroReadSupport.setRequestedProjection(conf, reflectSchema);
+    AvroReadSupport.setSerializableClasses(conf, BigDecimal.class);
     try (ParquetReader<ReflectRecordJavaClass> parquet = 
AvroParquetReader.<ReflectRecordJavaClass>builder(
             parquetFile)
         .withConf(conf)
@@ -348,6 +349,52 @@ public class TestStringBehavior {
     Assert.assertEquals("Should have the correct BigDecimal value", 
BIG_DECIMAL, parquetRecord.stringable_class);
   }
 
+  @Test(expected = SecurityException.class)
+  public void testSpecificValidationFail() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+    AvroReadSupport.setAvroDataSupplier(conf, SpecificDataSupplier.class);
+    AvroReadSupport.setAvroReadSchema(conf, 
org.apache.parquet.avro.StringBehaviorTest.getClassSchema());
+    try (ParquetReader<org.apache.parquet.avro.StringBehaviorTest> parquet =
+        
AvroParquetReader.<org.apache.parquet.avro.StringBehaviorTest>builder(parquetFile)
+            .withConf(conf)
+            .build()) {
+      parquet.read();
+    }
+  }
+
+  @Test(expected = SecurityException.class)
+  public void testReflectValidationFail() throws IOException {
+    Schema reflectSchema = ReflectData.get().getSchema(ReflectRecord.class);
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+    AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class);
+    AvroReadSupport.setAvroReadSchema(conf, reflectSchema);
+    try (ParquetReader<ReflectRecord> parquet = 
AvroParquetReader.<ReflectRecord>builder(parquetFile)
+        .withConf(conf)
+        .build()) {
+      parquet.read();
+    }
+  }
+
+  @Test(expected = SecurityException.class)
+  public void testReflectJavaClassValidationFail() throws IOException {
+    Schema reflectSchema = 
ReflectData.get().getSchema(ReflectRecordJavaClass.class);
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+    AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class);
+    AvroReadSupport.setAvroReadSchema(conf, reflectSchema);
+    AvroReadSupport.setRequestedProjection(conf, reflectSchema);
+    try (ParquetReader<ReflectRecordJavaClass> parquet = 
AvroParquetReader.<ReflectRecordJavaClass>builder(
+            parquetFile)
+        .withConf(conf)
+        .build()) {
+      parquet.read();
+    }
+  }
+
   public static class ReflectRecord {
     private String default_class;
     private String string_class;


Reply via email to