alexeykudinkin commented on code in PR #6561:
URL: https://github.com/apache/hudi/pull/6561#discussion_r969039968
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -356,6 +360,16 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>>
cluster(String clusteringInstan
LOG.info("Starting clustering at " + clusteringInstant);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata =
table.cluster(context, clusteringInstant);
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata =
writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
+ if (clusteringMetadata.getWriteStatuses().isEmpty()) {
Review Comment:
Let's also extract this as a `validate` method to keep the logic of actual
writing clean and clear
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -356,6 +360,16 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>>
cluster(String clusteringInstan
LOG.info("Starting clustering at " + clusteringInstant);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata =
table.cluster(context, clusteringInstant);
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata =
writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
+ if (clusteringMetadata.getWriteStatuses().isEmpty()) {
Review Comment:
Why are we just checkin the case when no files have been written? We should
check whether # of created file-groups doesn't match the expected one (in a
plan), don't we?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1456,6 +1458,41 @@ public void testSimpleClustering(boolean
populateMetaFields, boolean preserveCom
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false,
SqlQueryEqualityPreCommitValidator.class.getName(),
COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
+ @Test
+ public void testAndValidateClusteringOutputFiles() throws IOException {
+ String partitionPath = "2015/03/16";
+ testInsertTwoBatches(true, partitionPath);
+
+ // Trigger clustering
+ HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withAutoCommit(false)
+
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(2).build());
+ try (SparkRDDWriteClient client =
getHoodieWriteClient(cfgBuilder.build())) {
+ int numRecords = 200;
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records1 = dataGen.generateInserts(newCommitTime,
numRecords);
+ client.startCommitWithTime(newCommitTime);
+ JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(records1, 2);
+ JavaRDD<WriteStatus> statuses = client.insert(insertRecordsRDD1,
newCommitTime);
+ client.commit(newCommitTime, statuses);
+ List<WriteStatus> statusList = statuses.collect();
+ assertNoWriteErrors(statusList);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieInstant replaceCommitInstant =
metaClient.getActiveTimeline().getCompletedReplaceTimeline().firstInstant().get();
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
+
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(replaceCommitInstant).get(),
HoodieReplaceCommitMetadata.class);
+
+ List<String> filesFromReplaceCommit = new ArrayList<>();
+ replaceCommitMetadata.getPartitionToWriteStats().forEach((k,v) ->
v.forEach(entry -> filesFromReplaceCommit.add(entry.getPath())));
Review Comment:
nit: streams are easier comprehend when reading the code (since it's purely
a waterfall-like chain of transformations as opposed to cognitively tracking
the states of objects).
```
metadata.getPartitionToWriteStats().values()
.stream()
.map(e -> e.getPath())
.collect(Collectors.toList())
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]