nsivabalan commented on code in PR #5733:
URL: https://github.com/apache/hudi/pull/5733#discussion_r887340160


##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -58,100 +60,46 @@
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
 
+import javax.annotation.concurrent.ThreadSafe;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema;
 import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
 
 /**
  * Helper class to read schema from data files and log files and to convert it 
between different formats.
- *
- * TODO(HUDI-3626) cleanup
  */
+@ThreadSafe
 public class TableSchemaResolver {
 
   private static final Logger LOG = 
LogManager.getLogger(TableSchemaResolver.class);
-  private final HoodieTableMetaClient metaClient;
-  private final boolean hasOperationField;
 
-  public TableSchemaResolver(HoodieTableMetaClient metaClient) {
-    this.metaClient = metaClient;
-    this.hasOperationField = hasOperationField();
-  }
+  private final HoodieTableMetaClient metaClient;
 
   /**
-   * Gets the schema for a hoodie table. Depending on the type of table, read 
from any file written in the latest
-   * commit. We will assume that the schema has not changed within a single 
atomic write.
+   * NOTE: {@link HoodieCommitMetadata} could be of non-trivial size for large 
tables (in 100s of Mbs)
+   *       and therefore we'd want to limit amount of throw-away work being 
performed while fetching
+   *       commits' metadata
    *
-   * @return Parquet schema for this table
+   *       Please check out corresponding methods to fetch commonly used 
instances of {@link HoodieCommitMetadata}:
+   *       {@link #getLatestCommitMetadataWithValidSchema()},
+   *       {@link #getLatestCommitMetadataWithValidSchema()},
+   *       {@link #getCachedCommitMetadata(HoodieInstant)}
    */
-  private MessageType getTableParquetSchemaFromDataFile() {
-    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata 
=
-        activeTimeline.getLastCommitMetadataWithValidData();
-    try {
-      switch (metaClient.getTableType()) {
-        case COPY_ON_WRITE:
-          // For COW table, the file has data written must be in parquet or 
orc format currently.
-          if (instantAndCommitMetadata.isPresent()) {
-            HoodieCommitMetadata commitMetadata = 
instantAndCommitMetadata.get().getRight();
-            Iterator<String> filePaths = 
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
-            return fetchSchemaFromFiles(filePaths);
-          } else {
-            throw new IllegalArgumentException("Could not find any data file 
written for commit, "
-                + "so could not get schema for table " + 
metaClient.getBasePath());
-          }
-        case MERGE_ON_READ:
-          // For MOR table, the file has data written may be a parquet file, 
.log file, orc file or hfile.
-          // Determine the file format based on the file name, and then 
extract schema from it.
-          if (instantAndCommitMetadata.isPresent()) {
-            HoodieCommitMetadata commitMetadata = 
instantAndCommitMetadata.get().getRight();
-            Iterator<String> filePaths = 
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
-            return fetchSchemaFromFiles(filePaths);
-          } else {
-            throw new IllegalArgumentException("Could not find any data file 
written for commit, "
-                + "so could not get schema for table " + 
metaClient.getBasePath());
-          }
-        default:
-          LOG.error("Unknown table type " + metaClient.getTableType());
-          throw new InvalidTableException(metaClient.getBasePath());
-      }
-    } catch (IOException e) {
-      throw new HoodieException("Failed to read data schema", e);
-    }
-  }
+  private final Lazy<ConcurrentHashMap<HoodieInstant, HoodieCommitMetadata>> 
commitMetadataCache;

Review Comment:
   If there are more commits in the timeline compared to when the 
commitMetadataCache was updated, wouldn't we want to recompute it again. from 
getLatestCommitMetadataWithValidSchema impl, looks like we set it once and 
reuse it for any further calls. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -176,86 +124,25 @@ public Schema getTableAvroSchema() throws Exception {
    * @throws Exception
    */
   public Schema getTableAvroSchema(boolean includeMetadataFields) throws 
Exception {
-    Schema schema;
-    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(includeMetadataFields);
-    if (schemaFromCommitMetadata.isPresent()) {
-      schema = schemaFromCommitMetadata.get();
-    } else {
-      Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-      if (schemaFromTableConfig.isPresent()) {
-        if (includeMetadataFields) {
-          schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
-        } else {
-          schema = schemaFromTableConfig.get();
-        }
-      } else {
-        if (includeMetadataFields) {
-          schema = getTableAvroSchemaFromDataFile();
-        } else {
-          schema = 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
-        }
-      }
-    }
-
-    Option<String[]> partitionFieldsOpt = 
metaClient.getTableConfig().getPartitionFields();
-    if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
-      schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, 
schema);
-    }
-    return schema;
+    return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
   }
 
