diff --git a/README.md b/README.md index 804c3d1..9cbee7a 100644 --- a/README.md +++ b/README.md @@ -86,8 +86,8 @@ tutorial. ### Advanced * [Lucene Spatial Indexing](luceneSpatial/README.md) -* WAN Gateway -* Durable subscriptions +* [WAN Gateway](wan/README.md) +* [Durable Messaging for Subscriptions](durableMessaging/README.md) * Delta propagation * Network partition detection * D-lock diff --git a/durableMessaging/README.md b/durableMessaging/README.md new file mode 100644 index 0000000..f80a13d --- /dev/null +++ b/durableMessaging/README.md @@ -0,0 +1,51 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Geode Durable Messaging Example + +This example demonstrates Apache Geode's Durable Messaging feature. +Use durable messaging for subscriptions that you need maintained for your clients even when your clients are down or disconnected. +You can configure any of your event subscriptions as durable. Events for durable queries and subscriptions are saved in a queue when the client +is disconnected and played back when the client reconnects. Other queries and subscriptions are removed from the queue. + +The example performs the following tasks to demonstrate durable messaging: + +1. Create a client cache with durable messaging enabled +2. Register interest in all keys in the example region with durable messaging enabled +3. Close the client cache, simulating a disconnection +4. Start a second client, and do puts while the first client is down +5. Restart the first client, and observe that the create events in the durable queue are delivered. A simple cache listener is used to print output to the terminal as create events are received. If interested, see [Cache Listeners](listener/README.md) for more details on how cache listeners work. + +This example assumes you have installed Java and Geode. + +## Steps + +1. From the `geode-examples/durableMessaging` directory, build the example. + + $ ../gradlew build + +2. Next start a locator, start a server, and create a region. + + $ gfsh run --file=scripts/start.gfsh + +3. Run the example to demonstrate durable messaging. + + $ ../gradlew run + +4. Shut down the server. + + $ gfsh run --file=scripts/stop.gfsh diff --git a/durableMessaging/scripts/start.gfsh b/durableMessaging/scripts/start.gfsh new file mode 100644 index 0000000..4c845bc --- /dev/null +++ b/durableMessaging/scripts/start.gfsh @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +start locator --name=locator --bind-address=127.0.0.1 + +start server --name=server --locators=127.0.0.1[10334] --server-port=0 + +list members + +create region --name=example-region --type=REPLICATE +describe region --name=example-region diff --git a/durableMessaging/scripts/stop.gfsh b/durableMessaging/scripts/stop.gfsh new file mode 100644 index 0000000..7672d87 --- /dev/null +++ b/durableMessaging/scripts/stop.gfsh @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +connect --locator=127.0.0.1[10334] +shutdown --include-locators=true diff --git a/durableMessaging/src/main/java/org/apache/geode_examples/durableMessaging/Example.java b/durableMessaging/src/main/java/org/apache/geode_examples/durableMessaging/Example.java new file mode 100644 index 0000000..25b96b8 --- /dev/null +++ b/durableMessaging/src/main/java/org/apache/geode_examples/durableMessaging/Example.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode_examples.durableMessaging; + +import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID; +import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT; +import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; + +import java.util.concurrent.CountDownLatch; + +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.util.CacheListenerAdapter; + +public class Example { + private static final int numEvents = 10; + private static final CountDownLatch waitForEventsLatch = new CountDownLatch(numEvents); + + public static void main(String[] args) throws Exception { + ClientCache clientCacheOne = createDurableClient(); + + final String regionName = "example-region"; + + // Create a local caching proxy region that matches the server region + ClientRegionFactory<Integer, String> clientOneRegionFactory = + clientCacheOne.createClientRegionFactory(ClientRegionShortcut.PROXY); + Region<Integer, String> exampleClientRegionOne = clientOneRegionFactory.create(regionName); + + // Register interest to create the durable client message queue + exampleClientRegionOne.registerInterestForAllKeys(InterestResultPolicy.DEFAULT, true); + + // Close the client cache with keepalive set to true so + // the durable client messages are preserved + // for the duration of the configured timeout. In practice, + // it is more likely the client would disconnect + // due to a temporary network issue, but for this example the cache is explicitly closed. + clientCacheOne.close(true); + + // Create a second client to do puts with while the first client is disconnected + ClientCache clientCacheTwo = new ClientCacheFactory().addPoolLocator("127.0.0.1", 10334) + .set("log-level", "WARN").create(); + + ClientRegionFactory<Integer, String> clientTwoRegionFactory = + clientCacheTwo.createClientRegionFactory(ClientRegionShortcut.PROXY); + Region<Integer, String> exampleClientRegionTwo = clientTwoRegionFactory.create(regionName); + + for (int i = 0; i < numEvents; ++i) { + exampleClientRegionTwo.put(i, "testValue" + i); + } + + // Close the second client and restart the durable client + clientCacheTwo.close(false); + + clientCacheOne = createDurableClient(); + + // Add an example cache listener so this client can react + // when the server sends this client's events from the + // durable message queue. This isn't required but helps + // illustrate that the events are delivered successfully. + clientOneRegionFactory = clientCacheOne.createClientRegionFactory(ClientRegionShortcut.PROXY); + exampleClientRegionOne = clientOneRegionFactory + .addCacheListener(new ExampleCacheListener<Integer, String>()).create(regionName); + + // Signal to the server that this client is ready to receive events. + // Events in this client's durable message queue + // will then be delivered and trigger our example cache listener. + clientCacheOne.readyForEvents(); + + // Use a count down latch to ensure that this client receives all queued events from the server + waitForEventsLatch.await(); + } + + private static ClientCache createDurableClient() { + return new ClientCacheFactory().addPoolLocator("127.0.0.1", 10334) + // Provide a unique identifier for this client's durable subscription message queue + .set(DURABLE_CLIENT_ID, "1") + // Provide a timeout in seconds for how long the server will wait for the client to + // reconnect. + // If this property isn't set explicitly, it defaults to 300 seconds. + .set(DURABLE_CLIENT_TIMEOUT, "200") + // This is required so the client can register interest for all keys on this durable client + .setPoolSubscriptionEnabled(true).set(LOG_LEVEL, "WARN").create(); + } + + public static class ExampleCacheListener<Integer, String> + extends CacheListenerAdapter<Integer, String> { + public ExampleCacheListener() {} + + @Override + public void afterCreate(EntryEvent<Integer, String> event) { + System.out.println( + "Received create for key " + event.getKey() + " after durable client reconnection"); + waitForEventsLatch.countDown(); + } + } +} diff --git a/gradle/rat.gradle b/gradle/rat.gradle index bc0c2ab..90d09db 100644 --- a/gradle/rat.gradle +++ b/gradle/rat.gradle @@ -63,6 +63,7 @@ rat { // working directories '**/locator/**', + '**/server/**', '**/server1/**', '**/server2/**', '**/locator-ln/**', diff --git a/settings.gradle b/settings.gradle index c283d8c..18a22a4 100644 --- a/settings.gradle +++ b/settings.gradle @@ -22,6 +22,7 @@ include 'queries' include 'lucene' include 'loader' include 'putall' +include 'durableMessaging' include 'cq' include 'clientSecurity' include 'functions'
With regards, Apache Git Services
