bvaradar commented on a change in pull request #3808:
URL: https://github.com/apache/hudi/pull/3808#discussion_r746264487



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBaseInternalSchemasManager.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class FileBaseInternalSchemasManager extends InternalSchemasManager {

Review comment:
       Rename to FileBasedInternalSchemaStorageManager

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBaseInternalSchemasManager.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class FileBaseInternalSchemasManager extends InternalSchemasManager {
+  public static final String SCHEMA_NAME = ".schema";
+  private final Path baseSchemaPath;
+  private Configuration conf;
+  private HoodieTableMetaClient metaClient;
+
+  public FileBaseInternalSchemasManager(Configuration conf, Path 
baseTablePath) {
+    Path metaPath = new Path(baseTablePath, ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = conf;
+    this.metaClient = 
HoodieTableMetaClient.builder().setBasePath(metaPath.getParent().toString()).setConf(conf).build();
+  }
+
+  public FileBaseInternalSchemasManager(HoodieTableMetaClient metaClient) {
+    Path metaPath = new Path(metaClient.getBasePath(), ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = metaClient.getHadoopConf();
+    this.metaClient = metaClient;
+  }
+
+  @Override
+  public void persistHistorySchemaStr(String instantTime, String 
historySchemaStr) {
+    Path savePath = new Path(baseSchemaPath, instantTime);
+    Path saveTempPath = new Path(baseSchemaPath, instantTime + 
java.util.UUID.randomUUID().toString());
+    try {
+      cleanOldFiles();
+      byte[] writeContent = historySchemaStr.getBytes(StandardCharsets.UTF_8);
+      if (!metaClient.getFs().exists(saveTempPath)) {
+        if (!metaClient.getFs().createNewFile(saveTempPath)) {
+          throw new HoodieIOException("Failed to create file " + saveTempPath);
+        }
+      }
+      FSDataOutputStream fsout = metaClient.getFs().create(saveTempPath, true);
+      fsout.write(writeContent);
+      fsout.close();
+      // try to rename
+      boolean success = metaClient.getFs().rename(saveTempPath, savePath);
+      if (!success) {
+        throw new HoodieIOException(String.format("cannot rename %s to %s", 
saveTempPath, savePath));
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  private void cleanOldFiles() {
+    List<String> validateCommits = getValidateCommits();
+    try {
+      FileSystem fs = baseSchemaPath.getFileSystem(conf);
+      if (fs.exists(baseSchemaPath)) {
+        List<String> candidateSchemaFiles = 
Arrays.stream(fs.listStatus(baseSchemaPath)).filter(f -> f.isFile())
+            .map(file -> 
file.getPath().getName().split("\\.")[0]).collect(Collectors.toList());
+        List<String> validateSchemaFiles = 
candidateSchemaFiles.stream().filter(f -> 
validateCommits.contains(f)).collect(Collectors.toList());
+        List<String> residualSchemaFiles = 
candidateSchemaFiles.stream().filter(f -> 
!validateCommits.contains(f)).collect(Collectors.toList());
+        // clean residual files
+        residualSchemaFiles.forEach(f -> {
+          try {
+            fs.delete(new Path(f));
+          } catch (IOException o) {
+            throw new HoodieException(o);
+          }
+        });
+        // clean old files, keep at most ten schema files
+        if (validateSchemaFiles.size() > 10) {
+          for (int i = 0; i < validateSchemaFiles.size(); i++) {
+            if (i >= 10) {
+              break;
+            }
+            fs.delete(new Path(validateSchemaFiles.get(i)));
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  private List getValidateCommits() {

Review comment:
       nit: Rename to getValidInstants

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaUtil.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import static org.apache.avro.Schema.Type.UNION;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class AvroSchemaUtil {
+  private AvroSchemaUtil() {
+  }
+
+  /**
+   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the new schema.
+   * support deep rewrite for nested record.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) For GenericRecord, copy over the data from the old schema to the new 
schema or set default values for all fields of this transformed schema
+   *
+   * @param oldRecord oldRecord to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return newRecord for new Schema
+   */
+  public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema 
newSchema) {
+    Object newRecord = rewriteRecord(oldRecord, oldRecord.getSchema(), 
newSchema);
+    return (GenericData.Record) newRecord;
+  }
+
+  private static Object rewriteRecord(Object oldRecord, Schema oldSchema, 
Schema newSchema) {
+    if (oldRecord == null) {
+      return null;
+    }
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(oldRecord instanceof IndexedRecord)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
+        List<Schema.Field> fields = newSchema.getFields();
+        Map<Integer, Object> helper = new HashMap<>();
+
+        for (int i = 0; i < fields.size(); i++) {
+          Schema.Field field = fields.get(i);
+          if (oldSchema.getField(field.name()) != null) {
+            Schema.Field oldField = oldSchema.getField(field.name());

Review comment:
       Looks like we are resolving the schema by name. How will rename work 
here? Is it because in subsequent diff, you will be addressing it ? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBaseInternalSchemasManager.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class FileBaseInternalSchemasManager extends InternalSchemasManager {
+  public static final String SCHEMA_NAME = ".schema";
+  private final Path baseSchemaPath;
+  private Configuration conf;
+  private HoodieTableMetaClient metaClient;
+
+  public FileBaseInternalSchemasManager(Configuration conf, Path 
baseTablePath) {
+    Path metaPath = new Path(baseTablePath, ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = conf;
+    this.metaClient = 
HoodieTableMetaClient.builder().setBasePath(metaPath.getParent().toString()).setConf(conf).build();
+  }
+
+  public FileBaseInternalSchemasManager(HoodieTableMetaClient metaClient) {
+    Path metaPath = new Path(metaClient.getBasePath(), ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = metaClient.getHadoopConf();
+    this.metaClient = metaClient;
+  }
+
+  @Override
+  public void persistHistorySchemaStr(String instantTime, String 
historySchemaStr) {
+    Path savePath = new Path(baseSchemaPath, instantTime);
+    Path saveTempPath = new Path(baseSchemaPath, instantTime + 
java.util.UUID.randomUUID().toString());
+    try {
+      cleanOldFiles();
+      byte[] writeContent = historySchemaStr.getBytes(StandardCharsets.UTF_8);
+      if (!metaClient.getFs().exists(saveTempPath)) {
+        if (!metaClient.getFs().createNewFile(saveTempPath)) {
+          throw new HoodieIOException("Failed to create file " + saveTempPath);
+        }
+      }
+      FSDataOutputStream fsout = metaClient.getFs().create(saveTempPath, true);
+      fsout.write(writeContent);
+      fsout.close();
+      // try to rename
+      boolean success = metaClient.getFs().rename(saveTempPath, savePath);
+      if (!success) {
+        throw new HoodieIOException(String.format("cannot rename %s to %s", 
saveTempPath, savePath));
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  private void cleanOldFiles() {

Review comment:
       We should do the cleanup as part of archiving commit metadata

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBaseInternalSchemasManager.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class FileBaseInternalSchemasManager extends InternalSchemasManager {
+  public static final String SCHEMA_NAME = ".schema";
+  private final Path baseSchemaPath;
+  private Configuration conf;
+  private HoodieTableMetaClient metaClient;
+
+  public FileBaseInternalSchemasManager(Configuration conf, Path 
baseTablePath) {
+    Path metaPath = new Path(baseTablePath, ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = conf;
+    this.metaClient = 
HoodieTableMetaClient.builder().setBasePath(metaPath.getParent().toString()).setConf(conf).build();
+  }
+
+  public FileBaseInternalSchemasManager(HoodieTableMetaClient metaClient) {
+    Path metaPath = new Path(metaClient.getBasePath(), ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = metaClient.getHadoopConf();
+    this.metaClient = metaClient;
+  }
+
+  @Override
+  public void persistHistorySchemaStr(String instantTime, String 
historySchemaStr) {
+    Path savePath = new Path(baseSchemaPath, instantTime);
+    Path saveTempPath = new Path(baseSchemaPath, instantTime + 
java.util.UUID.randomUUID().toString());
+    try {
+      cleanOldFiles();
+      byte[] writeContent = historySchemaStr.getBytes(StandardCharsets.UTF_8);
+      if (!metaClient.getFs().exists(saveTempPath)) {
+        if (!metaClient.getFs().createNewFile(saveTempPath)) {
+          throw new HoodieIOException("Failed to create file " + saveTempPath);
+        }
+      }
+      FSDataOutputStream fsout = metaClient.getFs().create(saveTempPath, true);
+      fsout.write(writeContent);
+      fsout.close();
+      // try to rename
+      boolean success = metaClient.getFs().rename(saveTempPath, savePath);
+      if (!success) {
+        throw new HoodieIOException(String.format("cannot rename %s to %s", 
saveTempPath, savePath));
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  private void cleanOldFiles() {
+    List<String> validateCommits = getValidateCommits();
+    try {
+      FileSystem fs = baseSchemaPath.getFileSystem(conf);
+      if (fs.exists(baseSchemaPath)) {
+        List<String> candidateSchemaFiles = 
Arrays.stream(fs.listStatus(baseSchemaPath)).filter(f -> f.isFile())
+            .map(file -> 
file.getPath().getName().split("\\.")[0]).collect(Collectors.toList());
+        List<String> validateSchemaFiles = 
candidateSchemaFiles.stream().filter(f -> 
validateCommits.contains(f)).collect(Collectors.toList());
+        List<String> residualSchemaFiles = 
candidateSchemaFiles.stream().filter(f -> 
!validateCommits.contains(f)).collect(Collectors.toList());
+        // clean residual files
+        residualSchemaFiles.forEach(f -> {
+          try {
+            fs.delete(new Path(f));
+          } catch (IOException o) {
+            throw new HoodieException(o);
+          }
+        });
+        // clean old files, keep at most ten schema files
+        if (validateSchemaFiles.size() > 10) {

Review comment:
       Can you make this value 10 configurable

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/io/InternalSchemasManager.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.io;
+
+import org.apache.hudi.common.util.Option;
+
+abstract class InternalSchemasManager {

Review comment:
       Rename to AbstractInternalSchemaStorageManager

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBaseInternalSchemasManager.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class FileBaseInternalSchemasManager extends InternalSchemasManager {
+  public static final String SCHEMA_NAME = ".schema";
+  private final Path baseSchemaPath;
+  private Configuration conf;
+  private HoodieTableMetaClient metaClient;
+
+  public FileBaseInternalSchemasManager(Configuration conf, Path 
baseTablePath) {
+    Path metaPath = new Path(baseTablePath, ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = conf;
+    this.metaClient = 
HoodieTableMetaClient.builder().setBasePath(metaPath.getParent().toString()).setConf(conf).build();
+  }
+
+  public FileBaseInternalSchemasManager(HoodieTableMetaClient metaClient) {
+    Path metaPath = new Path(metaClient.getBasePath(), ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = metaClient.getHadoopConf();
+    this.metaClient = metaClient;
+  }
+
+  @Override
+  public void persistHistorySchemaStr(String instantTime, String 
historySchemaStr) {
+    Path savePath = new Path(baseSchemaPath, instantTime);
+    Path saveTempPath = new Path(baseSchemaPath, instantTime + 
java.util.UUID.randomUUID().toString());
+    try {
+      cleanOldFiles();
+      byte[] writeContent = historySchemaStr.getBytes(StandardCharsets.UTF_8);

Review comment:
       We had earlier removed usage of renames when handling state transitions 
because renames are not atomic in all storage types. why do we need to write 
temp file here ? Can we avoid using renames and instead use same state 
transitions like commits - e.g <instant.requested> <instant.inflight> and 
<instant.commit>. This way, we can reuse HoodieTimeline logic for scanning and 
constructing valid commits. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaUtil.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import static org.apache.avro.Schema.Type.UNION;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class AvroSchemaUtil {
+  private AvroSchemaUtil() {
+  }
+
+  /**
+   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the new schema.
+   * support deep rewrite for nested record.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) For GenericRecord, copy over the data from the old schema to the new 
schema or set default values for all fields of this transformed schema
+   *
+   * @param oldRecord oldRecord to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return newRecord for new Schema
+   */
+  public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema 
newSchema) {
+    Object newRecord = rewriteRecord(oldRecord, oldRecord.getSchema(), 
newSchema);
+    return (GenericData.Record) newRecord;
+  }
+
+  private static Object rewriteRecord(Object oldRecord, Schema oldSchema, 
Schema newSchema) {
+    if (oldRecord == null) {
+      return null;
+    }
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(oldRecord instanceof IndexedRecord)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
+        List<Schema.Field> fields = newSchema.getFields();
+        Map<Integer, Object> helper = new HashMap<>();

Review comment:
       Rename helper to recordBuffer




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to