This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e814a7f fix updating function/source/sink runtimeFlags and specifying
mutiple flags (#4572)
e814a7f is described below
commit e814a7ff8fbfc1fa55dfc8b82b9aed093e8bc1bb
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Thu Jun 20 16:13:56 2019 -0700
fix updating function/source/sink runtimeFlags and specifying mutiple flags
(#4572)
---
.../apache/pulsar/functions/runtime/RuntimeUtils.java | 15 +++++++++++++--
.../pulsar/functions/utils/FunctionConfigUtils.java | 3 +++
.../apache/pulsar/functions/utils/SinkConfigUtils.java | 3 +++
.../apache/pulsar/functions/utils/SourceConfigUtils.java | 3 +++
.../pulsar/functions/utils/FunctionConfigUtilsTest.java | 16 ++++++++++++++++
.../pulsar/functions/utils/SinkConfigUtilsTest.java | 15 +++++++++++++++
.../pulsar/functions/utils/SourceConfigUtilsTest.java | 15 +++++++++++++++
7 files changed, 68 insertions(+), 2 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 86e9bdf..fca3e22 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -273,7 +273,9 @@ public class RuntimeUtils {
instanceConfig.getFunctionDetails().getName(),
shardId));
if
(!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
-
args.add(instanceConfig.getFunctionDetails().getRuntimeFlags());
+ for (String runtimeFlagArg :
splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
+ args.add(runtimeFlagArg);
+ }
}
if (instanceConfig.getFunctionDetails().getResources() != null) {
Function.Resources resources =
instanceConfig.getFunctionDetails().getResources();
@@ -287,7 +289,9 @@ public class RuntimeUtils {
} else if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.PYTHON) {
args.add("python");
if
(!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
-
args.add(instanceConfig.getFunctionDetails().getRuntimeFlags());
+ for (String runtimeFlagArg :
splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
+ args.add(runtimeFlagArg);
+ }
}
args.add(instanceFile);
args.add("--py");
@@ -396,4 +400,11 @@ public class RuntimeUtils {
return result.toString();
}
+ /**
+ * Regex for splitting a string using space when not surrounded by single
or double quotes
+ */
+ public static String[] splitRuntimeArgs(String input) {
+ return input.split("\\s(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
+ }
+
}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index b96c7c4..824c1db 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -756,6 +756,9 @@ public class FunctionConfigUtils {
if (newConfig.getCleanupSubscription() != null) {
mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
}
+ if (!StringUtils.isEmpty(newConfig.getRuntimeFlags())) {
+ mergedConfig.setRuntimeFlags(newConfig.getRuntimeFlags());
+ }
return mergedConfig;
}
}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index e59bf46..42eedc3 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -520,6 +520,9 @@ public class SinkConfigUtils {
if (!StringUtils.isEmpty(newConfig.getArchive())) {
mergedConfig.setArchive(newConfig.getArchive());
}
+ if (!StringUtils.isEmpty(newConfig.getRuntimeFlags())) {
+ mergedConfig.setRuntimeFlags(newConfig.getRuntimeFlags());
+ }
return mergedConfig;
}
}
\ No newline at end of file
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index e02bb8b..edbb6d7 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -336,6 +336,9 @@ public class SourceConfigUtils {
if (!StringUtils.isEmpty(newConfig.getArchive())) {
mergedConfig.setArchive(newConfig.getArchive());
}
+ if (!StringUtils.isEmpty(newConfig.getRuntimeFlags())) {
+ mergedConfig.setRuntimeFlags(newConfig.getRuntimeFlags());
+ }
return mergedConfig;
}
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 9c3f706..29c5e55 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -389,6 +389,21 @@ public class FunctionConfigUtilsTest {
);
}
+ @Test
+ public void testMergeRuntimeFlags() {
+ FunctionConfig functionConfig = createFunctionConfig();
+ FunctionConfig newFunctionConfig =
createUpdatedFunctionConfig("runtimeFlags", "-Dfoo=bar2");
+ FunctionConfig mergedConfig =
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+ assertEquals(
+ mergedConfig.getRuntimeFlags(), "-Dfoo=bar2"
+ );
+ mergedConfig.setRuntimeFlags(functionConfig.getRuntimeFlags());
+ assertEquals(
+ new Gson().toJson(functionConfig),
+ new Gson().toJson(mergedConfig)
+ );
+ }
+
private FunctionConfig createFunctionConfig() {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant("test-tenant");
@@ -409,6 +424,7 @@ public class FunctionConfigUtilsTest {
functionConfig.setTimeoutMs(2000l);
functionConfig.setWindowConfig(new
WindowConfig().setWindowLengthCount(10));
functionConfig.setCleanupSubscription(true);
+ functionConfig.setRuntimeFlags("-Dfoo=bar");
return functionConfig;
}
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 446deb3..eb6ab39 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -264,6 +264,21 @@ public class SinkConfigUtilsTest {
);
}
+ @Test
+ public void testMergeRuntimeFlags() {
+ SinkConfig sinkConfig = createSinkConfig();
+ SinkConfig newFunctionConfig = createUpdatedSinkConfig("runtimeFlags",
"-Dfoo=bar2");
+ SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig,
newFunctionConfig);
+ assertEquals(
+ mergedConfig.getRuntimeFlags(), "-Dfoo=bar2"
+ );
+ mergedConfig.setRuntimeFlags(sinkConfig.getRuntimeFlags());
+ assertEquals(
+ new Gson().toJson(sinkConfig),
+ new Gson().toJson(mergedConfig)
+ );
+ }
+
private SinkConfig createSinkConfig() {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setTenant("test-tenant");
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 85911ed..2005902 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -198,6 +198,21 @@ public class SourceConfigUtilsTest {
);
}
+ @Test
+ public void testMergeRuntimeFlags() {
+ SourceConfig sourceConfig = createSourceConfig();
+ SourceConfig newFunctionConfig =
createUpdatedSourceConfig("runtimeFlags", "-Dfoo=bar2");
+ SourceConfig mergedConfig =
SourceConfigUtils.validateUpdate(sourceConfig, newFunctionConfig);
+ assertEquals(
+ mergedConfig.getRuntimeFlags(), "-Dfoo=bar2"
+ );
+ mergedConfig.setRuntimeFlags(sourceConfig.getRuntimeFlags());
+ assertEquals(
+ new Gson().toJson(sourceConfig),
+ new Gson().toJson(mergedConfig)
+ );
+ }
+
private SourceConfig createSourceConfig() {
SourceConfig sourceConfig = new SourceConfig();
sourceConfig.setTenant("test-tenant");