nsivabalan commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2127286498
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -114,6 +119,10 @@ public class HoodieWriteStat extends HoodieReadStats {
@Nullable
private RuntimeStats runtimeStats;
+ @JsonIgnore
Review Comment:
entire record stats is only used for in-memory computations. the record
stats or the col stats is not serialized to HoodieCommitMetadata. So, should
not be an issue.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -82,16 +86,65 @@ protected HoodieIndex createIndex(HoodieWriteConfig
writeConfig) {
return SparkHoodieIndexFactory.createIndex(config);
}
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ return commit(instantTime, writeStatuses, extraMetadata, commitActionType,
partitionToReplacedFileIds, extraPreCommitFunc, Option.empty());
+ }
+
/**
* Complete changes performed at the given instantTime marker with specified
action.
*/
@Override
public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
- Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusValidator> writeStatusValidatorOpt) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
- List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+ // Triggering the dag for writes.
+ // If streaming writes are enabled, writes to both data table and metadata
table gets triggered at this juncture.
+ // If not, writes to data table gets triggered here.
+ // When streaming writes are enabled, data table's WriteStatus is expected
to contain all stats required to generate metadata table records and so it
could be fatter.
+ // So, here we are dropping all additional stats and only retains the
information required to proceed from here on.
+ // And we are also dropping error records so that we don't unintentionally
collect the error records in the driver.
+ HoodieTable table = createTable(config);
+ boolean isMetadataStreamingWritesEnabled =
config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion());
+ List<Pair<Boolean, WriteStatus>> isMetadataWriteStatusPairs = writeStatuses
+ .map(writeStatus -> {
+ if (isMetadataStreamingWritesEnabled) {
+ writeStatus.removeMetadataIndexStatsAndErrorRecordsTracking();
+ } else {
+ writeStatus.dropGranularErrorRecordsTracking();
+ }
+ return Pair.of(writeStatus.isMetadataTable(), writeStatus);
+ }
+ ).collect();
+ // Compute stats for the writes and invoke callback
+ AtomicLong totalRecords = new AtomicLong(0);
+ AtomicLong totalErrorRecords = new AtomicLong(0);
+ isMetadataWriteStatusPairs.stream().filter(entry ->
table.isMetadataTable() && entry.getKey()).forEach(pair -> {
+ totalRecords.getAndAdd(pair.getValue().getTotalRecords());
+ totalErrorRecords.getAndAdd(pair.getValue().getTotalErrorRecords());
+ });
+ // reason why we are passing RDD<WriteStatus> to the writeStatusHandler
callback: At the beginning of this method, we drop all index stats and error
records before collecting in the driver.
+ // Just incase if there are errors, caller might be interested to fetch
error records in the callback. And so, we are passing the RDD<WriteStatus> as
last argument to the write status
+ // handler callback.
+ boolean canProceed = writeStatusValidatorOpt.map(callback ->
callback.validate(totalRecords.get(), totalErrorRecords.get(),
+ totalErrorRecords.get() > 0 ?
Option.of(HoodieJavaRDD.of(writeStatuses.filter(status ->
table.isMetadataTable() &&
status.isMetadataTable()).map(WriteStatus::removeMetadataStats))) :
Option.empty()))
Review Comment:
I ensured we use SparkRDDMetadataWriteClient even for tblV 6. It simplifies
these.
##########
hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java:
##########
@@ -65,4 +80,89 @@ public void testSetPaths() {
assertEquals(finalizeFilePath, new StoragePath(basePath,
writeStat.getPath()));
assertNull(writeStat.getTempPath());
}
+
+ @Test
+ public void testRecordsStats() {
Review Comment:
yes
https://github.com/apache/hudi/blob/1eef61452f8ba83d72da6b8aeaf815934721cebd/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java#L249
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -82,16 +86,65 @@ protected HoodieIndex createIndex(HoodieWriteConfig
writeConfig) {
return SparkHoodieIndexFactory.createIndex(config);
}
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ return commit(instantTime, writeStatuses, extraMetadata, commitActionType,
partitionToReplacedFileIds, extraPreCommitFunc, Option.empty());
+ }
+
/**
* Complete changes performed at the given instantTime marker with specified
action.
*/
@Override
public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
- Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusValidator> writeStatusValidatorOpt) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
- List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+ // Triggering the dag for writes.
+ // If streaming writes are enabled, writes to both data table and metadata
table gets triggered at this juncture.
+ // If not, writes to data table gets triggered here.
+ // When streaming writes are enabled, data table's WriteStatus is expected
to contain all stats required to generate metadata table records and so it
could be fatter.
+ // So, here we are dropping all additional stats and only retains the
information required to proceed from here on.
+ // And we are also dropping error records so that we don't unintentionally
collect the error records in the driver.
+ HoodieTable table = createTable(config);
+ boolean isMetadataStreamingWritesEnabled =
config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion());
+ List<Pair<Boolean, WriteStatus>> isMetadataWriteStatusPairs = writeStatuses
+ .map(writeStatus -> {
+ if (isMetadataStreamingWritesEnabled) {
+ writeStatus.removeMetadataIndexStatsAndErrorRecordsTracking();
+ } else {
+ writeStatus.dropGranularErrorRecordsTracking();
+ }
+ return Pair.of(writeStatus.isMetadataTable(), writeStatus);
+ }
+ ).collect();
+ // Compute stats for the writes and invoke callback
+ AtomicLong totalRecords = new AtomicLong(0);
+ AtomicLong totalErrorRecords = new AtomicLong(0);
+ isMetadataWriteStatusPairs.stream().filter(entry ->
table.isMetadataTable() && entry.getKey()).forEach(pair -> {
Review Comment:
again, this is simplified now
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java:
##########
@@ -181,6 +189,21 @@ private void updateStatsForFailure() {
totalErrorRecords++;
}
+ public WriteStatus removeMetadataIndexStatsAndErrorRecordsTracking() {
+ removeMetadataStats();
+ dropGranularErrorRecordsTracking();
+ return this;
+ }
+
+ public WriteStatus removeMetadataStats() {
+ this.writtenRecordDelegates.clear();
+ return this;
+ }
+
+ public void dropGranularErrorRecordsTracking() {
+ failedRecords.clear();
+ }
Review Comment:
when we wanted to call the WriteStatusValidatorCallback, we have to just
drop metadata stats (w/o dropping the per error record tracking) from the
rdd<WriteStatus> and send it to the callback.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/WriteStatusValidator.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.common;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.util.Option;
+
+/**
+ * WriteStatus validator to assist caller to process errors if any. Caller can
dictate if to proceed with the commit or not by means of the return
+ * value of the method {@link WriteStatusValidator#validate}.
+ *
+ * <p>Sometimes callers invoke the dag just to process if there are any errors
before proceeding with the commit.
+ * This hook function is introduced for avoiding additional dag triggers from
the callers side.
+ */
+public interface WriteStatusValidator {
+
+ /**
+ * Validates the given write status.
+ *
+ * @param totalRecords Total records in this inflight commit.
+ * @param totalErroredRecords Total error records in this inflight commit.
+ * @param writeStatusesOpt List of {@link WriteStatus} for the data table
writes for this inflight commit.
+ *
+ * @return True if the commit can proceed
Review Comment:
we did not intend to make changes to commit flow in this patch. Just
retaining old behavior.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -82,16 +86,65 @@ protected HoodieIndex createIndex(HoodieWriteConfig
writeConfig) {
return SparkHoodieIndexFactory.createIndex(config);
}
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ return commit(instantTime, writeStatuses, extraMetadata, commitActionType,
partitionToReplacedFileIds, extraPreCommitFunc, Option.empty());
+ }
+
/**
* Complete changes performed at the given instantTime marker with specified
action.
*/
@Override
public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
- Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusValidator> writeStatusValidatorOpt) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
- List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+ // Triggering the dag for writes.
+ // If streaming writes are enabled, writes to both data table and metadata
table gets triggered at this juncture.
+ // If not, writes to data table gets triggered here.
+ // When streaming writes are enabled, data table's WriteStatus is expected
to contain all stats required to generate metadata table records and so it
could be fatter.
+ // So, here we are dropping all additional stats and only retains the
information required to proceed from here on.
+ // And we are also dropping error records so that we don't unintentionally
collect the error records in the driver.
+ HoodieTable table = createTable(config);
+ boolean isMetadataStreamingWritesEnabled =
config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion());
+ List<Pair<Boolean, WriteStatus>> isMetadataWriteStatusPairs = writeStatuses
Review Comment:
good point.
we can avoid bringing in `WriteStatus` to the driver. will attend to this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]