-  public static Schema recreateSchemaWhenDropPartitionColumns(Option<String[]> 
partitionFieldsOpt, Schema originSchema) {
-    // when hoodie.datasource.write.drop.partition.columns is true, partition 
columns can't be persisted in data files.
-    // And there are no partition schema if the schema is parsed from data 
files.
-    // Here we create partition Fields for this case, and use StringType as 
the data type.
-    Schema schema = originSchema;
-    if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length != 
0) {
-      List<String> partitionFields = Arrays.asList(partitionFieldsOpt.get());
-
-      final Schema schema0 = originSchema;
-      boolean hasPartitionColNotInSchema = partitionFields.stream().anyMatch(
-          pt -> !HoodieAvroUtils.containsFieldInSchema(schema0, pt)
-      );
-      boolean hasPartitionColInSchema = partitionFields.stream().anyMatch(
-          pt -> HoodieAvroUtils.containsFieldInSchema(schema0, pt)
-      );
-      if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
-        throw new HoodieIncompatibleSchemaException(
-            "Not support: Partial partition fields are still in the schema "
-                + "when enable 
hoodie.datasource.write.drop.partition.columns");
-      }
-
-      if (hasPartitionColNotInSchema) {
-        // when hasPartitionColNotInSchema is true and hasPartitionColInSchema 
is false, all partition columns
-        // are not in originSchema. So we create and add them.
-        List<Field> newFields = new ArrayList<>();
-        for (String partitionField: partitionFields) {
-          newFields.add(new Schema.Field(
-              partitionField, createNullableSchema(Schema.Type.STRING), "", 
JsonProperties.NULL_VALUE));
-        }
-        schema = appendFieldsToSchema(schema, newFields);
-      }
-    }
-    return schema;
+  /**
+   * Fetches tables schema in Avro format as of the given instant
+   *
+   * @param instant as of which table's schema will be fetched
+   */
+  public Schema getTableAvroSchema(HoodieInstant instant, boolean 
includeMetadataFields) throws Exception {
+    return getTableAvroSchemaInternal(includeMetadataFields, 
Option.of(instant));
   }
 
   /**
    * Gets full schema (user + metadata) for a hoodie table in Parquet format.
    *
    * @return Parquet schema for the table
-   * @throws Exception
    */
   public MessageType getTableParquetSchema() throws Exception {
-    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(true);
-    if (schemaFromCommitMetadata.isPresent()) {
-      return convertAvroSchemaToParquet(schemaFromCommitMetadata.get());
-    }
-    Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-    if (schemaFromTableConfig.isPresent()) {
-      Schema schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
-      return convertAvroSchemaToParquet(schema);
-    }
-    return getTableParquetSchemaFromDataFile();
+    return convertAvroSchemaToParquet(getTableAvroSchema(true));

Review Comment:
   I see this new code path also handles shouldDropPartitionColumns, where as 
old code did not. was it a bug that we are fixing or was it unintentional w/ 
refactoring ? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -176,86 +124,25 @@ public Schema getTableAvroSchema() throws Exception {
    * @throws Exception
    */
   public Schema getTableAvroSchema(boolean includeMetadataFields) throws 
Exception {
-    Schema schema;
-    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(includeMetadataFields);
-    if (schemaFromCommitMetadata.isPresent()) {
-      schema = schemaFromCommitMetadata.get();
-    } else {
-      Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-      if (schemaFromTableConfig.isPresent()) {
-        if (includeMetadataFields) {
-          schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
-        } else {
-          schema = schemaFromTableConfig.get();
-        }
-      } else {
-        if (includeMetadataFields) {
-          schema = getTableAvroSchemaFromDataFile();
-        } else {
-          schema = 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
-        }
-      }
-    }
-
-    Option<String[]> partitionFieldsOpt = 
metaClient.getTableConfig().getPartitionFields();
-    if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
-      schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, 
schema);
-    }
-    return schema;
+    return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
   }
 
