This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 80fa0f5  [FLINK-18272][table-runtime-blink] Add retry logic to 
FileSystemLookupFunction
80fa0f5 is described below

commit 80fa0f5c5b8600f4b386487f267bde80b882bd07
Author: Rui Li <[email protected]>
AuthorDate: Thu Jun 18 11:32:21 2020 +0800

    [FLINK-18272][table-runtime-blink] Add retry logic to 
FileSystemLookupFunction
    
    This closes #12651
---
 .../table/filesystem/FileSystemLookupFunction.java | 57 +++++++++++++++-------
 1 file changed, 39 insertions(+), 18 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
index e3bed30..d8b8633 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
@@ -59,6 +59,11 @@ public class FileSystemLookupFunction<T extends InputSplit> 
extends TableFunctio
 
        private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemLookupFunction.class);
 
+       // the max number of retries before throwing exception, in case of 
failure to load the table into cache
+       private static final int MAX_RETRIES = 3;
+       // interval between retries
+       private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);
+
        private final InputFormat<RowData, T> inputFormat;
        // names and types of the records returned by the input format
        private final String[] producedNames;
@@ -143,26 +148,42 @@ public class FileSystemLookupFunction<T extends 
InputSplit> extends TableFunctio
                } else {
                        LOG.info("Populating lookup join cache");
                }
-               cache.clear();
-               try {
-                       T[] inputSplits = inputFormat.createInputSplits(1);
-                       GenericRowData reuse = new 
GenericRowData(producedNames.length);
-                       long count = 0;
-                       for (T split : inputSplits) {
-                               inputFormat.open(split);
-                               while (!inputFormat.reachedEnd()) {
-                                       RowData row = 
inputFormat.nextRecord(reuse);
-                                       count++;
-                                       Row key = extractKey(row);
-                                       List<RowData> rows = 
cache.computeIfAbsent(key, k -> new ArrayList<>());
-                                       rows.add(serializer.copy(row));
+               int numRetry = 0;
+               while (true) {
+                       cache.clear();
+                       try {
+                               T[] inputSplits = 
inputFormat.createInputSplits(1);
+                               GenericRowData reuse = new 
GenericRowData(producedNames.length);
+                               long count = 0;
+                               for (T split : inputSplits) {
+                                       inputFormat.open(split);
+                                       while (!inputFormat.reachedEnd()) {
+                                               RowData row = 
inputFormat.nextRecord(reuse);
+                                               count++;
+                                               Row key = extractKey(row);
+                                               List<RowData> rows = 
cache.computeIfAbsent(key, k -> new ArrayList<>());
+                                               rows.add(serializer.copy(row));
+                                       }
+                                       inputFormat.close();
+                               }
+                               nextLoadTime = System.currentTimeMillis() + 
getCacheTTL().toMillis();
+                               LOG.info("Loaded {} row(s) into lookup join 
cache", count);
+                               return;
+                       } catch (IOException e) {
+                               if (numRetry >= MAX_RETRIES) {
+                                       throw new FlinkRuntimeException(
+                                                       String.format("Failed 
to load table into cache after %d retries", numRetry), e);
+                               }
+                               numRetry++;
+                               long toSleep = numRetry * 
RETRY_INTERVAL.toMillis();
+                               LOG.warn(String.format("Failed to load table 
into cache, will retry in %d seconds", toSleep / 1000), e);
+                               try {
+                                       Thread.sleep(toSleep);
+                               } catch (InterruptedException ex) {
+                                       LOG.warn("Interrupted while waiting to 
retry failed cache load, aborting");
+                                       throw new FlinkRuntimeException(ex);
                                }
-                               inputFormat.close();
                        }
-                       nextLoadTime = System.currentTimeMillis() + 
getCacheTTL().toMillis();
-                       LOG.info("Loaded {} row(s) into lookup join cache", 
count);
-               } catch (IOException e) {
-                       throw new FlinkRuntimeException("Failed to load table 
into cache", e);
                }
        }
 

Reply via email to