alexeykudinkin commented on code in PR #5201:
URL: https://github.com/apache/hudi/pull/5201#discussion_r849916089
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -159,23 +167,67 @@ public Schema getTableAvroSchema() throws Exception {
* @throws Exception
*/
public Schema getTableAvroSchema(boolean includeMetadataFields) throws
Exception {
+ Schema schema;
Option<Schema> schemaFromCommitMetadata =
getTableSchemaFromCommitMetadata(includeMetadataFields);
if (schemaFromCommitMetadata.isPresent()) {
- return schemaFromCommitMetadata.get();
- }
- Option<Schema> schemaFromTableConfig =
metaClient.getTableConfig().getTableCreateSchema();
- if (schemaFromTableConfig.isPresent()) {
- if (includeMetadataFields) {
- return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(),
hasOperationField);
+ schema = schemaFromCommitMetadata.get();
+ } else {
+ Option<Schema> schemaFromTableConfig =
metaClient.getTableConfig().getTableCreateSchema();
+ if (schemaFromTableConfig.isPresent()) {
+ if (includeMetadataFields) {
+ schema =
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(),
hasOperationField);
+ } else {
+ schema = schemaFromTableConfig.get();
+ }
} else {
- return schemaFromTableConfig.get();
+ if (includeMetadataFields) {
+ schema = getTableAvroSchemaFromDataFile();
+ } else {
+ schema =
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+ }
}
}
- if (includeMetadataFields) {
- return getTableAvroSchemaFromDataFile();
- } else {
- return
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+
+ Option<String[]> partitionFieldsOpt =
metaClient.getTableConfig().getPartitionFields();
+ if (metaClient.getTableConfig().isDropPartitionColumns()) {
+ schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt,
schema);
+ }
+ return schema;
+ }
+
+ public static Schema recreateSchemaWhenDropPartitionColumns(Option<String[]>
partitionFieldsOpt, Schema originSchema) {
Review Comment:
Fair enough, since it's called table schema it has to contain all the
columns including partition ones.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -140,6 +144,12 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected val partitionColumns: Array[String] =
tableConfig.getPartitionFields.orElse(Array.empty)
+ /**
+ * if true, need to deal with schema for creating file reader.
+ */
+ protected val dropPartitionColumnsWhenWrite: Boolean =
Review Comment:
nit; `shouldDropPartitionColumns`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -209,14 +219,37 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
val fileSplits = collectFileSplits(partitionFilters, dataFilters)
- val partitionSchema = StructType(Nil)
- val tableSchema = HoodieTableSchema(tableStructSchema, if
(internalSchema.isEmptySchema) tableAvroSchema.toString else
AvroInternalSchemaConverter.convert(internalSchema,
tableAvroSchema.getName).toString, internalSchema)
- val requiredSchema = HoodieTableSchema(requiredStructSchema,
requiredAvroSchema.toString, requiredInternalSchema)
+ val partitionSchema = if (dropPartitionColumnsWhenWrite) {
+ // when hoodie.datasource.write.drop.partition.columns is true,
partition columns can't be persisted in
+ // data files.
+ StructType(partitionColumns.map(StructField(_, StringType)))
+ } else {
+ StructType(Nil)
+ }
+ val tableSchema = HoodieTableSchema(tableStructSchema, if
(internalSchema.isEmptySchema) tableAvroSchema.toString else
AvroInternalSchemaConverter.convert(internalSchema,
tableAvroSchema.getName).toString, internalSchema)
+ val dataSchema = if (dropPartitionColumnsWhenWrite) {
+ val dataStructType = StructType(tableStructSchema.filterNot(f =>
partitionColumns.contains(f.name)))
+ HoodieTableSchema(
+ dataStructType,
+ sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType,
nullable = false, "record").toString()
+ )
+ } else {
+ tableSchema
+ }
+ val requiredSchema = if (dropPartitionColumnsWhenWrite) {
Review Comment:
Ok, i see that we have to drop partition columns to make sure we're not
looking for such when we're reading form Parquet.
This ties back to my other comment instead of duplicating this let's extract
filtering of partition columns as a standalone utility and apply it for both
`tableSchema`, `requiredSchema` and add a comment explaining why we need this
filtering
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -159,23 +167,67 @@ public Schema getTableAvroSchema() throws Exception {
* @throws Exception
*/
public Schema getTableAvroSchema(boolean includeMetadataFields) throws
Exception {
+ Schema schema;
Option<Schema> schemaFromCommitMetadata =
getTableSchemaFromCommitMetadata(includeMetadataFields);
if (schemaFromCommitMetadata.isPresent()) {
- return schemaFromCommitMetadata.get();
- }
- Option<Schema> schemaFromTableConfig =
metaClient.getTableConfig().getTableCreateSchema();
- if (schemaFromTableConfig.isPresent()) {
- if (includeMetadataFields) {
- return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(),
hasOperationField);
+ schema = schemaFromCommitMetadata.get();
+ } else {
+ Option<Schema> schemaFromTableConfig =
metaClient.getTableConfig().getTableCreateSchema();
+ if (schemaFromTableConfig.isPresent()) {
+ if (includeMetadataFields) {
+ schema =
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(),
hasOperationField);
+ } else {
+ schema = schemaFromTableConfig.get();
+ }
} else {
- return schemaFromTableConfig.get();
+ if (includeMetadataFields) {
+ schema = getTableAvroSchemaFromDataFile();
+ } else {
+ schema =
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+ }
}
}
- if (includeMetadataFields) {
- return getTableAvroSchemaFromDataFile();
- } else {
- return
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+
+ Option<String[]> partitionFieldsOpt =
metaClient.getTableConfig().getPartitionFields();
+ if (metaClient.getTableConfig().isDropPartitionColumns()) {
+ schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt,
schema);
+ }
+ return schema;
+ }
+
+ public static Schema recreateSchemaWhenDropPartitionColumns(Option<String[]>
partitionFieldsOpt, Schema originSchema) {
+ // when hoodie.datasource.write.drop.partition.columns is true, partition
columns can't be persisted in data files.
+ // And there are no partition schema if the schema is parsed from data
files.
+ // Here we create partition Fields for this case, and use StringType as
the data type.
+ Schema schema = originSchema;
+ if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length !=
0) {
+ List<String> partitionFields = Arrays.asList(partitionFieldsOpt.get());
+
+ final Schema schema0 = originSchema;
+ boolean hasPartitionColNotInSchema = partitionFields.stream().anyMatch(
Review Comment:
These 2 iterations are not needed: we can just filter out the list of
partition-fields already contained in the schema and then make sure that it's
either an empty list or whole list
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]