This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a48f99  [HUDI-3064][HUDI-3054] FileSystemBasedLockProviderTestClass 
tryLock fix and TestHoodieClientMultiWriter test fixes (#4384)
4a48f99 is described below

commit 4a48f99a596b5596fb654b56a6533b908bb9d8f3
Author: Manoj Govindassamy <[email protected]>
AuthorDate: Sun Dec 19 10:31:02 2021 -0800

    [HUDI-3064][HUDI-3054] FileSystemBasedLockProviderTestClass tryLock fix and 
TestHoodieClientMultiWriter test fixes (#4384)
    
     - Made FileSystemBasedLockProviderTestClass thread safe and fixed the
       tryLock retry logic.
    
     - Made TestHoodieClientMultiWriter. testHoodieClientBasicMultiWriter
       deterministic in verifying the HoodieWriteConflictException.
---
 .../FileSystemBasedLockProviderTestClass.java      |  77 ++++++++-------
 .../hudi/client/TestHoodieClientMultiWriter.java   | 106 ++++++++++++++++-----
 2 files changed, 121 insertions(+), 62 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
index 2fc6ba4..97ad050 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java
@@ -36,38 +36,37 @@ import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_R
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
 
 /**
- * This lock provider is used for testing purposes only. It provides a simple 
file system based lock using HDFS atomic
- * create operation. This lock does not support cleaning/expiring the lock 
after a failed write hence cannot be used
- * in production environments.
+ * This lock provider is used for testing purposes only. It provides a simple 
file system based lock
+ * using filesystem's atomic create operation. This lock does not support 
cleaning/expiring the lock
+ * after a failed write. Must not be used in production environments.
  */
 public class FileSystemBasedLockProviderTestClass implements 
LockProvider<String>, Serializable {
 
-  private static final String LOCK_NAME = "acquired";
+  private static final String LOCK = "lock";
 
-  private String lockPath;
+  private final int retryMaxCount;
+  private final int retryWaitTimeMs;
   private transient FileSystem fs;
+  private transient Path lockFile;
   protected LockConfiguration lockConfiguration;
 
   public FileSystemBasedLockProviderTestClass(final LockConfiguration 
lockConfiguration, final Configuration configuration) {
     this.lockConfiguration = lockConfiguration;
-    this.lockPath = 
lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY);
-    this.fs = FSUtils.getFs(this.lockPath, configuration);
-  }
-
-  public void acquireLock() {
-    try {
-      fs.create(new Path(lockPath + "/" + LOCK_NAME), false).close();
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to acquire lock", e);
-    }
+    final String lockDirectory = 
lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY);
+    this.retryWaitTimeMs = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY);
+    this.retryMaxCount = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY);
+    this.lockFile = new Path(lockDirectory + "/" + LOCK);
+    this.fs = FSUtils.getFs(this.lockFile.toString(), configuration);
   }
 
   @Override
   public void close() {
-    try {
-      fs.delete(new Path(lockPath + "/" + LOCK_NAME), true);
-    } catch (IOException e) {
-      throw new HoodieLockException("Unable to release lock", e);
+    synchronized (LOCK) {
+      try {
+        fs.delete(this.lockFile, true);
+      } catch (IOException e) {
+        throw new HoodieLockException("Unable to release lock: " + getLock(), 
e);
+      }
     }
   }
 
@@ -75,39 +74,45 @@ public class FileSystemBasedLockProviderTestClass 
implements LockProvider<String
   public boolean tryLock(long time, TimeUnit unit) {
     try {
       int numRetries = 0;
-      while (fs.exists(new Path(lockPath + "/" + LOCK_NAME))
-          && (numRetries++ <= 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY))) {
-        
Thread.sleep(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY));
-      }
-      synchronized (LOCK_NAME) {
-        if (fs.exists(new Path(lockPath + "/" + LOCK_NAME))) {
-          return false;
+      synchronized (LOCK) {
+        while (fs.exists(this.lockFile)) {
+          LOCK.wait(retryWaitTimeMs);
+          numRetries++;
+          if (numRetries > retryMaxCount) {
+            return false;
+          }
         }
         acquireLock();
+        return fs.exists(this.lockFile);
       }
-      return true;
     } catch (IOException | InterruptedException e) {
-      throw new HoodieLockException("Failed to acquire lock", e);
+      throw new HoodieLockException("Failed to acquire lock: " + getLock(), e);
     }
   }
 
   @Override
   public void unlock() {
-    try {
-      if (fs.exists(new Path(lockPath + "/" + LOCK_NAME))) {
-        fs.delete(new Path(lockPath + "/" + LOCK_NAME), true);
+    synchronized (LOCK) {
+      try {
+        if (fs.exists(this.lockFile)) {
+          fs.delete(this.lockFile, true);
+        }
+      } catch (IOException io) {
+        throw new HoodieIOException("Unable to delete lock " + getLock() + "on 
disk", io);
       }
-    } catch (IOException io) {
-      throw new HoodieIOException("Unable to delete lock on disk", io);
     }
   }
 
   @Override
   public String getLock() {
+    return this.lockFile.toString();
+  }
+
+  private void acquireLock() {
     try {
-      return fs.listStatus(new Path(lockPath))[0].getPath().toString();
-    } catch (Exception e) {
-      throw new HoodieLockException("Failed to retrieve lock status from lock 
path " + lockPath);
+      fs.create(this.lockFile, false).close();
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to acquire lock: " + getLock(), e);
     }
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 69fc401..5f8b26b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -23,6 +23,7 @@ import 
org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
 import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -40,7 +41,6 @@ import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -53,6 +53,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -61,6 +62,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -96,7 +98,7 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
     
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
 "250");
     
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250");
     
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10");
-    HoodieWriteConfig cfg = getConfigBuilder()
+    HoodieWriteConfig writeConfig = getConfigBuilder()
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
         
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
@@ -104,41 +106,64 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
         .withMarkersType(MarkerType.DIRECT.name())
         
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class)
             .build()).withAutoCommit(false).withProperties(properties).build();
+
     // Create the first commit
-    createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200);
-    ExecutorService executors = Executors.newFixedThreadPool(2);
-    SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
-    SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
-    AtomicBoolean writer1Conflict = new AtomicBoolean(false);
-    AtomicBoolean writer2Conflict = new AtomicBoolean(false);
+    createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), 
"000", "001", 200);
+
+    final int threadCount = 2;
+    final ExecutorService executors = Executors.newFixedThreadPool(2);
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig);
+
+    final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+    final AtomicBoolean writer1Completed = new AtomicBoolean(false);
+    final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+
     Future future1 = executors.submit(() -> {
-      String newCommitTime = "004";
-      int numRecords = 100;
-      String commitTimeBetweenPrevAndNew = "002";
       try {
-        createCommitWithUpserts(cfg, client1, "002", 
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
-      } catch (Exception e1) {
-        assertTrue(e1 instanceof HoodieWriteConflictException);
-        writer1Conflict.set(true);
+        final String nextCommitTime = "002";
+        final JavaRDD<WriteStatus> writeStatusList = 
startCommitForUpdate(writeConfig, client1, nextCommitTime, 100);
+
+        // Wait for the 2nd writer to start the commit
+        cyclicBarrier.await(60, TimeUnit.SECONDS);
+
+        // Commit the update before the 2nd writer
+        assertDoesNotThrow(() -> {
+          client1.commit(nextCommitTime, writeStatusList);
+        });
+
+        // Signal the 2nd writer to go ahead for his commit
+        cyclicBarrier.await(60, TimeUnit.SECONDS);
+        writer1Completed.set(true);
+      } catch (Exception e) {
+        writer1Completed.set(false);
       }
     });
+
     Future future2 = executors.submit(() -> {
-      String newCommitTime = "005";
-      int numRecords = 100;
-      String commitTimeBetweenPrevAndNew = "002";
       try {
-        createCommitWithUpserts(cfg, client2, "002", 
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
-      } catch (Exception e2) {
-        assertTrue(e2 instanceof HoodieWriteConflictException);
-        writer2Conflict.set(true);
+        final String nextCommitTime = "003";
+
+        // Wait for the 1st writer to make progress with the commit
+        cyclicBarrier.await(60, TimeUnit.SECONDS);
+        final JavaRDD<WriteStatus> writeStatusList = 
startCommitForUpdate(writeConfig, client2, nextCommitTime, 100);
+
+        // Wait for the 1st writer to complete the commit
+        cyclicBarrier.await(60, TimeUnit.SECONDS);
+        assertThrows(HoodieWriteConflictException.class, () -> {
+          client2.commit(nextCommitTime, writeStatusList);
+        });
+        writer2Completed.set(true);
+      } catch (Exception e) {
+        writer2Completed.set(false);
       }
     });
+
     future1.get();
     future2.get();
-    Assertions.assertTrue(writer1Conflict.get() || writer2Conflict.get(), 
"Either of writer1 or writer2 should have failed "
-        + "with conflict");
-    Assertions.assertFalse(writer1Conflict.get() && writer2Conflict.get(), 
"Both writer1 and writer2 should not result "
-        + "in conflict");
+
+    // both should have been completed successfully. I mean, we already assert 
for conflict for writer2 at L155.
+    assertTrue(writer1Completed.get() && writer2Completed.get());
   }
 
   @Test
@@ -443,4 +468,33 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
         numRecords, 200, 2);
     client.commit(newCommitTime, result);
   }
+
+  /**
+   * Start the commit for an update operation with given number of records
+   *
+   * @param writeConfig   - Write config
+   * @param writeClient   - Write client for starting the commit
+   * @param newCommitTime - Commit time for the update
+   * @param numRecords    - Number of records to update
+   * @return RDD of write status from the update
+   * @throws Exception
+   */
+  private JavaRDD<WriteStatus> startCommitForUpdate(HoodieWriteConfig 
writeConfig, SparkRDDWriteClient writeClient,
+                                                    String newCommitTime, int 
numRecords) throws Exception {
+    // Start the new commit
+    writeClient.startCommitWithTime(newCommitTime);
+
+    // Prepare update records
+    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+        generateWrapRecordsFn(false, writeConfig, 
dataGen::generateUniqueUpdates);
+    final List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, 
numRecords);
+    final JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+    // Write updates
+    Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn = SparkRDDWriteClient::upsert;
+    JavaRDD<WriteStatus> result = writeFn.apply(writeClient, writeRecords, 
newCommitTime);
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+    return result;
+  }
 }

Reply via email to