This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5d6c3c1d630 [HUDI-6154] Introduced retry while reading
hoodie.properties to deal with parallel updates. (#8609)
5d6c3c1d630 is described below
commit 5d6c3c1d630d93efa37d1d70baa51ecd056c47fc
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Jun 29 23:58:22 2023 -0700
[HUDI-6154] Introduced retry while reading hoodie.properties to deal with
parallel updates. (#8609)
When reading the hoodie.properties, introduced a retry with delay.
Currently, the retry is hardcoded to 5 times with a 1-second delay,
which should be sufficient for the writer to complete updating the
properties file.
When updating the hoodie.properties, removed redundant reads of the file.
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../hudi/common/table/HoodieTableConfig.java | 78 +++++++++++++++-------
.../hudi/common/table/TestHoodieTableConfig.java | 58 ++++++++++++++--
.../deltastreamer/TestHoodieDeltaStreamer.java | 5 +-
3 files changed, 110 insertions(+), 31 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index b5087864ec3..e609fb85595 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -265,12 +265,17 @@ public class HoodieTableConfig extends HoodieConfig {
private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; //
<database_name>.<table_name>
+ // Number of retries while reading the properties file to deal with parallel
updates
+ private static final int MAX_READ_RETRIES = 5;
+ // Delay between retries while reading the properties file
+ private static final int READ_RETRY_DELAY_MSEC = 1000;
+
public HoodieTableConfig(FileSystem fs, String metaPath, String
payloadClassName, String recordMergerStrategyId) {
super();
Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
LOG.info("Loading table properties from " + propertyPath);
try {
- fetchConfigs(fs, metaPath);
+ this.props = fetchConfigs(fs, metaPath);
boolean needStore = false;
if (contains(PAYLOAD_CLASS_NAME) && payloadClassName != null
&& !getString(PAYLOAD_CLASS_NAME).equals(payloadClassName)) {
@@ -291,8 +296,6 @@ public class HoodieTableConfig extends HoodieConfig {
} catch (IOException e) {
throw new HoodieIOException("Could not load Hoodie properties from " +
propertyPath, e);
}
- ValidationUtils.checkArgument(contains(TYPE) && contains(NAME),
- "hoodie.properties file seems invalid. Please check for left over
`.updated` files if any, manually copy it to hoodie.properties and retry");
}
private static Properties getOrderedPropertiesWithTableChecksum(Properties
props) {
@@ -334,22 +337,43 @@ public class HoodieTableConfig extends HoodieConfig {
super();
}
- private void fetchConfigs(FileSystem fs, String metaPath) throws IOException
{
+ private static TypedProperties fetchConfigs(FileSystem fs, String metaPath)
throws IOException {
Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
- try (FSDataInputStream is = fs.open(cfgPath)) {
- props.load(is);
- } catch (IOException ioe) {
- if (!fs.exists(cfgPath)) {
- LOG.warn("Run `table recover-configs` if config update/delete failed
midway. Falling back to backed up configs.");
- // try the backup. this way no query ever fails if update fails midway.
- Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
- try (FSDataInputStream is = fs.open(backupCfgPath)) {
+ Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
+ int readRetryCount = 0;
+ boolean found = false;
+
+ TypedProperties props = new TypedProperties();
+ while (readRetryCount++ < MAX_READ_RETRIES) {
+ for (Path path : Arrays.asList(cfgPath, backupCfgPath)) {
+ // Read the properties and validate that it is a valid file
+ try (FSDataInputStream is = fs.open(path)) {
+ props.clear();
props.load(is);
+ found = true;
+ ValidationUtils.checkArgument(validateChecksum(props));
+ return props;
+ } catch (IOException e) {
+ LOG.warn(String.format("Could not read properties from %s: %s",
path, e));
+ } catch (IllegalArgumentException e) {
+ LOG.warn(String.format("Invalid properties file %s: %s", path,
props));
}
- } else {
- throw ioe;
+ }
+
+ // Failed to read all files so wait before retrying. This can happen in
cases of parallel updates to the properties.
+ try {
+ Thread.sleep(READ_RETRY_DELAY_MSEC);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting");
}
}
+
+ // If we are here then after all retries either no hoodie.properties was
found or only an invalid file was found.
+ if (found) {
+ throw new IllegalArgumentException("hoodie.properties file seems
invalid. Please check for left over `.updated` files if any, manually copy it
to hoodie.properties and retry");
+ } else {
+ throw new HoodieIOException("Could not load Hoodie properties from " +
cfgPath);
+ }
}
public static void recover(FileSystem fs, Path metadataFolder) throws
IOException {
@@ -385,25 +409,27 @@ public class HoodieTableConfig extends HoodieConfig {
// 0. do any recovery from prior attempts.
recoverIfNeeded(fs, cfgPath, backupCfgPath);
- // 1. backup the existing properties.
- try (FSDataInputStream in = fs.open(cfgPath);
- FSDataOutputStream out = fs.create(backupCfgPath, false)) {
- FileIOUtils.copy(in, out);
+ // 1. Read the existing config
+ TypedProperties props = fetchConfigs(fs, metadataFolder.toString());
+
+ // 2. backup the existing properties.
+ try (FSDataOutputStream out = fs.create(backupCfgPath, false)) {
+ storeProperties(props, out);
}
- /// 2. delete the properties file, reads will go to the backup, until we
are done.
+
+ // 3. delete the properties file, reads will go to the backup, until we
are done.
fs.delete(cfgPath, false);
- // 3. read current props, upsert and save back.
+
+ // 4. Upsert and save back.
String checksum;
- try (FSDataInputStream in = fs.open(backupCfgPath);
- FSDataOutputStream out = fs.create(cfgPath, true)) {
- Properties props = new TypedProperties();
- props.load(in);
+ try (FSDataOutputStream out = fs.create(cfgPath, true)) {
modifyFn.accept(props, modifyProps);
checksum = storeProperties(props, out);
}
+
// 4. verify and remove backup.
try (FSDataInputStream in = fs.open(cfgPath)) {
- Properties props = new TypedProperties();
+ props.clear();
props.load(in);
if (!props.containsKey(TABLE_CHECKSUM.key()) ||
!props.getProperty(TABLE_CHECKSUM.key()).equals(checksum)) {
// delete the properties file and throw exception indicating update
failure
@@ -412,6 +438,8 @@ public class HoodieTableConfig extends HoodieConfig {
throw new HoodieIOException("Checksum property missing or does not
match.");
}
}
+
+ // 5. delete the backup properties file
fs.delete(backupCfgPath, false);
} catch (IOException e) {
throw new HoodieIOException("Error updating table configs.", e);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index 2e24526905e..f971c6fa9d2 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -18,14 +18,13 @@
package org.apache.hudi.common.table;
-import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.exception.HoodieIOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.exception.HoodieIOException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -35,6 +34,10 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -143,4 +146,51 @@ public class TestHoodieTableConfig extends
HoodieCommonTestHarness {
config = new HoodieTableConfig(fs, metaPath.toString(), null, null);
assertEquals(6, config.getProps().size());
}
+
+ @Test
+ public void testReadRetry() throws IOException {
+ // When both the hoodie.properties and hoodie.properties.backup do not
exist then the read fails
+ fs.rename(cfgPath, new Path(cfgPath.toString() + ".bak"));
+ assertThrows(HoodieIOException.class, () -> new HoodieTableConfig(fs,
metaPath.toString(), null, null));
+
+ // Should return the backup config if hoodie.properties is not present
+ fs.rename(new Path(cfgPath.toString() + ".bak"), backupCfgPath);
+ new HoodieTableConfig(fs, metaPath.toString(), null, null);
+
+ // Should return backup config if hoodie.properties is corrupted
+ Properties props = new Properties();
+ try (FSDataOutputStream out = fs.create(cfgPath)) {
+ props.store(out, "No checksum in file so is invalid");
+ }
+ new HoodieTableConfig(fs, metaPath.toString(), null, null);
+
+ // Should throw exception if both hoodie.properties and backup are
corrupted
+ try (FSDataOutputStream out = fs.create(backupCfgPath)) {
+ props.store(out, "No checksum in file so is invalid");
+ }
+ assertThrows(IllegalArgumentException.class, () -> new
HoodieTableConfig(fs, metaPath.toString(), null, null));
+ }
+
+ @Test
+ public void testConcurrentlyUpdate() throws ExecutionException,
InterruptedException {
+ final ExecutorService executor = Executors.newFixedThreadPool(2);
+ Future updaterFuture = executor.submit(() -> {
+ for (int i = 0; i < 100; i++) {
+ Properties updatedProps = new Properties();
+ updatedProps.setProperty(HoodieTableConfig.NAME.key(), "test-table" +
i);
+ updatedProps.setProperty(HoodieTableConfig.PRECOMBINE_FIELD.key(),
"new_field" + i);
+ HoodieTableConfig.update(fs, metaPath, updatedProps);
+ }
+ });
+
+ Future readerFuture = executor.submit(() -> {
+ for (int i = 0; i < 100; i++) {
+ // Try to load the table properties, won't throw any exception
+ new HoodieTableConfig(fs, metaPath.toString(), null, null);
+ }
+ });
+
+ updaterFuture.get();
+ readerFuture.get();
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index c365aea0049..845f7aa2120 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -59,6 +59,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIncrementalPathNotFoundException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
@@ -2143,7 +2144,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
try {
fs.delete(entry.getPath());
} catch (IOException e) {
- e.printStackTrace();
+ LOG.warn("Failed to delete " + entry.getPath().toString(), e);
}
});
}
@@ -2159,7 +2160,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
deltaStreamer.sync();
TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath,
sqlContext);
} else {
- assertThrows(org.apache.hudi.exception.HoodieIOException.class, () ->
new HoodieDeltaStreamer(
+ assertThrows(HoodieIOException.class, () -> new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null, "timestamp",
null), jsc));