This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d6c3c1d630 [HUDI-6154] Introduced retry while reading 
hoodie.properties to deal with parallel updates. (#8609)
5d6c3c1d630 is described below

commit 5d6c3c1d630d93efa37d1d70baa51ecd056c47fc
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Jun 29 23:58:22 2023 -0700

    [HUDI-6154] Introduced retry while reading hoodie.properties to deal with 
parallel updates. (#8609)
    
    When reading the hoodie.properties, introduced a retry with delay.
    Currently, the retry is hardcoded to 5 times with a 1-second delay,
    which should be sufficient for the writer to complete updating the 
properties file.
    When updating the hoodie.properties, removed redundant reads of the file.
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../hudi/common/table/HoodieTableConfig.java       | 78 +++++++++++++++-------
 .../hudi/common/table/TestHoodieTableConfig.java   | 58 ++++++++++++++--
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  5 +-
 3 files changed, 110 insertions(+), 31 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index b5087864ec3..e609fb85595 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -265,12 +265,17 @@ public class HoodieTableConfig extends HoodieConfig {
 
   private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // 
<database_name>.<table_name>
 
+  // Number of retries while reading the properties file to deal with parallel 
updates
+  private static final int MAX_READ_RETRIES = 5;
+  // Delay between retries while reading the properties file
+  private static final int READ_RETRY_DELAY_MSEC = 1000;
+
   public HoodieTableConfig(FileSystem fs, String metaPath, String 
payloadClassName, String recordMergerStrategyId) {
     super();
     Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
     LOG.info("Loading table properties from " + propertyPath);
     try {
-      fetchConfigs(fs, metaPath);
+      this.props = fetchConfigs(fs, metaPath);
       boolean needStore = false;
       if (contains(PAYLOAD_CLASS_NAME) && payloadClassName != null
           && !getString(PAYLOAD_CLASS_NAME).equals(payloadClassName)) {
@@ -291,8 +296,6 @@ public class HoodieTableConfig extends HoodieConfig {
     } catch (IOException e) {
       throw new HoodieIOException("Could not load Hoodie properties from " + 
propertyPath, e);
     }
-    ValidationUtils.checkArgument(contains(TYPE) && contains(NAME),
-        "hoodie.properties file seems invalid. Please check for left over 
`.updated` files if any, manually copy it to hoodie.properties and retry");
   }
 
   private static Properties getOrderedPropertiesWithTableChecksum(Properties 
props) {
@@ -334,22 +337,43 @@ public class HoodieTableConfig extends HoodieConfig {
     super();
   }
 
-  private void fetchConfigs(FileSystem fs, String metaPath) throws IOException 
{
+  private static TypedProperties fetchConfigs(FileSystem fs, String metaPath) 
throws IOException {
     Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
-    try (FSDataInputStream is = fs.open(cfgPath)) {
-      props.load(is);
-    } catch (IOException ioe) {
-      if (!fs.exists(cfgPath)) {
-        LOG.warn("Run `table recover-configs` if config update/delete failed 
midway. Falling back to backed up configs.");
-        // try the backup. this way no query ever fails if update fails midway.
-        Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
-        try (FSDataInputStream is = fs.open(backupCfgPath)) {
+    Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
+    int readRetryCount = 0;
+    boolean found = false;
+
+    TypedProperties props = new TypedProperties();
+    while (readRetryCount++ < MAX_READ_RETRIES) {
+      for (Path path : Arrays.asList(cfgPath, backupCfgPath)) {
+        // Read the properties and validate that it is a valid file
+        try (FSDataInputStream is = fs.open(path)) {
+          props.clear();
           props.load(is);
+          found = true;
+          ValidationUtils.checkArgument(validateChecksum(props));
+          return props;
+        } catch (IOException e) {
+          LOG.warn(String.format("Could not read properties from %s: %s", 
path, e));
+        } catch (IllegalArgumentException e) {
+          LOG.warn(String.format("Invalid properties file %s: %s", path, 
props));
         }
-      } else {
-        throw ioe;
+      }
+
+      // Failed to read all files so wait before retrying. This can happen in 
cases of parallel updates to the properties.
+      try {
+        Thread.sleep(READ_RETRY_DELAY_MSEC);
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting");
       }
     }
+
+    // If we are here then after all retries either no hoodie.properties was 
found or only an invalid file was found.
+    if (found) {
+      throw new IllegalArgumentException("hoodie.properties file seems 
invalid. Please check for left over `.updated` files if any, manually copy it 
to hoodie.properties and retry");
+    } else {
+      throw new HoodieIOException("Could not load Hoodie properties from " + 
cfgPath);
+    }
   }
 
   public static void recover(FileSystem fs, Path metadataFolder) throws 
IOException {
@@ -385,25 +409,27 @@ public class HoodieTableConfig extends HoodieConfig {
       // 0. do any recovery from prior attempts.
       recoverIfNeeded(fs, cfgPath, backupCfgPath);
 
-      // 1. backup the existing properties.
-      try (FSDataInputStream in = fs.open(cfgPath);
-           FSDataOutputStream out = fs.create(backupCfgPath, false)) {
-        FileIOUtils.copy(in, out);
+      // 1. Read the existing config
+      TypedProperties props = fetchConfigs(fs, metadataFolder.toString());
+
+      // 2. backup the existing properties.
+      try (FSDataOutputStream out = fs.create(backupCfgPath, false)) {
+        storeProperties(props, out);
       }
-      /// 2. delete the properties file, reads will go to the backup, until we 
are done.
+
+      // 3. delete the properties file, reads will go to the backup, until we 
are done.
       fs.delete(cfgPath, false);
-      // 3. read current props, upsert and save back.
+
+      // 4. Upsert and save back.
       String checksum;
-      try (FSDataInputStream in = fs.open(backupCfgPath);
-           FSDataOutputStream out = fs.create(cfgPath, true)) {
-        Properties props = new TypedProperties();
-        props.load(in);
+      try (FSDataOutputStream out = fs.create(cfgPath, true)) {
         modifyFn.accept(props, modifyProps);
         checksum = storeProperties(props, out);
       }
+
       // 4. verify and remove backup.
       try (FSDataInputStream in = fs.open(cfgPath)) {
-        Properties props = new TypedProperties();
+        props.clear();
         props.load(in);
         if (!props.containsKey(TABLE_CHECKSUM.key()) || 
!props.getProperty(TABLE_CHECKSUM.key()).equals(checksum)) {
           // delete the properties file and throw exception indicating update 
failure
@@ -412,6 +438,8 @@ public class HoodieTableConfig extends HoodieConfig {
           throw new HoodieIOException("Checksum property missing or does not 
match.");
         }
       }
+
+      // 5. delete the backup properties file
       fs.delete(backupCfgPath, false);
     } catch (IOException e) {
       throw new HoodieIOException("Error updating table configs.", e);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 2e24526905e..f971c6fa9d2 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -18,14 +18,13 @@
 
 package org.apache.hudi.common.table;
 
-import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.exception.HoodieIOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.exception.HoodieIOException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -35,6 +34,10 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.IOException;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -143,4 +146,51 @@ public class TestHoodieTableConfig extends 
HoodieCommonTestHarness {
     config = new HoodieTableConfig(fs, metaPath.toString(), null, null);
     assertEquals(6, config.getProps().size());
   }
+
+  @Test
+  public void testReadRetry() throws IOException {
+    // When both the hoodie.properties and hoodie.properties.backup do not 
exist then the read fails
+    fs.rename(cfgPath, new Path(cfgPath.toString() + ".bak"));
+    assertThrows(HoodieIOException.class, () -> new HoodieTableConfig(fs, 
metaPath.toString(), null, null));
+
+    // Should return the backup config if hoodie.properties is not present
+    fs.rename(new Path(cfgPath.toString() + ".bak"), backupCfgPath);
+    new HoodieTableConfig(fs, metaPath.toString(), null, null);
+
+    // Should return backup config if hoodie.properties is corrupted
+    Properties props = new Properties();
+    try (FSDataOutputStream out = fs.create(cfgPath)) {
+      props.store(out, "No checksum in file so is invalid");
+    }
+    new HoodieTableConfig(fs, metaPath.toString(), null, null);
+
+    // Should throw exception if both hoodie.properties and backup are 
corrupted
+    try (FSDataOutputStream out = fs.create(backupCfgPath)) {
+      props.store(out, "No checksum in file so is invalid");
+    }
+    assertThrows(IllegalArgumentException.class, () -> new 
HoodieTableConfig(fs, metaPath.toString(), null, null));
+  }
+
+  @Test
+  public void testConcurrentlyUpdate() throws ExecutionException, 
InterruptedException {
+    final ExecutorService executor = Executors.newFixedThreadPool(2);
+    Future updaterFuture = executor.submit(() -> {
+      for (int i = 0; i < 100; i++) {
+        Properties updatedProps = new Properties();
+        updatedProps.setProperty(HoodieTableConfig.NAME.key(), "test-table" + 
i);
+        updatedProps.setProperty(HoodieTableConfig.PRECOMBINE_FIELD.key(), 
"new_field" + i);
+        HoodieTableConfig.update(fs, metaPath, updatedProps);
+      }
+    });
+
+    Future readerFuture = executor.submit(() -> {
+      for (int i = 0; i < 100; i++) {
+        // Try to load the table properties, won't throw any exception
+        new HoodieTableConfig(fs, metaPath.toString(), null, null);
+      }
+    });
+
+    updaterFuture.get();
+    readerFuture.get();
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index c365aea0049..845f7aa2120 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -59,6 +59,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieIncrementalPathNotFoundException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hive.HiveSyncConfig;
@@ -2143,7 +2144,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         try {
           fs.delete(entry.getPath());
         } catch (IOException e) {
-          e.printStackTrace();
+          LOG.warn("Failed to delete " + entry.getPath().toString(), e);
         }
       });
     }
@@ -2159,7 +2160,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       deltaStreamer.sync();
       TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, 
sqlContext);
     } else {
-      assertThrows(org.apache.hudi.exception.HoodieIOException.class, () -> 
new HoodieDeltaStreamer(
+      assertThrows(HoodieIOException.class, () -> new HoodieDeltaStreamer(
           TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
               null, PROPS_FILENAME_TEST_PARQUET, false,
               useSchemaProvider, 100000, false, null, null, "timestamp", 
null), jsc));

Reply via email to