[
https://issues.apache.org/jira/browse/HUDI-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499790#comment-17499790
]
sivabalan narayanan commented on HUDI-3543:
-------------------------------------------
[~xleesf] : Do you happened to know if we can remove these . Seems like it was
written by you.
> Clean up HoodieIncrSource for commented out code
> ------------------------------------------------
>
> Key: HUDI-3543
> URL: https://issues.apache.org/jira/browse/HUDI-3543
> Project: Apache Hudi
> Issue Type: Task
> Components: deltastreamer
> Reporter: sivabalan narayanan
> Assignee: leesf
> Priority: Major
>
> We find some commented out code in HoodieIncrSource. Clean up if not
> required.
>
> {code:java}
> /*
> * DataSourceUtils.checkRequiredProperties(props,
> Arrays.asList(Config.HOODIE_SRC_BASE_PATH,
> * Config.HOODIE_SRC_PARTITION_FIELDS)); List<String> partitionFields =
> * props.getStringList(Config.HOODIE_SRC_PARTITION_FIELDS, ",", new
> ArrayList<>()); PartitionValueExtractor
> * extractor = DataSourceUtils.createPartitionExtractor(props.getString(
> Config.HOODIE_SRC_PARTITION_EXTRACTORCLASS,
> * Config.DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS));
> */ {code}
> {code:java}
> /*
> * log.info("Partition Fields are : (" + partitionFields + "). Initial Source
> Schema :" + source.schema());
> *
> * StructType newSchema = new StructType(source.schema().fields()); for
> (String field : partitionFields) { newSchema
> * = newSchema.add(field, DataTypes.StringType, true); }
> *
> * /** Validates if the commit time is sane and also generates Partition
> fields from _hoodie_partition_path if
> * configured
> *
> * Dataset<Row> validated = source.map((MapFunction<Row, Row>) (Row row) -> {
> // _hoodie_instant_time String
> * instantTime = row.getString(0); IncrSourceHelper.validateInstantTime(row,
> instantTime, instantEndpts.getKey(),
> * instantEndpts.getValue()); if (!partitionFields.isEmpty()) { //
> _hoodie_partition_path String hoodiePartitionPath
> * = row.getString(3); List<Object> partitionVals =
> * extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream()
> .map(o -> (Object)
> * o).collect(Collectors.toList());
> ValidationUtils.checkArgument(partitionVals.size() == partitionFields.size(),
> * "#partition-fields != #partition-values-extracted"); List<Object> rowObjs
> = new
> * ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq()));
> rowObjs.addAll(partitionVals); return
> * RowFactory.create(rowObjs.toArray()); } return row; },
> RowEncoder.apply(newSchema));
> *
> * log.info("Validated Source Schema :" + validated.schema());
> */ {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)