This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch release-0.12.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6cf5c2447a687003338bb5ce4797ad9ce66aaf7d
Author: HunterXHunter <[email protected]>
AuthorDate: Sun Aug 7 11:40:20 2022 +0800

    [HUDI-3669] Add a remote request retry mechanism for 
'Remotehoodietablefiles… (#5884)
    
    - Adding request retry to RemoteHoodieTableFileSystemView. Users can enable 
using the new configs added.
---
 .../client/embedded/EmbeddedTimelineService.java   |  5 ++
 .../common/table/view/FileSystemViewManager.java   |  3 +-
 .../table/view/FileSystemViewStorageConfig.java    | 76 ++++++++++++++++++++++
 .../view/RemoteHoodieTableFileSystemView.java      | 67 ++++++++++++++-----
 .../org/apache/hudi/common/util/RetryHelper.java   | 46 ++++++++-----
 .../java/org/apache/hudi/util/StreamerUtil.java    |  5 ++
 .../TestRemoteHoodieTableFileSystemView.java       | 29 +++++++++
 7 files changed, 195 insertions(+), 36 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index 72f8e29c9f..4d5375894d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -117,6 +117,11 @@ public class EmbeddedTimelineService {
         .withRemoteServerHost(hostAddr)
         .withRemoteServerPort(serverPort)
         
.withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs())
+        
.withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled())
+        
.withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers())
+        
.withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs())
+        
.withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs())
+        
.withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions())
         .build();
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index 35fda6c416..48023d5046 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -214,8 +214,7 @@ public class FileSystemViewManager {
     LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + 
". Server="
         + viewConf.getRemoteViewServerHost() + ":" + 
viewConf.getRemoteViewServerPort() + ", Timeout="
         + viewConf.getRemoteTimelineClientTimeoutSecs());
