ignite-1.6 Fixed race in IgniteSourceTask initialization, fixed code style 
issues.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d549daf7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d549daf7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d549daf7

Branch: refs/heads/master
Commit: d549daf7b35496398844e47e4bf1bdacab6e814e
Parents: fa197d2
Author: sboikov <[email protected]>
Authored: Tue May 17 10:56:02 2016 +0300
Committer: sboikov <[email protected]>
Committed: Tue May 17 10:56:02 2016 +0300

----------------------------------------------------------------------
 .../stream/kafka/connect/IgniteSourceTask.java  | 122 ++++++++++++-------
 .../kafka/connect/IgniteSinkConnectorTest.java  |   8 +-
 .../connect/IgniteSourceConnectorMock.java      |   1 -
 .../connect/IgniteSourceConnectorTest.java      |  16 +--
 .../kafka/connect/IgniteSourceTaskMock.java     |   4 +-
 .../kafka/connect/TestCacheEventFilter.java     |   6 +-
 6 files changed, 93 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
----------------------------------------------------------------------
diff --git 
a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
 
b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
index 0d312ca..2f6a728 100644
--- 
a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
+++ 
b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
@@ -50,6 +50,9 @@ public class IgniteSourceTask extends SourceTask {
     /** Logger. */
     private static final Logger log = 
LoggerFactory.getLogger(IgniteSourceTask.class);
 
+    /** Tasks static monitor. */
+    private static final Object lock = new Object();
+
     /** Event buffer size. */
     private static int evtBufSize = 100000;
 
@@ -63,7 +66,7 @@ public class IgniteSourceTask extends SourceTask {
     private static volatile boolean stopped = true;
 
     /** Ignite grid configuration file. */
-    private static String igniteConfigFile;
+    private static String igniteCfgFile;
 
     /** Cache name. */
     private static String cacheName;
@@ -74,9 +77,6 @@ public class IgniteSourceTask extends SourceTask {
     /** Local listener. */
     private static TaskLocalListener locLsnr = new TaskLocalListener();
 
-    /** Remote filter. */
-    private static TaskRemoteFilter rmtLsnr;
-
     /** User-defined filter. */
     private static IgnitePredicate<CacheEvent> filter;
 
@@ -100,51 +100,54 @@ public class IgniteSourceTask extends SourceTask {
      * @param props Task properties.
      */
     @Override public void start(Map<String, String> props) {
-        // Each task has the same parameters -- avoid setting more than once.
-        if (cacheName != null)
-            return;
-
-        cacheName = props.get(IgniteSourceConstants.CACHE_NAME);
-        igniteConfigFile = props.get(IgniteSourceConstants.CACHE_CFG_PATH);
-        topics = 
props.get(IgniteSourceConstants.TOPIC_NAMES).split("\\s*,\\s*");
-
-        if (props.containsKey(IgniteSourceConstants.INTL_BUF_SIZE))
-            evtBufSize = 
Integer.parseInt(props.get(IgniteSourceConstants.INTL_BUF_SIZE));
-
-        if (props.containsKey(IgniteSourceConstants.INTL_BATCH_SIZE))
-            evtBatchSize = 
Integer.parseInt(props.get(IgniteSourceConstants.INTL_BATCH_SIZE));
-
-        if (props.containsKey(IgniteSourceConstants.CACHE_FILTER_CLASS)) {
-            String filterCls = 
props.get(IgniteSourceConstants.CACHE_FILTER_CLASS);
-            if (filterCls != null && !filterCls.isEmpty()) {
-                try {
-                    Class<? extends IgnitePredicate<CacheEvent>> clazz =
-                        (Class<? extends 
IgnitePredicate<CacheEvent>>)Class.forName(filterCls);
-
-                    filter = clazz.newInstance();
-                }
-                catch (Exception e) {
-                    log.error("Failed to instantiate the provided filter! " +
-                        "User-enabled filtering is ignored!", e);
+        synchronized (lock) {
+            // Each task has the same parameters -- avoid setting more than 
once.
+            // Nothing to do if the task has been already started.
+            if (!stopped)
+                return;
+
+            cacheName = props.get(IgniteSourceConstants.CACHE_NAME);
+            igniteCfgFile = props.get(IgniteSourceConstants.CACHE_CFG_PATH);
+            topics = 
props.get(IgniteSourceConstants.TOPIC_NAMES).split("\\s*,\\s*");
+
+            if (props.containsKey(IgniteSourceConstants.INTL_BUF_SIZE))
+                evtBufSize = 
Integer.parseInt(props.get(IgniteSourceConstants.INTL_BUF_SIZE));
+
+            if (props.containsKey(IgniteSourceConstants.INTL_BATCH_SIZE))
+                evtBatchSize = 
Integer.parseInt(props.get(IgniteSourceConstants.INTL_BATCH_SIZE));
+
+            if (props.containsKey(IgniteSourceConstants.CACHE_FILTER_CLASS)) {
+                String filterCls = 
props.get(IgniteSourceConstants.CACHE_FILTER_CLASS);
+                if (filterCls != null && !filterCls.isEmpty()) {
+                    try {
+                        Class<? extends IgnitePredicate<CacheEvent>> clazz =
+                            (Class<? extends 
IgnitePredicate<CacheEvent>>)Class.forName(filterCls);
+
+                        filter = clazz.newInstance();
+                    }
+                    catch (Exception e) {
+                        log.error("Failed to instantiate the provided filter! 
" +
+                            "User-enabled filtering is ignored!", e);
+                    }
                 }
             }
-        }
 
-        rmtLsnr = new TaskRemoteFilter(cacheName);
+            TaskRemoteFilter rmtLsnr = new TaskRemoteFilter(cacheName);
 
-        try {
-            int[] evts = 
cacheEvents(props.get(IgniteSourceConstants.CACHE_EVENTS));
+            try {
+                int[] evts = 
cacheEvents(props.get(IgniteSourceConstants.CACHE_EVENTS));
 
-            rmtLsnrId = 
IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
-                .remoteListen(locLsnr, rmtLsnr, evts);
-        }
-        catch (Exception e) {
-            log.error("Failed to register event listener!", e);
+                rmtLsnrId = 
IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
+                    .remoteListen(locLsnr, rmtLsnr, evts);
+            }
+            catch (Exception e) {
+                log.error("Failed to register event listener!", e);
 
-            throw new ConnectException(e);
-        }
-        finally {
-            stopped = false;
+                throw new ConnectException(e);
+            }
+            finally {
+                stopped = false;
+            }
         }
     }
 
@@ -228,10 +231,19 @@ public class IgniteSourceTask extends SourceTask {
     }
 
     /**
+     * Used by unit test to avoid restart node and valid state of the 
<code>stopped</code> flag.
+     *
+     * @param stopped Stopped flag.
+     */
+    protected static void setStopped(boolean stopped) {
+        IgniteSourceTask.stopped = stopped ;
+    }
+
+    /**
      * Local listener buffering cache events to be further sent to Kafka.
      */
     private static class TaskLocalListener implements IgniteBiPredicate<UUID, 
CacheEvent> {
-
+        /** {@inheritDoc} */
         @Override public boolean apply(UUID id, CacheEvent evt) {
             try {
                 if (!evtBuf.offer(evt, 10, TimeUnit.MILLISECONDS))
@@ -249,19 +261,23 @@ public class IgniteSourceTask extends SourceTask {
      * Remote filter.
      */
     private static class TaskRemoteFilter implements 
IgnitePredicate<CacheEvent> {
+        /** */
         @IgniteInstanceResource
         Ignite ignite;
 
         /** Cache name. */
         private final String cacheName;
 
+        /**
+         * @param cacheName Cache name.
+         */
         TaskRemoteFilter(String cacheName) {
             this.cacheName = cacheName;
         }
 
+        /** {@inheritDoc} */
         @Override public boolean apply(CacheEvent evt) {
-
-            Affinity affinity = ignite.affinity(cacheName);
+            Affinity<Object> affinity = ignite.affinity(cacheName);
 
             if (affinity.isPrimary(ignite.cluster().localNode(), evt.key())) {
                 // Process this event. Ignored on backups.
@@ -281,11 +297,13 @@ public class IgniteSourceTask extends SourceTask {
     private static class IgniteGrid {
         /** Constructor. */
         private IgniteGrid() {
+            // No-op.
         }
 
         /** Instance holder. */
         private static class Holder {
-            private static final Ignite IGNITE = 
Ignition.start(igniteConfigFile);
+            /** */
+            private static final Ignite IGNITE = Ignition.start(igniteCfgFile);
         }
 
         /**
@@ -300,15 +318,25 @@ public class IgniteSourceTask extends SourceTask {
 
     /** Cache events available for listening. */
     private enum CacheEvt {
+        /** */
         CREATED(EventType.EVT_CACHE_ENTRY_CREATED),
+        /** */
         DESTROYED(EventType.EVT_CACHE_ENTRY_DESTROYED),
+        /** */
         PUT(EventType.EVT_CACHE_OBJECT_PUT),
+        /** */
         READ(EventType.EVT_CACHE_OBJECT_READ),
+        /** */
         REMOVED(EventType.EVT_CACHE_OBJECT_REMOVED),
+        /** */
         LOCKED(EventType.EVT_CACHE_OBJECT_LOCKED),
+        /** */
         UNLOCKED(EventType.EVT_CACHE_OBJECT_UNLOCKED),
+        /** */
         SWAPPED(EventType.EVT_CACHE_OBJECT_SWAPPED),
+        /** */
         UNSWAPPED(EventType.EVT_CACHE_OBJECT_UNSWAPPED),
+        /** */
         EXPIRED(EventType.EVT_CACHE_OBJECT_EXPIRED);
 
         /** Internal Ignite event id. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
index 6e6d65d..1814c69 100644
--- 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
+++ 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -93,12 +93,12 @@ public class IgniteSinkConnectorTest extends 
GridCommonAbstractTest {
         for (String topic : TOPICS)
             kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR);
 
-        WorkerConfig workerConfig = new StandaloneConfig(makeWorkerProps());
+        WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps());
 
-        OffsetBackingStore offsetBackingStore = mock(OffsetBackingStore.class);
-        offsetBackingStore.configure(anyObject(Map.class));
+        OffsetBackingStore offBackingStore = mock(OffsetBackingStore.class);
+        offBackingStore.configure(anyObject(Map.class));
 
-        worker = new Worker(workerConfig, offsetBackingStore);
+        worker = new Worker(workerCfg, offBackingStore);
         worker.start();
 
         herder = new StandaloneHerder(worker);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
----------------------------------------------------------------------
diff --git 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
index d983c67..0157a17 100644
--- 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
+++ 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
@@ -23,7 +23,6 @@ import org.apache.kafka.connect.connector.Task;
  * Source connector mock for tests for using the task mock.
  */
 public class IgniteSourceConnectorMock extends IgniteSourceConnector {
-
     /** {@inheritDoc} */
     @Override public Class<? extends Task> taskClass() {
         return IgniteSourceTaskMock.class;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
----------------------------------------------------------------------
diff --git 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
index 13b6887..87b11b9 100644
--- 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
+++ 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
@@ -96,12 +96,12 @@ public class IgniteSourceConnectorTest extends 
GridCommonAbstractTest {
     @Override protected void beforeTest() throws Exception {
         kafkaBroker = new TestKafkaBroker();
 
-        WorkerConfig workerConfig = new StandaloneConfig(makeWorkerProps());
+        WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps());
 
-        MemoryOffsetBackingStore offsetBackingStore = new 
MemoryOffsetBackingStore();
-        offsetBackingStore.configure(workerConfig.originals());
+        MemoryOffsetBackingStore offBackingStore = new 
MemoryOffsetBackingStore();
+        offBackingStore.configure(workerCfg.originals());
 
-        worker = new Worker(workerConfig, offsetBackingStore);
+        worker = new Worker(workerCfg, offBackingStore);
         worker.start();
 
         herder = new StandaloneHerder(worker);
@@ -210,6 +210,7 @@ public class IgniteSourceConnectorTest extends 
GridCommonAbstractTest {
      * Sends messages to the grid.
      *
      * @return Map of key value messages.
+     * @throws IOException If failed.
      */
     private Map<String, String> sendData() throws IOException {
         Map<String, String> keyValMap = new HashMap<>();
@@ -257,7 +258,7 @@ public class IgniteSourceConnectorTest extends 
GridCommonAbstractTest {
         long start = System.currentTimeMillis();
 
         try {
-            while (false || (System.currentTimeMillis() - start) < 10000) {
+            while ((System.currentTimeMillis() - start) < 10000) {
                 ConsumerRecords<String, CacheEvent> records = 
consumer.poll(10);
                 for (ConsumerRecord<String, CacheEvent> record : records) {
                     System.out.println("Event: offset = " + record.offset() + 
", key = " + record.key()
@@ -274,9 +275,9 @@ public class IgniteSourceConnectorTest extends 
GridCommonAbstractTest {
             consumer.close();
 
             if (conditioned)
-                assertTrue(evtCnt == (EVENT_CNT * TOPICS.length) / 2);
+                assertEquals((EVENT_CNT * TOPICS.length) / 2, evtCnt);
             else
-                assertTrue(evtCnt == EVENT_CNT * TOPICS.length);
+                assertEquals(EVENT_CNT * TOPICS.length, evtCnt);
         }
     }
 
@@ -306,6 +307,7 @@ public class IgniteSourceConnectorTest extends 
GridCommonAbstractTest {
      * Creates properties for Kafka Connect workers.
      *
      * @return Worker configurations.
+     * @throws IOException If failed.
      */
     private Map<String, String> makeWorkerProps() throws IOException {
         Map<String, String> props = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
----------------------------------------------------------------------
diff --git 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
index 5237e98..4535ad5 100644
--- 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
+++ 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
@@ -21,11 +21,11 @@ package org.apache.ignite.stream.kafka.connect;
  * Source task mock for tests. It avoids closing the grid from test to test.
  */
 public class IgniteSourceTaskMock extends IgniteSourceTask {
-
     /** {@inheritDoc} */
     @Override public void stop() {
+        // Don't stop the grid for tests.
         stopRemoteListen();
 
-        // don't stop the grid for tests.
+        setStopped(true);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d549daf7/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
----------------------------------------------------------------------
diff --git 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
index 8978db0..bf28a2b 100644
--- 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
+++ 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
@@ -24,8 +24,8 @@ import org.apache.ignite.lang.IgnitePredicate;
  * Test user-defined filter.
  */
 class TestCacheEventFilter implements IgnitePredicate<CacheEvent> {
-
-    @Override public boolean apply(CacheEvent event) {
-        return ((String)event.key()).startsWith("conditioned_");
+    /** {@inheritDoc} */
+    @Override public boolean apply(CacheEvent evt) {
+        return ((String)evt.key()).startsWith("conditioned_");
     }
 }

Reply via email to