jerrypeng commented on a change in pull request #4174: [go function] support 
localrun and cluster mode for go function
URL: https://github.com/apache/pulsar/pull/4174#discussion_r279905483
 
 

 ##########
 File path: 
pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 ##########
 @@ -82,38 +84,183 @@
     public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig, 
String extraDependenciesDir) {
 
         final List<String> args = new LinkedList<>();
-        if (instanceConfig.getFunctionDetails().getRuntime() ==  
Function.FunctionDetails.Runtime.JAVA) {
+        if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
             //no-op
         } else if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.PYTHON) {
             // add `extraDependenciesDir` to python package searching path
             if (StringUtils.isNotEmpty(extraDependenciesDir)) {
                 args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir);
             }
+        } else if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.GO) {
+            //no-op
         }
 
         return args;
     }
 
+    /**
+     * Different from python and java function, Go function uploads a complete 
executable file(including:
+     * instance file + user code file). Its parameter list is provided to the 
broker in the form of a yaml file,
+     * the advantage of this approach is that backward compatibility is 
guaranteed.
+     *
+     * In Java and Python the instance is managed by broker (or function 
worker) so the changes in command line
+     * is under control; but in Go the instance is compiled with the user 
function, so pulsar doesn't have the
+     * control what instance is used in the function. Hence in order to 
support BC for go function, we can't
+     * dynamically add more commandline arguments. Using an instance config to 
pass the parameters from function
+     * worker to go instance is the best way for maintaining the BC.
+     * <p>
+     * When we run the go function, we only need to specify the location of 
the go-function file and the yaml file.
+     * The content of the yaml file will be automatically generated according 
to the content provided by instanceConfig.
+     */
+
+    public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
+                                                String originalCodeFileName,
+                                                String pulsarServiceUrl) 
throws IOException {
+        final List<String> args = new LinkedList<>();
+        GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
+
+        if (instanceConfig.getClusterName() != null) {
+            goInstanceConfig.setClusterName(instanceConfig.getClusterName());
+        }
+
+        if (instanceConfig.getInstanceId() != 0) {
+            goInstanceConfig.setInstanceID(instanceConfig.getInstanceId());
+        }
+
+        if (instanceConfig.getFunctionId() != null) {
+            goInstanceConfig.setFuncID(instanceConfig.getFunctionId());
+        }
+
+        if (instanceConfig.getFunctionVersion() != null) {
+            
goInstanceConfig.setFuncVersion(instanceConfig.getFunctionVersion());
+        }
+
+        if (instanceConfig.getFunctionDetails().getAutoAck()) {
+            
goInstanceConfig.setAutoAck(instanceConfig.getFunctionDetails().getAutoAck());
+        }
+
+        if (instanceConfig.getFunctionDetails().getTenant() != null) {
+            
goInstanceConfig.setTenant(instanceConfig.getFunctionDetails().getTenant());
+        }
+
+        if (instanceConfig.getFunctionDetails().getNamespace() != null) {
+            
goInstanceConfig.setNameSpace(instanceConfig.getFunctionDetails().getNamespace());
+        }
+
+        if (instanceConfig.getFunctionDetails().getName() != null) {
+            
goInstanceConfig.setName(instanceConfig.getFunctionDetails().getName());
+        }
+
+        if (instanceConfig.getFunctionDetails().getLogTopic() != null) {
+            
goInstanceConfig.setLogTopic(instanceConfig.getFunctionDetails().getLogTopic());
+        }
+        if (instanceConfig.getFunctionDetails().getProcessingGuarantees() != 
null) {
+            
goInstanceConfig.setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
+        }
+        if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
+            
goInstanceConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
+        }
+        if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
+            
goInstanceConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
+        }
+
+        if (instanceConfig.getMaxBufferedTuples() != 0) {
+            
goInstanceConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
+        }
+
+        if (pulsarServiceUrl != null) {
+            goInstanceConfig.setPulsarServiceURL(pulsarServiceUrl);
+        }
+        if 
(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription()) {
+            
goInstanceConfig.setCleanupSubscription(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription());
+        }
+        if 
(instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) 
{
+            
goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
+        }
+
+        if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() 
!= null) {
+            for (String inputTopic : 
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {
+                goInstanceConfig.setSourceSpecsTopic(inputTopic);
+            }
+        }
+
+        if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() != 
0) {
+            
goInstanceConfig.setTimeoutMs(instanceConfig.getFunctionDetails().getSource().getTimeoutMs());
+        }
+
+        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
+            
goInstanceConfig.setSinkSpecsTopic(instanceConfig.getFunctionDetails().getSink().getTopic());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getCpu() != 0) {
+            
goInstanceConfig.setCpu(instanceConfig.getFunctionDetails().getResources().getCpu());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getRam() != 0) {
+            
goInstanceConfig.setRam(instanceConfig.getFunctionDetails().getResources().getRam());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getDisk() != 0) 
{
+            
goInstanceConfig.setDisk(instanceConfig.getFunctionDetails().getResources().getDisk());
+        }
+
+        if 
(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic() != 
null) {
+            
goInstanceConfig.setDeadLetterTopic(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
+        }
+
+        if 
(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries() 
!= 0) {
+            
goInstanceConfig.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
+        }
+
+        goInstanceConfig.setKillAfterIdleMs(0);
+
+        DumperOptions dumperOptions = new DumperOptions();
+        dumperOptions.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
+        Yaml yaml = new Yaml(dumperOptions);
+        String output = yaml.dumpAs(goInstanceConfig, Tag.MAP, null);
+        String fileName = String.format("%s_%s_%s", 
goInstanceConfig.getTenant(), goInstanceConfig.getNameSpace(),
+                goInstanceConfig.getName());
+        File ymlFile = File.createTempFile(fileName, ".yml");
 
 Review comment:
   This file needs to be cleaned somehow. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to