borlandor edited a comment on issue #5099: "ConnectException: Connection refused: /127.0.0.1:49695" when running python example function URL: https://github.com/apache/pulsar/issues/5099#issuecomment-528240374 My Java function also has this problem: ``` 15:12:04.466 [function-timer-thread-90-1] ERROR org.apache.pulsar.functions.runtime.ProcessRuntime - Health check failed for ContextWindowFunction-0 java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_222] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_222] at org.apache.pulsar.functions.runtime.ProcessRuntime.lambda$start$1(ProcessRuntime.java:164) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_222] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_222] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_222] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_222] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222] Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at io.grpc.Status.asRuntimeException(Status.java:530) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:482) ~[io.grpc-grpc-stub-1.18.0.jar:1.18.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:699) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[io.grpc-grpc-core-1.18.0.jar:1.18.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_222] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_222] ... 1 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:42455 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_222] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_222] at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] ... 2 more Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_222] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_222] at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) ~[io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] ... 2 more 15:12:04.524 [function-timer-thread-90-1] ERROR org.apache.pulsar.functions.runtime.ProcessRuntime - Extracted Process death exception java.lang.RuntimeException: at org.apache.pulsar.functions.runtime.ProcessRuntime.tryExtractingDeathException(ProcessRuntime.java:380) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0] at org.apache.pulsar.functions.runtime.ProcessRuntime.isAlive(ProcessRuntime.java:367) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0] at org.apache.pulsar.functions.runtime.RuntimeSpawner.lambda$start$0(RuntimeSpawner.java:88) ~[org.apache.pulsar-pulsar-functions-runtime-2.4.0.jar:2.4.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_222] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_222] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_222] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_222] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222] ``` My modified ContextWindowFunction.java : ``` package org.apache.pulsar.functions.api.examples; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.WindowContext; import org.apache.pulsar.functions.api.WindowFunction; import java.util.Collection; import java.util.Set; import java.util.Map; import java.util.HashMap; import java.util.Optional; //import java.util.concurrent.CompletableFuture; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.pulsar.functions.api.examples.pojo.MoteStatValue; /** * Example Function that acts on a window of tuples at a time rather than per tuple basis. */ @Slf4j public class ContextWindowFunction implements WindowFunction<String, String> { @Override public String process(Collection<Record<String>> LogItems, WindowContext context) { //JSONArray jsonArray = new JSONArray(); Map<String, MoteStatValue > mapMoteStat = new HashMap<String, MoteStatValue>(); for (Record<String> record : LogItems) { context.publish("test_debug", record.getValue()); JSONObject jsonObject = JSON.parseObject(record.getValue()); context.publish("test_debug", "JSONObject.parseObject success!"); String s = jsonObject.getString("pktype"); /* if (s.equals("motetx")) { String eui = jsonObject.getString("eui"); context.publish("test_debug", eui); MoteStatValue statValue = mapMoteStat.get(eui); if (statValue == null) { statValue = new MoteStatValue(0,0); } statValue.incUpNbs(); statValue.incUpThroughput(jsonObject.getIntValue("payloadlen")); mapMoteStat.put(eui,statValue); } */ } //Set setMoteStat = mapMoteStat.entrySet(); //context.publish("test_debug", setMoteStat.size() ); //return setMoteStat.toString(); return "abacdefg"; } } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
