[
https://issues.apache.org/jira/browse/IGNITE-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16553886#comment-16553886
]
Saikat Maitra commented on IGNITE-8697:
---------------------------------------
Hi [~amashenkov]
Thank you for reviewing and sharing feedback, Flink Sink can be used in
standalone mode or in cluster mode. In standalone mode we may reuse Ignite
instance but in cluster mode each jvm have separate ignite instances.
We purposefully remove the static fields as they may not guarantee for reuse of
the same Ignite instance in cluster mode. Flink by default serialize everything
and send to each cluster member.
I can make the change to use Ignition.ignite() and if no Ignite instance is
found then I can use Ignition.start() but my understanding is it will throw
IgniteIllegalStateException when no Ignite instance is running and we will need
to catch that and call Ignition.start().
Please let me know your feedback.
Regards,
Saikat
> Flink sink throws java.lang.IllegalArgumentException when running in flink
> cluster mode.
> ----------------------------------------------------------------------------------------
>
> Key: IGNITE-8697
> URL: https://issues.apache.org/jira/browse/IGNITE-8697
> Project: Ignite
> Issue Type: Bug
> Affects Versions: 2.3, 2.4, 2.5
> Reporter: Ray
> Priority: Blocker
>
> if I submit the Application to the Flink Cluster using Ignite flink sink I
> get this error
>
> java.lang.ExceptionInInitializerError
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201)
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175)
> at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165)
> at
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97)
> at
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: Ouch! Argument is invalid:
> Cache name must not be null or empty.
> at
> org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109)
> at
> org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581)
> at
> org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284)
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.<clinit>(IgniteSink.java:183)
> ... 27 more
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)