nsivabalan commented on code in PR #7243:
URL: https://github.com/apache/hudi/pull/7243#discussion_r1029801201
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -243,9 +243,20 @@ public boolean commitStats(String instantTime,
List<HoodieWriteStat> stats, Opti
} finally {
this.txnManager.endTransaction(Option.of(inflightInstant));
}
- // do this outside of lock since compaction, clustering can be time taking
and we don't need a lock for the entire execution period
- runTableServicesInline(table, metadata, extraMetadata);
+
+ // We don't want to fail the commit if
hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions is false.
We catch warn if false
+ try {
+ // do this outside of lock since compaction, clustering can be time
taking and we don't need a lock for the entire execution period
+ runTableServicesInline(table, metadata, extraMetadata);
+ } catch (Exception e) {
+ if (config.isFailOnTableServiceExceptionEnabled()) {
+ throw e;
+ }
+ LOG.warn("Inline compaction or clustering failed with exception: " +
e.getMessage() + ". Attempting to finish commit. ");
Review Comment:
`Attempting to finish commit.` -> `Moving further since
"hoodie.fail.writes.on.inline.table.service.exceptions" is set to false`.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1754,30 +1755,53 @@ private List<HoodieRecord>
testInsertAndClustering(HoodieClusteringConfig cluste
return allRecords.getLeft().getLeft();
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testClusteringFailure(boolean shouldFail) throws IOException {
+ try {
+ Properties properties = new Properties();
+
properties.setProperty("hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions",
String.valueOf(shouldFail));
+ properties.setProperty("hoodie.auto.commit", "false");
+ properties.setProperty("hoodie.clustering.inline.max.commits", "1");
+ properties.setProperty("hoodie.clustering.inline", "true");
+ testInsertTwoBatches(true, "2015/03/16", properties, true);
Review Comment:
after L1767
if ( !shouldFail ) {
assertFail()
}
catch () {
}
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/WriteClientBrokenClustering.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Map;
+
+public class WriteClientBrokenClustering<T extends HoodieRecordPayload>
extends org.apache.hudi.client.SparkRDDWriteClient<T> {
Review Comment:
is it not possible to inline this class within the test file only.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1754,30 +1755,53 @@ private List<HoodieRecord>
testInsertAndClustering(HoodieClusteringConfig cluste
return allRecords.getLeft().getLeft();
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testClusteringFailure(boolean shouldFail) throws IOException {
+ try {
+ Properties properties = new Properties();
+
properties.setProperty("hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions",
String.valueOf(shouldFail));
+ properties.setProperty("hoodie.auto.commit", "false");
+ properties.setProperty("hoodie.clustering.inline.max.commits", "1");
+ properties.setProperty("hoodie.clustering.inline", "true");
+ testInsertTwoBatches(true, "2015/03/16", properties, true);
+ } catch (HoodieException e) {
+ assertEquals("CLUSTERING IS BROKEN", e.getMessage());
+ assertTrue(shouldFail);
+ return;
+ }
+ assertFalse(shouldFail);
+ }
+
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>>
testInsertTwoBatches(boolean populateMetaFields) throws IOException {
return testInsertTwoBatches(populateMetaFields, "2015/03/16");
}
+ private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>>
testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws
IOException {
+ return testInsertTwoBatches(populateMetaFields, partitionPath, new
Properties(), false);
+ }
+
/**
* This method returns following three items:
* 1. List of all HoodieRecord written in the two batches of insert.
* 2. Commit instants of the two batches.
* 3. List of new file group ids that were written in the two batches.
*/
- private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>>
testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws
IOException {
+ private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>>
testInsertTwoBatches(boolean populateMetaFields, String partitionPath,
Properties props,
+
boolean makeClusteringBroken) throws IOException {
Review Comment:
failInlineClustering.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -2075,11 +2099,19 @@ private void
verifyRecordsWrittenWithPreservedMetadata(Set<String> commitTimes,
}
private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client,
List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields)
throws IOException {
+ return writeAndVerifyBatch(client, inserts, commitTime,
populateMetaFields, false);
+ }
+
+ private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client,
List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields,
boolean autoCommitOff) throws IOException {
client.startCommitWithTime(commitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);
- List<WriteStatus> statuses = client.upsert(insertRecordsRDD1,
commitTime).collect();
+ JavaRDD<WriteStatus> statusRDD = client.upsert(insertRecordsRDD1,
commitTime);
+ List<WriteStatus> statuses = statusRDD.collect();
assertNoWriteErrors(statuses);
verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses,
client.getConfig());
+ if (autoCommitOff) {
+ client.commit(commitTime, statusRDD);
+ }
Review Comment:
you need to move this to L 2109
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java:
##########
@@ -489,12 +490,20 @@ public SparkRDDReadClient getHoodieReadClient(String
basePath) {
}
public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+ return getHoodieWriteClient(cfg, false);
+ }
+
+ public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg,
Boolean makeClusteringBroken) {
if (null != writeClient) {
writeClient.close();
writeClient = null;
}
- writeClient = new SparkRDDWriteClient(context, cfg);
- return writeClient;
+ if (makeClusteringBroken) {
Review Comment:
can we not introduce makeClusteringBroken to this test base. I don't see
many usages of this for now. so, lets keep it within the test class only.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -386,6 +386,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Timeline archiving removes older instants from the
timeline, after each write operation, to minimize metadata overhead. "
+ "Controls whether or not, the write should be failed as well, if
such archiving fails.");
+ public static final ConfigProperty<String>
FAIL_ON_TABLE_SERVICE_EXCEPTION_ENABLE = ConfigProperty
+
.key("hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions")
Review Comment:
lets not add "deltastreamer" in config key. this is applicable to any writes.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1754,30 +1755,53 @@ private List<HoodieRecord>
testInsertAndClustering(HoodieClusteringConfig cluste
return allRecords.getLeft().getLeft();
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testClusteringFailure(boolean shouldFail) throws IOException {
+ try {
+ Properties properties = new Properties();
+
properties.setProperty("hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions",
String.valueOf(shouldFail));
+ properties.setProperty("hoodie.auto.commit", "false");
+ properties.setProperty("hoodie.clustering.inline.max.commits", "1");
+ properties.setProperty("hoodie.clustering.inline", "true");
+ testInsertTwoBatches(true, "2015/03/16", properties, true);
+ } catch (HoodieException e) {
+ assertEquals("CLUSTERING IS BROKEN", e.getMessage());
Review Comment:
use static variables.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1754,30 +1755,53 @@ private List<HoodieRecord>
testInsertAndClustering(HoodieClusteringConfig cluste
return allRecords.getLeft().getLeft();
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testClusteringFailure(boolean shouldFail) throws IOException {
Review Comment:
testFailWritesOnInlineTableServiceExceptions
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]