-  public static Schema recreateSchemaWhenDropPartitionColumns(Option<String[]> 
partitionFieldsOpt, Schema originSchema) {
-    // when hoodie.datasource.write.drop.partition.columns is true, partition 
columns can't be persisted in data files.
-    // And there are no partition schema if the schema is parsed from data 
files.
-    // Here we create partition Fields for this case, and use StringType as 
the data type.
-    Schema schema = originSchema;
-    if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length != 
0) {
-      List<String> partitionFields = Arrays.asList(partitionFieldsOpt.get());
-
-      final Schema schema0 = originSchema;
-      boolean hasPartitionColNotInSchema = partitionFields.stream().anyMatch(
-          pt -> !HoodieAvroUtils.containsFieldInSchema(schema0, pt)
-      );
-      boolean hasPartitionColInSchema = partitionFields.stream().anyMatch(
-          pt -> HoodieAvroUtils.containsFieldInSchema(schema0, pt)
-      );
-      if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
-        throw new HoodieIncompatibleSchemaException(
-            "Not support: Partial partition fields are still in the schema "
-                + "when enable 
hoodie.datasource.write.drop.partition.columns");
-      }
-
-      if (hasPartitionColNotInSchema) {
-        // when hasPartitionColNotInSchema is true and hasPartitionColInSchema 
is false, all partition columns
-        // are not in originSchema. So we create and add them.
-        List<Field> newFields = new ArrayList<>();
-        for (String partitionField: partitionFields) {
-          newFields.add(new Schema.Field(
-              partitionField, createNullableSchema(Schema.Type.STRING), "", 
JsonProperties.NULL_VALUE));
-        }
-        schema = appendFieldsToSchema(schema, newFields);
-      }
-    }
-    return schema;
+  /**
+   * Fetches tables schema in Avro format as of the given instant
+   *
+   * @param instant as of which table's schema will be fetched
+   */
+  public Schema getTableAvroSchema(HoodieInstant instant, boolean 
includeMetadataFields) throws Exception {
+    return getTableAvroSchemaInternal(includeMetadataFields, 
Option.of(instant));
   }
 
   /**
    * Gets full schema (user + metadata) for a hoodie table in Parquet format.
    *
    * @return Parquet schema for the table
-   * @throws Exception
    */
   public MessageType getTableParquetSchema() throws Exception {
-    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(true);
-    if (schemaFromCommitMetadata.isPresent()) {
-      return convertAvroSchemaToParquet(schemaFromCommitMetadata.get());
-    }
-    Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-    if (schemaFromTableConfig.isPresent()) {
-      Schema schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
-      return convertAvroSchemaToParquet(schema);
-    }
-    return getTableParquetSchemaFromDataFile();
+    return convertAvroSchemaToParquet(getTableAvroSchema(true));

Review Comment:
   getTableAvroSchema in one of the code paths(equivalent to L258), fetches 
schema from file as parquet schema. and here we are converting that to avro and 
again back to parquet. Can we avoid the back and forth conversion if possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to