danny0405 commented on code in PR #10846:
URL: https://github.com/apache/hudi/pull/10846#discussion_r1520743894


##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java:
##########
@@ -38,24 +42,61 @@
 import org.apache.parquet.hadoop.ParquetRecordReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePathUnchecked;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.addPartitionFields;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.avroToArrayWritable;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.constructHiveOrderedSchema;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getNameToFieldMap;
 import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 public class HoodieAvroParquetReader extends RecordReader<Void, ArrayWritable> 
{
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieAvroParquetReader.class);
   private final ParquetRecordReader<GenericData.Record> parquetRecordReader;
   private Schema baseSchema;
 
   public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf) 
throws IOException {
-    // get base schema
-    ParquetMetadata fileFooter =
-        ParquetFileReader.readFooter(conf, ((ParquetInputSplit) 
inputSplit).getPath(), ParquetMetadataConverter.NO_FILTER);
-    MessageType messageType = fileFooter.getFileMetaData().getSchema();
-    baseSchema = new AvroSchemaConverter(conf).convert(messageType);
+    Path filePath = ((ParquetInputSplit) inputSplit).getPath();
+    Schema writerSchema;
+    try {
+      HoodieTableMetaClient metaClient = 
getTableMetaClientForBasePathUnchecked(conf, filePath);
+      writerSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+      LOG.warn("Got writer schema from table schema: {}", writerSchema);
+    } catch (Exception e) {
+      LOG.error("Failed to get writer schema from table schema", e);
+      LOG.warn("Falling back to reading writer schema from parquet file");
+      ParquetMetadata fileFooter = ParquetFileReader.readFooter(conf, 
filePath, ParquetMetadataConverter.NO_FILTER);
+      MessageType messageType = fileFooter.getFileMetaData().getSchema();
+      writerSchema = new AvroSchemaConverter(conf).convert(messageType);
+    }
+    JobConf jobConf = new JobConf(conf);
+    try {
+      // Add partitioning fields to writer schema for resulting row to contain 
null values for these fields
+      String partitionFields = 
jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
+      List<String> partitioningFields =
+          partitionFields.length() > 0 ? 
Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
+              : new ArrayList<>();
+      LOG.warn("Got partitioning fields: {}", partitioningFields);
+      writerSchema = addPartitionFields(writerSchema, partitioningFields);
+      LOG.warn("Added partition fields to writer schema: {}", writerSchema);
+    } catch (Exception e) {
+      LOG.error("Failed to add partition fields to writer schema", e);
+    }
+    Map<String, Schema.Field> schemaFieldsMap = 
getNameToFieldMap(writerSchema);
+    LOG.warn("Got schema fields map: {}", schemaFieldsMap);
+    baseSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap, 
jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS, EMPTY_STRING));
+    LOG.warn("Got hive ordered schema: {}", baseSchema);

Review Comment:
   Do we have some specific test cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to