bbl opened a new issue #7267:
URL: https://github.com/apache/pulsar/issues/7267


   **Describe the bug**
   Whenever a golang function is run locally or created - the configured 
`InstanceCommunication` port is set to 0. This leads to health checks errors.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Create a sample function from 
[example](https://github.com/apache/pulsar/blob/master/pulsar-function-go/examples/exclamationFunc/exclamationFunc.go).
   2. Compile and run  locally:
   
   ```bash
   go build
   ./apache-pulsar-2.5.2/bin/pulsar-admin functions localrun \
        --go $(ROOT_DIR)/pulsar-go \
        --name "test-funtion" \
        --inputs persistent://public/default/input-1 \
        --output persistent://public/default/output-1
   ```
   3. Observe in logs that instance config has port is set to `0`.
   ```
   ...
   17:08:17.997 [main] INFO  
org.apache.pulsar.functions.runtime.process.ProcessRuntime - Created or found 
function log directory 
/work/projects/rockos/pulsar-sample-go/apache-pulsar-2.5.2/logs/functions/public/default/test-funtion
   17:08:17.999 [main] INFO  
org.apache.pulsar.functions.runtime.process.ProcessRuntime - ProcessBuilder 
starting the process with args /work/projects/rockos/pulsar-sample-go/pulsar-go 
-instance-conf 
{"pulsarServiceURL":"pulsar://localhost:6650","instanceID":0,"funcID":"f53f28c0-7338-46b2-beb9-9d3126954213","funcVersion":"83775c36-7618-489e-83a3-82f54355a832","maxBufTuples":1024,"port":0,"clusterName":"local","killAfterIdleMs":0,"tenant":"public","nameSpace":"default","name":"test-funtion","className":"","logTopic":"","processingGuarantees":0,"secretsMap":"","runtime":0,"autoAck":true,"parallelism":1,"subscriptionType":0,"timeoutMs":0,"subscriptionName":"","cleanupSubscription":true,"sourceSpecsTopic":"persistent://public/default/input-1","sourceSchemaType":"","receiverQueueSize":0,"sinkSpecsTopic":"persistent://public/default/output-1","sinkSchemaType":"","cpu":1.0,"ram":1073741824,"disk":10737418240,"maxMessageRetries":0,"deadLetterTopic":"","regexPatternSubscription":false}
   2020/06/13 17:08:18.005 log.go:46: [info] The default config file path is: 
conf/conf.yaml
   17:08:18.006 [main] INFO  
org.apache.pulsar.functions.runtime.process.ProcessRuntime - Started process 
successfully
   2020/06/13 17:08:18.006  [info] Connecting to broker 
remote_addr=pulsar://localhost:6650
   2020/06/13 17:08:18.007  [info] TCP connection established 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:36010
   2020/06/13 17:08:18.008  [info] Connection is ready 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:36010
   2020/06/13 17:08:18.011  [info] Created producer 
producer_name=standalone-6-16 topic=persistent://public/default/output-1 
cnx=127.0.0.1:36010 -> 127.0.0.1:6650
   2020/06/13 17:08:18.014 asm_amd64.s:1357: [info] Connected consumer 
name=qtptl subscription=public/default/test-funtion 
topic=persistent://public/default/input-1
   2020/06/13 17:08:18.014 asm_amd64.s:1357: [info] Created consumer name=qtptl 
subscription=public/default/test-funtion 
topic=persistent://public/default/input-1
   2020/06/13 17:08:18.014 log.go:46: [info] Serving InstanceCommunication on 
port 0
   ...
   ```
   4. Observe in logs that runner actually expects a particular port to be 
served by function (`39915` in our case):
   ```
   17:08:48.462 [function-timer-thread-3-1] ERROR 
org.apache.pulsar.functions.runtime.process.ProcessRuntime - Health check 
failed for test-funtion-0
   java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: 
UNAVAILABLE: io exception
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
        at 
org.apache.pulsar.functions.runtime.process.ProcessRuntime.lambda$start$1(ProcessRuntime.java:167)
 [org.apache.pulsar-pulsar-functions-runtime-2.5.2.jar:2.5.2]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) 
[?:?]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 [?:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at java.lang.Thread.run(Thread.java:834) [?:?]
   Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
        at io.grpc.Status.asRuntimeException(Status.java:530) 
~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:482) 
~[io.grpc-grpc-stub-1.18.0.jar:1.18.0]
        at 
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:699)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459) 
~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63) 
~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) 
~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) 
~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
        ... 1 more
   Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: 
Connection refused: /127.0.0.1:39915
   ...
   ```
   
   **Expected behavior**
   I assume that the port wasn't passed at some point to the ProcessRunner.
   Expected ProcessRunner to set the correct port in instance config.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to