This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ab86512e770 [HUDI-6084] Added FailOnFirstErrorWriteStatus for MDT to
ensure that write operations fail fast on any error (#8467)
ab86512e770 is described below
commit ab86512e770531103dabb555f8f06d0a55214e5d
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Apr 20 02:47:17 2023 -0700
[HUDI-6084] Added FailOnFirstErrorWriteStatus for MDT to ensure that write
operations fail fast on any error (#8467)
---
.../hudi/client/FailOnFirstErrorWriteStatus.java | 46 ++++++++++++++++++++++
.../metadata/HoodieBackedTableMetadataWriter.java | 9 ++++-
.../FlinkHoodieBackedTableMetadataWriter.java | 6 ---
.../SparkHoodieBackedTableMetadataWriter.java | 11 +-----
4 files changed, 55 insertions(+), 17 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
new file mode 100644
index 00000000000..40ccae0f000
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * This class can be used as WriteStatus when we want to fail fast and at the
first available exception/error.
+ */
+public class FailOnFirstErrorWriteStatus extends WriteStatus {
+ private static final Logger LOG =
LoggerFactory.getLogger(FailOnFirstErrorWriteStatus.class);
+
+ public FailOnFirstErrorWriteStatus(Boolean trackSuccessRecords, Double
failureFraction) {
+ super(trackSuccessRecords, failureFraction);
+ }
+
+ @Override
+ public void markFailure(HoodieRecord record, Throwable t, Option<Map<String,
String>> optionalRecordMetadata) {
+ LOG.error(String.format("Error writing record %s with data %s and
optionalRecordMetadata %s", record, record.getData(),
+ optionalRecordMetadata.orElse(Collections.emptyMap()), t));
+ throw new HoodieException("Error writing record " + record + ": " +
t.getMessage());
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 9ba192d4a42..9af63b90619 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.FailOnFirstErrorWriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
@@ -170,9 +171,12 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
"Cleaning is controlled internally for Metadata table.");
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(),
"Compaction is controlled internally for metadata table.");
- // Metadata Table cannot have metadata listing turned on. (infinite
loop, much?)
+ // Auto commit is required
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(),
"Auto commit is required for Metadata Table");
+
ValidationUtils.checkArgument(this.metadataWriteConfig.getWriteStatusClassName().equals(FailOnFirstErrorWriteStatus.class.getName()),
+ "MDT should use " + FailOnFirstErrorWriteStatus.class.getName());
+ // Metadata Table cannot have metadata listing turned on. (infinite
loop, much?)
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(),
"File listing cannot be used for Metadata Table");
@@ -300,6 +304,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
.withAllowMultiWriteOnSameInstant(true)
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.withPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
+ .withWriteStatusClass(FailOnFirstErrorWriteStatus.class)
.withReleaseResourceEnabled(writeConfig.areReleaseResourceEnabled());
// RecordKey properties are needed for the metadata table records
@@ -1051,7 +1056,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
// b. There could be DT inflights after latest delta commit in MDT and we
are ok with it. bcoz, the contract is, latest compaction instant time in MDT
represents
// any instants before that is already synced with metadata table.
// c. Do consider out of order commits. For eg, c4 from DT could complete
before c3. and we can't trigger compaction in MDT with c4 as base instant time,
until every
- // instant before c4 is synced with metadata table.
+ // instant before c4 is synced with metadata table.
List<HoodieInstant> pendingInstants =
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 8d356498824..021cc8ca456 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.avro.specific.SpecificRecordBase;
@@ -171,11 +170,6 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
List<WriteStatus> statuses = preppedRecordList.size() > 0
? writeClient.upsertPreppedRecords(preppedRecordList, instantTime)
: Collections.emptyList();
- statuses.forEach(writeStatus -> {
- if (writeStatus.hasErrors()) {
- throw new HoodieMetadataException("Failed to commit metadata table
records at instant " + instantTime);
- }
- });
// flink does not support auto-commit yet, also the auto commit logic is
not complete as BaseHoodieWriteClient now.
writeClient.commit(instantTime, statuses, Option.empty(),
HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index ae0929a1e66..eae8e6a6d56 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -19,7 +19,6 @@
package org.apache.hudi.metadata;
import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -36,7 +35,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
-import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.avro.specific.SpecificRecordBase;
@@ -89,7 +87,7 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
return new SparkHoodieBackedTableMetadataWriter(
conf, writeConfig, failedWritesCleaningPolicy, context,
actionMetadata, inflightInstantTimestamp);
}
-
+
public static HoodieTableMetadataWriter create(Configuration conf,
HoodieWriteConfig writeConfig,
HoodieEngineContext context) {
return create(conf, writeConfig, context, Option.empty(), Option.empty());
@@ -184,12 +182,7 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
// files in the active timeline.
}
- List<WriteStatus> statuses =
writeClient.upsertPreppedRecords(preppedRecordRDD, instantTime).collect();
- statuses.forEach(writeStatus -> {
- if (writeStatus.hasErrors()) {
- throw new HoodieMetadataException("Failed to commit metadata table
records at instant " + instantTime);
- }
- });
+ writeClient.upsertPreppedRecords(preppedRecordRDD,
instantTime).collect();
// reload timeline
metadataMetaClient.reloadActiveTimeline();