satishkotha commented on a change in pull request #3970:
URL: https://github.com/apache/hudi/pull/3970#discussion_r748646037
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
##########
@@ -205,12 +207,26 @@ public MultipleSparkJobExecutionStrategy(HoodieTable
table, HoodieEngineContext
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.build();
- HoodieTableConfig tableConfig =
table.getMetaClient().getTableConfig();
- recordIterators.add(getFileSliceReader(baseFileReader, scanner,
readerSchema,
- tableConfig.getPayloadClass(),
- tableConfig.getPreCombineField(),
- tableConfig.populateMetaFields() ? Option.empty() :
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
- tableConfig.getPartitionFieldProp()))));
+ if (!StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())) {
+ HoodieFileReader<? extends IndexedRecord> baseFileReader =
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new
Path(clusteringOp.getDataFilePath()));
+ HoodieTableConfig tableConfig =
table.getMetaClient().getTableConfig();
+ recordIterators.add(getFileSliceReader(baseFileReader, scanner,
readerSchema,
+ tableConfig.getPayloadClass(),
+ tableConfig.getPreCombineField(),
+ tableConfig.populateMetaFields() ? Option.empty() :
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+ tableConfig.getPartitionFieldProp()))));
+ } else {
+ // Since there is no base file, fall back to reading log files
+ Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable =
() -> scanner.iterator();
+ recordIterators.add(StreamSupport.stream(iterable.spliterator(),
false)
+ .map(e -> {
+ try {
+ return transform((IndexedRecord)
e.getData().getInsertValue(readerSchema).get());
+ } catch (IOException io) {
+ throw new UncheckedIOException(io);
Review comment:
minor: We use HoodieIOException in rest of the code. consider using that
for consistency.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
##########
@@ -205,12 +207,26 @@ public MultipleSparkJobExecutionStrategy(HoodieTable
table, HoodieEngineContext
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.build();
- HoodieTableConfig tableConfig =
table.getMetaClient().getTableConfig();
- recordIterators.add(getFileSliceReader(baseFileReader, scanner,
readerSchema,
- tableConfig.getPayloadClass(),
- tableConfig.getPreCombineField(),
- tableConfig.populateMetaFields() ? Option.empty() :
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
- tableConfig.getPartitionFieldProp()))));
+ if (!StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())) {
+ HoodieFileReader<? extends IndexedRecord> baseFileReader =
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new
Path(clusteringOp.getDataFilePath()));
+ HoodieTableConfig tableConfig =
table.getMetaClient().getTableConfig();
+ recordIterators.add(getFileSliceReader(baseFileReader, scanner,
readerSchema,
+ tableConfig.getPayloadClass(),
+ tableConfig.getPreCombineField(),
+ tableConfig.populateMetaFields() ? Option.empty() :
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+ tableConfig.getPartitionFieldProp()))));
+ } else {
+ // Since there is no base file, fall back to reading log files
+ Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable =
() -> scanner.iterator();
Review comment:
Functionality looks good. But what do you think reorganizing this a
little? Here is what I'm thinking:
Change HoodieFileSliceReader#getFileSliceReader method to take
Option[HoodieBaseFileReader]. This whole logic can be embedded inside that
method (Introduce new methods if needed).
Makes it easy to reuse code if there are other places that need to read
FileSlices. Please try and let me know if you think this is reasonable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]