Hi guys, Thanks for helping with the fix!
As this is a development topic now and not a usage one, I’m BCC’ing the user-list and replacing it with dev-list. Please continue the discussion there. Andrey, Dmitry, please help with the review. Thanks, Stan From: Saikat Maitra Sent: 22 июля 2018 г. 8:28 To: u...@ignite.apache.org; ray...@cisco.com Subject: Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid Hi Ray, Andrew As discussed I have fixed the issue with IgniteSink when running in cluster mode. Please review the below PR and share feedback. PR : https://github.com/apache/ignite/pull/4398 Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695 Regards, Saikat On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra <saikat.mai...@gmail.com> wrote: Hi Ray, Thank you for validating the changes, I see that in cluster mode when I am checking the IgniteSink it is working as desired. In stand alone mode I can see we are getting the exception class org.apache.ignite.IgniteException: Default Ignite instance has already been started. Please take a look into this sample application https://github.com/samaitra/streamers which I used to run it with flink in cluster mode. I am considering if I should make changes to run the IgniteSink in client mode similar to the ways flink connector for redis and flume were implemented in Apache Bahir https://github.com/apache/bahir-flink I will share update soon. Regards, Saikat On Sun, Jul 15, 2018 at 10:07 PM, Ray <ray...@cisco.com> wrote: Hello Saikat, I tried your newest code and wrote a simple word count application to test the sink. It appears there's still problems. Here's my code. import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.extensions._ import org.apache.flink.configuration.Configuration import org.apache.ignite.Ignition import org.apache.ignite.configuration.CacheConfiguration import scala.collection.JavaConverters._ object WordCount { def main(args: Array[String]) { val ignite = Ignition.start("ignite.xml") val cacheConfig = new CacheConfiguration[Any, Any]() ignite.destroyCache("aaa") cacheConfig.setName("aaa") cacheConfig.setSqlSchema("PUBLIC") ignite.createCache(cacheConfig) ignite.close() // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val igniteSink = new IgniteSink[java.util.Map[String, Int]]("aaa", "ignite.xml") igniteSink.setAllowOverwrite(false) igniteSink.setAutoFlushFrequency(1) igniteSink.open(new Configuration) // get input data val text = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,") val counts = text // split up the lines in pairs (2-tuples) containing: (word,1) .flatMap(_.toLowerCase.split("\\W+")) .filter(_.nonEmpty) .map((_, 1)) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .sum(1) // Convert to key/value format before ingesting to Ignite .mapWith { case (k: String, v: Int) => Map(k -> v).asJava } .addSink(igniteSink) try env.execute("Streaming WordCount1") catch { case e: Exception => // Exception handling. } finally igniteSink.close() } } I tried running this application in Idea and the error log snippet is as follows 07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched to FAILED class org.apache.ignite.IgniteException: Default Ignite instance has already been started. at org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990) at org.apache.ignite.Ignition.start(Ignition.java:355) at IgniteSink.open(IgniteSink.java:135) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite instance has already been started. at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1134) at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1069) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:955) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:724) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693) at org.apache.ignite.Ignition.start(Ignition.java:352) ... 7 more 07/16/2018 11:05:30 Job execution switched to status FAILING. -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/