[
https://issues.apache.org/jira/browse/IGNITE-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
jeado ko updated IGNITE-10861:
------------------------------
Description:
I got following error when I create multiple sink in Flink 1.7.0
{code:java}
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
instance has already been started.
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141)
at
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700)
at org.apache.ignite.Ignition.start(Ignition.java:348)
{code}
and this is my flink job code
{code:java}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.ignite.sink.flink.IgniteSink
import scala.collection.JavaConverters._
object IgniteSinkTestJob extends App {
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache",
"ignite-test.xml")
igniteSink.setAllowOverwrite(true)
igniteSink.setAutoFlushFrequency(10)
// igniteSink.open(new Configuration)
val igniteSink2 = new IgniteSink[java.util.Map[String, String]]("testCache2",
"ignite-test.xml")
igniteSink2.setAllowOverwrite(true)
igniteSink2.setAutoFlushFrequency(10)
// igniteSink2.open(new Configuration)
val igniteSink3 = new IgniteSink[java.util.Map[String, String]]("testCache3",
"ignite-test.xml")
igniteSink3.setAllowOverwrite(true)
igniteSink3.setAutoFlushFrequency(10)
// igniteSink3.open(new Configuration)
val source = env.fromCollection(
Array(
Map("key1" -> "hello1"),
Map("key1" -> "hello11"),
Map("key1" -> "hello144"),
Map("key1" -> "hello1155"),
Map("key2" -> "hello2"),
Map("key2" -> "hello3"),
Map("key3" -> "hello23"),
Map("key3" -> "hello25")
).map(_.asJava)
)
source
.filter(v => v.containsKey("key1"))
.setParallelism(2)
.addSink(igniteSink)
.name("sink1")
.setParallelism(1)
source.filter(v => v.containsKey("key2"))
.setParallelism(2)
.addSink(igniteSink2)
.name("sink2")
.setParallelism(1)
source.filter(v => v.containsKey("key3"))
.setParallelism(2)
.addSink(igniteSink3)
.name("sink3")
.setParallelism(1)
env.execute("test ignite sink")
}
{code}
And if I remove comment to open Ignite sink before execute pipeline, I still
got a same error.
was:
I got following error when I create multiple sink in Flink 1.7.0
{code:java}
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
instance has already been started.
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141)
at
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700)
at org.apache.ignite.Ignition.start(Ignition.java:348)
{code}
and this is my flink job code
{code:java}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.ignite.sink.flink.IgniteSink
import scala.collection.JavaConverters._
object IgniteSinkTestJob extends App {
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache",
"ignite-test.xml")
igniteSink.setAllowOverwrite(true)
igniteSink.setAutoFlushFrequency(10)
// igniteSink.open(new Configuration)
val igniteSink2 = new IgniteSink[java.util.Map[String, String]]("testCache2",
"ignite-test.xml")
igniteSink2.setAllowOverwrite(true)
igniteSink2.setAutoFlushFrequency(10)
// igniteSink2.open(new Configuration)
val igniteSink3 = new IgniteSink[java.util.Map[String, String]]("testCache3",
"ignite-test.xml")
igniteSink3.setAllowOverwrite(true)
igniteSink3.setAutoFlushFrequency(10)
// igniteSink3.open(new Configuration)
val source = env.fromCollection(
Array(
Map("key1" -> "hello1"),
Map("key1" -> "hello11"),
Map("key1" -> "hello144"),
Map("key1" -> "hello1155"),
Map("key2" -> "hello2"),
Map("key2" -> "hello3"),
Map("key3" -> "hello23"),
Map("key3" -> "hello25")
).map(_.asJava)
)
source
.filter(v => v.containsKey("key1"))
.setParallelism(2)
.addSink(igniteSink)
.name("sink1")
.setParallelism(1)
source.filter(v => v.containsKey("key2"))
.setParallelism(2)
.addSink(igniteSink2)
.name("sink2")
.setParallelism(1)
source.filter(v => v.containsKey("key3"))
.setParallelism(2)
.addSink(igniteSink3)
.name("sink3")
.setParallelism(1)
env.execute("test ignite sink")
}
{code}
> Using multiple Ignite Sink got Ignite instance has already been started Error
> -----------------------------------------------------------------------------
>
> Key: IGNITE-10861
> URL: https://issues.apache.org/jira/browse/IGNITE-10861
> Project: Ignite
> Issue Type: Bug
> Components: streaming
> Affects Versions: 2.7
> Reporter: jeado ko
> Priority: Major
>
> I got following error when I create multiple sink in Flink 1.7.0
> {code:java}
> Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
> instance has already been started.
> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141)
> at
> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700)
> at org.apache.ignite.Ignition.start(Ignition.java:348)
> {code}
> and this is my flink job code
> {code:java}
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala._
> import org.apache.ignite.sink.flink.IgniteSink
> import scala.collection.JavaConverters._
> object IgniteSinkTestJob extends App {
> val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
> // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache",
> "ignite-test.xml")
> igniteSink.setAllowOverwrite(true)
> igniteSink.setAutoFlushFrequency(10)
> // igniteSink.open(new Configuration)
> val igniteSink2 = new IgniteSink[java.util.Map[String,
> String]]("testCache2", "ignite-test.xml")
> igniteSink2.setAllowOverwrite(true)
> igniteSink2.setAutoFlushFrequency(10)
> // igniteSink2.open(new Configuration)
> val igniteSink3 = new IgniteSink[java.util.Map[String,
> String]]("testCache3", "ignite-test.xml")
> igniteSink3.setAllowOverwrite(true)
> igniteSink3.setAutoFlushFrequency(10)
> // igniteSink3.open(new Configuration)
> val source = env.fromCollection(
> Array(
> Map("key1" -> "hello1"),
> Map("key1" -> "hello11"),
> Map("key1" -> "hello144"),
> Map("key1" -> "hello1155"),
> Map("key2" -> "hello2"),
> Map("key2" -> "hello3"),
> Map("key3" -> "hello23"),
> Map("key3" -> "hello25")
> ).map(_.asJava)
> )
> source
> .filter(v => v.containsKey("key1"))
> .setParallelism(2)
> .addSink(igniteSink)
> .name("sink1")
> .setParallelism(1)
> source.filter(v => v.containsKey("key2"))
> .setParallelism(2)
> .addSink(igniteSink2)
> .name("sink2")
> .setParallelism(1)
> source.filter(v => v.containsKey("key3"))
> .setParallelism(2)
> .addSink(igniteSink3)
> .name("sink3")
> .setParallelism(1)
> env.execute("test ignite sink")
> }
> {code}
> And if I remove comment to open Ignite sink before execute pipeline, I still
> got a same error.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)