xiarixiaoyao commented on a change in pull request #3668:
URL: https://github.com/apache/hudi/pull/3668#discussion_r711563248



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
##########
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.Types.Field;
+import org.apache.hudi.internal.schema.Types.RecordType;
+import org.apache.hudi.internal.schema.action.MergeSchemaAction;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class InternalSchemaUtils {
+
+  private InternalSchemaUtils() {
+  }
+
+  public static Map<Integer, String> buildIdToName(Type type) {
+    Map<Integer, String> result = new HashMap<>();
+    buildNameToId(type).forEach((k, v) -> result.put(v, k));
+    return result;
+  }
+
+  public static Map<String, Integer> buildNameToId(Type type) {
+    Deque<String> fieldNames = new LinkedList<>();
+    Map<String, Integer> nameToId = new HashMap<>();
+    visitNameToId(type, fieldNames, nameToId);
+    return nameToId;
+  }
+
+  public static Map<Integer, Field> buildIdToField(Type type) {
+    Map<Integer, Field> idToField = new HashMap<>();
+    visitIdToField(type, idToField);
+    return idToField;
+  }
+
+  private static void visitIdToField(Type type, Map<Integer, Field> index) {
+    switch (type.typeId()) {
+      case RECORD:
+        RecordType record = (RecordType) type;
+        for (Field field : record.fields()) {
+          visitIdToField(field.type(), index);
+          index.put(field.fieldId(), field);
+        }
+        return;
+      case ARRAY:
+        Types.ArrayType array = (Types.ArrayType) type;
+        visitIdToField(array.elementType(), index);
+        for (Field field : array.fields()) {
+          index.put(field.fieldId(), field);
+        }
+        return;
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        visitIdToField(map.keyType(), index);
+        visitIdToField(map.valueType(), index);
+        for (Field field : map.fields()) {
+          index.put(field.fieldId(), field);
+        }
+        return;
+      default:
+        return;
+    }
+  }
+
+  private static void visitNameToId(Type type, Deque<String> fieldNames, 
Map<String, Integer> nameToId) {
+    switch (type.typeId()) {
+      case RECORD:
+        RecordType record = (RecordType) type;
+        for (Field field : record.fields()) {
+          fieldNames.push(field.name());
+          visitNameToId(field.type(), fieldNames, nameToId);
+          fieldNames.pop();
+          nameToId.put(createFullName(field.name(), fieldNames), 
field.fieldId());
+        }
+        return;
+      case ARRAY:
+        Types.ArrayType array = (Types.ArrayType) type;
+        Field elementField = array.field(array.elementId());
+        fieldNames.push(elementField.name());
+        visitNameToId(array.elementType(), fieldNames, nameToId);
+        fieldNames.pop();
+        nameToId.put(createFullName("element", fieldNames), array.elementId());
+        return;
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        Field keyField = map.field(map.keyId());
+        Field valueField = map.field(map.valueId());
+        fieldNames.push(keyField.name());
+        visitNameToId(map.keyType(), fieldNames, nameToId);
+        fieldNames.pop();
+        fieldNames.push(valueField.name());
+        visitNameToId(map.valueType(), fieldNames, nameToId);
+        fieldNames.pop();
+        nameToId.put(createFullName("key", fieldNames), map.keyId());
+        nameToId.put(createFullName("value", fieldNames), map.valueId());
+        return;
+      default:
+    }
+  }
+
+  public static String createFullName(String name, Deque<String> fieldNames) {
+    String result = name;
+    if (!fieldNames.isEmpty()) {
+      List<String> parentNames = new ArrayList<>();
+      fieldNames.descendingIterator().forEachRemaining(parentNames::add);
+      result = parentNames.stream().collect(Collectors.joining(".")) + "." + 
result;
+    }
+    return result;
+  }
+
+  public static Map<Integer, Integer> index2Parents(Types.RecordType record) {
+    Map<Integer, Integer> result = new HashMap<>();
+    Deque<Integer> parentIds = new LinkedList<>();
+    index2Parents(record, parentIds, result);
+    return result;
+  }
+
+  private static void index2Parents(Type type, Deque<Integer> pids, 
Map<Integer, Integer> id2p) {
+    switch (type.typeId()) {
+      case RECORD:
+        Types.RecordType record = (Types.RecordType)type;
+        for (Field f : record.fields()) {
+          pids.push(f.fieldId());
+          index2Parents(f.type(), pids, id2p);
+          pids.pop();
+        }
+
+        for (Field f : record.fields()) {
+          // root record has no parent id.
+          if (!pids.isEmpty()) {
+            Integer pid = pids.peek();
+            id2p.put(f.fieldId(), pid);
+          }
+        }
+        return;
+      case ARRAY:
+        Types.ArrayType array = (Types.ArrayType) type;
+        Types.Field elementField = array.field(array.elementId());
+        pids.push(elementField.fieldId());
+        index2Parents(elementField.type(), pids, id2p);
+        pids.pop();
+        id2p.put(array.elementId(), pids.peek());
+        return;
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        Types.Field keyField = map.field(map.keyId());
+        Types.Field valueField = map.field(map.valueId());
+        // visit key
+        pids.push(map.keyId());
+        index2Parents(keyField.type(), pids, id2p);
+        pids.pop();
+        // visit value
+        pids.push(map.valueId());
+        index2Parents(valueField.type(), pids, id2p);
+        pids.pop();
+        id2p.put(map.keyId(), pids.peek());
+        id2p.put(map.valueId(), pids.peek());
+        return;
+      default:
+    }
+  }
+
+  public static Type refreshNewId(Type type, AtomicInteger nextId) {
+    switch (type.typeId()) {
+      case RECORD:
+        RecordType record = (RecordType) type;
+        List<Field> oldFields = record.fields();
+        List<Type> fieldTypes = new ArrayList<>();
+        int currentId = nextId.get();
+        nextId.set(currentId + record.fields().size());
+        for (Types.Field f : oldFields) {
+          fieldTypes.add(refreshNewId(f.type(), nextId));
+        }
+        List<Types.Field> internalFields = new ArrayList<>();
+        for (int i = 0; i < oldFields.size(); i++) {
+          Field oldField = oldFields.get(i);
+          internalFields.add(Types.Field.get(currentId++, 
oldField.isOptional(), oldField.name(), fieldTypes.get(i), oldField.doc()));
+        }
+        return Types.RecordType.get(internalFields);
+      case ARRAY:
+        Types.ArrayType array = (Types.ArrayType) type;
+        int elementId = nextId.get();
+        nextId.set(elementId + 1);
+        Type elementType = refreshNewId(array.elementType(), nextId);
+        return Types.ArrayType.get(elementId, array.isElementOptional(), 
elementType);
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        int keyId = nextId.get();
+        int valueId = keyId + 1;
+        nextId.set(keyId + 2);
+        Type keyType = refreshNewId(map.keyType(), nextId);
+        Type valueType = refreshNewId(map.valueType(), nextId);
+        return Types.MapType.get(keyId, valueId, keyType, valueType, 
map.isValueOptional());
+      default:
+        return type;
+    }
+  }
+
+  public static Type buildTypeFromAvroSchema(Schema schema) {
+    // set flag to check this has not been visited.
+    Deque<String> visited = new LinkedList();
+    Deque<String> fieldNames = new LinkedList();
+    AtomicInteger nextId = new AtomicInteger(1);
+    return visitAvroSchemaToBuildType(schema, visited, fieldNames, true, 
nextId);
+  }
+
+  private static Type visitAvroSchemaToBuildType(Schema schema, Deque<String> 
visited, Deque<String> fieldNames, Boolean firstVisitRoot, AtomicInteger 
nextId) {
+    switch (schema.getType()) {
+      case RECORD:
+        String name = schema.getFullName();
+        if (visited.contains(name)) {
+          throw new HoodieSchemaException(String.format("cannot convert 
recursive avro record %s", name));
+        }
+        visited.push(name);
+        List<Schema.Field> fields = schema.getFields();
+        List<Type> fieldTypes = new ArrayList<>(fields.size());
+        int nextAssignId = nextId.get();
+        // when first visit root record, set nextAssignId = 0;
+        if (firstVisitRoot) {
+          nextAssignId = 0;
+        }
+        nextId.set(nextAssignId + fields.size());
+        fields.stream().forEach(field -> {
+          fieldNames.addLast(name);
+          fieldTypes.add(visitAvroSchemaToBuildType(field.schema(), visited, 
fieldNames, false, nextId));
+        });
+        visited.pop();
+        List<Types.Field> internalFields = new ArrayList<>(fields.size());
+
+        for (int i  = 0; i < fields.size(); i++) {
+          Schema.Field field = fields.get(i);
+          Type fieldType = fieldTypes.get(i);
+          internalFields.add(Types.Field.get(nextAssignId, 
AvroSchemaUtil.isOptional(field.schema()), field.name(), fieldType, 
field.doc()));
+          nextAssignId += 1;
+        }
+        return Types.RecordType.get(internalFields);
+      case UNION:
+        List<Type> fTypes = new ArrayList<>();
+        schema.getTypes().stream().forEach(t -> {
+          fTypes.add(visitAvroSchemaToBuildType(t, visited, fieldNames, false, 
nextId));
+        });
+        return fTypes.get(0) == null ? fTypes.get(1) : fTypes.get(0);
+      case ARRAY:
+        Schema elementSchema = schema.getElementType();
+        fieldNames.addLast("element");
+        int elementId = nextId.get();
+        nextId.set(elementId + 1);
+        Type elementType = visitAvroSchemaToBuildType(elementSchema, visited, 
fieldNames, false, nextId);
+        fieldNames.pop();
+        return Types.ArrayType.get(elementId, 
AvroSchemaUtil.isOptional(schema.getElementType()), elementType);
+      case MAP:
+        fieldNames.addLast("value");
+        int keyId = nextId.get();
+        int valueId = keyId + 1;
+        nextId.set(valueId + 1);
+        Type valueType = visitAvroSchemaToBuildType(schema.getValueType(),  
visited, fieldNames, false, nextId);
+        return Types.MapType.get(keyId, valueId, Types.StringType.get(), 
valueType, AvroSchemaUtil.isOptional(schema.getValueType()));
+      default:
+        return visitAvroPrimitiveToBuildInternalType(schema);
+    }
+  }
+
+  private static Type visitAvroPrimitiveToBuildInternalType(Schema primitive) {
+    LogicalType logical = primitive.getLogicalType();
+    if (logical != null) {
+      String name = logical.getName();
+      if (logical instanceof LogicalTypes.Decimal) {
+        return Types.DecimalType.get(
+            ((LogicalTypes.Decimal) logical).getPrecision(),
+            ((LogicalTypes.Decimal) logical).getScale());
+
+      } else if (logical instanceof LogicalTypes.Date) {
+        return Types.DateType.get();
+
+      } else if (
+          logical instanceof LogicalTypes.TimeMillis
+              || logical instanceof LogicalTypes.TimeMicros) {
+        return Types.TimeType.get();
+
+      } else if (
+          logical instanceof LogicalTypes.TimestampMillis
+              || logical instanceof LogicalTypes.TimestampMicros) {
+        return Types.TimestampType.get();
+      } else if (LogicalTypes.uuid().getName().equals(name)) {
+        return Types.UUIDType.get();
+      }
+    }
+
+    switch (primitive.getType()) {
+      case BOOLEAN:
+        return Types.BooleanType.get();
+      case INT:
+        return Types.IntType.get();
+      case LONG:
+        return Types.LongType.get();
+      case FLOAT:
+        return Types.FloatType.get();
+      case DOUBLE:
+        return Types.DoubleType.get();
+      case STRING:
+      case ENUM:
+        return Types.StringType.get();
+      case FIXED:
+        return Types.FixedType.getFixed(primitive.getFixedSize());
+      case BYTES:
+        return Types.BinaryType.get();
+      case NULL:
+        return null;
+      default:
+        throw new UnsupportedOperationException("Unsupported primitive type: " 
+ primitive);
+    }
+  }
+
+  public static Schema buildAvroSchemaFromType(Type type, String name) {
+    Map<Type, Schema> cache = new HashMap<>();
+    return visitInternalSchemaToBuildAvroSchema(type, cache, name);
+  }
+
+  public static Schema buildAvroSchemaFromInternalSchema(InternalSchema 
schema, String recordName) {
+    Map<Type, Schema> cache = new HashMap<>();
+    return visitInternalSchemaToBuildAvroSchema(schema.getRecord(), cache, 
recordName);
+  }
+
+  private static Schema visitInternalSchemaToBuildAvroSchema(Type type, 
Map<Type, Schema> cache, String recordName) {
+    switch (type.typeId()) {
+      case RECORD:
+        Types.RecordType record = (Types.RecordType) type;
+        List<Schema> schemas = new ArrayList<>();
+        record.fields().forEach(f -> {
+          Schema tempSchema = visitInternalSchemaToBuildAvroSchema(f.type(), 
cache, recordName + "_" + f.name());
+          // convert tempSchema
+          Schema result = f.isOptional() ? 
AvroSchemaUtil.nullableSchema(tempSchema) : tempSchema;
+          schemas.add(result);
+        });
+        // check visited
+        Schema recordSchema;
+        recordSchema = cache.get(record);
+        if (recordSchema != null) {
+          return recordSchema;
+        }
+        recordSchema = visitInternalRecordToBuildAvroRecord(record, schemas, 
recordName);
+        cache.put(record, recordSchema);
+        return recordSchema;
+      case ARRAY:
+        Types.ArrayType array = (Types.ArrayType) type;
+        Schema elementSchema;
+        elementSchema = 
visitInternalSchemaToBuildAvroSchema(array.elementType(), cache, recordName);
+        Schema arraySchema;
+        arraySchema = cache.get(array);
+        if (arraySchema != null) {
+          return arraySchema;
+        }
+        arraySchema = visitInternalArrayToBuildAvroArray(array, elementSchema);
+        cache.put(array, arraySchema);
+        return arraySchema;
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        Schema keySchema;
+        Schema valueSchema;
+        keySchema = visitInternalSchemaToBuildAvroSchema(map.keyType(), cache, 
recordName);
+        valueSchema = visitInternalSchemaToBuildAvroSchema(map.valueType(), 
cache, recordName);
+        Schema mapSchema;
+        mapSchema = cache.get(map);
+        if (mapSchema != null) {
+          return mapSchema;
+        }
+        mapSchema = visitInternalMapToBuildAvroMap(map, keySchema, 
valueSchema);
+        cache.put(map, mapSchema);
+        return mapSchema;
+      default:
+        Schema primitiveSchema = 
visitInternalPrimitiveToBuildAvroMap((Type.PrimitiveType) type);
+        cache.put(type, primitiveSchema);
+        return primitiveSchema;
+    }
+  }
+
+  private static Schema visitInternalRecordToBuildAvroRecord(Types.RecordType 
record, List<Schema> fieldSchemas, String recordName) {
+    List<Field> fields = record.fields();
+    List<Schema.Field> avroFields = new ArrayList<>();
+    for (int i = 0; i < fields.size(); i++) {
+      Field f = fields.get(i);
+      Schema.Field field = new Schema.Field(f.name(), fieldSchemas.get(i), 
f.doc(), f.isOptional() ? JsonProperties.NULL_VALUE : null);
+      avroFields.add(field);
+    }
+    return Schema.createRecord(recordName, null, null, false, avroFields);
+  }
+
+  private static Schema visitInternalArrayToBuildAvroArray(Types.ArrayType 
array, Schema elementSchema) {
+    Schema result;
+    if (array.isElementOptional()) {
+      result = 
Schema.createArray(AvroSchemaUtil.nullableSchema(elementSchema));
+    } else {
+      result = Schema.createArray(elementSchema);
+    }
+    return result;
+  }
+
+  private static Schema visitInternalMapToBuildAvroMap(Types.MapType map, 
Schema keySchema, Schema valueSchema) {
+    Schema mapSchema;
+    if (keySchema.getType() == Schema.Type.STRING) {
+      mapSchema = Schema.createMap(map.isValueOptional() ? 
AvroSchemaUtil.nullableSchema(valueSchema) : valueSchema);
+    } else {
+      throw new HoodieSchemaException("only support StringType key for avro 
MapType");
+    }
+    return mapSchema;
+  }
+
+  private static Schema 
visitInternalPrimitiveToBuildAvroMap(Type.PrimitiveType primitive) {
+    Schema primitiveSchema;
+    switch (primitive.typeId()) {
+      case BOOLEAN:
+        primitiveSchema = Schema.create(Schema.Type.BOOLEAN);
+        break;
+      case INT:
+        primitiveSchema = Schema.create(Schema.Type.INT);
+        break;
+      case LONG:
+        primitiveSchema = Schema.create(Schema.Type.LONG);
+        break;
+      case FLOAT:
+        primitiveSchema = Schema.create(Schema.Type.FLOAT);
+        break;
+      case DOUBLE:
+        primitiveSchema = Schema.create(Schema.Type.DOUBLE);
+        break;
+      case DATE:
+        primitiveSchema = LogicalTypes.date()
+            .addToSchema(Schema.create(Schema.Type.INT));
+        break;
+      case TIME:
+        primitiveSchema = LogicalTypes.timeMicros()
+            .addToSchema(Schema.create(Schema.Type.LONG));
+        break;
+      case TIMESTAMP:
+        primitiveSchema = LogicalTypes.timestampMicros()
+            .addToSchema(Schema.create(Schema.Type.LONG));
+        break;
+      case STRING:
+        primitiveSchema = Schema.create(Schema.Type.STRING);
+        break;
+      case UUID:
+        primitiveSchema = LogicalTypes.uuid()
+            .addToSchema(Schema.createFixed("uuid_fixed", null, null, 16));
+        break;
+      case FIXED:
+        Types.FixedType fixed = (Types.FixedType) primitive;
+        primitiveSchema = Schema.createFixed("fixed_" + fixed.getFixedSize(), 
null, null, fixed.getFixedSize());
+        break;
+      case BINARY:
+        primitiveSchema = Schema.create(Schema.Type.BYTES);
+        break;
+      case DECIMAL:
+        Types.DecimalType decimal = (Types.DecimalType) primitive;
+        primitiveSchema = LogicalTypes.decimal(decimal.precision(), 
decimal.scale())
+            .addToSchema(Schema.createFixed(
+                "decimal_" + decimal.precision() + "_" + decimal.scale(),
+                null, null, computeMinBytesForPrecision(decimal.precision())));
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported type ID: " + primitive.typeId());
+    }
+    return primitiveSchema;
+  }
+
+  private static int computeMinBytesForPrecision(int precision) {
+    int numBytes = 1;
+    while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
+      numBytes += 1;
+    }
+    return numBytes;
+  }
+
+  public static InternalSchema mergeSchema(InternalSchema fileSchema, 
InternalSchema querySchema) {
+    return mergeSchema(fileSchema, querySchema, true);
+  }
+
+  public static InternalSchema mergeSchema(InternalSchema fileSchema, 
InternalSchema querySchema, Boolean mergeRequiredFiledForce) {
+    MergeSchemaAction mergeSchemaAction = new MergeSchemaAction(fileSchema, 
querySchema, mergeRequiredFiledForce);
+    Types.RecordType record = (Types.RecordType) mergeType(mergeSchemaAction, 
querySchema.getRecord());
+    return new InternalSchema(record.fields());
+  }
+
+  private static Type mergeType(MergeSchemaAction mergeSchemaAction, Type 
type) {
+    switch (type.typeId()) {
+      case RECORD:
+        Types.RecordType record = (Types.RecordType) type;
+        List<Type> newTypes = new ArrayList<>();
+        for (Types.Field f : record.fields()) {
+          Type newType = mergeType(mergeSchemaAction, f.type());
+          newTypes.add(newType);
+        }
+        return 
Types.RecordType.get(mergeSchemaAction.buildNewFields(record.fields(), 
newTypes));
+      case ARRAY:
+        Types.ArrayType array = (Types.ArrayType) type;
+        Type newElementType;
+        Types.Field elementField = array.fields().get(0);
+        newElementType = mergeType(mergeSchemaAction, elementField.type());
+        return mergeSchemaAction.buildArrayType(array, newElementType);
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        Type newValueType = mergeType(mergeSchemaAction, map.valueType());
+        return mergeSchemaAction.buildMapType(map, newValueType);
+      default:
+        return type;
+    }
+  }
+
+  public static InternalSchema pruneInternalSchema(InternalSchema schema, 
List<String> names) {
+    // do check
+    HashSet prunedIds = names.stream().map(name -> 
schema.findIdByName(name.toLowerCase(Locale.ROOT)))
+        .collect(Collectors.toCollection(HashSet::new));
+    if (prunedIds.contains(-1)) {
+      throw new IllegalArgumentException("cannot prune col which not exisit in 
hudi table");
+    }
+    return pruneInternalSchema(schema, prunedIds);
+  }
+
+  public static InternalSchema pruneInternalSchema(InternalSchema schema, Set 
fieldIds) {
+    Types.RecordType recordType = 
(Types.RecordType)pruneType(schema.getRecord(), fieldIds);
+    return new InternalSchema(recordType.fields());
+  }
+
+  private static Type pruneType(Type type, Set<Integer> fieldIds) {
+    switch (type.typeId()) {
+      case RECORD:
+        Types.RecordType record = (Types.RecordType) type;
+        List<Types.Field> fields = record.fields();
+        List<Type> newTypes = new ArrayList<>();
+        for (Types.Field f : fields) {
+          Type newType = pruneType(f.type(), fieldIds);
+          if (fieldIds.contains(f.fieldId())) {
+            newTypes.add(f.type());
+          } else if (newType != null) {
+            newTypes.add(newType);
+          } else {
+            newTypes.add(null);
+          }
+        }
+        boolean changed = false;
+        List<Field> newFields = new ArrayList<>();
+        for (int i = 0; i < fields.size(); i++) {
+          Types.Field oldField = fields.get(i);
+          Type newType = newTypes.get(i);
+          if (oldField.type() == newType) {
+            newFields.add(oldField);
+          } else if (newType != null) {
+            changed = true;
+            newFields.add(Types.Field.get(oldField.fieldId(), 
oldField.isOptional(), oldField.name(), newType, oldField.doc()));
+          }
+        }
+        if (newFields.isEmpty()) {
+          return null;
+        }
+        if (newFields.size() == fields.size() && !changed) {
+          return record;
+        } else {
+          return Types.RecordType.get(newFields);
+        }
+      case ARRAY:
+        Types.ArrayType array = (Types.ArrayType) type;
+        Type newElementType = pruneType(array.elementType(), fieldIds);
+        if (fieldIds.contains(array.elementId())) {
+          return array;
+        } else if (newElementType != null) {
+          if (array.elementType() == newElementType) {
+            return array;
+          }
+          return Types.ArrayType.get(array.elementId(), 
array.isElementOptional(), newElementType);
+        }
+        return null;
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        Type newValueType = pruneType(map.valueType(), fieldIds);
+        if (fieldIds.contains(map.valueId())) {
+          return map;
+        } else if (newValueType != null) {
+          if (map.valueType() == newValueType) {
+            return map;
+          }
+          return Types.MapType.get(map.keyId(), map.valueId(), map.keyType(), 
newValueType, map.isValueOptional());
+        }
+        return null;
+      default:
+        return null;
+    }
+  }
+
+  public static String reBuildFilterName(String name, InternalSchema 
fileSchema, InternalSchema querySchema) {
+    Integer nameId = querySchema.findIdByName(name);

Review comment:
       agree




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to