This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 80fa0f5 [FLINK-18272][table-runtime-blink] Add retry logic to
FileSystemLookupFunction
80fa0f5 is described below
commit 80fa0f5c5b8600f4b386487f267bde80b882bd07
Author: Rui Li <[email protected]>
AuthorDate: Thu Jun 18 11:32:21 2020 +0800
[FLINK-18272][table-runtime-blink] Add retry logic to
FileSystemLookupFunction
This closes #12651
---
.../table/filesystem/FileSystemLookupFunction.java | 57 +++++++++++++++-------
1 file changed, 39 insertions(+), 18 deletions(-)
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
index e3bed30..d8b8633 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
@@ -59,6 +59,11 @@ public class FileSystemLookupFunction<T extends InputSplit>
extends TableFunctio
private static final Logger LOG =
LoggerFactory.getLogger(FileSystemLookupFunction.class);
+ // the max number of retries before throwing exception, in case of
failure to load the table into cache
+ private static final int MAX_RETRIES = 3;
+ // interval between retries
+ private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);
+
private final InputFormat<RowData, T> inputFormat;
// names and types of the records returned by the input format
private final String[] producedNames;
@@ -143,26 +148,42 @@ public class FileSystemLookupFunction<T extends
InputSplit> extends TableFunctio
} else {
LOG.info("Populating lookup join cache");
}
- cache.clear();
- try {
- T[] inputSplits = inputFormat.createInputSplits(1);
- GenericRowData reuse = new
GenericRowData(producedNames.length);
- long count = 0;
- for (T split : inputSplits) {
- inputFormat.open(split);
- while (!inputFormat.reachedEnd()) {
- RowData row =
inputFormat.nextRecord(reuse);
- count++;
- Row key = extractKey(row);
- List<RowData> rows =
cache.computeIfAbsent(key, k -> new ArrayList<>());
- rows.add(serializer.copy(row));
+ int numRetry = 0;
+ while (true) {
+ cache.clear();
+ try {
+ T[] inputSplits =
inputFormat.createInputSplits(1);
+ GenericRowData reuse = new
GenericRowData(producedNames.length);
+ long count = 0;
+ for (T split : inputSplits) {
+ inputFormat.open(split);
+ while (!inputFormat.reachedEnd()) {
+ RowData row =
inputFormat.nextRecord(reuse);
+ count++;
+ Row key = extractKey(row);
+ List<RowData> rows =
cache.computeIfAbsent(key, k -> new ArrayList<>());
+ rows.add(serializer.copy(row));
+ }
+ inputFormat.close();
+ }
+ nextLoadTime = System.currentTimeMillis() +
getCacheTTL().toMillis();
+ LOG.info("Loaded {} row(s) into lookup join
cache", count);
+ return;
+ } catch (IOException e) {
+ if (numRetry >= MAX_RETRIES) {
+ throw new FlinkRuntimeException(
+ String.format("Failed
to load table into cache after %d retries", numRetry), e);
+ }
+ numRetry++;
+ long toSleep = numRetry *
RETRY_INTERVAL.toMillis();
+ LOG.warn(String.format("Failed to load table
into cache, will retry in %d seconds", toSleep / 1000), e);
+ try {
+ Thread.sleep(toSleep);
+ } catch (InterruptedException ex) {
+ LOG.warn("Interrupted while waiting to
retry failed cache load, aborting");
+ throw new FlinkRuntimeException(ex);
}
- inputFormat.close();
}
- nextLoadTime = System.currentTimeMillis() +
getCacheTTL().toMillis();
- LOG.info("Loaded {} row(s) into lookup join cache",
count);
- } catch (IOException e) {
- throw new FlinkRuntimeException("Failed to load table
into cache", e);
}
}