sijie commented on a change in pull request #4174: [go function] support 
localrun and cluster mode for go function
URL: https://github.com/apache/pulsar/pull/4174#discussion_r279609785
 
 

 ##########
 File path: 
pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 ##########
 @@ -89,31 +91,180 @@
             if (StringUtils.isNotEmpty(extraDependenciesDir)) {
                 args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir);
             }
+        } else if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.GO) {
+            //no-op
         }
 
         return args;
     }
 
+    /**
+     *
+     * Different from python and java function, Go function uploads a complete 
executable file(including:
+     * instance file + user code file). Its parameter list is provided to the 
broker in the form of a yaml file,
+     * the advantage of this approach is that backward compatibility is 
guaranteed.
+     *
+     * When we run the go function, we only need to specify the location of 
the go-function file and the yaml file.
+     * The content of the yaml file will be automatically generated according 
to the content provided by instanceConfig.
+     *
+     */
+
+    public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
+                                                String originalCodeFileName,
+                                                String pulsarServiceUrl) {
+        final List<String> args = new LinkedList<>();
+        FunctionConverGoConfig functionConverGoConfig = new 
FunctionConverGoConfig();
+
+        if (instanceConfig.getClusterName() != null) {
+            
functionConverGoConfig.setClusterName(instanceConfig.getClusterName());
+        }
+
+        if (instanceConfig.getInstanceId() != 0) {
+            
functionConverGoConfig.setInstanceID(instanceConfig.getInstanceId());
+        }
+
+        if (instanceConfig.getFunctionId() != null) {
+            functionConverGoConfig.setFuncID(instanceConfig.getFunctionId());
+        }
+
+        if (instanceConfig.getFunctionVersion() != null) {
+            
functionConverGoConfig.setFuncVersion(instanceConfig.getFunctionVersion());
+        }
+
+        if (instanceConfig.getFunctionDetails().getAutoAck()) {
+            
functionConverGoConfig.setAutoAck(instanceConfig.getFunctionDetails().getAutoAck());
+        }
+
+        if (instanceConfig.getFunctionDetails().getTenant() != null) {
+            
functionConverGoConfig.setTenant(instanceConfig.getFunctionDetails().getTenant());
+        }
+
+        if (instanceConfig.getFunctionDetails().getNamespace() != null) {
+            
functionConverGoConfig.setNameSpace(instanceConfig.getFunctionDetails().getNamespace());
+        }
+
+        if (instanceConfig.getFunctionDetails().getName() != null) {
+            
functionConverGoConfig.setName(instanceConfig.getFunctionDetails().getName());
+        }
+
+        if (instanceConfig.getFunctionDetails().getClassName() != null) {
+            
functionConverGoConfig.setClassName(instanceConfig.getFunctionDetails().getClassName());
+        }
+        if (instanceConfig.getFunctionDetails().getLogTopic() != null) {
+            
functionConverGoConfig.setLogTopic(instanceConfig.getFunctionDetails().getLogTopic());
+        }
+        if (instanceConfig.getFunctionDetails().getProcessingGuarantees() != 
null) {
+            
functionConverGoConfig.setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
+        }
+        if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
+            
functionConverGoConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
+        }
+        if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
+            
functionConverGoConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
+        }
+
+        if (instanceConfig.getMaxBufferedTuples() != 0) {
+            
functionConverGoConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
+        }
+
+        if (pulsarServiceUrl != null) {
+            functionConverGoConfig.setPulsarServiceURL(pulsarServiceUrl);
+        }
+        if 
(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription()) {
+            
functionConverGoConfig.setCleanupSubscription(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription());
+        }
+        if 
(instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) 
{
+            
functionConverGoConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
+        }
+
+        if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() 
!= null) {
+            for (String inputTopic : 
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {
+                functionConverGoConfig.setSourceSpecsTopic(inputTopic);
+            }
+        }
+
+        if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() != 
0) {
+            
functionConverGoConfig.setTimeoutMs(instanceConfig.getFunctionDetails().getSource().getTimeoutMs());
+        }
+
+        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
+            
functionConverGoConfig.setSinkSpecsTopic(instanceConfig.getFunctionDetails().getSink().getTopic());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getCpu() != 0) {
+            
functionConverGoConfig.setCpu(instanceConfig.getFunctionDetails().getResources().getCpu());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getRam() != 0) {
+            
functionConverGoConfig.setRam(instanceConfig.getFunctionDetails().getResources().getRam());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getDisk() != 0) 
{
+            
functionConverGoConfig.setDisk(instanceConfig.getFunctionDetails().getResources().getDisk());
+        }
+
+        if 
(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic() != 
null) {
+            
functionConverGoConfig.setDeadLetterTopic(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
+        }
+
+        if 
(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries() 
!= 0) {
+            
functionConverGoConfig.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
+        }
+
+        functionConverGoConfig.setKillAfterIdleMs(0);
+
+        DumperOptions dumperOptions = new DumperOptions();
+        dumperOptions.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
+        Yaml yaml = new Yaml(dumperOptions);
+        String output = yaml.dumpAs(functionConverGoConfig, Tag.MAP, null);
+        String pathName = "pulsar-functions/runtime/src/main/resources";
+        String fileName = String.format("%s_%s_%s.yml", 
functionConverGoConfig.getTenant(), functionConverGoConfig.getNameSpace(),
 
 Review comment:
   I would suggest using TempFile for now. So the config file is deleted when 
the instance dies so we don't need to manage those configuration files.
   
   ```
   Path path = Files.createTempFile(...);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to