xiarixiaoyao commented on a change in pull request #3668:
URL: https://github.com/apache/hudi/pull/3668#discussion_r717191016



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -213,6 +224,27 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
                       List<HoodieWriteStat> stats) throws IOException {
     LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    // do save internal schema to support Implicitly add columns in write 
process
+    if 
(!metadata.getExtraMetadata().containsKey(InternalSchemaParser.LATESTSCHEMA) && 
metadata.getExtraMetadata().containsKey(SCHEMA_KEY)) {

Review comment:
       if we donot  modify the table schema explicitly.  LATESTSCHEMA is only 
produced when we do alter SQL or   alter api.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -213,6 +224,27 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
                       List<HoodieWriteStat> stats) throws IOException {
     LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    // do save internal schema to support Implicitly add columns in write 
process

Review comment:
       > We need to introduce a config to enable/disable schema evolution. Only 
if the config is enables, we should let any side-effects of schema evolution to 
take effect.
   
   may no need introduce a config.  only when user execute alter SQL /alter api 
explicitly, schema evolution will take effect; Otherwise, everything goes 
through the original process

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -213,6 +224,27 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
                       List<HoodieWriteStat> stats) throws IOException {
     LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    // do save internal schema to support Implicitly add columns in write 
process
+    if 
(!metadata.getExtraMetadata().containsKey(InternalSchemaParser.LATESTSCHEMA) && 
metadata.getExtraMetadata().containsKey(SCHEMA_KEY)) {
+      TableSchemaResolver schemaUtils = new 
TableSchemaResolver(table.getMetaClient());
+      String historySchemaStr = 
schemaUtils.getTableHistorySchemaStrFromCommitMetadata().orElse("");
+      if (!historySchemaStr.isEmpty()) {

Review comment:
       ok, will do it. thanks

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
##########
@@ -152,7 +160,7 @@ public void scan() {
       // iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
-          readerSchema, readBlocksLazily, reverseReader, bufferSize);
+          internalSchema != null ? null : readerSchema, readBlocksLazily, 
reverseReader, bufferSize);

Review comment:
       when internalSchema is not null, we should forbid  
AbstractHoodieLogRecordScanner useing readerSchema(which is a avro Schema) to 
read avro record; since AbstractHoodieLogRecordScanner may not be able to read 
correctly data by using readerSchema.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
##########
@@ -311,6 +319,12 @@ private boolean isNewInstantBlock(HoodieLogBlock logBlock) 
{
   private void processDataBlock(HoodieDataBlock dataBlock) throws Exception {
     // TODO (NA) - Implement getRecordItr() in HoodieAvroDataBlock and use 
that here
     List<IndexedRecord> recs = dataBlock.getRecords();
+    if (internalSchema != null) {
+      Long currentTime = 
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
+      InternalSchema fileSchema = 
TableInternalSchemaUtils.searchSchemaAndCache(currentTime, path, fs.getConf());
+      Schema readSchema = 
AvroSchemaUtil.convert(InternalSchemaUtils.mergeSchema(fileSchema, 
internalSchema), readerSchema != null ? readerSchema.getName() : "tableName");
+      recs = recs.stream().map(rec -> 
AvroSchemaUtil.rewriteRecord((GenericRecord) rec, 
readSchema)).collect(Collectors.toList());

Review comment:
       yes

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/MergeSchemaAction.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * auxiliary class.
+ * help to merge file schema and query schema to produce final read schema for 
avro/parquet file
+ */
+public class MergeSchemaAction {

Review comment:
       ok
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
##########
@@ -38,7 +38,7 @@
   private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, 
List<String> logFilePaths, Schema readerSchema,
                                          String latestInstantTime, boolean 
readBlocksLazily, boolean reverseReader, int bufferSize,
                                          LogRecordScannerCallback callback, 
Option<InstantRange> instantRange) {
-    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize, instantRange, false);
+    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize, instantRange, false, null);

Review comment:
       no, spark mor/cow has not use HoodieUnmergedLogRecordScanner only 
hive/presto use it.   this pr is only about spark adapation. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/MergeSchemaAction.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * auxiliary class.
+ * help to merge file schema and query schema to produce final read schema for 
avro/parquet file
+ */
+public class MergeSchemaAction {
+  private final InternalSchema fileSchema;
+  private final InternalSchema querySchema;
+  private final Boolean mergeRequiredFiledForce;

Review comment:
       ok

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/MergeSchemaAction.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * auxiliary class.
+ * help to merge file schema and query schema to produce final read schema for 
avro/parquet file
+ */
+public class MergeSchemaAction {
+  private final InternalSchema fileSchema;
+  private final InternalSchema querySchema;
+  private final Boolean mergeRequiredFiledForce;
+  public MergeSchemaAction(InternalSchema fileSchema, InternalSchema 
querySchema, Boolean mergeRequiredFiledForce) {
+    this.fileSchema = fileSchema;
+    this.querySchema = querySchema;
+    this.mergeRequiredFiledForce = mergeRequiredFiledForce;
+  }
+
+  public List<Types.Field> buildNewFields(List<Types.Field> oldFields, 
List<Type> newTypes) {

Review comment:
       done

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChange.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * TableChange subclasses represent requested changes to a table.
+ * now only column changes support.
+ * to do support partition changes
+ */
+public interface TableChange {
+  /* The action Type of schema change. */
+  enum ColumnChangeID {
+    ADD, UPDATE, DELETE, PROPERTY_CHANGE;
+    private String name;
+
+    private ColumnChangeID() {
+      this.name = this.name().toLowerCase(Locale.ROOT);
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  static ColumnChangeID fromValue(String value) {
+    switch (value.toLowerCase(Locale.ROOT)) {
+      case "add":
+        return ColumnChangeID.ADD;
+      case "change":
+        return ColumnChangeID.UPDATE;
+      case "delete":
+        return ColumnChangeID.DELETE;
+      case "property":
+        return ColumnChangeID.PROPERTY_CHANGE;
+      default:
+        throw new IllegalArgumentException("Invalid value of Type.");
+    }
+  }
+
+  ColumnChangeID columnChangeId();
+
+  default boolean withPositionChange() {
+    return false;
+  }
+
+  abstract class BaseColumnChange implements TableChange {
+    protected final InternalSchema internalSchema;
+    protected final Map<Integer, Integer> id2parent;
+    protected final Map<Integer, ArrayList<ColumnPositionChange>> 
positionChangeMap = new HashMap<>();
+
+    BaseColumnChange(InternalSchema schema) {
+      this.internalSchema = schema;
+      this.id2parent = InternalSchemaUtils.index2Parents(schema.getRecord());
+    }
+
+    /**
+     * add position change.
+     *
+     * @param srcName column which need to be reordered
+     * @param dsrName reference position
+     * @param orderType change types
+     * @return this
+     */
+    public BaseColumnChange addPositionChange(String srcName, String dsrName, 
String orderType) {
+      Integer srcId = findIdByFullName(srcName);
+      Integer dsrId = findIdByFullName(dsrName);
+      Integer srcParentId = id2parent.get(srcId);
+      Integer dsrParentId = id2parent.get(dsrId);
+      int parentId;
+      if (srcParentId != null && dsrParentId != null && 
srcParentId.equals(dsrParentId)) {
+        Types.Field parentField = internalSchema.findField(srcParentId);
+        if (!(parentField.type() instanceof Types.RecordType)) {
+          throw new HoodieSchemaException(String.format("only support reorder 
fields in struct type, but find: %s", parentField.type()));
+        }
+        parentId = parentField.fieldId();
+      } else if (srcParentId == null &&  dsrParentId == null) {
+        parentId = -1;
+      } else if (srcParentId != null && dsrParentId == null && 
orderType.equals("first")) {
+        parentId = srcParentId;
+      } else {
+        throw new HoodieSchemaException("cannot order position from different 
parent");
+      }
+
+      ArrayList<ColumnPositionChange> changes = 
positionChangeMap.getOrDefault(parentId, new ArrayList<>());
+      changes.add(ColumnPositionChange.get(srcId, dsrId, orderType));
+      positionChangeMap.put(parentId, changes);
+      return this;
+    }
+
+    /**
+     * abstract method.
+     * give a column fullName and return the field id
+     *
+     * @param fullName column fullName
+     * @return field id of current column
+     */
+    public abstract Integer findIdByFullName(String fullName);

Review comment:
       yes, change it to protected。   this method cannot be a part of 
internalSchema,this method is different for different implementation classes.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChange.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * TableChange subclasses represent requested changes to a table.
+ * now only column changes support.
+ * to do support partition changes
+ */
+public interface TableChange {
+  /* The action Type of schema change. */
+  enum ColumnChangeID {
+    ADD, UPDATE, DELETE, PROPERTY_CHANGE;
+    private String name;
+
+    private ColumnChangeID() {
+      this.name = this.name().toLowerCase(Locale.ROOT);
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  static ColumnChangeID fromValue(String value) {
+    switch (value.toLowerCase(Locale.ROOT)) {
+      case "add":
+        return ColumnChangeID.ADD;
+      case "change":
+        return ColumnChangeID.UPDATE;
+      case "delete":
+        return ColumnChangeID.DELETE;
+      case "property":
+        return ColumnChangeID.PROPERTY_CHANGE;
+      default:
+        throw new IllegalArgumentException("Invalid value of Type.");
+    }
+  }
+
+  ColumnChangeID columnChangeId();
+
+  default boolean withPositionChange() {
+    return false;
+  }
+
+  abstract class BaseColumnChange implements TableChange {
+    protected final InternalSchema internalSchema;
+    protected final Map<Integer, Integer> id2parent;
+    protected final Map<Integer, ArrayList<ColumnPositionChange>> 
positionChangeMap = new HashMap<>();
+
+    BaseColumnChange(InternalSchema schema) {
+      this.internalSchema = schema;
+      this.id2parent = InternalSchemaUtils.index2Parents(schema.getRecord());
+    }
+
+    /**
+     * add position change.
+     *
+     * @param srcName column which need to be reordered
+     * @param dsrName reference position
+     * @param orderType change types
+     * @return this
+     */
+    public BaseColumnChange addPositionChange(String srcName, String dsrName, 
String orderType) {
+      Integer srcId = findIdByFullName(srcName);
+      Integer dsrId = findIdByFullName(dsrName);
+      Integer srcParentId = id2parent.get(srcId);
+      Integer dsrParentId = id2parent.get(dsrId);
+      int parentId;
+      if (srcParentId != null && dsrParentId != null && 
srcParentId.equals(dsrParentId)) {
+        Types.Field parentField = internalSchema.findField(srcParentId);
+        if (!(parentField.type() instanceof Types.RecordType)) {
+          throw new HoodieSchemaException(String.format("only support reorder 
fields in struct type, but find: %s", parentField.type()));
+        }
+        parentId = parentField.fieldId();
+      } else if (srcParentId == null &&  dsrParentId == null) {
+        parentId = -1;
+      } else if (srcParentId != null && dsrParentId == null && 
orderType.equals("first")) {
+        parentId = srcParentId;
+      } else {
+        throw new HoodieSchemaException("cannot order position from different 
parent");
+      }
+
+      ArrayList<ColumnPositionChange> changes = 
positionChangeMap.getOrDefault(parentId, new ArrayList<>());
+      changes.add(ColumnPositionChange.get(srcId, dsrId, orderType));
+      positionChangeMap.put(parentId, changes);
+      return this;
+    }
+
+    /**
+     * abstract method.
+     * give a column fullName and return the field id
+     *
+     * @param fullName column fullName
+     * @return field id of current column
+     */
+    public abstract Integer findIdByFullName(String fullName);
+
+    @Override
+    public boolean withPositionChange() {
+      return false;
+    }
+  }
+
+  /**
+   * Column position change.
+   * now support three change types: FIRST/AFTER/BEFORE
+   * FIRST means the specified column should be the first column.
+   * AFTER means the specified column should be put after the given column.
+   * BEFORE means the specified column should be put before the given column.
+   * Note that, the specified column may be a nested field:
+   * AFTER/BEFORE means the given columns should in the same struct;
+   * FIRST means this field should be the first one within the struct.
+   */
+  class ColumnPositionChange {
+    public enum ColumnPositionType {

Review comment:
       already done

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChange.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * TableChange subclasses represent requested changes to a table.
+ * now only column changes support.
+ * to do support partition changes
+ */
+public interface TableChange {
+  /* The action Type of schema change. */
+  enum ColumnChangeID {
+    ADD, UPDATE, DELETE, PROPERTY_CHANGE;
+    private String name;
+
+    private ColumnChangeID() {
+      this.name = this.name().toLowerCase(Locale.ROOT);
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  static ColumnChangeID fromValue(String value) {
+    switch (value.toLowerCase(Locale.ROOT)) {
+      case "add":
+        return ColumnChangeID.ADD;
+      case "change":
+        return ColumnChangeID.UPDATE;
+      case "delete":
+        return ColumnChangeID.DELETE;
+      case "property":
+        return ColumnChangeID.PROPERTY_CHANGE;
+      default:
+        throw new IllegalArgumentException("Invalid value of Type.");
+    }
+  }
+
+  ColumnChangeID columnChangeId();
+
+  default boolean withPositionChange() {
+    return false;
+  }
+
+  abstract class BaseColumnChange implements TableChange {
+    protected final InternalSchema internalSchema;
+    protected final Map<Integer, Integer> id2parent;
+    protected final Map<Integer, ArrayList<ColumnPositionChange>> 
positionChangeMap = new HashMap<>();
+
+    BaseColumnChange(InternalSchema schema) {
+      this.internalSchema = schema;
+      this.id2parent = InternalSchemaUtils.index2Parents(schema.getRecord());
+    }
+
+    /**
+     * add position change.
+     *
+     * @param srcName column which need to be reordered
+     * @param dsrName reference position
+     * @param orderType change types
+     * @return this
+     */
+    public BaseColumnChange addPositionChange(String srcName, String dsrName, 
String orderType) {
+      Integer srcId = findIdByFullName(srcName);
+      Integer dsrId = findIdByFullName(dsrName);
+      Integer srcParentId = id2parent.get(srcId);
+      Integer dsrParentId = id2parent.get(dsrId);
+      int parentId;
+      if (srcParentId != null && dsrParentId != null && 
srcParentId.equals(dsrParentId)) {
+        Types.Field parentField = internalSchema.findField(srcParentId);
+        if (!(parentField.type() instanceof Types.RecordType)) {
+          throw new HoodieSchemaException(String.format("only support reorder 
fields in struct type, but find: %s", parentField.type()));
+        }
+        parentId = parentField.fieldId();
+      } else if (srcParentId == null &&  dsrParentId == null) {
+        parentId = -1;
+      } else if (srcParentId != null && dsrParentId == null && 
orderType.equals("first")) {
+        parentId = srcParentId;
+      } else {
+        throw new HoodieSchemaException("cannot order position from different 
parent");
+      }
+
+      ArrayList<ColumnPositionChange> changes = 
positionChangeMap.getOrDefault(parentId, new ArrayList<>());
+      changes.add(ColumnPositionChange.get(srcId, dsrId, orderType));
+      positionChangeMap.put(parentId, changes);
+      return this;
+    }
+
+    /**
+     * abstract method.
+     * give a column fullName and return the field id
+     *
+     * @param fullName column fullName
+     * @return field id of current column
+     */
+    public abstract Integer findIdByFullName(String fullName);
+
+    @Override
+    public boolean withPositionChange() {
+      return false;
+    }
+  }
+
+  /**
+   * Column position change.
+   * now support three change types: FIRST/AFTER/BEFORE
+   * FIRST means the specified column should be the first column.
+   * AFTER means the specified column should be put after the given column.
+   * BEFORE means the specified column should be put before the given column.
+   * Note that, the specified column may be a nested field:
+   * AFTER/BEFORE means the given columns should in the same struct;
+   * FIRST means this field should be the first one within the struct.
+   */
+  class ColumnPositionChange {
+    public enum ColumnPositionType {
+      FIRST,
+      BEFORE,
+      AFTER
+    }
+
+    private final int srcId;
+    private final int dsrId;
+    private final ColumnPositionType type;
+
+    static ColumnPositionChange first(int srcId) {
+      return new ColumnPositionChange(srcId, -1, ColumnPositionType.FIRST);
+    }
+
+    static ColumnPositionChange before(int srcId, int dsrId) {
+      return new ColumnPositionChange(srcId, dsrId, ColumnPositionType.BEFORE);
+    }
+
+    static ColumnPositionChange after(int srcId, int dsrId) {
+      return new ColumnPositionChange(srcId, dsrId, ColumnPositionType.AFTER);
+    }
+
+    static ColumnPositionChange get(int srcId, int dsrId, String type) {
+      switch (type) {

Review comment:
       done

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TableChanges {

Review comment:
       remove helper methods to a helper class.  now TableChanges is ok.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaUtil.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import static org.apache.avro.Schema.Type.UNION;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class AvroSchemaUtil {

Review comment:
       ok

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaParser.java
##########
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.util.TokenBuffer;
+
+import org.apache.avro.JsonProperties;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class InternalSchemaParser {
+  private InternalSchemaParser() {
+
+  }
+
+  public static final String LATESTSCHEMA = "latestSchema";
+  public static final String SCHEMAS = "schemas";
+  private static final String MAX_COLUMN_ID = "max_column_id";
+  private static final String VERSION_ID = "version-id";
+  private static final String TYPE = "type";
+  private static final String RECORD = "record";
+  private static final String ARRAY = "array";
+  private static final String MAP = "map";
+  private static final String FIELDS = "fields";
+  private static final String ELEMENT = "element";
+  private static final String KEY = "key";
+  private static final String VALUE = "value";
+  private static final String DOC = "doc";
+  private static final String NAME = "name";
+  private static final String ID = "id";
+  private static final String ELEMENT_ID = "element-id";
+  private static final String KEY_ID = "key-id";
+  private static final String VALUE_ID = "value-id";
+  private static final String OPTIONAL = "optional";
+  private static final String ELEMENT_OPTIONAL = "element_optional";
+  private static final String VALUE_OPTIONAL = "value_optional";
+
+  private static final Pattern FIXED = Pattern.compile("fixed\\[(\\d+)\\]");
+  private static final Pattern DECIMAL = 
Pattern.compile("decimal\\((\\d+),\\s+(\\d+)\\)");
+
+  public static JsonNode toJsonNode(Object datum) {

Review comment:
       done,

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaParser.java
##########
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.util.TokenBuffer;
+
+import org.apache.avro.JsonProperties;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class InternalSchemaParser {
+  private InternalSchemaParser() {
+
+  }
+
+  public static final String LATESTSCHEMA = "latestSchema";
+  public static final String SCHEMAS = "schemas";
+  private static final String MAX_COLUMN_ID = "max_column_id";
+  private static final String VERSION_ID = "version-id";
+  private static final String TYPE = "type";
+  private static final String RECORD = "record";
+  private static final String ARRAY = "array";
+  private static final String MAP = "map";
+  private static final String FIELDS = "fields";
+  private static final String ELEMENT = "element";
+  private static final String KEY = "key";
+  private static final String VALUE = "value";
+  private static final String DOC = "doc";
+  private static final String NAME = "name";
+  private static final String ID = "id";
+  private static final String ELEMENT_ID = "element-id";
+  private static final String KEY_ID = "key-id";
+  private static final String VALUE_ID = "value-id";
+  private static final String OPTIONAL = "optional";
+  private static final String ELEMENT_OPTIONAL = "element_optional";
+  private static final String VALUE_OPTIONAL = "value_optional";
+
+  private static final Pattern FIXED = Pattern.compile("fixed\\[(\\d+)\\]");
+  private static final Pattern DECIMAL = 
Pattern.compile("decimal\\((\\d+),\\s+(\\d+)\\)");
+
+  public static JsonNode toJsonNode(Object datum) {

Review comment:
       done,  docs added, and rename InternalSchemaParser to SerDeHelper

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
##########
@@ -0,0 +1,779 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.Types.Field;
+import org.apache.hudi.internal.schema.Types.RecordType;
+import org.apache.hudi.internal.schema.action.MergeSchemaAction;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class InternalSchemaUtils {
+
+  private InternalSchemaUtils() {
+  }
+
+  /**
+   * build a mapping from id to full field name for a internal Type.
+   * if a field y belong to a struct filed x, then the full name of y is x.y
+   *
+   * @param type hoodie internal type
+   * @return a mapping from id to full field name
+   */
+  public static Map<Integer, String> buildIdToName(Type type) {
+    Map<Integer, String> result = new HashMap<>();
+    buildNameToId(type).forEach((k, v) -> result.put(v, k));
+    return result;
+  }
+
+  /**
+   * build a mapping from full field name to id for a internal Type.
+   * if a field y belong to a struct filed x, then the full name of y is x.y
+   *
+   * @param type hoodie internal type
+   * @return a mapping from full field name to id
+   */
+  public static Map<String, Integer> buildNameToId(Type type) {
+    Deque<String> fieldNames = new LinkedList<>();
+    Map<String, Integer> nameToId = new HashMap<>();
+    visitNameToId(type, fieldNames, nameToId);
+    return nameToId;
+  }
+
+  /**
+   * build a mapping from id to field for a internal Type.
+   *
+   * @param type hoodie internal type
+   * @return a mapping from id to field
+   */
+  public static Map<Integer, Field> buildIdToField(Type type) {
+    Map<Integer, Field> idToField = new HashMap<>();
+    visitIdToField(type, idToField);
+    return idToField;
+  }
+
+  private static void visitIdToField(Type type, Map<Integer, Field> index) {
+    switch (type.typeId()) {
+      case RECORD:
+        RecordType record = (RecordType) type;
+        for (Field field : record.fields()) {
+          visitIdToField(field.type(), index);
+          index.put(field.fieldId(), field);
+        }
+        return;
+      case ARRAY:
+        Types.ArrayType array = (Types.ArrayType) type;
+        visitIdToField(array.elementType(), index);
+        for (Field field : array.fields()) {
+          index.put(field.fieldId(), field);
+        }
+        return;
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        visitIdToField(map.keyType(), index);
+        visitIdToField(map.valueType(), index);
+        for (Field field : map.fields()) {
+          index.put(field.fieldId(), field);
+        }
+        return;
+      default:
+        return;
+    }
+  }
+
+  private static void visitNameToId(Type type, Deque<String> fieldNames, 
Map<String, Integer> nameToId) {

Review comment:
       ok

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
##########
@@ -45,7 +45,7 @@ private HoodieMetadataMergedLogRecordScanner(FileSystem fs, 
String basePath, Lis
                                               String spillableMapBasePath, 
Set<String> mergeKeyFilter,
                                                
ExternalSpillableMap.DiskMapType diskMapType, boolean 
isBitCaskDiskMapCompressionEnabled) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
maxMemorySizeInBytes, false, false, bufferSize,
-        spillableMapBasePath, Option.empty(), false, diskMapType, 
isBitCaskDiskMapCompressionEnabled, false);
+        spillableMapBasePath, Option.empty(), false, diskMapType, 
isBitCaskDiskMapCompressionEnabled, false, null);

Review comment:
        spark mor/cow has not use HoodieUnmergedLogRecordScanner only 
hive/presto use it. this pr is only about spark adapation.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -52,26 +54,33 @@ case class HoodieMergeOnReadTableState(tableStructSchema: 
StructType,
                                        requiredAvroSchema: String,
                                        hoodieRealtimeFileSplits: 
List[HoodieMergeOnReadFileSplit],
                                        preCombineField: Option[String],
-                                       recordKeyFieldOpt: Option[String])
+                                       recordKeyFieldOpt: Option[String],
+                                       internalSchema: Option[InternalSchema] 
= None,
+                                       requiredInternalSchema: 
Option[InternalSchema] = None)
 
 class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
                                   val optParams: Map[String, String],
                                   val userSchema: StructType,
                                   val globPaths: Option[Seq[Path]],
                                   val metaClient: HoodieTableMetaClient)
-  extends BaseRelation with PrunedFilteredScan with Logging {
+  extends BaseRelation with PrunedFilteredScan with Logging with 
SparkAdapterSupport {
 
   private val conf = sqlContext.sparkContext.hadoopConfiguration
   private val jobConf = new JobConf(conf)
   // use schema from latest metadata, if not present, read schema from the 
data file
   private val schemaUtil = new TableSchemaResolver(metaClient)
-  private lazy val tableAvroSchema = {
+  private lazy val (tableAvroSchema, internalSchema) = {
     try {
-      schemaUtil.getTableAvroSchema
+      val internalSchemaOpt = 
schemaUtil.getTableInternalSchemaFromCommitMetadata
+      if (internalSchemaOpt.isPresent) {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to