This is an automated email from the ASF dual-hosted git repository.
samaitra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 5ada52a Fix ignite sink startup - Fixes #9.
5ada52a is described below
commit 5ada52ab55e5d1de08115553bfe0e0dad40bafaa
Author: Dominik Przybysz <[email protected]>
AuthorDate: Sat Mar 14 15:11:51 2020 -0500
Fix ignite sink startup - Fixes #9.
Signed-off-by: samaitra <[email protected]>
---
.../main/java/org/apache/ignite/sink/flink/IgniteSink.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git
a/modules/flink-ext/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java
b/modules/flink-ext/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java
index 8deb0d7..a9add62 100644
---
a/modules/flink-ext/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java
+++
b/modules/flink-ext/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java
@@ -143,10 +143,18 @@ public class IgniteSink<IN> extends RichSinkFunction<IN> {
A.notNull(cacheName, "Cache name");
try {
- // if an ignite instance is already started in same JVM then use
it.
- this.ignite = Ignition.ignite();
- } catch (IgniteIllegalStateException e) {
this.ignite = Ignition.start(igniteCfgFile);
+ } catch (IgniteException e) {
+ if (e.getMessage().contains("instance has already been started."))
{
+ // ignite instance is already started in same JVM then use it
+ try {
+ this.ignite = Ignition.ignite();
+ } catch(IgniteIllegalStateException illegalStateException){
+ throw new IgniteException("Cannot connect to existing
ignite instance", e);
+ }
+ } else {
+ throw e;
+ }
}
this.ignite.getOrCreateCache(cacheName);