alexeykudinkin commented on code in PR #5201:
URL: https://github.com/apache/hudi/pull/5201#discussion_r849916089


##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -159,23 +167,67 @@ public Schema getTableAvroSchema() throws Exception {
    * @throws Exception
    */
   public Schema getTableAvroSchema(boolean includeMetadataFields) throws 
Exception {
+    Schema schema;
     Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(includeMetadataFields);
     if (schemaFromCommitMetadata.isPresent()) {
-      return schemaFromCommitMetadata.get();
-    }
-    Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-    if (schemaFromTableConfig.isPresent()) {
-      if (includeMetadataFields) {
-        return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+      schema = schemaFromCommitMetadata.get();
+    } else {
+      Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
+      if (schemaFromTableConfig.isPresent()) {
+        if (includeMetadataFields) {
+          schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+        } else {
+          schema = schemaFromTableConfig.get();
+        }
       } else {
-        return schemaFromTableConfig.get();
+        if (includeMetadataFields) {
+          schema = getTableAvroSchemaFromDataFile();
+        } else {
+          schema = 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+        }
       }
     }
-    if (includeMetadataFields) {
-      return getTableAvroSchemaFromDataFile();
-    } else {
-      return 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+
+    Option<String[]> partitionFieldsOpt = 
metaClient.getTableConfig().getPartitionFields();
+    if (metaClient.getTableConfig().isDropPartitionColumns()) {
+      schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, 
schema);
+    }
+    return schema;
+  }
+
+  public static Schema recreateSchemaWhenDropPartitionColumns(Option<String[]> 
partitionFieldsOpt, Schema originSchema) {

Review Comment:
   Fair enough, since it's called table schema it has to contain all the 
columns including partition ones. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -140,6 +144,12 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
   protected val partitionColumns: Array[String] = 
tableConfig.getPartitionFields.orElse(Array.empty)
 
+  /**
+   * if true, need to deal with schema for creating file reader.
+   */
+  protected val dropPartitionColumnsWhenWrite: Boolean =

Review Comment:
   nit; `shouldDropPartitionColumns`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -209,14 +219,37 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
     val fileSplits = collectFileSplits(partitionFilters, dataFilters)
 
-    val partitionSchema = StructType(Nil)
-    val tableSchema = HoodieTableSchema(tableStructSchema, if 
(internalSchema.isEmptySchema) tableAvroSchema.toString else 
AvroInternalSchemaConverter.convert(internalSchema, 
tableAvroSchema.getName).toString, internalSchema)
-    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString, requiredInternalSchema)
+    val partitionSchema = if (dropPartitionColumnsWhenWrite) {
+      // when hoodie.datasource.write.drop.partition.columns is true, 
partition columns can't be persisted in
+      // data files.
+      StructType(partitionColumns.map(StructField(_, StringType)))
+    } else {
+      StructType(Nil)
+    }
 
+    val tableSchema = HoodieTableSchema(tableStructSchema, if 
(internalSchema.isEmptySchema) tableAvroSchema.toString else 
AvroInternalSchemaConverter.convert(internalSchema, 
tableAvroSchema.getName).toString, internalSchema)
+    val dataSchema = if (dropPartitionColumnsWhenWrite) {
+      val dataStructType = StructType(tableStructSchema.filterNot(f => 
partitionColumns.contains(f.name)))
+      HoodieTableSchema(
+        dataStructType,
+        sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType, 
nullable = false, "record").toString()
+      )
+    } else {
+      tableSchema
+    }
+    val requiredSchema = if (dropPartitionColumnsWhenWrite) {

Review Comment:
   Ok, i see that we have to drop partition columns to make sure we're not 
looking for such when we're reading form Parquet. 
   
   This ties back to my other comment instead of duplicating this let's extract 
filtering of partition columns as a standalone utility and apply it for both 
`tableSchema`, `requiredSchema` and add a comment explaining why we need this 
filtering



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -159,23 +167,67 @@ public Schema getTableAvroSchema() throws Exception {
    * @throws Exception
    */
   public Schema getTableAvroSchema(boolean includeMetadataFields) throws 
Exception {
+    Schema schema;
     Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(includeMetadataFields);
     if (schemaFromCommitMetadata.isPresent()) {
-      return schemaFromCommitMetadata.get();
-    }
-    Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-    if (schemaFromTableConfig.isPresent()) {
-      if (includeMetadataFields) {
-        return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+      schema = schemaFromCommitMetadata.get();
+    } else {
+      Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
+      if (schemaFromTableConfig.isPresent()) {
+        if (includeMetadataFields) {
+          schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+        } else {
+          schema = schemaFromTableConfig.get();
+        }
       } else {
-        return schemaFromTableConfig.get();
+        if (includeMetadataFields) {
+          schema = getTableAvroSchemaFromDataFile();
+        } else {
+          schema = 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+        }
       }
     }
-    if (includeMetadataFields) {
-      return getTableAvroSchemaFromDataFile();
-    } else {
-      return 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+
+    Option<String[]> partitionFieldsOpt = 
metaClient.getTableConfig().getPartitionFields();
+    if (metaClient.getTableConfig().isDropPartitionColumns()) {
+      schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, 
schema);
+    }
+    return schema;
+  }
+
+  public static Schema recreateSchemaWhenDropPartitionColumns(Option<String[]> 
partitionFieldsOpt, Schema originSchema) {
+    // when hoodie.datasource.write.drop.partition.columns is true, partition 
columns can't be persisted in data files.
+    // And there are no partition schema if the schema is parsed from data 
files.
+    // Here we create partition Fields for this case, and use StringType as 
the data type.
+    Schema schema = originSchema;
+    if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length != 
0) {
+      List<String> partitionFields = Arrays.asList(partitionFieldsOpt.get());
+
+      final Schema schema0 = originSchema;
+      boolean hasPartitionColNotInSchema = partitionFields.stream().anyMatch(

Review Comment:
   These 2 iterations are not needed: we can just filter out the list of 
partition-fields already contained in the schema and then make sure that it's 
either an empty list or whole list



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to