the-other-tim-brown commented on code in PR #13307: URL: https://github.com/apache/hudi/pull/13307#discussion_r2127120058
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/WriteStatusValidator.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.callback.common; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.util.Option; + +/** + * WriteStatus validator to assist caller to process errors if any. Caller can dictate if to proceed with the commit or not by means of the return + * value of the method {@link WriteStatusValidator#validate}. + * + * <p>Sometimes callers invoke the dag just to process if there are any errors before proceeding with the commit. + * This hook function is introduced for avoiding additional dag triggers from the callers side. + */ +public interface WriteStatusValidator { + + /** + * Validates the given write status. + * + * @param totalRecords Total records in this inflight commit. + * @param totalErroredRecords Total error records in this inflight commit. + * @param writeStatusesOpt List of {@link WriteStatus} for the data table writes for this inflight commit. + * + * @return True if the commit can proceed Review Comment: Should the implementation return false or throw an exception if the commit cannot proceed? ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ########## @@ -82,16 +86,65 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) { return SparkHoodieIndexFactory.createIndex(config); } + public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, + String commitActionType, Map<String, List<String>> partitionToReplacedFileIds, + Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) { + return commit(instantTime, writeStatuses, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc, Option.empty()); + } + /** * Complete changes performed at the given instantTime marker with specified action. */ @Override public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds, - Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) { + Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc, + Option<WriteStatusValidator> writeStatusValidatorOpt) { context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " + config.getTableName()); - List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect(); - return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); + // Triggering the dag for writes. + // If streaming writes are enabled, writes to both data table and metadata table gets triggered at this juncture. + // If not, writes to data table gets triggered here. + // When streaming writes are enabled, data table's WriteStatus is expected to contain all stats required to generate metadata table records and so it could be fatter. + // So, here we are dropping all additional stats and only retains the information required to proceed from here on. + // And we are also dropping error records so that we don't unintentionally collect the error records in the driver. Review Comment: ```suggestion // So, here we are dropping all additional stats and error records to retain only the required information and prevent collecting large objects on the driver. ``` ########## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java: ########## @@ -263,6 +272,28 @@ public void setPath(StoragePath basePath, StoragePath path) { this.path = path.toString().replace(basePath + "/", ""); } + public void putRecordsStats(Map<String, HoodieColumnRangeMetadata<Comparable>> stats) { + if (!recordsStats.isPresent()) { + recordsStats = Option.of(stats); + } else { + // in case there are multiple log blocks for one write process. + recordsStats = Option.of(mergeRecordsStats(recordsStats.get(), stats)); + } + } + + // keep for serialization efficiency + public void setRecordsStats(Map<String, HoodieColumnRangeMetadata<Comparable>> stats) { + recordsStats = Option.of(stats); + } Review Comment: Why expose two separate APIs here? Will this cause confusion/bugs in the future? ########## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java: ########## @@ -114,6 +119,10 @@ public class HoodieWriteStat extends HoodieReadStats { @Nullable private RuntimeStats runtimeStats; + @JsonIgnore Review Comment: It used to have this field before though, right? ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ########## @@ -82,16 +86,65 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) { return SparkHoodieIndexFactory.createIndex(config); } + public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, + String commitActionType, Map<String, List<String>> partitionToReplacedFileIds, + Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) { + return commit(instantTime, writeStatuses, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc, Option.empty()); + } + /** * Complete changes performed at the given instantTime marker with specified action. */ @Override public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds, - Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) { + Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc, + Option<WriteStatusValidator> writeStatusValidatorOpt) { context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " + config.getTableName()); - List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect(); - return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); + // Triggering the dag for writes. + // If streaming writes are enabled, writes to both data table and metadata table gets triggered at this juncture. + // If not, writes to data table gets triggered here. + // When streaming writes are enabled, data table's WriteStatus is expected to contain all stats required to generate metadata table records and so it could be fatter. + // So, here we are dropping all additional stats and only retains the information required to proceed from here on. + // And we are also dropping error records so that we don't unintentionally collect the error records in the driver. + HoodieTable table = createTable(config); + boolean isMetadataStreamingWritesEnabled = config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion()); + List<Pair<Boolean, WriteStatus>> isMetadataWriteStatusPairs = writeStatuses Review Comment: Previously this was only returning the `WriteStat`. Even with setting fields to null/empty, there is still serialization overhead when comparing the `WriteStat` to `WriteStatus` due to the other fields - is this of any concern? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java: ########## @@ -925,8 +925,11 @@ public void ingestOnce() { @Override protected boolean requestShutdownIfNeeded(Option<HoodieData<WriteStatus>> lastWriteStatuses) { - Option<JavaRDD<WriteStatus>> lastWriteStatusRDD = Option.ofNullable(lastWriteStatuses.isPresent() ? HoodieJavaRDD.getJavaRDD(lastWriteStatuses.get()) : null); - return postWriteTerminationStrategy.isPresent() && postWriteTerminationStrategy.get().shouldShutdown(lastWriteStatusRDD); + if (postWriteTerminationStrategy.isPresent()) { + Option<JavaRDD<WriteStatus>> lastWriteStatusRDD = Option.ofNullable(lastWriteStatuses.isPresent() ? HoodieJavaRDD.getJavaRDD(lastWriteStatuses.get()) : null); Review Comment: ```suggestion Option<JavaRDD<WriteStatus>> lastWriteStatusRDD = lastWriteStatuses.map(HoodieJavaRDD::getJavaRDD); ``` ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ########## @@ -82,16 +86,65 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) { return SparkHoodieIndexFactory.createIndex(config); } + public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, + String commitActionType, Map<String, List<String>> partitionToReplacedFileIds, + Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) { + return commit(instantTime, writeStatuses, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc, Option.empty()); + } + /** * Complete changes performed at the given instantTime marker with specified action. */ @Override public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds, - Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) { + Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc, + Option<WriteStatusValidator> writeStatusValidatorOpt) { context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " + config.getTableName()); - List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect(); - return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); + // Triggering the dag for writes. + // If streaming writes are enabled, writes to both data table and metadata table gets triggered at this juncture. + // If not, writes to data table gets triggered here. + // When streaming writes are enabled, data table's WriteStatus is expected to contain all stats required to generate metadata table records and so it could be fatter. Review Comment: ```suggestion // When streaming writes are enabled, data table's WriteStatus is expected to contain all stats required to generate metadata table records and so each object will be larger. ``` ########## hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java: ########## @@ -65,4 +80,89 @@ public void testSetPaths() { assertEquals(finalizeFilePath, new StoragePath(basePath, writeStat.getPath())); assertNull(writeStat.getTempPath()); } + + @Test + public void testRecordsStats() { Review Comment: Is there any existing testing around the merging of stats? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
