This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ab86512e770 [HUDI-6084] Added FailOnFirstErrorWriteStatus for MDT to 
ensure that write operations fail fast on any error (#8467)
ab86512e770 is described below

commit ab86512e770531103dabb555f8f06d0a55214e5d
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Apr 20 02:47:17 2023 -0700

    [HUDI-6084] Added FailOnFirstErrorWriteStatus for MDT to ensure that write 
operations fail fast on any error (#8467)
---
 .../hudi/client/FailOnFirstErrorWriteStatus.java   | 46 ++++++++++++++++++++++
 .../metadata/HoodieBackedTableMetadataWriter.java  |  9 ++++-
 .../FlinkHoodieBackedTableMetadataWriter.java      |  6 ---
 .../SparkHoodieBackedTableMetadataWriter.java      | 11 +-----
 4 files changed, 55 insertions(+), 17 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
new file mode 100644
index 00000000000..40ccae0f000
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * This class can be used as WriteStatus when we want to fail fast and at the 
first available exception/error.
+ */
+public class FailOnFirstErrorWriteStatus extends WriteStatus {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FailOnFirstErrorWriteStatus.class);
+
+  public FailOnFirstErrorWriteStatus(Boolean trackSuccessRecords, Double 
failureFraction) {
+    super(trackSuccessRecords, failureFraction);
+  }
+
+  @Override
+  public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, 
String>> optionalRecordMetadata) {
+    LOG.error(String.format("Error writing record %s with data %s and 
optionalRecordMetadata %s", record, record.getData(),
+            optionalRecordMetadata.orElse(Collections.emptyMap()), t));
+    throw new HoodieException("Error writing record " + record + ": " + 
t.getMessage());
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 9ba192d4a42..9af63b90619 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.FailOnFirstErrorWriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieData;
@@ -170,9 +171,12 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
           "Cleaning is controlled internally for Metadata table.");
       
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(),
           "Compaction is controlled internally for metadata table.");
-      // Metadata Table cannot have metadata listing turned on. (infinite 
loop, much?)
+      // Auto commit is required
       
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(),
           "Auto commit is required for Metadata Table");
+      
ValidationUtils.checkArgument(this.metadataWriteConfig.getWriteStatusClassName().equals(FailOnFirstErrorWriteStatus.class.getName()),
+          "MDT should use " + FailOnFirstErrorWriteStatus.class.getName());
+      // Metadata Table cannot have metadata listing turned on. (infinite 
loop, much?)
       
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(),
           "File listing cannot be used for Metadata Table");
 
@@ -300,6 +304,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
         .withAllowMultiWriteOnSameInstant(true)
         
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
         .withPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
+        .withWriteStatusClass(FailOnFirstErrorWriteStatus.class)
         .withReleaseResourceEnabled(writeConfig.areReleaseResourceEnabled());
 
     // RecordKey properties are needed for the metadata table records
@@ -1051,7 +1056,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     // b. There could be DT inflights after latest delta commit in MDT and we 
are ok with it. bcoz, the contract is, latest compaction instant time in MDT 
represents
     // any instants before that is already synced with metadata table.
     // c. Do consider out of order commits. For eg, c4 from DT could complete 
before c3. and we can't trigger compaction in MDT with c4 as base instant time, 
until every
-    // instant before c4 is synced with metadata table. 
+    // instant before c4 is synced with metadata table.
     List<HoodieInstant> pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
         
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 8d356498824..021cc8ca456 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
 import org.apache.avro.specific.SpecificRecordBase;
@@ -171,11 +170,6 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
       List<WriteStatus> statuses = preppedRecordList.size() > 0
           ? writeClient.upsertPreppedRecords(preppedRecordList, instantTime)
           : Collections.emptyList();
-      statuses.forEach(writeStatus -> {
-        if (writeStatus.hasErrors()) {
-          throw new HoodieMetadataException("Failed to commit metadata table 
records at instant " + instantTime);
-        }
-      });
       // flink does not support auto-commit yet, also the auto commit logic is 
not complete as BaseHoodieWriteClient now.
       writeClient.commit(instantTime, statuses, Option.empty(), 
HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index ae0929a1e66..eae8e6a6d56 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.metadata;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -36,7 +35,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
-import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.metrics.DistributedRegistry;
 
 import org.apache.avro.specific.SpecificRecordBase;
@@ -89,7 +87,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     return new SparkHoodieBackedTableMetadataWriter(
         conf, writeConfig, failedWritesCleaningPolicy, context, 
actionMetadata, inflightInstantTimestamp);
   }
-  
+
   public static HoodieTableMetadataWriter create(Configuration conf, 
HoodieWriteConfig writeConfig,
                                                  HoodieEngineContext context) {
     return create(conf, writeConfig, context, Option.empty(), Option.empty());
@@ -184,12 +182,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
         // files in the active timeline.
       }
 
-      List<WriteStatus> statuses = 
writeClient.upsertPreppedRecords(preppedRecordRDD, instantTime).collect();
-      statuses.forEach(writeStatus -> {
-        if (writeStatus.hasErrors()) {
-          throw new HoodieMetadataException("Failed to commit metadata table 
records at instant " + instantTime);
-        }
-      });
+      writeClient.upsertPreppedRecords(preppedRecordRDD, 
instantTime).collect();
 
       // reload timeline
       metadataMetaClient.reloadActiveTimeline();

Reply via email to