alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1091455819
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -159,35 +153,8 @@ public Pair<Option<Dataset<Row>>, String>
fetchNextBatch(Option<String> lastCkpt
queryTypeAndInstantEndpts.getRight().getRight()));
}
- /*
- * log.info("Partition Fields are : (" + partitionFields + "). Initial
Source Schema :" + source.schema());
- *
- * StructType newSchema = new StructType(source.schema().fields()); for
(String field : partitionFields) { newSchema
- * = newSchema.add(field, DataTypes.StringType, true); }
- *
- * /** Validates if the commit time is sane and also generates Partition
fields from _hoodie_partition_path if
- * configured
- *
- * Dataset<Row> validated = source.map((MapFunction<Row, Row>) (Row row)
-> { // _hoodie_instant_time String
- * instantTime = row.getString(0);
IncrSourceHelper.validateInstantTime(row, instantTime, instantEndpts.getKey(),
- * instantEndpts.getValue()); if (!partitionFields.isEmpty()) { //
_hoodie_partition_path String hoodiePartitionPath
- * = row.getString(3); List<Object> partitionVals =
- * extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream()
.map(o -> (Object)
- * o).collect(Collectors.toList());
ValidationUtils.checkArgument(partitionVals.size() == partitionFields.size(),
- * "#partition-fields != #partition-values-extracted"); List<Object>
rowObjs = new
- *
ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq()));
rowObjs.addAll(partitionVals); return
- * RowFactory.create(rowObjs.toArray()); } return row; },
RowEncoder.apply(newSchema));
- *
- * log.info("Validated Source Schema :" + validated.schema());
- */
- boolean dropAllMetaFields =
props.getBoolean(Config.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE,
- Config.DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE);
-
- // Remove Hoodie meta columns except partition path from input source
- String[] colsToDrop = dropAllMetaFields ?
HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) :
- HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x ->
!x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
- final Dataset<Row> src = source.drop(colsToDrop);
- // log.info("Final Schema from Source is :" + src.schema());
+ // Remove Hoodie meta columns
+ final Dataset<Row> src =
source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new));
Review Comment:
Change here is to avoid keeping partition-path as this will make
`HoodieSparkSqlWriter` treat it as data column which is not compatible w/
`SparkRecordMerger`
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -159,35 +153,8 @@ public Pair<Option<Dataset<Row>>, String>
fetchNextBatch(Option<String> lastCkpt
queryTypeAndInstantEndpts.getRight().getRight()));
}
- /*
- * log.info("Partition Fields are : (" + partitionFields + "). Initial
Source Schema :" + source.schema());
Review Comment:
Cleaning up dead commented code (not updated since 2018)
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -159,35 +153,8 @@ public Pair<Option<Dataset<Row>>, String>
fetchNextBatch(Option<String> lastCkpt
queryTypeAndInstantEndpts.getRight().getRight()));
}
- /*
- * log.info("Partition Fields are : (" + partitionFields + "). Initial
Source Schema :" + source.schema());
- *
- * StructType newSchema = new StructType(source.schema().fields()); for
(String field : partitionFields) { newSchema
- * = newSchema.add(field, DataTypes.StringType, true); }
- *
- * /** Validates if the commit time is sane and also generates Partition
fields from _hoodie_partition_path if
- * configured
- *
- * Dataset<Row> validated = source.map((MapFunction<Row, Row>) (Row row)
-> { // _hoodie_instant_time String
- * instantTime = row.getString(0);
IncrSourceHelper.validateInstantTime(row, instantTime, instantEndpts.getKey(),
- * instantEndpts.getValue()); if (!partitionFields.isEmpty()) { //
_hoodie_partition_path String hoodiePartitionPath
- * = row.getString(3); List<Object> partitionVals =
- * extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream()
.map(o -> (Object)
- * o).collect(Collectors.toList());
ValidationUtils.checkArgument(partitionVals.size() == partitionFields.size(),
- * "#partition-fields != #partition-values-extracted"); List<Object>
rowObjs = new
- *
ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq()));
rowObjs.addAll(partitionVals); return
- * RowFactory.create(rowObjs.toArray()); } return row; },
RowEncoder.apply(newSchema));
- *
- * log.info("Validated Source Schema :" + validated.schema());
- */
- boolean dropAllMetaFields =
props.getBoolean(Config.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE,
- Config.DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE);
-
- // Remove Hoodie meta columns except partition path from input source
- String[] colsToDrop = dropAllMetaFields ?
HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) :
- HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x ->
!x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
- final Dataset<Row> src = source.drop(colsToDrop);
- // log.info("Final Schema from Source is :" + src.schema());
+ // Remove Hoodie meta columns
+ final Dataset<Row> src =
source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new));
Review Comment:
`_hoodie_partition_path` isn't used neither in the source or DS and
according to the commented out code it's been used previously but is not used
anymore.
#7132 recently added config that forces all of the meta-fields to be cleaned
up, but it's false by default
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]