ignite-1.6 Fixed race in IgniteSourceTask initialization, fixed code style issues.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d549daf7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d549daf7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d549daf7 Branch: refs/heads/master Commit: d549daf7b35496398844e47e4bf1bdacab6e814e Parents: fa197d2 Author: sboikov <[email protected]> Authored: Tue May 17 10:56:02 2016 +0300 Committer: sboikov <[email protected]> Committed: Tue May 17 10:56:02 2016 +0300 ---------------------------------------------------------------------- .../stream/kafka/connect/IgniteSourceTask.java | 122 ++++++++++++------- .../kafka/connect/IgniteSinkConnectorTest.java | 8 +- .../connect/IgniteSourceConnectorMock.java | 1 - .../connect/IgniteSourceConnectorTest.java | 16 +-- .../kafka/connect/IgniteSourceTaskMock.java | 4 +- .../kafka/connect/TestCacheEventFilter.java | 6 +- 6 files changed, 93 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java index 0d312ca..2f6a728 100644 --- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java +++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java @@ -50,6 +50,9 @@ public class IgniteSourceTask extends SourceTask { /** Logger. */ private static final Logger log = LoggerFactory.getLogger(IgniteSourceTask.class); + /** Tasks static monitor. */ + private static final Object lock = new Object(); + /** Event buffer size. */ private static int evtBufSize = 100000; @@ -63,7 +66,7 @@ public class IgniteSourceTask extends SourceTask { private static volatile boolean stopped = true; /** Ignite grid configuration file. */ - private static String igniteConfigFile; + private static String igniteCfgFile; /** Cache name. */ private static String cacheName; @@ -74,9 +77,6 @@ public class IgniteSourceTask extends SourceTask { /** Local listener. */ private static TaskLocalListener locLsnr = new TaskLocalListener(); - /** Remote filter. */ - private static TaskRemoteFilter rmtLsnr; - /** User-defined filter. */ private static IgnitePredicate<CacheEvent> filter; @@ -100,51 +100,54 @@ public class IgniteSourceTask extends SourceTask { * @param props Task properties. */ @Override public void start(Map<String, String> props) { - // Each task has the same parameters -- avoid setting more than once. - if (cacheName != null) - return; - - cacheName = props.get(IgniteSourceConstants.CACHE_NAME); - igniteConfigFile = props.get(IgniteSourceConstants.CACHE_CFG_PATH); - topics = props.get(IgniteSourceConstants.TOPIC_NAMES).split("\\s*,\\s*"); - - if (props.containsKey(IgniteSourceConstants.INTL_BUF_SIZE)) - evtBufSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BUF_SIZE)); - - if (props.containsKey(IgniteSourceConstants.INTL_BATCH_SIZE)) - evtBatchSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BATCH_SIZE)); - - if (props.containsKey(IgniteSourceConstants.CACHE_FILTER_CLASS)) { - String filterCls = props.get(IgniteSourceConstants.CACHE_FILTER_CLASS); - if (filterCls != null && !filterCls.isEmpty()) { - try { - Class<? extends IgnitePredicate<CacheEvent>> clazz = - (Class<? extends IgnitePredicate<CacheEvent>>)Class.forName(filterCls); - - filter = clazz.newInstance(); - } - catch (Exception e) { - log.error("Failed to instantiate the provided filter! " + - "User-enabled filtering is ignored!", e); + synchronized (lock) { + // Each task has the same parameters -- avoid setting more than once. + // Nothing to do if the task has been already started. + if (!stopped) + return; + + cacheName = props.get(IgniteSourceConstants.CACHE_NAME); + igniteCfgFile = props.get(IgniteSourceConstants.CACHE_CFG_PATH); + topics = props.get(IgniteSourceConstants.TOPIC_NAMES).split("\\s*,\\s*"); + + if (props.containsKey(IgniteSourceConstants.INTL_BUF_SIZE)) + evtBufSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BUF_SIZE)); + + if (props.containsKey(IgniteSourceConstants.INTL_BATCH_SIZE)) + evtBatchSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BATCH_SIZE)); + + if (props.containsKey(IgniteSourceConstants.CACHE_FILTER_CLASS)) { + String filterCls = props.get(IgniteSourceConstants.CACHE_FILTER_CLASS); + if (filterCls != null && !filterCls.isEmpty()) { + try { + Class<? extends IgnitePredicate<CacheEvent>> clazz = + (Class<? extends IgnitePredicate<CacheEvent>>)Class.forName(filterCls); + + filter = clazz.newInstance(); + } + catch (Exception e) { + log.error("Failed to instantiate the provided filter! " + + "User-enabled filtering is ignored!", e); + } } } - } - rmtLsnr = new TaskRemoteFilter(cacheName); + TaskRemoteFilter rmtLsnr = new TaskRemoteFilter(cacheName); - try { - int[] evts = cacheEvents(props.get(IgniteSourceConstants.CACHE_EVENTS)); + try { + int[] evts = cacheEvents(props.get(IgniteSourceConstants.CACHE_EVENTS)); - rmtLsnrId = IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName)) - .remoteListen(locLsnr, rmtLsnr, evts); - } - catch (Exception e) { - log.error("Failed to register event listener!", e); + rmtLsnrId = IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName)) + .remoteListen(locLsnr, rmtLsnr, evts); + } + catch (Exception e) { + log.error("Failed to register event listener!", e); - throw new ConnectException(e); - } - finally { - stopped = false; + throw new ConnectException(e); + } + finally { + stopped = false; + } } } @@ -228,10 +231,19 @@ public class IgniteSourceTask extends SourceTask { } /** + * Used by unit test to avoid restart node and valid state of the <code>stopped</code> flag. + * + * @param stopped Stopped flag. + */ + protected static void setStopped(boolean stopped) { + IgniteSourceTask.stopped = stopped ; + } + + /** * Local listener buffering cache events to be further sent to Kafka. */ private static class TaskLocalListener implements IgniteBiPredicate<UUID, CacheEvent> { - + /** {@inheritDoc} */ @Override public boolean apply(UUID id, CacheEvent evt) { try { if (!evtBuf.offer(evt, 10, TimeUnit.MILLISECONDS)) @@ -249,19 +261,23 @@ public class IgniteSourceTask extends SourceTask { * Remote filter. */ private static class TaskRemoteFilter implements IgnitePredicate<CacheEvent> { + /** */ @IgniteInstanceResource Ignite ignite; /** Cache name. */ private final String cacheName; + /** + * @param cacheName Cache name. + */ TaskRemoteFilter(String cacheName) { this.cacheName = cacheName; } + /** {@inheritDoc} */ @Override public boolean apply(CacheEvent evt) { - - Affinity affinity = ignite.affinity(cacheName); + Affinity<Object> affinity = ignite.affinity(cacheName); if (affinity.isPrimary(ignite.cluster().localNode(), evt.key())) { // Process this event. Ignored on backups. @@ -281,11 +297,13 @@ public class IgniteSourceTask extends SourceTask { private static class IgniteGrid { /** Constructor. */ private IgniteGrid() { + // No-op. } /** Instance holder. */ private static class Holder { - private static final Ignite IGNITE = Ignition.start(igniteConfigFile); + /** */ + private static final Ignite IGNITE = Ignition.start(igniteCfgFile); } /** @@ -300,15 +318,25 @@ public class IgniteSourceTask extends SourceTask { /** Cache events available for listening. */ private enum CacheEvt { + /** */ CREATED(EventType.EVT_CACHE_ENTRY_CREATED), + /** */ DESTROYED(EventType.EVT_CACHE_ENTRY_DESTROYED), + /** */ PUT(EventType.EVT_CACHE_OBJECT_PUT), + /** */ READ(EventType.EVT_CACHE_OBJECT_READ), + /** */ REMOVED(EventType.EVT_CACHE_OBJECT_REMOVED), + /** */ LOCKED(EventType.EVT_CACHE_OBJECT_LOCKED), + /** */ UNLOCKED(EventType.EVT_CACHE_OBJECT_UNLOCKED), + /** */ SWAPPED(EventType.EVT_CACHE_OBJECT_SWAPPED), + /** */ UNSWAPPED(EventType.EVT_CACHE_OBJECT_UNSWAPPED), + /** */ EXPIRED(EventType.EVT_CACHE_OBJECT_EXPIRED); /** Internal Ignite event id. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java index 6e6d65d..1814c69 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java @@ -93,12 +93,12 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest { for (String topic : TOPICS) kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR); - WorkerConfig workerConfig = new StandaloneConfig(makeWorkerProps()); + WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps()); - OffsetBackingStore offsetBackingStore = mock(OffsetBackingStore.class); - offsetBackingStore.configure(anyObject(Map.class)); + OffsetBackingStore offBackingStore = mock(OffsetBackingStore.class); + offBackingStore.configure(anyObject(Map.class)); - worker = new Worker(workerConfig, offsetBackingStore); + worker = new Worker(workerCfg, offBackingStore); worker.start(); herder = new StandaloneHerder(worker); http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java index d983c67..0157a17 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java @@ -23,7 +23,6 @@ import org.apache.kafka.connect.connector.Task; * Source connector mock for tests for using the task mock. */ public class IgniteSourceConnectorMock extends IgniteSourceConnector { - /** {@inheritDoc} */ @Override public Class<? extends Task> taskClass() { return IgniteSourceTaskMock.class; http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java index 13b6887..87b11b9 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java @@ -96,12 +96,12 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { @Override protected void beforeTest() throws Exception { kafkaBroker = new TestKafkaBroker(); - WorkerConfig workerConfig = new StandaloneConfig(makeWorkerProps()); + WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps()); - MemoryOffsetBackingStore offsetBackingStore = new MemoryOffsetBackingStore(); - offsetBackingStore.configure(workerConfig.originals()); + MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore(); + offBackingStore.configure(workerCfg.originals()); - worker = new Worker(workerConfig, offsetBackingStore); + worker = new Worker(workerCfg, offBackingStore); worker.start(); herder = new StandaloneHerder(worker); @@ -210,6 +210,7 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { * Sends messages to the grid. * * @return Map of key value messages. + * @throws IOException If failed. */ private Map<String, String> sendData() throws IOException { Map<String, String> keyValMap = new HashMap<>(); @@ -257,7 +258,7 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { long start = System.currentTimeMillis(); try { - while (false || (System.currentTimeMillis() - start) < 10000) { + while ((System.currentTimeMillis() - start) < 10000) { ConsumerRecords<String, CacheEvent> records = consumer.poll(10); for (ConsumerRecord<String, CacheEvent> record : records) { System.out.println("Event: offset = " + record.offset() + ", key = " + record.key() @@ -274,9 +275,9 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { consumer.close(); if (conditioned) - assertTrue(evtCnt == (EVENT_CNT * TOPICS.length) / 2); + assertEquals((EVENT_CNT * TOPICS.length) / 2, evtCnt); else - assertTrue(evtCnt == EVENT_CNT * TOPICS.length); + assertEquals(EVENT_CNT * TOPICS.length, evtCnt); } } @@ -306,6 +307,7 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { * Creates properties for Kafka Connect workers. * * @return Worker configurations. + * @throws IOException If failed. */ private Map<String, String> makeWorkerProps() throws IOException { Map<String, String> props = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java index 5237e98..4535ad5 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java @@ -21,11 +21,11 @@ package org.apache.ignite.stream.kafka.connect; * Source task mock for tests. It avoids closing the grid from test to test. */ public class IgniteSourceTaskMock extends IgniteSourceTask { - /** {@inheritDoc} */ @Override public void stop() { + // Don't stop the grid for tests. stopRemoteListen(); - // don't stop the grid for tests. + setStopped(true); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java index 8978db0..bf28a2b 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java @@ -24,8 +24,8 @@ import org.apache.ignite.lang.IgnitePredicate; * Test user-defined filter. */ class TestCacheEventFilter implements IgnitePredicate<CacheEvent> { - - @Override public boolean apply(CacheEvent event) { - return ((String)event.key()).startsWith("conditioned_"); + /** {@inheritDoc} */ + @Override public boolean apply(CacheEvent evt) { + return ((String)evt.key()).startsWith("conditioned_"); } }
