[
https://issues.apache.org/jira/browse/IGNITE-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518869#comment-16518869
]
Saikat Maitra commented on IGNITE-8697:
---------------------------------------
[~ldz] [~amashenkov]
I will take a look. I am working on a similar story IgniteSource and facing
similar issues to get handler to sources from StreamExecutionEnvironment. The
AbstractStreamOperatorTestHarness would make it easier to write tests for Flink
Sources and Sinks but it is not yet available in public API.
> Flink sink throws java.lang.IllegalArgumentException when running in flink
> cluster mode.
> ----------------------------------------------------------------------------------------
>
> Key: IGNITE-8697
> URL: https://issues.apache.org/jira/browse/IGNITE-8697
> Project: Ignite
> Issue Type: Bug
> Affects Versions: 2.3, 2.4, 2.5
> Reporter: Ray
> Priority: Blocker
>
> if I submit the Application to the Flink Cluster using Ignite flink sink I
> get this error
>
> java.lang.ExceptionInInitializerError
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201)
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175)
> at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165)
> at
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97)
> at
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: Ouch! Argument is invalid:
> Cache name must not be null or empty.
> at
> org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109)
> at
> org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581)
> at
> org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284)
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.<clinit>(IgniteSink.java:183)
> ... 27 more
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)