This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new db3d84b73d [HotFix] Fix TaskOutputParameterParser might OOM if meed a
bad output param expression (#15264)
db3d84b73d is described below
commit db3d84b73dc2537b39073244433b7690a7e0fed3
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Dec 4 18:38:38 2023 +0800
[HotFix] Fix TaskOutputParameterParser might OOM if meed a bad output param
expression (#15264)
---
.../task/api/parser/TaskOutputParameterParser.java | 54 +++++++++++++++++-----
.../api/parser/TaskOutputParameterParserTest.java | 49 ++++++++++++++++----
.../{multipleVarPoll.txt => maxLengthVarPool.txt} | 5 +-
.../{multipleVarPoll.txt => maxRowsVarPool.txt} | 7 ++-
.../{multipleVarPoll.txt => multipleVarPool.txt} | 0
5 files changed, 92 insertions(+), 23 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java
index e79d44e4da..d96aae4307 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java
@@ -36,46 +36,78 @@ import lombok.extern.slf4j.Slf4j;
@NotThreadSafe
public class TaskOutputParameterParser {
- private final Map<String, String> taskOutputParams = new HashMap<>();
+ // Used to avoid '${setValue(' which loss the end of ')}'
+ private final int maxOneParameterRows;
+
+ // Used to avoid '${setValue(' which length is too long, this may case OOM
+ private final int maxOneParameterLength;
+
+ private final Map<String, String> taskOutputParams;
private List<String> currentTaskOutputParam;
- public void appendParseLog(String log) {
- if (log == null) {
+ private long currentTaskOutputParamLength;
+
+ public TaskOutputParameterParser() {
+ // the default max rows of one parameter is 1024, this should be enough
+ this(1024, Integer.MAX_VALUE);
+ }
+
+ public TaskOutputParameterParser(int maxOneParameterRows, int
maxOneParameterLength) {
+ this.maxOneParameterRows = maxOneParameterRows;
+ this.maxOneParameterLength = maxOneParameterLength;
+ this.taskOutputParams = new HashMap<>();
+ this.currentTaskOutputParam = null;
+ this.currentTaskOutputParamLength = 0;
+ }
+
+ public void appendParseLog(String logLine) {
+ if (logLine == null) {
return;
}
if (currentTaskOutputParam != null) {
+ if (currentTaskOutputParam.size() > maxOneParameterRows
+ || currentTaskOutputParamLength > maxOneParameterLength) {
+ log.warn(
+ "The output param expression '{}' is too long, the max
rows is {}, max length is {}, will skip this param",
+ String.join("\n", currentTaskOutputParam),
maxOneParameterLength, maxOneParameterRows);
+ currentTaskOutputParam = null;
+ currentTaskOutputParamLength = 0;
+ return;
+ }
// continue to parse the rest of line
- int i = log.indexOf(")}");
+ int i = logLine.indexOf(")}");
if (i == -1) {
// the end of var pool not found
- currentTaskOutputParam.add(log);
+ currentTaskOutputParam.add(logLine);
+ currentTaskOutputParamLength += logLine.length();
} else {
// the end of var pool found
- currentTaskOutputParam.add(log.substring(0, i + 2));
+ currentTaskOutputParam.add(logLine.substring(0, i + 2));
Pair<String, String> keyValue =
parseOutputParam(String.join("\n", currentTaskOutputParam));
if (keyValue.getKey() != null && keyValue.getValue() != null) {
taskOutputParams.put(keyValue.getKey(),
keyValue.getValue());
}
currentTaskOutputParam = null;
+ currentTaskOutputParamLength = 0;
// continue to parse the rest of line
- if (i + 2 != log.length()) {
- appendParseLog(log.substring(i + 2));
+ if (i + 2 != logLine.length()) {
+ appendParseLog(logLine.substring(i + 2));
}
}
return;
}
- int indexOfVarPoolBegin = log.indexOf("${setValue(");
+ int indexOfVarPoolBegin = logLine.indexOf("${setValue(");
if (indexOfVarPoolBegin == -1) {
- indexOfVarPoolBegin = log.indexOf("#{setValue(");
+ indexOfVarPoolBegin = logLine.indexOf("#{setValue(");
}
if (indexOfVarPoolBegin == -1) {
return;
}
currentTaskOutputParam = new ArrayList<>();
- appendParseLog(log.substring(indexOfVarPoolBegin));
+ appendParseLog(logLine.substring(indexOfVarPoolBegin));
}
public Map<String, String> getTaskOutputParams() {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java
index 950ed822c0..25c1b84ae1 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java
@@ -19,14 +19,15 @@ package org.apache.dolphinscheduler.plugin.task.api.parser;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import lombok.SneakyThrows;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -35,7 +36,7 @@ import com.google.common.collect.ImmutableMap;
class TaskOutputParameterParserTest {
@Test
- void testEmptyLog() throws IOException, URISyntaxException {
+ void testEmptyLog() {
List<String> varPools = getLogs("/outputParam/emptyVarPoolLog.txt");
TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
varPools.forEach(taskOutputParameterParser::appendParseLog);
@@ -43,7 +44,7 @@ class TaskOutputParameterParserTest {
}
@Test
- void testOneLineLog() throws IOException, URISyntaxException {
+ void testOneLineLog() {
List<String> varPools = getLogs("/outputParam/onelineVarPoolLog.txt");
TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
varPools.forEach(taskOutputParameterParser::appendParseLog);
@@ -51,7 +52,7 @@ class TaskOutputParameterParserTest {
}
@Test
- void testOneVarPollInMultiLineLog() throws IOException, URISyntaxException
{
+ void testOneVarPoolInMultiLineLog() {
List<String> varPools =
getLogs("/outputParam/oneVarPollInMultiLineLog.txt");
TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
varPools.forEach(taskOutputParameterParser::appendParseLog);
@@ -63,14 +64,46 @@ class TaskOutputParameterParserTest {
}
@Test
- void testVarPollInMultiLineLog() throws IOException, URISyntaxException {
- List<String> varPools = getLogs("/outputParam/multipleVarPoll.txt");
+ void testVarPoolInMultiLineLog() {
+ List<String> varPools = getLogs("/outputParam/multipleVarPool.txt");
TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
varPools.forEach(taskOutputParameterParser::appendParseLog);
assertEquals(ImmutableMap.of("name", "tom", "age", "1"),
taskOutputParameterParser.getTaskOutputParams());
}
- private List<String> getLogs(String file) throws IOException,
URISyntaxException {
+ @Test
+ void textVarPoolExceedMaxRows() {
+ List<String> varPools = getLogs("/outputParam/maxRowsVarPool.txt");
+ TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser(2, Integer.MAX_VALUE);
+ varPools.forEach(taskOutputParameterParser::appendParseLog);
+ assertEquals(Collections.emptyMap(),
taskOutputParameterParser.getTaskOutputParams());
+
+ taskOutputParameterParser = new TaskOutputParameterParser();
+ varPools.forEach(taskOutputParameterParser::appendParseLog);
+ assertEquals(ImmutableMap.of("name", "name=tom\n" +
+ "name=name=tom\n" +
+ "name=name=tom\n" +
+ "name=name=tom\n" +
+ "name=name=tom"),
taskOutputParameterParser.getTaskOutputParams());
+
+ }
+
+ @Test
+ void textVarPoolExceedMaxLength() {
+ List<String> varPools = getLogs("/outputParam/maxLengthVarPool.txt");
+ TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser(2, 10);
+ varPools.forEach(taskOutputParameterParser::appendParseLog);
+ assertEquals(Collections.emptyMap(),
taskOutputParameterParser.getTaskOutputParams());
+
+ taskOutputParameterParser = new TaskOutputParameterParser();
+ varPools.forEach(taskOutputParameterParser::appendParseLog);
+ assertEquals(ImmutableMap.of("name", "123456789\n" +
+ "12345\n"), taskOutputParameterParser.getTaskOutputParams());
+
+ }
+
+ @SneakyThrows
+ private List<String> getLogs(String file) {
URI uri =
TaskOutputParameterParserTest.class.getResource(file).toURI();
return Files.lines(Paths.get(uri)).collect(Collectors.toList());
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxLengthVarPool.txt
similarity index 93%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxLengthVarPool.txt
index 994cee503e..6aebb51a1d 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxLengthVarPool.txt
@@ -15,5 +15,6 @@
# limitations under the License.
#
-INFO: ${setValue(name=tom)}
-INFO: ${setValue(age=1)}
\ No newline at end of file
+${setValue(name=123456789
+12345
+)}
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxRowsVarPool.txt
similarity index 90%
copy from
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxRowsVarPool.txt
index 994cee503e..0fea43da8d 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxRowsVarPool.txt
@@ -15,5 +15,8 @@
# limitations under the License.
#
-INFO: ${setValue(name=tom)}
-INFO: ${setValue(age=1)}
\ No newline at end of file
+${setValue(name=name=tom
+name=name=tom
+name=name=tom
+name=name=tom
+name=name=tom)}
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPool.txt
similarity index 100%
rename from
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
rename to
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPool.txt