alexeykudinkin commented on a change in pull request #4026:
URL: https://github.com/apache/hudi/pull/4026#discussion_r756319651
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java
##########
@@ -299,50 +298,54 @@ public static void saveStatisticsInfo(Dataset<Row> df,
String cols, String index
Dataset<Row> statisticsDF = ZCurveOptimizeHelper.getMinMaxValue(df, cols);
// try to find last validate index table from index path
try {
- if (fs.exists(new Path(indexPath))) {
- List<String> allIndexTables = Arrays
- .stream(fs.listStatus(new Path(indexPath))).filter(f ->
f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
- List<String> candidateIndexTables = allIndexTables.stream().filter(f
-> validateCommits.contains(f)).sorted().collect(Collectors.toList());
- List<String> residualTables = allIndexTables.stream().filter(f ->
!validateCommits.contains(f)).collect(Collectors.toList());
- Option<Dataset> latestIndexData = Option.empty();
- if (!candidateIndexTables.isEmpty()) {
- latestIndexData = Option.of(spark.read().load(new Path(indexPath,
candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
- // clean old index table, keep at most 1 index table.
- candidateIndexTables.remove(candidateIndexTables.size() - 1);
- candidateIndexTables.forEach(f -> {
- try {
- fs.delete(new Path(indexPath, f));
- } catch (IOException ie) {
- throw new HoodieException(ie);
- }
- });
- }
+ // If there's currently no index, create one
+ if (!fs.exists(new Path(indexPath))) {
+
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
+ return;
+ }
- // clean residualTables
- // retried cluster operations at the same instant time is also
considered,
- // the residual files produced by retried are cleaned up before save
statistics
- // save statistics info to index table which named commitTime
- residualTables.forEach(f -> {
+ // Otherwise, clean up all indexes but the most recent one
+
+ List<String> allIndexTables = Arrays
+ .stream(fs.listStatus(new Path(indexPath))).filter(f ->
f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
+ List<String> candidateIndexTables = allIndexTables.stream().filter(f ->
validateCommits.contains(f)).sorted().collect(Collectors.toList());
+ List<String> residualTables = allIndexTables.stream().filter(f ->
!validateCommits.contains(f)).collect(Collectors.toList());
+ Option<Dataset> latestIndexData = Option.empty();
+ if (!candidateIndexTables.isEmpty()) {
+ latestIndexData = Option.of(spark.read().load(new Path(indexPath,
candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
+ // clean old index table, keep at most 1 index table.
+ candidateIndexTables.remove(candidateIndexTables.size() - 1);
+ candidateIndexTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
+ }
- if (latestIndexData.isPresent() &&
latestIndexData.get().schema().equals(statisticsDF.schema())) {
- // update the statistics info
- String originalTable = "indexTable_" +
java.util.UUID.randomUUID().toString().replace("-", "");
- String updateTable = "updateTable_" +
java.util.UUID.randomUUID().toString().replace("-", "");
- latestIndexData.get().registerTempTable(originalTable);
- statisticsDF.registerTempTable(updateTable);
- // update table by full out join
- List columns = Arrays.asList(statisticsDF.schema().fieldNames());
- spark.sql(HoodieSparkUtils$
- .MODULE$.createMergeSql(originalTable, updateTable,
JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString());
- } else {
-
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
+ // clean residualTables
+ // retried cluster operations at the same instant time is also
considered,
+ // the residual files produced by retried are cleaned up before save
statistics
+ // save statistics info to index table which named commitTime
+ residualTables.forEach(f -> {
+ try {
+ fs.delete(new Path(indexPath, f));
+ } catch (IOException ie) {
+ throw new HoodieException(ie);
Review comment:
I will actually tackle this on as part of #4060
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]