[
https://issues.apache.org/jira/browse/IGNITE-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16545977#comment-16545977
]
Ray commented on IGNITE-8697:
-----------------------------
[~samaitra]
I tried your newest code and wrote a simple word count application to test
the sink.
It appears there's still problems.
Here's my code.
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.configuration.Configuration
import org.apache.ignite.Ignition
import org.apache.ignite.configuration.CacheConfiguration
import scala.collection.JavaConverters._
object WordCount {
def main(args: Array[String]) {
val ignite = Ignition.start("ignite.xml")
val cacheConfig = new CacheConfiguration[Any, Any]()
ignite.destroyCache("aaa")
cacheConfig.setName("aaa")
cacheConfig.setSqlSchema("PUBLIC")
ignite.createCache(cacheConfig)
ignite.close()
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val igniteSink = new IgniteSink[java.util.Map[String,
Int]]("aaa",
"ignite.xml")
igniteSink.setAllowOverwrite(false)
igniteSink.setAutoFlushFrequency(1)
igniteSink.open(new Configuration)
// get input data
val text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = text
// split up the lines in pairs (2-tuples) containing:
(word,1)
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
// group by the tuple field "0" and sum up tuple field
"1"
.keyBy(0)
.sum(1)
// Convert to key/value format before ingesting to
Ignite
.mapWith \{ case (k: String, v: Int) => Map(k ->
v).asJava }
.addSink(igniteSink)
try
env.execute("Streaming WordCount1")
catch {
case e: Exception =>
// Exception handling.
} finally igniteSink.close()
}
}
I tried running this application in Idea and the error log snippet is as
follows
07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched to
FAILED
class org.apache.ignite.IgniteException: Default Ignite instance has already
been started.
at
org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990)
at org.apache.ignite.Ignition.start(Ignition.java:355)
at IgniteSink.open(IgniteSink.java:135)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
instance has already been started.
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1134)
at
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1069)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:955)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:724)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693)
at org.apache.ignite.Ignition.start(Ignition.java:352)
... 7 more
07/16/2018 11:05:30 Job execution switched to status FAILING.
> Flink sink throws java.lang.IllegalArgumentException when running in flink
> cluster mode.
> ----------------------------------------------------------------------------------------
>
> Key: IGNITE-8697
> URL: https://issues.apache.org/jira/browse/IGNITE-8697
> Project: Ignite
> Issue Type: Bug
> Affects Versions: 2.3, 2.4, 2.5
> Reporter: Ray
> Priority: Blocker
>
> if I submit the Application to the Flink Cluster using Ignite flink sink I
> get this error
>
> java.lang.ExceptionInInitializerError
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201)
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175)
> at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165)
> at
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97)
> at
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: Ouch! Argument is invalid:
> Cache name must not be null or empty.
> at
> org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109)
> at
> org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581)
> at
> org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284)
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.<clinit>(IgniteSink.java:183)
> ... 27 more
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)