Hi guys,

Thanks for helping with the fix!

As this is a development topic now and not a usage one, I’m BCC’ing the 
user-list and replacing it with dev-list.
Please continue the discussion there.

Andrey, Dmitry, please help with the review.

Thanks,
Stan

From: Saikat Maitra
Sent: 22 июля 2018 г. 8:28
To: u...@ignite.apache.org; ray...@cisco.com
Subject: Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Hi Ray, Andrew

As discussed I have fixed the issue with IgniteSink when running in cluster 
mode.

Please review the below PR and share feedback.

PR : https://github.com/apache/ignite/pull/4398
Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695

Regards,
Saikat




On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra <saikat.mai...@gmail.com> wrote:
Hi Ray,

Thank you for validating the changes, I see that in cluster mode when I am 
checking the IgniteSink it is working as desired. In stand alone mode I can see 
we are getting the exception class org.apache.ignite.IgniteException: Default 
Ignite instance has already been started.

Please take a look into this sample application 
https://github.com/samaitra/streamers which I used to run it with flink in 
cluster mode.

I am considering if I should make changes to run the IgniteSink in client mode 
similar to the ways flink connector for redis and flume were implemented in 
Apache Bahir

https://github.com/apache/bahir-flink

I will share update soon.

Regards,
Saikat



On Sun, Jul 15, 2018 at 10:07 PM, Ray <ray...@cisco.com> wrote:
Hello Saikat,

I tried your newest code and wrote a simple word count application to test
the sink.
It appears there's still problems.
Here's my code.



import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.configuration.Configuration
import org.apache.ignite.Ignition
import org.apache.ignite.configuration.CacheConfiguration

import scala.collection.JavaConverters._


object WordCount {

        def main(args: Array[String]) {

                val ignite = Ignition.start("ignite.xml")
                val cacheConfig = new CacheConfiguration[Any, Any]()
                ignite.destroyCache("aaa")
                cacheConfig.setName("aaa")
                cacheConfig.setSqlSchema("PUBLIC")
                ignite.createCache(cacheConfig)
                ignite.close()


                // set up the execution environment
                val env = StreamExecutionEnvironment.getExecutionEnvironment

                val igniteSink = new IgniteSink[java.util.Map[String, 
Int]]("aaa",
"ignite.xml")

                igniteSink.setAllowOverwrite(false)
                igniteSink.setAutoFlushFrequency(1)

                igniteSink.open(new Configuration)


                // get input data
                val text = env.fromElements(
                        "To be, or not to be,--that is the question:--",
                        "Whether 'tis nobler in the mind to suffer",
                        "The slings and arrows of outrageous fortune",
                        "Or to take arms against a sea of troubles,")


                val counts = text
                        // split up the lines in pairs (2-tuples) containing: 
(word,1)
                        .flatMap(_.toLowerCase.split("\\W+"))
                        .filter(_.nonEmpty)
                        .map((_, 1))
                        // group by the tuple field "0" and sum up tuple field 
"1"
                        .keyBy(0)
                        .sum(1)
                        // Convert to key/value format before ingesting to 
Ignite
                        .mapWith { case (k: String, v: Int) => Map(k -> 
v).asJava }
                        .addSink(igniteSink)

                try
                        env.execute("Streaming WordCount1")
                catch {
                        case e: Exception =>

                        // Exception handling.
                } finally igniteSink.close()

        }
}

I tried running this application in Idea and the error log snippet is as
follows

07/16/2018 11:05:30     aggregation -> Map -> Sink: Unnamed(4/8) switched to
FAILED 
class org.apache.ignite.IgniteException: Default Ignite instance has already
been started.
        at
org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990)
        at org.apache.ignite.Ignition.start(Ignition.java:355)
        at IgniteSink.open(IgniteSink.java:135)
        at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
instance has already been started.
        at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1134)
        at
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1069)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:955)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:724)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693)
        at org.apache.ignite.Ignition.start(Ignition.java:352)
        ... 7 more

07/16/2018 11:05:30     Job execution switched to status FAILING.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/



Reply via email to