nsivabalan commented on code in PR #7243:
URL: https://github.com/apache/hudi/pull/7243#discussion_r1029801201


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -243,9 +243,20 @@ public boolean commitStats(String instantTime, 
List<HoodieWriteStat> stats, Opti
     } finally {
       this.txnManager.endTransaction(Option.of(inflightInstant));
     }
-    // do this outside of lock since compaction, clustering can be time taking 
and we don't need a lock for the entire execution period
-    runTableServicesInline(table, metadata, extraMetadata);
+
+    // We don't want to fail the commit if 
hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions is false. 
We catch warn if false
+    try {
+      // do this outside of lock since compaction, clustering can be time 
taking and we don't need a lock for the entire execution period
+      runTableServicesInline(table, metadata, extraMetadata);
+    } catch (Exception e) {
+      if (config.isFailOnTableServiceExceptionEnabled()) {
+        throw e;
+      }
+      LOG.warn("Inline compaction or clustering failed with exception: " + 
e.getMessage() + ". Attempting to finish commit. ");

Review Comment:
   `Attempting to finish commit.` -> `Moving further since 
"hoodie.fail.writes.on.inline.table.service.exceptions" is set to false`.  



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1754,30 +1755,53 @@ private List<HoodieRecord> 
testInsertAndClustering(HoodieClusteringConfig cluste
     return allRecords.getLeft().getLeft();
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testClusteringFailure(boolean shouldFail) throws IOException {
+    try {
+      Properties properties = new Properties();
+      
properties.setProperty("hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions",
 String.valueOf(shouldFail));
+      properties.setProperty("hoodie.auto.commit", "false");
+      properties.setProperty("hoodie.clustering.inline.max.commits", "1");
+      properties.setProperty("hoodie.clustering.inline", "true");
+      testInsertTwoBatches(true, "2015/03/16", properties, true);

Review Comment:
   after L1767
   if ( !shouldFail ) {
     assertFail()
   } 
   catch () {
   } 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/WriteClientBrokenClustering.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Map;
+
+public class WriteClientBrokenClustering<T extends HoodieRecordPayload> 
extends org.apache.hudi.client.SparkRDDWriteClient<T>   {

Review Comment:
   is it not possible to inline this class within the test file only. 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1754,30 +1755,53 @@ private List<HoodieRecord> 
testInsertAndClustering(HoodieClusteringConfig cluste
     return allRecords.getLeft().getLeft();
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testClusteringFailure(boolean shouldFail) throws IOException {
+    try {
+      Properties properties = new Properties();
+      
properties.setProperty("hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions",
 String.valueOf(shouldFail));
+      properties.setProperty("hoodie.auto.commit", "false");
+      properties.setProperty("hoodie.clustering.inline.max.commits", "1");
+      properties.setProperty("hoodie.clustering.inline", "true");
+      testInsertTwoBatches(true, "2015/03/16", properties, true);
+    } catch (HoodieException e) {
+      assertEquals("CLUSTERING IS BROKEN", e.getMessage());
+      assertTrue(shouldFail);
+      return;
+    }
+    assertFalse(shouldFail);
+  }
+
   private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> 
testInsertTwoBatches(boolean populateMetaFields) throws IOException {
     return testInsertTwoBatches(populateMetaFields, "2015/03/16");
   }
 
+  private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> 
testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws 
IOException {
+    return testInsertTwoBatches(populateMetaFields, partitionPath, new 
Properties(), false);
+  }
+
   /**
    * This method returns following three items:
    * 1. List of all HoodieRecord written in the two batches of insert.
    * 2. Commit instants of the two batches.
    * 3. List of new file group ids that were written in the two batches.
    */
-  private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> 
testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws 
IOException {
+  private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> 
testInsertTwoBatches(boolean populateMetaFields, String partitionPath, 
Properties props,
+                                                                               
                     boolean makeClusteringBroken) throws IOException {

Review Comment:
   failInlineClustering.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -2075,11 +2099,19 @@ private void 
verifyRecordsWrittenWithPreservedMetadata(Set<String> commitTimes,
   }
 
   private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, 
List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields) 
throws IOException {
+    return writeAndVerifyBatch(client, inserts, commitTime, 
populateMetaFields, false);
+  }
+
+  private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, 
List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields, 
boolean autoCommitOff) throws IOException {
     client.startCommitWithTime(commitTime);
     JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);
-    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime).collect();
+    JavaRDD<WriteStatus> statusRDD = client.upsert(insertRecordsRDD1, 
commitTime);
+    List<WriteStatus> statuses = statusRDD.collect();
     assertNoWriteErrors(statuses);
     verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses, 
client.getConfig());
+    if (autoCommitOff) {
+      client.commit(commitTime, statusRDD);
+    }

Review Comment:
   you need to move this to L 2109



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java:
##########
@@ -489,12 +490,20 @@ public SparkRDDReadClient getHoodieReadClient(String 
basePath) {
   }
 
   public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+    return getHoodieWriteClient(cfg, false);
+  }
+
+  public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, 
Boolean makeClusteringBroken) {
     if (null != writeClient) {
       writeClient.close();
       writeClient = null;
     }
-    writeClient = new SparkRDDWriteClient(context, cfg);
-    return writeClient;
+    if (makeClusteringBroken) {

Review Comment:
   can we not introduce makeClusteringBroken to this test base. I don't see 
many usages of this for now. so, lets keep it within the test class only. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -386,6 +386,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Timeline archiving removes older instants from the 
timeline, after each write operation, to minimize metadata overhead. "
           + "Controls whether or not, the write should be failed as well, if 
such archiving fails.");
 
+  public static final ConfigProperty<String> 
FAIL_ON_TABLE_SERVICE_EXCEPTION_ENABLE = ConfigProperty
+      
.key("hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions")

Review Comment:
   lets not add "deltastreamer" in config key. this is applicable to any writes.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1754,30 +1755,53 @@ private List<HoodieRecord> 
testInsertAndClustering(HoodieClusteringConfig cluste
     return allRecords.getLeft().getLeft();
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testClusteringFailure(boolean shouldFail) throws IOException {
+    try {
+      Properties properties = new Properties();
+      
properties.setProperty("hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions",
 String.valueOf(shouldFail));
+      properties.setProperty("hoodie.auto.commit", "false");
+      properties.setProperty("hoodie.clustering.inline.max.commits", "1");
+      properties.setProperty("hoodie.clustering.inline", "true");
+      testInsertTwoBatches(true, "2015/03/16", properties, true);
+    } catch (HoodieException e) {
+      assertEquals("CLUSTERING IS BROKEN", e.getMessage());

Review Comment:
   use static variables. 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1754,30 +1755,53 @@ private List<HoodieRecord> 
testInsertAndClustering(HoodieClusteringConfig cluste
     return allRecords.getLeft().getLeft();
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testClusteringFailure(boolean shouldFail) throws IOException {

Review Comment:
   testFailWritesOnInlineTableServiceExceptions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to