-    return new 
RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), 
viewConf.getRemoteViewServerPort(),
-        metaClient, viewConf.getRemoteTimelineClientTimeoutSecs());
+    return new RemoteHoodieTableFileSystemView(metaClient, viewConf);
   }
 
   public static FileSystemViewManager createViewManager(final 
HoodieEngineContext context,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
index 63f10855ba..bc835612aa 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
@@ -110,6 +110,37 @@ public class FileSystemViewStorageConfig extends 
HoodieConfig {
       .defaultValue(5 * 60) // 5 min
       .withDocumentation("Timeout in seconds, to wait for API requests against 
a remote file system view. e.g timeline server.");
 
+  public static final ConfigProperty<String> REMOTE_RETRY_ENABLE = 
ConfigProperty
+          .key("hoodie.filesystem.view.remote.retry.enable")
+          .defaultValue("false")
+          .sinceVersion("0.12.0")
+          .withDocumentation("Whether to enable API request retry for remote 
file system view.");
+
+  public static final ConfigProperty<Integer> REMOTE_MAX_RETRY_NUMBERS = 
ConfigProperty
+      .key("hoodie.filesystem.view.remote.retry.max_numbers")
+      .defaultValue(3) // 3 times
+      .sinceVersion("0.12.0")
+      .withDocumentation("Maximum number of retry for API requests against a 
remote file system view. e.g timeline server.");
+
+  public static final ConfigProperty<Long> REMOTE_INITIAL_RETRY_INTERVAL_MS = 
ConfigProperty
+      .key("hoodie.filesystem.view.remote.retry.initial_interval_ms")
+      .defaultValue(100L)
+      .sinceVersion("0.12.0")
+      .withDocumentation("Amount of time (in ms) to wait, before retry to do 
operations on storage.");
+
+  public static final ConfigProperty<Long> REMOTE_MAX_RETRY_INTERVAL_MS = 
ConfigProperty
+      .key("hoodie.filesystem.view.remote.retry.max_interval_ms")
+      .defaultValue(2000L)
+      .sinceVersion("0.12.0")
+      .withDocumentation("Maximum amount of time (in ms), to wait for next 
retry.");
+
+  public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
+          .key("hoodie.filesystem.view.remote.retry.exceptions")
+          .defaultValue("")
+          .sinceVersion("0.12.0")
+          .withDocumentation("The class name of the Exception that needs to be 
re-tryed, separated by commas. "
+                  + "Default is empty which means retry all the IOException 
and RuntimeException from Remote Request.");
+
   public static final ConfigProperty<String> REMOTE_BACKUP_VIEW_ENABLE = 
ConfigProperty
       .key("hoodie.filesystem.remote.backup.view.enable")
       .defaultValue("true") // Need to be disabled only for tests.
@@ -144,6 +175,26 @@ public class FileSystemViewStorageConfig extends 
HoodieConfig {
     return getInt(REMOTE_TIMEOUT_SECS);
   }
 
+  public boolean isRemoteTimelineClientRetryEnabled() {
+    return getBoolean(REMOTE_RETRY_ENABLE);
+  }
+
+  public Integer getRemoteTimelineClientMaxRetryNumbers() {
+    return getInt(REMOTE_MAX_RETRY_NUMBERS);
+  }
+
+  public Long getRemoteTimelineInitialRetryIntervalMs() {
+    return getLong(REMOTE_INITIAL_RETRY_INTERVAL_MS);
+  }
+
+  public Long getRemoteTimelineClientMaxRetryIntervalMs() {
+    return getLong(REMOTE_MAX_RETRY_INTERVAL_MS);
+  }
+
+  public String getRemoteTimelineClientRetryExceptions() {
+    return getString(RETRY_EXCEPTIONS);
+  }
+
   public long getMaxMemoryForFileGroupMap() {
     long totalMemory = getLong(SPILLABLE_MEMORY);
     return totalMemory - getMaxMemoryForPendingCompaction() - 
getMaxMemoryForBootstrapBaseFile();
@@ -245,6 +296,31 @@ public class FileSystemViewStorageConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder withRemoteTimelineClientRetry(boolean enableRetry) {
+      fileSystemViewStorageConfig.setValue(REMOTE_RETRY_ENABLE, 
Boolean.toString(enableRetry));
+      return this;
+    }
+
+    public Builder withRemoteTimelineClientMaxRetryNumbers(Integer 
maxRetryNumbers) {
+      fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_NUMBERS, 
maxRetryNumbers.toString());
+      return this;
+    }
+
+    public Builder withRemoteTimelineInitialRetryIntervalMs(Long 
initialRetryIntervalMs) {
+      fileSystemViewStorageConfig.setValue(REMOTE_INITIAL_RETRY_INTERVAL_MS, 
initialRetryIntervalMs.toString());
+      return this;
+    }
+
+    public Builder withRemoteTimelineClientMaxRetryIntervalMs(Long 
maxRetryIntervalMs) {
+      fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_INTERVAL_MS, 
maxRetryIntervalMs.toString());
+      return this;
+    }
+
+    public Builder withRemoteTimelineClientRetryExceptions(String 
retryExceptions) {
+      fileSystemViewStorageConfig.setValue(RETRY_EXCEPTIONS, retryExceptions);
+      return this;
+    }
+
     public Builder withMemFractionForPendingCompaction(Double 
memFractionForPendingCompaction) {
       fileSystemViewStorageConfig.setValue(SPILLABLE_COMPACTION_MEM_FRACTION, 
memFractionForPendingCompaction.toString());
       return this;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 099b79cbba..ea51732eb0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
 import org.apache.hudi.common.table.timeline.dto.InstantDTO;
 import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.RetryHelper;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -132,22 +133,35 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
 
   private boolean closed = false;
 
+  private RetryHelper<Response> retryHelper;
+
+  private final HttpRequestCheckedFunction urlCheckedFunc;
+
   private enum RequestMethod {
     GET, POST
   }
 
   public RemoteHoodieTableFileSystemView(String server, int port, 
HoodieTableMetaClient metaClient) {
-    this(server, port, metaClient, 300);
+    this(metaClient, 
FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build());
   }
 
-  public RemoteHoodieTableFileSystemView(String server, int port, 
HoodieTableMetaClient metaClient, int timeoutSecs) {
+  public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, 
FileSystemViewStorageConfig viewConf) {
     this.basePath = metaClient.getBasePath();
-    this.serverHost = server;
-    this.serverPort = port;
     this.mapper = new ObjectMapper();
     this.metaClient = metaClient;
     this.timeline = 
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
-    this.timeoutSecs = timeoutSecs;
+    this.serverHost = viewConf.getRemoteViewServerHost();
+    this.serverPort = viewConf.getRemoteViewServerPort();
+    this.timeoutSecs = viewConf.getRemoteTimelineClientTimeoutSecs();
+    this.urlCheckedFunc = new HttpRequestCheckedFunction(this.timeoutSecs * 
1000);
+    if (viewConf.isRemoteTimelineClientRetryEnabled()) {
+      retryHelper =  new RetryHelper(
+              viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
+              viewConf.getRemoteTimelineClientMaxRetryNumbers(),
+              viewConf.getRemoteTimelineInitialRetryIntervalMs(),
+              viewConf.getRemoteTimelineClientRetryExceptions(),
+              "Sending request");
+    }
   }
 
   private <T> T executeRequest(String requestPath, Map<String, String> 
queryParameters, TypeReference reference,
@@ -165,17 +179,9 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
 
     String url = builder.toString();
     LOG.info("Sending request : (" + url + ")");
-    Response response;
-    int timeout = this.timeoutSecs * 1000; // msec
-    switch (method) {
-      case GET:
-        response = 
Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
-        break;
-      case POST:
-      default:
-        response = 
Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
-        break;
-    }
+    // Reset url and method, to avoid repeatedly instantiating objects.
+    urlCheckedFunc.setUrlAndMethod(url, method);
+    Response response =  retryHelper != null ? 
retryHelper.tryWith(urlCheckedFunc).start() : urlCheckedFunc.get();
     String content = response.returnContent().asString();
     return (T) mapper.readValue(content, reference);
   }
@@ -495,4 +501,33 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
       throw new HoodieRemoteException(e);
     }
   }
+
+  /**
+   * For remote HTTP requests, to avoid repeatedly instantiating objects.
+   */
+  private class HttpRequestCheckedFunction implements 
RetryHelper.CheckedFunction<Response> {
+    private String url;
+    private RequestMethod method;
+    private final int timeoutMs;
+
+    public void setUrlAndMethod(String url, RequestMethod method) {
+      this.method = method;
+      this.url = url;
+    }
+    
+    public HttpRequestCheckedFunction(int timeoutMs) {
+      this.timeoutMs = timeoutMs;
+    }
+
+    @Override
+    public Response get() throws IOException {
+      switch (method) {
+        case GET:
+          return 
Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
+        case POST:
+        default:
+          return 
Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
+      }
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
index 067c5ee40d..efa10f3830 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java
@@ -18,28 +18,27 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.exception.HoodieException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.stream.Collectors;
 
-public class RetryHelper<T> {
+public class RetryHelper<T> implements Serializable {
   private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
-  private CheckedFunction<T> func;
-  private int num;
-  private long maxIntervalTime;
-  private long initialIntervalTime = 100L;
+  private transient CheckedFunction<T> func;
+  private final int num;
+  private final long maxIntervalTime;
+  private final long initialIntervalTime;
   private String taskInfo = "N/A";
   private List<? extends Class<? extends Exception>> retryExceptionsClasses;
 
-  public RetryHelper() {
-  }
-
   public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long 
initialRetryIntervalMs, String retryExceptions) {
     this.num = maxRetryNumbers;
     this.initialIntervalTime = initialRetryIntervalMs;
@@ -47,18 +46,24 @@ public class RetryHelper<T> {
     if (StringUtils.isNullOrEmpty(retryExceptions)) {
       this.retryExceptionsClasses = new ArrayList<>();
     } else {
-      this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
-          .map(exception -> (Exception) ReflectionUtils.loadClass(exception, 
""))
-          .map(Exception::getClass)
-          .collect(Collectors.toList());
+      try {
+        this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
+                .map(exception -> (Exception) 
ReflectionUtils.loadClass(exception, ""))
+                .map(Exception::getClass)
+                .collect(Collectors.toList());
+      } catch (HoodieException e) {
+        LOG.error("Exception while loading retry exceptions classes '" + 
retryExceptions + "'.", e);
+        this.retryExceptionsClasses = new ArrayList<>();
+      }
     }
   }
 
-  public RetryHelper(String taskInfo) {
+  public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long 
initialRetryIntervalMs, String retryExceptions, String taskInfo) {
+    this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, 
retryExceptions);
     this.taskInfo = taskInfo;
   }
 
-  public RetryHelper tryWith(CheckedFunction<T> func) {
+  public RetryHelper<T> tryWith(CheckedFunction<T> func) {
     this.func = func;
     return this;
   }
@@ -77,14 +82,18 @@ public class RetryHelper<T> {
           throw e;
         }
         if (retries++ >= num) {
-          LOG.error("Still failed to " + taskInfo + " after retried " + num + 
" times.", e);
+          String message = "Still failed to " + taskInfo + " after retried " + 
num + " times.";
+          LOG.error(message, e);
+          if (e instanceof IOException) {
+            throw new IOException(message, e);
+          }
           throw e;
         }
-        LOG.warn("Catch Exception " + taskInfo + ", will retry after " + 
waitTime + " ms.", e);
+        LOG.warn("Catch Exception for " + taskInfo + ", will retry after " + 
waitTime + " ms.", e);
         try {
           Thread.sleep(waitTime);
         } catch (InterruptedException ex) {
-            // ignore InterruptedException here
+          // ignore InterruptedException here
         }
       }
     }
@@ -92,6 +101,7 @@ public class RetryHelper<T> {
     if (retries > 0) {
       LOG.info("Success to " + taskInfo + " after retried " + retries + " 
times.");
     }
+
     return functionResult;
   }
 
@@ -123,7 +133,7 @@ public class RetryHelper<T> {
   }
 
   @FunctionalInterface
-  public interface CheckedFunction<T> {
+  public interface CheckedFunction<T> extends Serializable {
     T get() throws IOException;
   }
 }
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 7c7cdcc8ad..c14749c50d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -430,6 +430,11 @@ public class StreamerUtil {
         .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
         .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort())
         
.withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs())
+        
.withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled())
+        
.withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers())
+        
.withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs())
+        
.withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs())
+        
.withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions())
         .build();
     ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), 
rebuilt, conf);
     return writeClient;
diff --git 
a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
 
b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
index f9a6172b5e..55fd9d7f16 100644
--- 
a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
+++ 
b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java
@@ -28,12 +28,14 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView;
+import org.apache.hudi.exception.HoodieRemoteException;
 import org.apache.hudi.timeline.service.TimelineService;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.jupiter.api.Test;
 
 /**
  * Bring up a remote Timeline Server and run all test-cases of 
TestHoodieTableFileSystemView against it.
@@ -64,4 +66,31 @@ public class TestRemoteHoodieTableFileSystemView extends 
TestHoodieTableFileSyst
     view = new RemoteHoodieTableFileSystemView("localhost", 
server.getServerPort(), metaClient);
     return view;
   }
+
+  @Test
+  public void testRemoteHoodieTableFileSystemViewWithRetry() {
+    // Service is available.
+    view.getLatestBaseFiles();
+    // Shut down the service.
+    server.close();
+    try {
+      // Immediately fails and throws a connection refused exception.
+      view.getLatestBaseFiles();
+    } catch (HoodieRemoteException e) {
+      assert e.getMessage().contains("Connection refused (Connection 
refused)");
+    }
+    // Enable API request retry for remote file system view.
+    view =  new RemoteHoodieTableFileSystemView(metaClient, 
FileSystemViewStorageConfig
+            .newBuilder()
+            .withRemoteServerHost("localhost")
+            .withRemoteServerPort(server.getServerPort())
+            .withRemoteTimelineClientRetry(true)
+            .withRemoteTimelineClientMaxRetryNumbers(4)
+            .build());
+    try {
+      view.getLatestBaseFiles();
+    } catch (HoodieRemoteException e) {
+      assert e.getMessage().equalsIgnoreCase("Still failed to Sending request 
after retried 4 times.");
+    }
+  }
 }

Reply via email to