Repository: ignite Updated Branches: refs/heads/ignite-3443 aebeffeaf -> 5f0d68cea
IGNITE-3665: Updated KafkaStreamer dependencies. - Fixes #957. Signed-off-by: shtykh_roman <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/843979db Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/843979db Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/843979db Branch: refs/heads/ignite-3443 Commit: 843979dbbdf65bc005d2cda36fe94b68eb8fd5e4 Parents: 39ec7d0 Author: shtykh_roman <[email protected]> Authored: Fri Sep 9 17:14:21 2016 +0900 Committer: shtykh_roman <[email protected]> Committed: Fri Sep 9 17:14:21 2016 +0900 ---------------------------------------------------------------------- .../stream/kafka/connect/IgniteSinkConnector.java | 9 +++++++++ .../stream/kafka/connect/IgniteSourceConnector.java | 9 +++++++++ .../kafka/connect/IgniteSinkConnectorTest.java | 15 ++++++++++----- .../kafka/connect/IgniteSourceConnectorTest.java | 14 ++++++++------ modules/osgi-karaf/src/main/resources/features.xml | 12 ++++++------ parent/pom.xml | 4 +--- 6 files changed, 43 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java index 9385920..3fbfd9c 100644 --- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java +++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; @@ -34,6 +35,9 @@ public class IgniteSinkConnector extends SinkConnector { /** Sink properties. */ private Map<String, String> configProps; + /** Expected configurations. */ + private static final ConfigDef CONFIG_DEF = new ConfigDef(); + /** {@inheritDoc} */ @Override public String version() { return AppInfoParser.getVersion(); @@ -88,4 +92,9 @@ public class IgniteSinkConnector extends SinkConnector { @Override public void stop() { // No-op. } + + /** {@inheritDoc} */ + @Override public ConfigDef config() { + return CONFIG_DEF; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java index 59e2ed0..986888e 100644 --- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java +++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; @@ -36,6 +37,9 @@ public class IgniteSourceConnector extends SourceConnector { /** Source properties. */ private Map<String, String> configProps; + /** Expected configurations. */ + private static final ConfigDef CONFIG_DEF = new ConfigDef(); + /** {@inheritDoc} */ @Override public String version() { return AppInfoParser.getVersion(); @@ -78,4 +82,9 @@ public class IgniteSourceConnector extends SourceConnector { @Override public void stop() { // No-op. } + + /** {@inheritDoc} */ + @Override public ConfigDef config() { + return CONFIG_DEF; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java index 1814c69..efa2fa2 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.stream.kafka.TestKafkaBroker; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; @@ -40,12 +41,12 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; +import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.FutureCallback; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; -import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.mock; /** @@ -59,7 +60,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest { private static final String CACHE_NAME = "testCache"; /** Test topics. */ - private static final String[] TOPICS = {"test1", "test2"}; + private static final String[] TOPICS = {"sink-test1", "sink-test2"}; /** Kafka partition. */ private static final int PARTITIONS = 3; @@ -67,6 +68,9 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest { /** Kafka replication factor. */ private static final int REPLICATION_FACTOR = 1; + /** Worker id. */ + private static final String WORKER_ID = "workerId"; + /** Test Kafka broker. */ private TestKafkaBroker kafkaBroker; @@ -96,9 +100,9 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest { WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps()); OffsetBackingStore offBackingStore = mock(OffsetBackingStore.class); - offBackingStore.configure(anyObject(Map.class)); + offBackingStore.configure(workerCfg); - worker = new Worker(workerCfg, offBackingStore); + worker = new Worker(WORKER_ID, new SystemTime(), workerCfg, offBackingStore); worker.start(); herder = new StandaloneHerder(worker); @@ -211,7 +215,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest { private Map<String, String> makeSinkProps(String topics) { Map<String, String> props = new HashMap<>(); - props.put(ConnectorConfig.TOPICS_CONFIG, topics); + props.put(SinkConnector.TOPICS_CONFIG, topics); props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2"); props.put(ConnectorConfig.NAME_CONFIG, "test-sink-connector"); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnector.class.getName()); @@ -239,6 +243,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest { props.put("key.converter.schemas.enable", "false"); props.put("value.converter.schemas.enable", "false"); props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress()); + props.put("offset.storage.file.filename", "/tmp/connect.offsets"); // fast flushing for testing. props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10"); http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java index 7cdb09c..a3ce10e 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java @@ -26,13 +26,11 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.CacheEvent; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.stream.kafka.TestKafkaBroker; @@ -43,6 +41,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; @@ -68,7 +67,10 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { private static final String CACHE_NAME = "testCache"; /** Test topics created by connector. */ - private static final String[] TOPICS = {"test1", "test2"}; + private static final String[] TOPICS = {"src-test1", "src-test2"}; + + /** Worker id. */ + private static final String WORKER_ID = "workerId"; /** Test Kafka broker. */ private TestKafkaBroker kafkaBroker; @@ -104,9 +106,9 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps()); MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore(); - offBackingStore.configure(workerCfg.originals()); + offBackingStore.configure(workerCfg); - worker = new Worker(workerCfg, offBackingStore); + worker = new Worker(WORKER_ID, new SystemTime(), workerCfg, offBackingStore); worker.start(); herder = new StandaloneHerder(worker); @@ -280,7 +282,6 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { } }, 20_000); - info("Waiting for unexpected records for 5 secs."); assertFalse(GridTestUtils.waitForCondition(new GridAbsPredicate() { @@ -345,6 +346,7 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { props.put("key.converter.schemas.enable", "false"); props.put("value.converter.schemas.enable", "false"); props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress()); + props.put("offset.storage.file.filename", "/tmp/connect.offsets"); // fast flushing for testing. props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10"); http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/modules/osgi-karaf/src/main/resources/features.xml ---------------------------------------------------------------------- diff --git a/modules/osgi-karaf/src/main/resources/features.xml b/modules/osgi-karaf/src/main/resources/features.xml index 584429d..0f761f1 100644 --- a/modules/osgi-karaf/src/main/resources/features.xml +++ b/modules/osgi-karaf/src/main/resources/features.xml @@ -154,7 +154,7 @@ <feature name="ignite-kafka" version="${project.version}" description="Apache Ignite :: Kafka"> <details> - <![CDATA[The Apache Ignite Kafka module + dependencies. This module installs the Scala 2.1 library bundle.]]> + <![CDATA[The Apache Ignite Kafka module + dependencies. This module installs the Scala 2.11 library bundle.]]> </details> <feature prerequisite="true">wrap</feature> <bundle start="true" dependency="true">mvn:org.scala-lang/scala-library/${scala211.library.version}</bundle> @@ -190,14 +190,14 @@ </feature> <feature name="ignite-rest-http" version="${project.version}" description="Apache Ignite :: REST HTTP"> - <!-- NOTICE: XOM cannot be included by default due to an incompatible license; + <!-- NOTICE: XOM cannot be included by default due to an incompatible license; please review its license model and install the dependency manually if you agree. --> <details> - <![CDATA[The Apache Ignite REST HTTP module + dependencies. - + <![CDATA[The Apache Ignite REST HTTP module + dependencies. + Installing this feature will trigger the installation of the 'http' feature from the Apache Karaf distribution. - - NOTE: Before using this feature you must review the license of the XOM bundle and install it manually if you accept it: + + NOTE: Before using this feature you must review the license of the XOM bundle and install it manually if you accept it: install -s mvn:xom/xom/1.2.5]]> </details> <feature dependency="true">http</feature> http://git-wip-us.apache.org/repos/asf/ignite/blob/843979db/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 246c36b..b7b5be2 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -86,9 +86,7 @@ <jsonlib.bundle.version>2.4_1</jsonlib.bundle.version> <jsonlib.version>2.4</jsonlib.version> <jtidy.version>r938</jtidy.version> - <kafka.bundle.version>0.9.0.0_1</kafka.bundle.version> - <kafka.clients.bundle.version>0.9.0.0_1</kafka.clients.bundle.version> - <kafka.version>0.9.0.0</kafka.version> + <kafka.version>0.10.0.1</kafka.version> <karaf.version>4.0.2</karaf.version> <lucene.bundle.version>3.5.0_1</lucene.bundle.version> <lucene.version>3.5.0</lucene.version>
