[ 
https://issues.apache.org/jira/browse/HUDI-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499790#comment-17499790
 ] 

sivabalan narayanan commented on HUDI-3543:
-------------------------------------------

[~xleesf] : Do you happened to know if we can remove these . Seems like it was 
written by you. 

> Clean up HoodieIncrSource for commented out code
> ------------------------------------------------
>
>                 Key: HUDI-3543
>                 URL: https://issues.apache.org/jira/browse/HUDI-3543
>             Project: Apache Hudi
>          Issue Type: Task
>          Components: deltastreamer
>            Reporter: sivabalan narayanan
>            Assignee: leesf
>            Priority: Major
>
> We find some commented out code in HoodieIncrSource. Clean up if not 
> required. 
>  
> {code:java}
> /*
>  * DataSourceUtils.checkRequiredProperties(props, 
> Arrays.asList(Config.HOODIE_SRC_BASE_PATH,
>  * Config.HOODIE_SRC_PARTITION_FIELDS)); List<String> partitionFields =
>  * props.getStringList(Config.HOODIE_SRC_PARTITION_FIELDS, ",", new 
> ArrayList<>()); PartitionValueExtractor
>  * extractor = DataSourceUtils.createPartitionExtractor(props.getString( 
> Config.HOODIE_SRC_PARTITION_EXTRACTORCLASS,
>  * Config.DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS));
>  */ {code}
> {code:java}
> /*
>  * log.info("Partition Fields are : (" + partitionFields + "). Initial Source 
> Schema :" + source.schema());
>  *
>  * StructType newSchema = new StructType(source.schema().fields()); for 
> (String field : partitionFields) { newSchema
>  * = newSchema.add(field, DataTypes.StringType, true); }
>  *
>  * /** Validates if the commit time is sane and also generates Partition 
> fields from _hoodie_partition_path if
>  * configured
>  *
>  * Dataset<Row> validated = source.map((MapFunction<Row, Row>) (Row row) -> { 
> // _hoodie_instant_time String
>  * instantTime = row.getString(0); IncrSourceHelper.validateInstantTime(row, 
> instantTime, instantEndpts.getKey(),
>  * instantEndpts.getValue()); if (!partitionFields.isEmpty()) { // 
> _hoodie_partition_path String hoodiePartitionPath
>  * = row.getString(3); List<Object> partitionVals =
>  * extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream() 
> .map(o -> (Object)
>  * o).collect(Collectors.toList()); 
> ValidationUtils.checkArgument(partitionVals.size() == partitionFields.size(),
>  * "#partition-fields != #partition-values-extracted"); List<Object> rowObjs 
> = new
>  * ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); 
> rowObjs.addAll(partitionVals); return
>  * RowFactory.create(rowObjs.toArray()); } return row; }, 
> RowEncoder.apply(newSchema));
>  *
>  * log.info("Validated Source Schema :" + validated.schema());
>  */ {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to