borlandor edited a comment on issue #5099: "ConnectException: Connection 
refused: /127.0.0.1:49695"  when running python example function
URL: https://github.com/apache/pulsar/issues/5099#issuecomment-528240374
 
 
   My Java function also has this problem:
   ```
   15:12:04.466 [function-timer-thread-90-1] ERROR 
org.apache.pulsar.functions.runtime.ProcessRuntime - Health check failed for 
ContextWindowFunction-0
   java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: 
UNAVAILABLE: io exception
           at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_222]
           at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
~[?:1.8.0_222]
           at 
org.apache.pulsar.functions.runtime.ProcessRuntime.lambda$start$1(ProcessRuntime.java:164)
 ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_222]
           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[?:1.8.0_222]
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_222]
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [?:1.8.0_222]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_222]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_222]
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
   Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
           at io.grpc.Status.asRuntimeException(Status.java:530) 
~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:482) 
~[io.grpc-grpc-stub-1.18.0.jar:1.18.0]
           at 
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:699)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459) 
~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63) 
~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584)
 ~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) 
~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) 
~[io.grpc-grpc-core-1.18.0.jar:1.18.0]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_222]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_222]
           ... 1 more
   Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: 
Connection refused: /127.0.0.1:42455
           at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
~[?:1.8.0_222]
           at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
~[?:1.8.0_222]
           at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
 ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
 ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) 
~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
 ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) 
~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) 
~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
 ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           ... 2 more
   Caused by: java.net.ConnectException: Connection refused
           at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
~[?:1.8.0_222]
           at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
~[?:1.8.0_222]
           at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
 ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
 ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) 
~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
 ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) 
~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) 
~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
 ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           ... 2 more
   15:12:04.524 [function-timer-thread-90-1] ERROR 
org.apache.pulsar.functions.runtime.ProcessRuntime - Extracted Process death 
exception
   java.lang.RuntimeException: 
           at 
org.apache.pulsar.functions.runtime.ProcessRuntime.tryExtractingDeathException(ProcessRuntime.java:380)
 ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
           at 
org.apache.pulsar.functions.runtime.ProcessRuntime.isAlive(ProcessRuntime.java:367)
 ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
           at 
org.apache.pulsar.functions.runtime.RuntimeSpawner.lambda$start$0(RuntimeSpawner.java:88)
 ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_222]
           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[?:1.8.0_222]
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_222]
           at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [?:1.8.0_222]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_222]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_222]
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
   ```
   
   My modified ContextWindowFunction.java :
   ```
   package org.apache.pulsar.functions.api.examples;
   import lombok.extern.slf4j.Slf4j;
   import org.apache.pulsar.functions.api.Record;
   import org.apache.pulsar.functions.api.WindowContext;
   import org.apache.pulsar.functions.api.WindowFunction;
   
   import java.util.Collection;
   import java.util.Set;
   import java.util.Map;
   import java.util.HashMap;
   import java.util.Optional;
   //import java.util.concurrent.CompletableFuture;
   
   import com.alibaba.fastjson.JSON;
   import com.alibaba.fastjson.JSONObject;
   import org.apache.pulsar.functions.api.examples.pojo.MoteStatValue;
   
   /**
    * Example Function that acts on a window of tuples at a time rather than 
per tuple basis.
    */
   
   @Slf4j
   public class ContextWindowFunction implements WindowFunction<String, String> 
{
       @Override
       public String process(Collection<Record<String>> LogItems, WindowContext 
context) {
           //JSONArray jsonArray = new JSONArray();
   
           Map<String, MoteStatValue > mapMoteStat = new HashMap<String, 
MoteStatValue>();
   
           for (Record<String> record : LogItems) 
           {
                                                context.publish("test_debug", 
record.getValue());
                                                JSONObject jsonObject = 
JSON.parseObject(record.getValue());
                                                context.publish("test_debug", 
"JSONObject.parseObject success!");
                                                String s = 
jsonObject.getString("pktype");
   /*
                                                if (s.equals("motetx"))
                                                {
                                                        String eui = 
jsonObject.getString("eui");
                                                  context.publish("test_debug", 
eui);
                                                        MoteStatValue statValue 
= mapMoteStat.get(eui);
                                                        if (statValue == null)
                                                        {
                                                                statValue = new 
MoteStatValue(0,0);
                                                        }
                                                          
                                                        statValue.incUpNbs();
                                                        
statValue.incUpThroughput(jsonObject.getIntValue("payloadlen"));
   
                                                        
mapMoteStat.put(eui,statValue);
                                                        
                                                }
    
   */ 
           }
           //Set setMoteStat = mapMoteStat.entrySet();
                          //context.publish("test_debug", setMoteStat.size() );
           //return setMoteStat.toString();
           return "abacdefg";
       }
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to