andygrove commented on code in PR #1815:
URL: https://github.com/apache/datafusion-comet/pull/1815#discussion_r2141233094
##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -424,6 +425,187 @@ public void init() throws Throwable {
isInitialized = true;
}
+ private ParquetColumn getParquetColumn(MessageType schema, StructType
sparkSchema) {
+ // We use a different config from the config that is passed in.
+ // This follows the setting used in Spark's
SpecificParquetRecordReaderBase
+ Configuration config = new Configuration();
+ config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), false);
+ config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+ config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
+ config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(),
false);
+ config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
+ ParquetToSparkSchemaConverter converter = new
ParquetToSparkSchemaConverter(config);
+ return converter.convertParquetColumn(schema, Option.apply(sparkSchema));
+ }
+
+ private Map<Integer, List<Type>> getIdToParquetFieldMap(GroupType type) {
+ return type.getFields().stream()
+ .filter(f -> f.getId() != null)
+ .collect(Collectors.groupingBy(f -> f.getId().intValue()));
+ }
+
+ private Map<String, List<Type>> getCaseSensitiveParquetFieldMap(GroupType
schema) {
+ return schema.getFields().stream().collect(Collectors.toMap(Type::getName,
Arrays::asList));
+ }
+
+ private Map<String, List<Type>> getCaseInsensitiveParquetFieldMap(GroupType
schema) {
+ return schema.getFields().stream()
+ .collect(Collectors.groupingBy(f ->
f.getName().toLowerCase(Locale.ROOT)));
+ }
+
+ private Type getMatchingParquetFieldById(
+ StructField f,
+ Map<Integer, List<Type>> idToParquetFieldMap,
+ Map<String, List<Type>> nameToParquetFieldMap,
+ boolean isCaseSensitive) {
+ List<Type> matched = null;
+ int fieldId = 0;
+ if (ParquetUtils.hasFieldId(f)) {
+ fieldId = ParquetUtils.getFieldId(f);
+ matched = idToParquetFieldMap.get(fieldId);
+ } else {
+ String fieldName = isCaseSensitive ? f.name() :
f.name().toLowerCase(Locale.ROOT);
+ matched = nameToParquetFieldMap.get(fieldName);
+ }
+
+ if (matched == null || matched.isEmpty()) {
+ return null;
+ }
+ if (matched.size() > 1) {
+ // Need to fail if there is ambiguity, i.e. more than one field is
matched
+ String parquetTypesString =
+ matched.stream().map(Type::getName).collect(Collectors.joining("[",
", ", "]"));
+ throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+ fieldId, parquetTypesString);
+ } else {
+ return matched.get(0);
+ }
+ }
+
+ // Derived from CometParquetReadSupport.matchFieldId
+ private String getMatchingNameById(
+ StructField f,
+ Map<Integer, List<Type>> idToParquetFieldMap,
+ Map<String, List<Type>> nameToParquetFieldMap /*, Map<String, String>
nameMap*/,
+ boolean isCaseSensitive) {
+ Type matched =
+ getMatchingParquetFieldById(f, idToParquetFieldMap,
nameToParquetFieldMap, isCaseSensitive);
+
+ // When there is no ID match, we use a fake name to avoid a name match by
accident
+ // We need this name to be unique as well, otherwise there will be type
conflicts
+ if (matched == null /*|| matched.isEmpty()*/) {
Review Comment:
is this commented out code still needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]