[ 
https://issues.apache.org/jira/browse/IGNITE-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739954#comment-16739954
 ] 

Saikat Maitra commented on IGNITE-10861:
----------------------------------------

[~jeado]

Thank you for sharing the issue. The open method check if an ignite instance is 
already created in the same jvm then it reuses it otherwise it create a new 
ignite instance. 

1) Can you please confirm if this issue is reproducible in Flink 1.5 version.

2) I am thinking looking into the shared example that if one instance is 
starting the second one is also getting started and finding that since no 
ignite instance is available and trying to start another ignite instance but 
then during ignite.start() process it is getting exception since by then first 
ignite instance would have already started. 

3. Can you also check in the ignite-test.xml if you are using port range or 
single port on a node?

 

Regards,

Saikat

> Using multiple Ignite Sink got Ignite instance has already been started Error
> -----------------------------------------------------------------------------
>
>                 Key: IGNITE-10861
>                 URL: https://issues.apache.org/jira/browse/IGNITE-10861
>             Project: Ignite
>          Issue Type: Bug
>          Components: streaming
>    Affects Versions: 2.7
>            Reporter: jeado ko
>            Priority: Major
>
> I got following error when I create multiple sink in Flink 1.7.0
> {code:java}
> Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite 
> instance has already been started.
> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141)
> at 
> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700)
> at org.apache.ignite.Ignition.start(Ignition.java:348)
> {code}
> and this is my flink job code
> {code:java}
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala._
> import org.apache.ignite.sink.flink.IgniteSink
> import scala.collection.JavaConverters._
> object IgniteSinkTestJob extends App {
>   val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> //  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache", 
> "ignite-test.xml")
>   igniteSink.setAllowOverwrite(true)
>   igniteSink.setAutoFlushFrequency(10)
> //  igniteSink.open(new Configuration)
>   val igniteSink2 = new IgniteSink[java.util.Map[String, 
> String]]("testCache2", "ignite-test.xml")
>   igniteSink2.setAllowOverwrite(true)
>   igniteSink2.setAutoFlushFrequency(10)
> //  igniteSink2.open(new Configuration)
>   val igniteSink3 = new IgniteSink[java.util.Map[String, 
> String]]("testCache3", "ignite-test.xml")
>   igniteSink3.setAllowOverwrite(true)
>   igniteSink3.setAutoFlushFrequency(10)
> //  igniteSink3.open(new Configuration)
>   val source = env.fromCollection(
>     Array(
>       Map("key1" -> "hello1"),
>       Map("key1" -> "hello11"),
>       Map("key1" -> "hello144"),
>       Map("key1" -> "hello1155"),
>       Map("key2" -> "hello2"),
>       Map("key2" -> "hello3"),
>       Map("key3" -> "hello23"),
>       Map("key3" -> "hello25")
>     ).map(_.asJava)
>   )
>   source
>     .filter(v => v.containsKey("key1"))
>     .setParallelism(2)
>     .addSink(igniteSink)
>       .name("sink1")
>       .setParallelism(1)
>   source.filter(v => v.containsKey("key2"))
>     .setParallelism(2)
>     .addSink(igniteSink2)
>       .name("sink2")
>       .setParallelism(1)
>   source.filter(v => v.containsKey("key3"))
>     .setParallelism(2)
>     .addSink(igniteSink3)
>       .name("sink3")
>       .setParallelism(1)
>   env.execute("test ignite sink")
> }
> {code}
> And if I remove comment to open Ignite sink before execute pipeline, I still 
> got a same error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to