IGNITE-8697 Flink sink throws java.lang.IllegalArgumentException when running in flink cluster mode. - Fixes #4398.
Signed-off-by: Dmitriy Pavlov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/891da2a5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/891da2a5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/891da2a5 Branch: refs/heads/ignite-8446 Commit: 891da2a5b61e2aa70a0decf5afca91f5294d94b9 Parents: 137dd06 Author: samaitra <[email protected]> Authored: Wed Aug 1 21:02:02 2018 +0300 Committer: Dmitriy Pavlov <[email protected]> Committed: Wed Aug 1 21:02:02 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/sink/flink/IgniteSink.java | 88 +++++------ .../sink/flink/FlinkIgniteSinkSelfTest.java | 154 +++---------------- .../flink/src/test/resources/example-ignite.xml | 7 +- 3 files changed, 71 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/891da2a5/modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java ---------------------------------------------------------------------- diff --git a/modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java b/modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java index 2f18f80..ad29490 100644 --- a/modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java +++ b/modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java @@ -18,10 +18,12 @@ package org.apache.ignite.sink.flink; import java.util.Map; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteIllegalStateException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.Ignition; import org.apache.ignite.internal.util.typedef.internal.A; @@ -34,7 +36,7 @@ public class IgniteSink<IN> extends RichSinkFunction<IN> { private static final long DFLT_FLUSH_FREQ = 10000L; /** Logger. */ - private final transient IgniteLogger log; + private transient IgniteLogger log; /** Automatic flush frequency. */ private long autoFlushFrequency = DFLT_FLUSH_FREQ; @@ -43,13 +45,19 @@ public class IgniteSink<IN> extends RichSinkFunction<IN> { private boolean allowOverwrite = false; /** Flag for stopped state. */ - private static volatile boolean stopped = true; + private volatile boolean stopped = true; + + /** Ignite instance. */ + protected transient Ignite ignite; + + /** Ignite Data streamer instance. */ + protected transient IgniteDataStreamer streamer; /** Ignite grid configuration file. */ - private static String igniteCfgFile; + protected final String igniteCfgFile; /** Cache name. */ - private static String cacheName; + protected final String cacheName; /** * Gets the cache name. @@ -70,6 +78,15 @@ public class IgniteSink<IN> extends RichSinkFunction<IN> { } /** + * Gets the Ignite instance. + * + * @return Ignite instance. + */ + public Ignite getIgnite() { + return ignite; + } + + /** * Obtains data flush frequency. * * @return Flush frequency. @@ -109,12 +126,10 @@ public class IgniteSink<IN> extends RichSinkFunction<IN> { * Default IgniteSink constructor. * * @param cacheName Cache name. - * @param igniteCfgFile Ignite configuration file. */ public IgniteSink(String cacheName, String igniteCfgFile) { this.cacheName = cacheName; this.igniteCfgFile = igniteCfgFile; - this.log = SinkContext.getIgnite().log(); } /** @@ -122,13 +137,26 @@ public class IgniteSink<IN> extends RichSinkFunction<IN> { * * @throws IgniteException If failed. */ + @Override @SuppressWarnings("unchecked") - public void start() throws IgniteException { + public void open(Configuration parameter) { A.notNull(igniteCfgFile, "Ignite config file"); A.notNull(cacheName, "Cache name"); - SinkContext.getStreamer().autoFlushFrequency(autoFlushFrequency); - SinkContext.getStreamer().allowOverwrite(allowOverwrite); + try { + // if an ignite instance is already started in same JVM then use it. + this.ignite = Ignition.ignite(); + } catch (IgniteIllegalStateException e) { + this.ignite = Ignition.start(igniteCfgFile); + } + + this.ignite.getOrCreateCache(cacheName); + + this.log = this.ignite.log(); + + this.streamer = this.ignite.dataStreamer(cacheName); + this.streamer.autoFlushFrequency(autoFlushFrequency); + this.streamer.allowOverwrite(allowOverwrite); stopped = false; } @@ -138,15 +166,14 @@ public class IgniteSink<IN> extends RichSinkFunction<IN> { * * @throws IgniteException If failed. */ - public void stop() throws IgniteException { + @Override + public void close() { if (stopped) return; stopped = true; - SinkContext.getStreamer().close(); - SinkContext.getIgnite().cache(cacheName).close(); - SinkContext.getIgnite().close(); + this.streamer.close(); } /** @@ -162,43 +189,10 @@ public class IgniteSink<IN> extends RichSinkFunction<IN> { if (!(in instanceof Map)) throw new IgniteException("Map as a streamer input is expected!"); - SinkContext.getStreamer().addData((Map)in); + this.streamer.addData((Map)in); } catch (Exception e) { log.error("Error while processing IN of " + cacheName, e); } } - - /** - * Streamer context initializing grid and data streamer instances on demand. - */ - private static class SinkContext { - /** Constructor. */ - private SinkContext() { - } - - /** Instance holder. */ - private static class Holder { - private static final Ignite IGNITE = Ignition.start(igniteCfgFile); - private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName); - } - - /** - * Obtains grid instance. - * - * @return Grid instance. - */ - private static Ignite getIgnite() { - return Holder.IGNITE; - } - - /** - * Obtains data streamer instance. - * - * @return Data streamer instance. - */ - private static IgniteDataStreamer getStreamer() { - return Holder.STREAMER; - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/891da2a5/modules/flink/src/test/java/org/apache/ignite/sink/flink/FlinkIgniteSinkSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/flink/src/test/java/org/apache/ignite/sink/flink/FlinkIgniteSinkSelfTest.java b/modules/flink/src/test/java/org/apache/ignite/sink/flink/FlinkIgniteSinkSelfTest.java index 50eedb8..eb59379 100644 --- a/modules/flink/src/test/java/org/apache/ignite/sink/flink/FlinkIgniteSinkSelfTest.java +++ b/modules/flink/src/test/java/org/apache/ignite/sink/flink/FlinkIgniteSinkSelfTest.java @@ -19,19 +19,11 @@ package org.apache.ignite.sink.flink; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CountDownLatch; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.ignite.Ignite; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.events.CacheEvent; -import org.apache.ignite.events.EventType; -import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; - /** * Tests for {@link IgniteSink}. */ @@ -39,147 +31,51 @@ public class FlinkIgniteSinkSelfTest extends GridCommonAbstractTest { /** Cache name. */ private static final String TEST_CACHE = "testCache"; - /** Cache entries count. */ - private static final int CACHE_ENTRY_COUNT = 10000; - - /** Streaming events for testing. */ - private static final long DFLT_STREAMING_EVENT = 10000; - - /** Ignite instance. */ - private Ignite ignite; - /** Ignite test configuration file. */ private static final String GRID_CONF_FILE = "modules/flink/src/test/resources/example-ignite.xml"; - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 20_000; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected void beforeTest() throws Exception { - IgniteConfiguration cfg = loadConfiguration(GRID_CONF_FILE); - - cfg.setClientMode(false); - - ignite = startGrid("igniteServerNode", cfg); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** - * Tests for the Flink sink. - * Ignite started in sink based on what is specified in the configuration file. - * - * @throws Exception - */ - @SuppressWarnings("unchecked") - public void testFlinkIgniteSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - env.getConfig().disableSysoutLogging(); + public void testIgniteSink() throws Exception { + Configuration configuration = new Configuration(); IgniteSink igniteSink = new IgniteSink(TEST_CACHE, GRID_CONF_FILE); igniteSink.setAllowOverwrite(true); - igniteSink.setAutoFlushFrequency(10); + igniteSink.setAutoFlushFrequency(1L); + + igniteSink.open(configuration); - igniteSink.start(); + Map<String, String> myData = new HashMap<>(); + myData.put("testData", "testValue"); - CacheListener listener = subscribeToPutEvents(); + igniteSink.invoke(myData); - DataStream<Map> stream = env.addSource(new SourceFunction<Map>() { + /** waiting for a small duration for the cache flush to complete */ + Thread.sleep(2000); - private boolean running = true; + assertEquals("testValue", igniteSink.getIgnite().getOrCreateCache(TEST_CACHE).get("testData")); + } - @Override public void run(SourceContext<Map> ctx) throws Exception { - Map testDataMap = new HashMap<>(); - long cnt = 0; + public void testIgniteSinkStreamExecution() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - while (running && (cnt < DFLT_STREAMING_EVENT)) { - testDataMap.put(cnt, "ignite-" + cnt); - cnt++; - } + IgniteSink igniteSink = new IgniteSink(TEST_CACHE, GRID_CONF_FILE); - ctx.collect(testDataMap); - } + igniteSink.setAllowOverwrite(true); - @Override public void cancel() { - running = false; - } - }).setParallelism(1); + igniteSink.setAutoFlushFrequency(1); - assertEquals(0, ignite.cache(TEST_CACHE).size()); + Map<String, String> myData = new HashMap<>(); + myData.put("testdata", "testValue"); + DataStream<Map> stream = env.fromElements(myData); - // sink data into the grid. stream.addSink(igniteSink); - try { env.execute(); - - CountDownLatch latch = listener.getLatch(); - - // Enough events was handled in 10 seconds. Limited by test's timeout. - latch.await(); - - unsubscribeToPutEvents(listener); - - assertEquals(DFLT_STREAMING_EVENT, ignite.getOrCreateCache(TEST_CACHE).size()); - - for (long i = 0; i < DFLT_STREAMING_EVENT; i++) - assertEquals("ignite-" + i, ignite.getOrCreateCache(TEST_CACHE).get(i)); - - } - finally { - igniteSink.stop(); } - } - - /** - * Sets a listener for {@link EventType#EVT_CACHE_OBJECT_PUT}. - * - * @return Cache listener. - */ - private CacheListener subscribeToPutEvents() { - // Listen to cache PUT events and expect as many as messages as test data items. - CacheListener listener = new CacheListener(); - - ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).localListen(listener, EVT_CACHE_OBJECT_PUT); - - return listener; - } - - /** - * Removes the listener for {@link EventType#EVT_CACHE_OBJECT_PUT}. - * - * @param listener Cache listener. - */ - private void unsubscribeToPutEvents(CacheListener listener) { - ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).stopLocalListen(listener, EVT_CACHE_OBJECT_PUT); - } - - /** Listener. */ - private class CacheListener implements IgnitePredicate<CacheEvent> { - private final CountDownLatch latch = new CountDownLatch(CACHE_ENTRY_COUNT); - - /** @return Latch. */ - public CountDownLatch getLatch() { - return latch; - } - - /** - * @param evt Cache Event. - * @return {@code true}. - */ - @Override public boolean apply(CacheEvent evt) { - latch.countDown(); - - return true; + catch (Exception e) { + e.printStackTrace(); + fail("Stream execution process failed."); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/891da2a5/modules/flink/src/test/resources/example-ignite.xml ---------------------------------------------------------------------- diff --git a/modules/flink/src/test/resources/example-ignite.xml b/modules/flink/src/test/resources/example-ignite.xml index b8ddc8f..d4f4dc1 100644 --- a/modules/flink/src/test/resources/example-ignite.xml +++ b/modules/flink/src/test/resources/example-ignite.xml @@ -31,7 +31,7 @@ http://www.springframework.org/schema/util/spring-util.xsd"> <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Enable client mode. --> - <property name="clientMode" value="true"/> + <property name="clientMode" value="false"/> <!-- Cache accessed from IgniteSink. --> <property name="cacheConfiguration"> @@ -49,6 +49,9 @@ <list> <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). --> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> + <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> + <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> + </list> </property> @@ -59,7 +62,7 @@ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> <property name="addresses"> <list> - <value>127.0.0.1:47500</value> + <value>127.0.0.1:47500..47509</value> </list> </property> </bean>
