sijie commented on a change in pull request #4174: [go function] support 
localrun and cluster mode for go function
URL: https://github.com/apache/pulsar/pull/4174#discussion_r279609942
 
 

 ##########
 File path: 
pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 ##########
 @@ -89,31 +91,180 @@
             if (StringUtils.isNotEmpty(extraDependenciesDir)) {
                 args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir);
             }
+        } else if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.GO) {
+            //no-op
         }
 
         return args;
     }
 
+    /**
+     *
+     * Different from python and java function, Go function uploads a complete 
executable file(including:
+     * instance file + user code file). Its parameter list is provided to the 
broker in the form of a yaml file,
+     * the advantage of this approach is that backward compatibility is 
guaranteed.
+     *
+     * When we run the go function, we only need to specify the location of 
the go-function file and the yaml file.
+     * The content of the yaml file will be automatically generated according 
to the content provided by instanceConfig.
+     *
+     */
+
+    public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
+                                                String originalCodeFileName,
+                                                String pulsarServiceUrl) {
+        final List<String> args = new LinkedList<>();
+        FunctionConverGoConfig functionConverGoConfig = new 
FunctionConverGoConfig();
+
+        if (instanceConfig.getClusterName() != null) {
+            
functionConverGoConfig.setClusterName(instanceConfig.getClusterName());
+        }
+
+        if (instanceConfig.getInstanceId() != 0) {
+            
functionConverGoConfig.setInstanceID(instanceConfig.getInstanceId());
+        }
+
+        if (instanceConfig.getFunctionId() != null) {
+            functionConverGoConfig.setFuncID(instanceConfig.getFunctionId());
+        }
+
+        if (instanceConfig.getFunctionVersion() != null) {
+            
functionConverGoConfig.setFuncVersion(instanceConfig.getFunctionVersion());
+        }
+
+        if (instanceConfig.getFunctionDetails().getAutoAck()) {
+            
functionConverGoConfig.setAutoAck(instanceConfig.getFunctionDetails().getAutoAck());
+        }
+
+        if (instanceConfig.getFunctionDetails().getTenant() != null) {
+            
functionConverGoConfig.setTenant(instanceConfig.getFunctionDetails().getTenant());
+        }
+
+        if (instanceConfig.getFunctionDetails().getNamespace() != null) {
+            
functionConverGoConfig.setNameSpace(instanceConfig.getFunctionDetails().getNamespace());
+        }
+
+        if (instanceConfig.getFunctionDetails().getName() != null) {
+            
functionConverGoConfig.setName(instanceConfig.getFunctionDetails().getName());
+        }
+
+        if (instanceConfig.getFunctionDetails().getClassName() != null) {
+            
functionConverGoConfig.setClassName(instanceConfig.getFunctionDetails().getClassName());
+        }
+        if (instanceConfig.getFunctionDetails().getLogTopic() != null) {
+            
functionConverGoConfig.setLogTopic(instanceConfig.getFunctionDetails().getLogTopic());
+        }
+        if (instanceConfig.getFunctionDetails().getProcessingGuarantees() != 
null) {
+            
functionConverGoConfig.setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
+        }
+        if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
+            
functionConverGoConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
+        }
+        if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
+            
functionConverGoConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
+        }
+
+        if (instanceConfig.getMaxBufferedTuples() != 0) {
+            
functionConverGoConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
+        }
+
+        if (pulsarServiceUrl != null) {
+            functionConverGoConfig.setPulsarServiceURL(pulsarServiceUrl);
+        }
+        if 
(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription()) {
+            
functionConverGoConfig.setCleanupSubscription(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription());
+        }
+        if 
(instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) 
{
+            
functionConverGoConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
+        }
+
+        if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() 
!= null) {
+            for (String inputTopic : 
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {
+                functionConverGoConfig.setSourceSpecsTopic(inputTopic);
+            }
+        }
+
+        if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() != 
0) {
+            
functionConverGoConfig.setTimeoutMs(instanceConfig.getFunctionDetails().getSource().getTimeoutMs());
+        }
+
+        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
+            
functionConverGoConfig.setSinkSpecsTopic(instanceConfig.getFunctionDetails().getSink().getTopic());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getCpu() != 0) {
+            
functionConverGoConfig.setCpu(instanceConfig.getFunctionDetails().getResources().getCpu());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getRam() != 0) {
+            
functionConverGoConfig.setRam(instanceConfig.getFunctionDetails().getResources().getRam());
+        }
+
+        if (instanceConfig.getFunctionDetails().getResources().getDisk() != 0) 
{
+            
functionConverGoConfig.setDisk(instanceConfig.getFunctionDetails().getResources().getDisk());
+        }
+
+        if 
(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic() != 
null) {
+            
functionConverGoConfig.setDeadLetterTopic(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
+        }
+
+        if 
(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries() 
!= 0) {
+            
functionConverGoConfig.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
+        }
+
+        functionConverGoConfig.setKillAfterIdleMs(0);
+
+        DumperOptions dumperOptions = new DumperOptions();
+        dumperOptions.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
+        Yaml yaml = new Yaml(dumperOptions);
+        String output = yaml.dumpAs(functionConverGoConfig, Tag.MAP, null);
+        String pathName = "pulsar-functions/runtime/src/main/resources";
+        String fileName = String.format("%s_%s_%s.yml", 
functionConverGoConfig.getTenant(), functionConverGoConfig.getNameSpace(),
+                functionConverGoConfig.getName());
+        File ymlFile = new File(pathName + "/" + fileName);
+        try {
+            FileWriter fileWriter = new FileWriter(ymlFile);
+            fileWriter.write(output);
+            fileWriter.flush();
+            fileWriter.close();
+        } catch (IOException e) {
+            e.printStackTrace();
 
 Review comment:
   it is a bad practice to use `e.printStackTrace()`. 
   
   Also I don't think you should catch the exception here. You should let the 
function throw the exception and fail the operation. Because Go instance can 
not run correctly without the instance config. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to