sijie commented on a change in pull request #4174: [go function] support
localrun and cluster mode for go function
URL: https://github.com/apache/pulsar/pull/4174#discussion_r279609785
##########
File path:
pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
##########
@@ -89,31 +91,180 @@
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir);
}
+ } else if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.GO) {
+ //no-op
}
return args;
}
+ /**
+ *
+ * Different from python and java function, Go function uploads a complete
executable file(including:
+ * instance file + user code file). Its parameter list is provided to the
broker in the form of a yaml file,
+ * the advantage of this approach is that backward compatibility is
guaranteed.
+ *
+ * When we run the go function, we only need to specify the location of
the go-function file and the yaml file.
+ * The content of the yaml file will be automatically generated according
to the content provided by instanceConfig.
+ *
+ */
+
+ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
+ String originalCodeFileName,
+ String pulsarServiceUrl) {
+ final List<String> args = new LinkedList<>();
+ FunctionConverGoConfig functionConverGoConfig = new
FunctionConverGoConfig();
+
+ if (instanceConfig.getClusterName() != null) {
+
functionConverGoConfig.setClusterName(instanceConfig.getClusterName());
+ }
+
+ if (instanceConfig.getInstanceId() != 0) {
+
functionConverGoConfig.setInstanceID(instanceConfig.getInstanceId());
+ }
+
+ if (instanceConfig.getFunctionId() != null) {
+ functionConverGoConfig.setFuncID(instanceConfig.getFunctionId());
+ }
+
+ if (instanceConfig.getFunctionVersion() != null) {
+
functionConverGoConfig.setFuncVersion(instanceConfig.getFunctionVersion());
+ }
+
+ if (instanceConfig.getFunctionDetails().getAutoAck()) {
+
functionConverGoConfig.setAutoAck(instanceConfig.getFunctionDetails().getAutoAck());
+ }
+
+ if (instanceConfig.getFunctionDetails().getTenant() != null) {
+
functionConverGoConfig.setTenant(instanceConfig.getFunctionDetails().getTenant());
+ }
+
+ if (instanceConfig.getFunctionDetails().getNamespace() != null) {
+
functionConverGoConfig.setNameSpace(instanceConfig.getFunctionDetails().getNamespace());
+ }
+
+ if (instanceConfig.getFunctionDetails().getName() != null) {
+
functionConverGoConfig.setName(instanceConfig.getFunctionDetails().getName());
+ }
+
+ if (instanceConfig.getFunctionDetails().getClassName() != null) {
+
functionConverGoConfig.setClassName(instanceConfig.getFunctionDetails().getClassName());
+ }
+ if (instanceConfig.getFunctionDetails().getLogTopic() != null) {
+
functionConverGoConfig.setLogTopic(instanceConfig.getFunctionDetails().getLogTopic());
+ }
+ if (instanceConfig.getFunctionDetails().getProcessingGuarantees() !=
null) {
+
functionConverGoConfig.setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
+ }
+ if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
+
functionConverGoConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
+ }
+ if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
+
functionConverGoConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
+ }
+
+ if (instanceConfig.getMaxBufferedTuples() != 0) {
+
functionConverGoConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
+ }
+
+ if (pulsarServiceUrl != null) {
+ functionConverGoConfig.setPulsarServiceURL(pulsarServiceUrl);
+ }
+ if
(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription()) {
+
functionConverGoConfig.setCleanupSubscription(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription());
+ }
+ if
(instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null)
{
+
functionConverGoConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
+ }
+
+ if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap()
!= null) {
+ for (String inputTopic :
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {
+ functionConverGoConfig.setSourceSpecsTopic(inputTopic);
+ }
+ }
+
+ if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() !=
0) {
+
functionConverGoConfig.setTimeoutMs(instanceConfig.getFunctionDetails().getSource().getTimeoutMs());
+ }
+
+ if (instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
+
functionConverGoConfig.setSinkSpecsTopic(instanceConfig.getFunctionDetails().getSink().getTopic());
+ }
+
+ if (instanceConfig.getFunctionDetails().getResources().getCpu() != 0) {
+
functionConverGoConfig.setCpu(instanceConfig.getFunctionDetails().getResources().getCpu());
+ }
+
+ if (instanceConfig.getFunctionDetails().getResources().getRam() != 0) {
+
functionConverGoConfig.setRam(instanceConfig.getFunctionDetails().getResources().getRam());
+ }
+
+ if (instanceConfig.getFunctionDetails().getResources().getDisk() != 0)
{
+
functionConverGoConfig.setDisk(instanceConfig.getFunctionDetails().getResources().getDisk());
+ }
+
+ if
(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic() !=
null) {
+
functionConverGoConfig.setDeadLetterTopic(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
+ }
+
+ if
(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries()
!= 0) {
+
functionConverGoConfig.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
+ }
+
+ functionConverGoConfig.setKillAfterIdleMs(0);
+
+ DumperOptions dumperOptions = new DumperOptions();
+ dumperOptions.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
+ Yaml yaml = new Yaml(dumperOptions);
+ String output = yaml.dumpAs(functionConverGoConfig, Tag.MAP, null);
+ String pathName = "pulsar-functions/runtime/src/main/resources";
+ String fileName = String.format("%s_%s_%s.yml",
functionConverGoConfig.getTenant(), functionConverGoConfig.getNameSpace(),
Review comment:
I would suggest using TempFile for now. So the config file is deleted when
the instance dies so we don't need to manage those configuration files.
```
Path path = Files.createTempFile(...);
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services