JingsongLi commented on a change in pull request #11735:
URL: https://github.com/apache/flink/pull/11735#discussion_r412729427



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
##########
@@ -122,12 +131,40 @@ public void open(HiveTableInputSplit split) throws 
IOException {
                        this.reader = new HiveVectorizedParquetSplitReader(
                                        hiveVersion, jobConf, fieldNames, 
fieldTypes, selectedFields, split);
                } else {
-                       this.reader = new HiveMapredSplitReader(jobConf, 
partitionKeys, fieldTypes, selectedFields, split,
+                       JobConf clonedConf = new JobConf(jobConf);
+                       addSchemaToConf(clonedConf);
+                       this.reader = new HiveMapredSplitReader(clonedConf, 
partitionKeys, fieldTypes, selectedFields, split,
                                        
HiveShimLoader.loadHiveShim(hiveVersion));
                }
                currentReadCount = 0L;
        }
 
+       // Hive readers may rely on the schema info in configuration
+       private void addSchemaToConf(JobConf jobConf) {
+               // set columns/types -- including partition cols
+               List<String> typeStrs = Arrays.stream(fieldTypes)
+                               .map(t -> HiveTypeUtil.toHiveTypeInfo(t, 
true).toString())
+                               .collect(Collectors.toList());
+               jobConf.set(IOConstants.COLUMNS, String.join(",", fieldNames));
+               jobConf.set(IOConstants.COLUMNS_TYPES, String.join(",", 
typeStrs));
+               // set schema evolution -- excluding partition cols
+               int numPartCol = partitionKeys != null ? partitionKeys.size() : 
0;
+               int firstPartColIndex = fieldNames.length - numPartCol;
+               if (numPartCol == 0) {

Review comment:
       Need this branch?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to