This is an automated email from the ASF dual-hosted git repository. nizhikov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
commit e5f9f9815b9404f20171922b5061e55a56a2240b Author: gkatzioura <[email protected]> AuthorDate: Sun Dec 29 12:28:49 2019 -0600 IGNITE-12262 Implement a Pub/Sub Data Streamer - Fixes #2. Signed-off-by: samaitra <[email protected]> --- modules/pub-sub/README.txt | 33 +++ modules/pub-sub/config/example-ignite.xml | 73 +++++ modules/pub-sub/licenses/apache-2.0.txt | 202 ++++++++++++++ modules/pub-sub/pom.xml | 102 +++++++ .../ignite/stream/pubsub/PubSubStreamer.java | 248 +++++++++++++++++ .../apache/ignite/stream/pubsub/package-info.java | 21 ++ .../ignite/stream/pubsub/MockPubSubServer.java | 147 ++++++++++ .../stream/pubsub/PubSubStreamerSelfTest.java | 310 +++++++++++++++++++++ .../stream/pubsub/PubSubStreamerTestSuite.java | 29 ++ .../pub-sub/src/test/resources/example-ignite.xml | 73 +++++ parent/pom.xml | 4 + pom.xml | 1 + 12 files changed, 1243 insertions(+) diff --git a/modules/pub-sub/README.txt b/modules/pub-sub/README.txt new file mode 100644 index 0000000..65e7754 --- /dev/null +++ b/modules/pub-sub/README.txt @@ -0,0 +1,33 @@ +Apache Ignite Pub/Sub Module +----------------------------------- + +Pub/Sub module is a streaming connector to inject Pub/Sub data into Ignite cache. + +Starting data transfer to Ignite can be done with the following steps. + +1. Import Ignite Pub/Sub Module in Maven Project + +If you are using Maven to manage dependencies of your project, you can add Pub/Sub module +dependency like this (replace '${ignite.version}' with actual Ignite version you are +interested in): + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + ... + <dependencies> + ... + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-pub-sub</artifactId> + <version>${ignite.version}</version> + </dependency> + ... + </dependencies> + ... +</project> + +2. Create an Ignite configuration file (see example-ignite.xml) and make sure it is accessible from the sink. + +3. Make sure your data input to the sink is specified. For example `input.addSink(igniteSinkObject)` diff --git a/modules/pub-sub/config/example-ignite.xml b/modules/pub-sub/config/example-ignite.xml new file mode 100644 index 0000000..d4f4dc1 --- /dev/null +++ b/modules/pub-sub/config/example-ignite.xml @@ -0,0 +1,73 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Ignite configuration with all defaults and enabled events. + Used for testing IgniteSink running Ignite in a client mode. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/util + http://www.springframework.org/schema/util/spring-util.xsd"> + <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + <!-- Enable client mode. --> + <property name="clientMode" value="false"/> + + <!-- Cache accessed from IgniteSink. --> + <property name="cacheConfiguration"> + <list> + <!-- Partitioned cache example configuration with configurations adjusted to server nodes'. --> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <property name="atomicityMode" value="ATOMIC"/> + <property name="name" value="testCache"/> + </bean> + </list> + </property> + + <!-- Enable cache events. --> + <property name="includeEventTypes"> + <list> + <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). --> + <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> + <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> + <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> + + </list> + </property> + + <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <property name="addresses"> + <list> + <value>127.0.0.1:47500..47509</value> + </list> + </property> + </bean> + </property> + </bean> + </property> + </bean> +</beans> diff --git a/modules/pub-sub/licenses/apache-2.0.txt b/modules/pub-sub/licenses/apache-2.0.txt new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/modules/pub-sub/licenses/apache-2.0.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/modules/pub-sub/pom.xml b/modules/pub-sub/pom.xml new file mode 100644 index 0000000..8ef0b07 --- /dev/null +++ b/modules/pub-sub/pom.xml @@ -0,0 +1,102 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-parent</artifactId> + <version>1</version> + <relativePath>../../parent</relativePath> + </parent> + + <artifactId>ignite-pub-sub</artifactId> + <version>2.9.0-SNAPSHOT</version> + <url>http://ignite.apache.org</url> + + <properties> + <pubsub.version>1.102.0</pubsub.version> + <ignite-core.version>2.9.0-SNAPSHOT</ignite-core.version> + <ignite-log4j.version>2.9.0-SNAPSHOT</ignite-log4j.version> + <ignite-spring.version>2.9.0-SNAPSHOT</ignite-spring.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${ignite-core.version}</version> + </dependency> + <dependency> + <groupId>com.google.cloud</groupId> + <artifactId>google-cloud-pubsub</artifactId> + <version>${pubsub.version}</version> + </dependency> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${ignite-core.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spring</artifactId> + <version>${ignite-spring.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-log4j</artifactId> + <version>${ignite-log4j.version}</version> + <scope>test</scope> + </dependency> + + <!-- + <dependency> + <groupId>org.easymock</groupId> + --> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- Generate the OSGi MANIFEST.MF for this bundle. --> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file diff --git a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java new file mode 100644 index 0000000..13384d2 --- /dev/null +++ b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.pubsub; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.stream.StreamAdapter; + +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; + +import org.apache.ignite.stream.StreamMultipleTupleExtractor; +import org.apache.ignite.stream.StreamSingleTupleExtractor; + + +/** + * Server that subscribes to topic messages from Pub/Sub and streams its to key-value pairs into + * {@link IgniteDataStreamer} instance, using Google Cloud Pub/Sub sdk as a Pub/Sub client. + * <p> + * You must also provide a {@link StreamSingleTupleExtractor} or a {@link StreamMultipleTupleExtractor} to extract + * cache tuples out of the incoming message. + * <p> + */ +public class PubSubStreamer<K,V> extends StreamAdapter<PubsubMessage, K, V> { + /** Default max messages. */ + private static final int DFLT_MAX_MESSAGES = 10; + + /** Logger. */ + private IgniteLogger log; + + /** Polling tasks executor. */ + private ExecutorService executor; + + /** Topics. */ + private List<ProjectTopicName> topics; + + /** Number of threads. */ + private int threads; + + /** Pub/Sub subscriptionName. */ + private String subscriptionName; + + /** Pub/Sub subscriberStubSettings*/ + private SubscriberStubSettings subscriberStubSettings; + + /** Return policy on unavailable messages. */ + private boolean returnImmediately = false; + + /** Pub/Sub maxMessages. */ + private int maxMessages = DFLT_MAX_MESSAGES; + + /** Pub/Sub consumer tasks. */ + private final List<ConsumerTask> consumerTasks = new ArrayList<>(); + + /** + * Sets the topic names. + * + * @param topics Topic names. + */ + public void setTopic(List<ProjectTopicName> topics) { + this.topics = topics; + } + + /** + * Sets the threads. + * + * @param threads Number of threads. + */ + public void setThreads(int threads) { + this.threads = threads; + } + + /** + * Sets the consumer subscriptionName. + * + * @param subscriptionName Consumer subscription name. + */ + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + /** + * Sets the consumer Settings + * @param subscriberStubSettings Pub/Sub subscriber settings + */ + public void setSubscriberStubSettings(SubscriberStubSettings subscriberStubSettings) { + this.subscriberStubSettings = subscriberStubSettings; + } + + /** + * Sets the return policy for receiving messages on Pub/Sub tasks on message availability. + * + * @param returnImmediately Pub/Sub waiting policy on message unavailability. + */ + public void setReturnImmediately(boolean returnImmediately) { + this.returnImmediately = returnImmediately; + } + + /** + * Sets the max number of Pub/Sub messages to fetch on pull request + * @param maxMessages Pub/Sub messages per pull request + */ + public void setMaxMessages(int maxMessages) { + this.maxMessages = maxMessages; + } + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() { + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null), + "tuple extractor missing"); + A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, + "cannot provide both single and multiple tuple extractor"); + A.notNull(topics, "topics"); + A.notNull(subscriptionName, "Pub/Sub consumer config"); + A.ensure(threads > 0, "threads > 0"); + + log = getIgnite().log(); + + executor = Executors.newFixedThreadPool(threads); + + IntStream.range(0, threads).forEach(i -> consumerTasks.add(new ConsumerTask(subscriberStubSettings, subscriptionName, returnImmediately, maxMessages))); + + for (ConsumerTask task : consumerTasks) + executor.submit(task); + } + + /** + * Stops streamer. + */ + public void stop() { + for (ConsumerTask task : consumerTasks) + task.stop(); + + if (executor != null) { + executor.shutdown(); + + try { + if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) + if (log.isDebugEnabled()) + log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly."); + } + catch (InterruptedException ignored) { + if (log.isDebugEnabled()) + log.debug("Interrupted during shutdown, exiting uncleanly."); + } + } + } + + /** Polling task. */ + class ConsumerTask implements Callable<Void> { + /** Pub/Sub consumer */ + private final SubscriberStub subscriberStub; + + private final String subscriptionName; + + private final boolean returnImmediately; + + private final int maxMessages; + + /** Stopped. */ + private volatile boolean stopped; + + public ConsumerTask(SubscriberStubSettings subscriberStubSettings,String subscriptionName, boolean returnImmediately, int maxMessages) throws IgniteException { + try { + + this.subscriberStub = GrpcSubscriberStub.create(subscriberStubSettings); + this.subscriptionName = subscriptionName; + this.returnImmediately = returnImmediately; + this.maxMessages = maxMessages; + } catch (IOException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + + try { + while (!stopped) { + PullRequest pullRequest = + PullRequest.newBuilder() + .setMaxMessages(maxMessages) + .setReturnImmediately(returnImmediately) // return immediately if messages are not available + .setSubscription(subscriptionName) + .build(); + + PullResponse pullResponse = subscriberStub.pullCallable().call(pullRequest); + + for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) { + addMessage(message.getMessage()); + } + } + } finally { + subscriberStub.close(); + } + + return null; + } + + /** Stops the polling task. */ + public void stop() { + stopped = true; + + if (subscriberStub!= null) { + subscriberStub.shutdown(); + } + } + } + +} diff --git a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/package-info.java b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/package-info.java new file mode 100644 index 0000000..9a1461f --- /dev/null +++ b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains implementation of Pub/Sub Streamer. + */ +package org.apache.ignite.stream.pubsub; diff --git a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java new file mode 100644 index 0000000..20fe767 --- /dev/null +++ b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.pubsub; + +import com.google.api.core.ApiFutures; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.logging.Logger; + +import org.jetbrains.annotations.NotNull; +import org.mockito.Mockito; + +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Mock Pub/Sub Server + */ +class MockPubSubServer { + + /** Test topic. */ + public static final String TOPIC_NAME = "pagevisits"; + + private static final Logger LOGGER = Logger.getLogger(MockPubSubServer.class.getName()); + public static final String PROJECT = "test-project"; + private static final String LOCALHOST = "localhost"; + private static final int PORT = 8080; + public static final int MESSAGES_PER_REQUEST = 10; + + private final Map<String, Publisher> publishers = new HashMap<>(); + private final List<PubsubMessage> topicMessages = new ArrayList<>(); + private final Queue<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>(); + + public SubscriberStubSettings createSubscriberStub() throws IOException { + CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); + + ManagedChannel managedChannel = managedChannel(); + + FixedTransportChannelProvider transportChannel = FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel)); + SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder() + .setTransportChannelProvider(transportChannel) + .setCredentialsProvider(credentialsProvider) + .build(); + return subscriberStubSettings; + } + + @NotNull + private ManagedChannel managedChannel() { + ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class); + when(managedChannel.newCall(any(MethodDescriptor.class),any(CallOptions.class))).thenAnswer((la) -> clientCall()); + return managedChannel; + } + + private ClientCall<PullRequest, PullResponse> clientCall() { + ClientCall<PullRequest, PullResponse> clientCall = Mockito.mock(ClientCall.class); + + doAnswer( + iom ->{ + Object[] arguments = iom.getArguments(); + ClientCall.Listener<PullResponse> listener = (ClientCall.Listener<PullResponse>) arguments[0]; + Metadata metadata = (Metadata) arguments[1]; + pullMessages(listener, metadata); + return null; + } + ).when(clientCall).start(any(ClientCall.Listener.class),any(Metadata.class)); + return clientCall; + } + + private void pullMessages(ClientCall.Listener<PullResponse> listener, Metadata metadata) { + PullResponse.Builder pullResponse = PullResponse.newBuilder(); + + for(int i = 0; i < MESSAGES_PER_REQUEST; i++) { + pullResponse.addReceivedMessages(ReceivedMessage.newBuilder().mergeMessage(blockingQueue.remove()).build()); + } + + listener.onMessage(pullResponse.build()); + listener.onClose(Status.OK, metadata); + } + + public Publisher getPublisher(String topicName) throws IOException { + publishers.putIfAbsent(topicName, createPublisher(topicName)); + return publishers.get(topicName); + } + + private Publisher createPublisher(String topic) { + Publisher publisher = mock(Publisher.class); + + when(publisher.publish(any(PubsubMessage.class))).thenAnswer( + (iom) -> { + PubsubMessage pubsubMessage = (PubsubMessage) iom.getArguments()[0]; + blockingQueue.add(pubsubMessage); + return ApiFutures.immediateFuture(UUID.randomUUID().toString()); + } + ); + return publisher; + } + + /** + * Obtains Pub/Sub address. + * + * @return Pub/Sub address. + */ + private String getPubSubAddress() { + return LOCALHOST+ ":" + PORT; + } + +} diff --git a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java new file mode 100644 index 0000000..1ab67c4 --- /dev/null +++ b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerSelfTest.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.pubsub; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import java.util.concurrent.TimeoutException; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.lang.GridMapEntry; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.stream.StreamMultipleTupleExtractor; +import org.apache.ignite.stream.StreamSingleTupleExtractor; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; +import static org.apache.ignite.stream.pubsub.MockPubSubServer.PROJECT; +import static org.apache.ignite.stream.pubsub.MockPubSubServer.TOPIC_NAME; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link PubSubStreamer}. + */ +public class PubSubStreamerSelfTest { + + private static final Logger LOGGER = Logger.getLogger(PubSubStreamerSelfTest.class.getName()); + + /** Cache name. */ + private static final String DEFAULT_CACHE_NAME = "testCache"; + + /** Ignite test configuration file. */ + private static final String GRID_CONF_FILE = "config/example-ignite.xml"; + + /** Subscription Name. */ + private static final String SUBSCRIPTION = "ignite_subscription"; + + /** Count. */ + private static final int CNT = 100; + + /** Messages per request. */ + private static final int MESSAGES_PER_REQUEST = 10; + + /** Topic message key prefix. */ + private static final String KEY_PREFIX = "192.168.2."; + + /** Topic message value URL. */ + private static final String VALUE_URL = ",www.example.com,"; + + + private static final String JSON_KEY = "key"; + private static final String JSON_VALUE = "value"; + + private Ignite ignite; + + private static MockPubSubServer mockPubSubServer = new MockPubSubServer(); + + @Before + public void beforeTest() throws InterruptedException { + this.ignite = Ignition.start(GRID_CONF_FILE); + IgniteCache<Integer,String> igniteCache = ignite.getOrCreateCache(defaultCacheConfiguration()); + } + + @After + public void afterTest() { + ignite.cache(DEFAULT_CACHE_NAME).clear(); + Ignition.stop(true); + } + + /** + * @return New cache configuration with modified defaults. + */ + public static CacheConfiguration<Integer,String> defaultCacheConfiguration() { + CacheConfiguration cfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + cfg.setAtomicityMode(ATOMIC); + cfg.setWriteSynchronizationMode(FULL_SYNC); + return cfg; + } + /** + * Tests Pub/Sub streamer. + * + * @throws TimeoutException If timed out. + * @throws InterruptedException If interrupted. + */ + @Test + public void testPubSubStreamer() throws Exception { + Map<String, String> keyValMap = produceStream(); + consumerStream(ProjectTopicName.of(MockPubSubServer.PROJECT, MockPubSubServer.TOPIC_NAME), keyValMap); + } + + /** + * Consumes Pub/Sub stream via Ignite. + * + * @param topic Topic name. + * @param keyValMap Expected key value map. + * @throws TimeoutException If timed out. + * @throws InterruptedException If interrupted. + */ + private void consumerStream(ProjectTopicName topic, Map<String, String> keyValMap) + throws InterruptedException, IOException { + PubSubStreamer<String, String> pubSubStmr = null; + + try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) { + stmr.allowOverwrite(true); + stmr.autoFlushFrequency(MESSAGES_PER_REQUEST); + + // Configure Pub/Sub streamer. + pubSubStmr = new PubSubStreamer<>(); + + // Get the cache. + IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME); + + // Set Ignite instance. + pubSubStmr.setIgnite(ignite); + + // Set data streamer instance. + pubSubStmr.setStreamer(stmr); + + // Set the topic. + pubSubStmr.setTopic(Arrays.asList(topic)); + + // Set subscription name + pubSubStmr.setSubscriptionName(ProjectSubscriptionName.format(PROJECT, SUBSCRIPTION)); + + // Set the number of threads. + pubSubStmr.setThreads(4); + + // Set the SubScriber Stub settings. + pubSubStmr.setSubscriberStubSettings(mockPubSubServer.createSubscriberStub()); + + pubSubStmr.setSingleTupleExtractor(singleTupleExtractor()); + + // Start Pub/Sub streamer. + pubSubStmr.start(); + + final CountDownLatch latch = new CountDownLatch(CNT); + + IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() { + @IgniteInstanceResource + private Ignite ig; + + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, CacheEvent evt) { + latch.countDown(); + + if (log.isInfoEnabled()) { + IgniteEx igEx = (IgniteEx)ig; + + UUID nodeId = igEx.localNode().id(); + + log.info("Receive event=" + evt + ", nodeId=" + nodeId); + } + + return true; + } + }; + + ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT); + + // Checks all events successfully processed in 10 seconds. + assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events", + latch.await(10, TimeUnit.SECONDS)); + + for (Map.Entry<String, String> entry : keyValMap.entrySet()) + assertEquals(entry.getValue(), cache.get(entry.getKey())); + } + finally { + if (pubSubStmr!= null) + pubSubStmr.stop(); + } + } + + /** + * Sends messages to Pub/Sub. + * + * @return Map of key value messages. + */ + private Map<String, String> produceStream() throws Exception { + List<Integer> subnet = new ArrayList<>(); + + for (int i = 1; i <= CNT; i++) + subnet.add(i); + + Collections.shuffle(subnet); + + List<PubsubMessage> messages = new ArrayList<>(); + + Map<String, String> keyValMap = new HashMap<>(); + + for (int evt = 0; evt < CNT; evt++) { + long runtime = System.currentTimeMillis(); + + String ip = KEY_PREFIX + subnet.get(evt); + + String msg = runtime + VALUE_URL + ip; + + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty(JSON_KEY, ip); + jsonObject.addProperty(JSON_VALUE, msg); + + ByteString byteString = ByteString.copyFrom(jsonObject.toString(), Charset.defaultCharset()); + + PubsubMessage pubsubMessage = PubsubMessage.newBuilder() + .setData(byteString) + .build(); + + messages.add(pubsubMessage); + String messageId = mockPubSubServer.getPublisher(TOPIC_NAME).publish(pubsubMessage).get(); + keyValMap.put(ip, msg); + + } + + return keyValMap; + } + + /** + * @return {@link StreamSingleTupleExtractor} for testing. + */ + private static StreamSingleTupleExtractor<PubsubMessage, String, String> singleTupleExtractor() { + return new StreamSingleTupleExtractor<PubsubMessage, String, String>() { + @Override public Map.Entry<String, String> extract(PubsubMessage msg) { + String dataStr = msg.getData().toStringUtf8(); + JsonElement jsonElement = new JsonParser().parse(dataStr).getAsJsonObject(); + JsonObject jsonObject = jsonElement.getAsJsonObject(); + return new GridMapEntry<>(jsonObject.get(JSON_KEY).getAsString(), jsonObject.get(JSON_VALUE).getAsString()); + } + }; + } + + /** + * @return {@link StreamMultipleTupleExtractor} for testing. + */ + private static StreamMultipleTupleExtractor<PubsubMessage, String, String> multipleTupleExtractor() { + return new StreamMultipleTupleExtractor<PubsubMessage, String, String>() { + @Override public Map<String, String> extract(PubsubMessage msg) { + String dataStr = msg.getData().toStringUtf8(); + JsonElement jsonElement = new JsonParser().parse(dataStr).getAsJsonObject(); + JsonArray jsonArray = jsonElement.getAsJsonArray(); + + final Map<String, String> answer = new HashMap<>(); + + for(int i=0;i<jsonArray.size();i++) { + JsonObject jsonObject = jsonArray.get(i) .getAsJsonObject(); + String key = jsonObject.get(JSON_KEY).getAsString(); + String value = jsonObject.get(JSON_VALUE).getAsString(); + + answer.put(key,value); + } + + return answer; + } + }; + } + +} diff --git a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerTestSuite.java b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerTestSuite.java new file mode 100644 index 0000000..fd51b01 --- /dev/null +++ b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/PubSubStreamerTestSuite.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.pubsub; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * Pub/Sub streamer tests. + */ +@RunWith(Suite.class) [email protected]({PubSubStreamerSelfTest.class}) +public class PubSubStreamerTestSuite { +} diff --git a/modules/pub-sub/src/test/resources/example-ignite.xml b/modules/pub-sub/src/test/resources/example-ignite.xml new file mode 100644 index 0000000..f23a306 --- /dev/null +++ b/modules/pub-sub/src/test/resources/example-ignite.xml @@ -0,0 +1,73 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Ignite configuration with all defaults and enabled p2p deployment and enabled events. + Used for testing IgniteSink running Ignite in a client mode. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/util + http://www.springframework.org/schema/util/spring-util.xsd"> + <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + <!-- Enable peer class loading for remote events. --> + <property name="peerClassLoadingEnabled" value="true"/> + <!-- Enable client mode. --> + <property name="clientMode" value="true"/> + + <!-- Cache accessed from IgniteSink. --> + <property name="cacheConfiguration"> + <list> + <!-- Partitioned cache example configuration with configurations adjusted to server nodes'. --> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <property name="atomicityMode" value="ATOMIC"/> + + <property name="name" value="testCache"/> + </bean> + </list> + </property> + + <!-- Enable cache events. --> + <property name="includeEventTypes"> + <list> + <!-- Cache events. --> + <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> + </list> + </property> + + <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <property name="addresses"> + <list> + <value>127.0.0.1:47500</value> + </list> + </property> + </bean> + </property> + </bean> + </property> + </bean> +</beans> diff --git a/parent/pom.xml b/parent/pom.xml index fde57a5..077161c 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -517,6 +517,10 @@ <packages>org.apache.ignite.stream.rocketmq*</packages> </group> <group> + <title>Pub/Sub integration</title> + <packages>org.apache.ignite.stream.pubsub.*</packages> + </group> + <group> <title>Storm integration</title> <packages>org.apache.ignite.stream.storm*</packages> </group> diff --git a/pom.xml b/pom.xml index e5fb47f..bd0c46e 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,7 @@ <modules> <module>modules/flink-ext</module> + <module>modules/pub-sub</module> </modules> <profiles>
