alexeykudinkin commented on code in PR #5201:
URL: https://github.com/apache/hudi/pull/5201#discussion_r848892463


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -209,14 +219,37 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
     val fileSplits = collectFileSplits(partitionFilters, dataFilters)
 
-    val partitionSchema = StructType(Nil)
-    val tableSchema = HoodieTableSchema(tableStructSchema, if 
(internalSchema.isEmptySchema) tableAvroSchema.toString else 
AvroInternalSchemaConverter.convert(internalSchema, 
tableAvroSchema.getName).toString, internalSchema)
-    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString, requiredInternalSchema)
+    val partitionSchema = if (dropPartitionColumnsWhenWrite) {
+      // when hoodie.datasource.write.drop.partition.columns is true, 
partition columns can't be persisted in
+      // data files.
+      StructType(partitionColumns.map(StructField(_, StringType)))
+    } else {
+      StructType(Nil)
+    }
 
+    val tableSchema = HoodieTableSchema(tableStructSchema, if 
(internalSchema.isEmptySchema) tableAvroSchema.toString else 
AvroInternalSchemaConverter.convert(internalSchema, 
tableAvroSchema.getName).toString, internalSchema)

Review Comment:
   Let's extract conditional to utility to make it readable



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -159,23 +167,67 @@ public Schema getTableAvroSchema() throws Exception {
    * @throws Exception
    */
   public Schema getTableAvroSchema(boolean includeMetadataFields) throws 
Exception {
+    Schema schema;
     Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(includeMetadataFields);
     if (schemaFromCommitMetadata.isPresent()) {
-      return schemaFromCommitMetadata.get();
-    }
-    Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-    if (schemaFromTableConfig.isPresent()) {
-      if (includeMetadataFields) {
-        return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+      schema = schemaFromCommitMetadata.get();
+    } else {
+      Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
+      if (schemaFromTableConfig.isPresent()) {
+        if (includeMetadataFields) {
+          schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+        } else {
+          schema = schemaFromTableConfig.get();
+        }
       } else {
-        return schemaFromTableConfig.get();
+        if (includeMetadataFields) {
+          schema = getTableAvroSchemaFromDataFile();
+        } else {
+          schema = 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+        }
       }
     }
-    if (includeMetadataFields) {
-      return getTableAvroSchemaFromDataFile();
-    } else {
-      return 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+
+    Option<String[]> partitionFieldsOpt = 
metaClient.getTableConfig().getPartitionFields();
+    if (metaClient.getTableConfig().isDropPartitionColumns()) {
+      schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, 
schema);
+    }
+    return schema;
+  }
+
+  public static Schema recreateSchemaWhenDropPartitionColumns(Option<String[]> 
partitionFieldsOpt, Schema originSchema) {

Review Comment:
   `TableSchemaResolver` should not even be aware of such setting.
   
   Instead, let's abstract such utilities in a generic way w/in 
`AvroSchemaUtils`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -209,14 +219,37 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
     val fileSplits = collectFileSplits(partitionFilters, dataFilters)
 
-    val partitionSchema = StructType(Nil)
-    val tableSchema = HoodieTableSchema(tableStructSchema, if 
(internalSchema.isEmptySchema) tableAvroSchema.toString else 
AvroInternalSchemaConverter.convert(internalSchema, 
tableAvroSchema.getName).toString, internalSchema)
-    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString, requiredInternalSchema)
+    val partitionSchema = if (dropPartitionColumnsWhenWrite) {
+      // when hoodie.datasource.write.drop.partition.columns is true, 
partition columns can't be persisted in
+      // data files.
+      StructType(partitionColumns.map(StructField(_, StringType)))
+    } else {
+      StructType(Nil)
+    }
 
+    val tableSchema = HoodieTableSchema(tableStructSchema, if 
(internalSchema.isEmptySchema) tableAvroSchema.toString else 
AvroInternalSchemaConverter.convert(internalSchema, 
tableAvroSchema.getName).toString, internalSchema)
+    val dataSchema = if (dropPartitionColumnsWhenWrite) {
+      val dataStructType = StructType(tableStructSchema.filterNot(f => 
partitionColumns.contains(f.name)))
+      HoodieTableSchema(
+        dataStructType,
+        sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType, 
nullable = false, "record").toString()
+      )
+    } else {
+      tableSchema
+    }
+    val requiredSchema = if (dropPartitionColumnsWhenWrite) {

Review Comment:
   Why are we changing `requiredSchema`? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -286,8 +319,16 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   }
 
   protected final def appendMandatoryColumns(requestedColumns: Array[String]): 
Array[String] = {
-    val missing = mandatoryColumns.filter(col => 
!requestedColumns.contains(col))
-    requestedColumns ++ missing
+    if (dropPartitionColumnsWhenWrite) {
+      if (requestedColumns.isEmpty) {
+        mandatoryColumns.toArray
+      } else {
+        requestedColumns

Review Comment:
   @YannByron let's make sure we're carefully vetting changes we're making on 
that level, since i'd imagine we didn't do exhaustive testing with and without 
this new config.
   
   A few notes here: 
   
   1. Behavior of selecting mandatory columns should not be predicated on this 
config
   2. This change will break MOR tables semantic (when new config is on) as it 
misses required columns for MOR to do merging properly



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -159,23 +167,67 @@ public Schema getTableAvroSchema() throws Exception {
    * @throws Exception
    */
   public Schema getTableAvroSchema(boolean includeMetadataFields) throws 
Exception {
+    Schema schema;
     Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(includeMetadataFields);
     if (schemaFromCommitMetadata.isPresent()) {
-      return schemaFromCommitMetadata.get();
-    }
-    Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-    if (schemaFromTableConfig.isPresent()) {
-      if (includeMetadataFields) {
-        return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+      schema = schemaFromCommitMetadata.get();
+    } else {
+      Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
+      if (schemaFromTableConfig.isPresent()) {
+        if (includeMetadataFields) {
+          schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+        } else {
+          schema = schemaFromTableConfig.get();
+        }
       } else {
-        return schemaFromTableConfig.get();
+        if (includeMetadataFields) {
+          schema = getTableAvroSchemaFromDataFile();
+        } else {
+          schema = 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+        }
       }
     }
-    if (includeMetadataFields) {
-      return getTableAvroSchemaFromDataFile();
-    } else {
-      return 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+
+    Option<String[]> partitionFieldsOpt = 
metaClient.getTableConfig().getPartitionFields();
+    if (metaClient.getTableConfig().isDropPartitionColumns()) {
+      schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, 
schema);
+    }
+    return schema;
+  }
+
+  public static Schema recreateSchemaWhenDropPartitionColumns(Option<String[]> 
partitionFieldsOpt, Schema originSchema) {
+    // when hoodie.datasource.write.drop.partition.columns is true, partition 
columns can't be persisted in data files.
+    // And there are no partition schema if the schema is parsed from data 
files.
+    // Here we create partition Fields for this case, and use StringType as 
the data type.
+    Schema schema = originSchema;
+    if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length != 
0) {
+      List<String> partitionFields = Arrays.asList(partitionFieldsOpt.get());
+
+      final Schema schema0 = originSchema;
+      boolean hasPartitionColNotInSchema = partitionFields.stream().anyMatch(
+          pt -> !HoodieAvroUtils.containsFieldInSchema(schema0, pt)
+      );
+      boolean hasPartitionColInSchema = partitionFields.stream().anyMatch(
+          pt -> HoodieAvroUtils.containsFieldInSchema(schema0, pt)
+      );
+      if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
+        throw new HoodieIncompatibleSchemaException(
+            "Not support: Partial partition fields are still in the schema "
+                + "when enable 
hoodie.datasource.write.drop.partition.columns");
+      }
+
+      if (hasPartitionColNotInSchema) {
+        // when hasPartitionColNotInSchema is true and hasPartitionColInSchema 
is false, all partition columns
+        // are not in originSchema. So we create and add them.
+        List<Field> newFields = new ArrayList<>();
+        for (String partitionField: partitionFields) {
+          newFields.add(new Schema.Field(
+              partitionField, Schema.create(Schema.Type.STRING), "", 
JsonProperties.NULL_VALUE));

Review Comment:
   Why do we assume that those partition fields would be strings? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -286,8 +319,16 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   }
 
   protected final def appendMandatoryColumns(requestedColumns: Array[String]): 
Array[String] = {
-    val missing = mandatoryColumns.filter(col => 
!requestedColumns.contains(col))
-    requestedColumns ++ missing
+    if (dropPartitionColumnsWhenWrite) {

Review Comment:
   Why are we changing this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to