Ameer Basha Pattan created IGNITE-13178:
-------------------------------------------
Summary: Spark job gets stuck indefinitely while trying to fetch
data from ignite cluster using thin client
Key: IGNITE-13178
URL: https://issues.apache.org/jira/browse/IGNITE-13178
Project: Ignite
Issue Type: Bug
Components: cache, clients, thin client
Reporter: Ameer Basha Pattan
Attachments: IgniteHelper.java
We are trying to use ignite as in-memory distributed cache and put data inside
cache using spark job.
We tried using thin client to fetch data from cache.
// we are using ThreadLocal to stop creating too many client instances.
private static final ThreadLocal<IgniteClient> igniteClientContext = new
ThreadLocal<>();
//Thin client creation
public static IgniteClient getIgniteClient(String[] address) \{
if(igniteClientContext.get() == null) { ClientConfiguration clientConfig =
null; if(cfg == null) { clientConfig = new
ClientConfiguration().setAddresses(address); } else \{ clientConfig = cfg; }
IgniteClient igniteClient = Ignition.startClient(clientConfig);
logger.info("igniteClient initialized ");
igniteClientContext.set(igniteClient); } return igniteClientContext.get(); }
>From spark code, I'm trying to create instance of ignite thin client and
>create cache object.
{{val address = config.igniteServers.split(",") // config.igniteServers
="10.xx.xxx.xxx:10800,10.xx.xx.xxx:10800"}}
{{}}
Below code will be called from spark executor. We will be processing set or
records in each executor and we are only reading data from cache and comparing
with currently processing record. If it is already present in cache, we will
ignore otherwise we will consume it.
{{}}
{{val cacheCfg = new ClientCacheConfiguration()
.setName(PNR_CACHE)
.setCacheMode(CacheMode.REPLICATED)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setDefaultLockTimeout(30000)
val igniteClient = IgniteHelper.getIgniteClient(address)
val cache : ClientCache[Long, Boolean] =
igniteClient.getOrCreateCache(cacheCfg);}}
{{}}
{{Job is running fine for couple of hours and it gets stuck with below
exception indefinitely.}}
{{}}
{{org.apache.ignite.client.ClientConnectionException: Ignite cluster is
unavailable
[sock=Socket[addr=hdpct2ldap01g02.hadoop.sgdcprod.XXXX.com/10.xx.xx.xx,port=10800,localport=20214]]
at
org.apache.ignite.internal.client.thin.TcpClientChannel.handleIOError(TcpClientChannel.java:499)
at
org.apache.ignite.internal.client.thin.TcpClientChannel.handleIOError(TcpClientChannel.java:491)
at
org.apache.ignite.internal.client.thin.TcpClientChannel.access$100(TcpClientChannel.java:92)
at
org.apache.ignite.internal.client.thin.TcpClientChannel$ByteCountingDataInput.read(TcpClientChannel.java:538)
at
org.apache.ignite.internal.client.thin.TcpClientChannel$ByteCountingDataInput.readInt(TcpClientChannel.java:572)
at
org.apache.ignite.internal.client.thin.TcpClientChannel.processNextResponse(TcpClientChannel.java:272)
at
org.apache.ignite.internal.client.thin.TcpClientChannel.receive(TcpClientChannel.java:234)
at
org.apache.ignite.internal.client.thin.TcpClientChannel.service(TcpClientChannel.java:171)
at
org.apache.ignite.internal.client.thin.ReliableChannel.service(ReliableChannel.java:160)
at
org.apache.ignite.internal.client.thin.ReliableChannel.request(ReliableChannel.java:187)
at
org.apache.ignite.internal.client.thin.TcpIgniteClient.getOrCreateCache(TcpIgniteClient.java:124)
at
com.XXXX.eda.pnr.PnrApplication$$anonfun$2$$anonfun$apply$4.apply(PnrApplication.scala:305)
at
com.XXXX.eda.pnr.PnrApplication$$anonfun$2$$anonfun$apply$4.apply(PnrApplication.scala:297)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1094)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Connection timed out (Read failed)
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at
org.apache.ignite.internal.client.thin.TcpClientChannel$ByteCountingDataInput.read(TcpClientChannel.java:535)}}
{{}}
{{}}
{{}}
{{stackoverflow link :}}
[https://stackoverflow.com/questions/62531478/spark-job-gets-stuck-indefinitely-while-trying-to-fetch-data-from-ignite-cluster]
{{}}
{{}}
{{}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)