Repository: camel Updated Branches: refs/heads/master da6aa1ecb -> b584b3d30
CAMEL-11835: cluster service : make a JGroups based cluster service Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b584b3d3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b584b3d3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b584b3d3 Branch: refs/heads/master Commit: b584b3d304127dcaacb6ee12972fded1ae5bdd2d Parents: da6aa1e Author: Andrea Tarocchi <ataro...@redhat.com> Authored: Fri Oct 6 19:39:59 2017 +0200 Committer: Andrea Tarocchi <ataro...@redhat.com> Committed: Fri Oct 6 19:40:11 2017 +0200 ---------------------------------------------------------------------- components/camel-jgroups/pom.xml | 5 + .../jgroups/ha/JGroupsLockClusterService.java | 58 +++++++ .../jgroups/ha/JGroupsLockClusterView.java | 159 +++++++++++++++++++ .../src/main/resources/locking.xml | 69 ++++++++ .../jgroups/ha/JGroupsLockMasterTest.java | 104 ++++++++++++ ...oupsLockClusterServiceAutoConfiguration.java | 51 ++++++ .../JGroupsLockClusterServiceConfiguration.java | 38 +++++ .../main/resources/META-INF/spring.factories | 3 +- 8 files changed, 486 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b584b3d3/components/camel-jgroups/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-jgroups/pom.xml b/components/camel-jgroups/pom.xml index 1b0ab00..9d4bcad 100644 --- a/components/camel-jgroups/pom.xml +++ b/components/camel-jgroups/pom.xml @@ -78,6 +78,11 @@ <artifactId>log4j-slf4j-impl</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-master</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/camel/blob/b584b3d3/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/ha/JGroupsLockClusterService.java ---------------------------------------------------------------------- diff --git a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/ha/JGroupsLockClusterService.java b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/ha/JGroupsLockClusterService.java new file mode 100644 index 0000000..d560105 --- /dev/null +++ b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/ha/JGroupsLockClusterService.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jgroups.ha; + +import org.apache.camel.impl.ha.AbstractCamelClusterService; + +public class JGroupsLockClusterService extends AbstractCamelClusterService<JGroupsLockClusterView> { + + private static final String DEFAULT_JGROUPS_CONFIG = "locking.xml"; + private static final String DEFAULT_JGROUPS_CLUSTERNAME = "jgroups-master"; + + private String jgroupsConfig; + private String jgroupsClusterName; + + public JGroupsLockClusterService() { + this.jgroupsConfig = DEFAULT_JGROUPS_CONFIG; + this.jgroupsClusterName = DEFAULT_JGROUPS_CLUSTERNAME; + } + public JGroupsLockClusterService(String jgroupsConfig, String jgroupsClusterName) { + this.jgroupsConfig = jgroupsConfig; + this.jgroupsClusterName = jgroupsClusterName; + } + + @Override + protected JGroupsLockClusterView createView(String namespace) throws Exception { + return new JGroupsLockClusterView(this, namespace, jgroupsConfig, jgroupsClusterName); + } + + public String getJgroupsConfig() { + return jgroupsConfig; + } + + public void setJgroupsConfig(String jgroupsConfig) { + this.jgroupsConfig = jgroupsConfig; + } + + public String getJgroupsClusterName() { + return jgroupsClusterName; + } + + public void setJgroupsClusterName(String jgroupsClusterName) { + this.jgroupsClusterName = jgroupsClusterName; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/b584b3d3/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/ha/JGroupsLockClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/ha/JGroupsLockClusterView.java b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/ha/JGroupsLockClusterView.java new file mode 100644 index 0000000..117f2df --- /dev/null +++ b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/ha/JGroupsLockClusterView.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jgroups.ha; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.Lock; + +import org.apache.camel.CamelContext; +import org.apache.camel.ha.CamelClusterMember; +import org.apache.camel.ha.CamelClusterService; +import org.apache.camel.impl.ha.AbstractCamelClusterView; +import org.apache.camel.util.ObjectHelper; +import org.jgroups.JChannel; +import org.jgroups.blocks.locking.LockService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class JGroupsLockClusterView extends AbstractCamelClusterView { + + private static final transient Logger LOG = LoggerFactory.getLogger(JGroupsLockClusterView.class); + private final CamelClusterMember localMember = new JGropusLocalMember(); + private String jgroupsConfig; + private String jgroupsClusterName; + private String lockName; + private JChannel channel; + private LockService lockService; + private Lock lock; + private ScheduledExecutorService executor; + private volatile boolean isMaster; + + protected JGroupsLockClusterView(CamelClusterService cluster, String namespace, String jgroupsConfig, String jgroupsClusterName) { + super(cluster, namespace); + lockName = namespace; + this.jgroupsConfig = jgroupsConfig; + this.jgroupsClusterName = jgroupsClusterName; + } + + @Override + public Optional<CamelClusterMember> getMaster() { + if (isMaster) { + return Optional.of(localMember); + } else { + return Optional.empty(); + } + } + + @Override + public CamelClusterMember getLocalMember() { + return localMember; + } + + @Override + public List<CamelClusterMember> getMembers() { + return new ArrayList<CamelClusterMember>() {{ add(localMember); }}; + } + + @Override + protected void doStart() throws Exception { + if (lock != null) { + lock.unlock(); + lock = null; + } + if (channel == null) { + channel = new JChannel(jgroupsConfig); + lockService = new LockService(channel); + } + channel.connect(jgroupsClusterName); + lock = lockService.getLock(lockName); + + // Camel context should be set at this stage. + final CamelContext context = ObjectHelper.notNull(getCamelContext(), "CamelContext"); + executor = context.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "JGroupsLockClusterView-" + getClusterService().getId() + "-" + lockName); + executor.execute(new Runnable() { + @Override + public void run() { + LOG.info("Attempting to become master acquiring the lock for group: " + lockName + " in JGroups cluster" + jgroupsClusterName + " with configuration: " + jgroupsConfig); + lock.lock(); + isMaster = true; + fireLeadershipChangedEvent(Optional.ofNullable(localMember)); + LOG.info("Became master by acquiring the lock for group: " + lockName + " in JGroups cluster" + jgroupsClusterName + " with configuration: " + jgroupsConfig); + } + }); + } + + @Override + protected void doStop() throws Exception { + shutdownExecutor(); + isMaster = false; + fireLeadershipChangedEvent(Optional.empty()); + clearLock(); + channel.disconnect(); + } + + @Override + protected void doShutdown() throws Exception { + shutdownExecutor(); + isMaster = false; + fireLeadershipChangedEvent(Optional.empty()); + clearLock(); + if (channel != null) { + channel.close(); + channel = null; + } + } + + private void clearLock() { + if (lock != null) { + lock.unlock(); + lock = null; + } + } + + private void shutdownExecutor() { + CamelContext context = getCamelContext(); + if (executor != null) { + if (context != null) { + context.getExecutorServiceManager().shutdown(executor); + } else { + executor.shutdown(); + } + executor = null; + } + } + + private final class JGropusLocalMember implements CamelClusterMember { + @Override + public boolean isLeader() { + return isMaster; + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public String getId() { + return getClusterService().getId(); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/b584b3d3/components/camel-jgroups/src/main/resources/locking.xml ---------------------------------------------------------------------- diff --git a/components/camel-jgroups/src/main/resources/locking.xml b/components/camel-jgroups/src/main/resources/locking.xml new file mode 100644 index 0000000..6711291 --- /dev/null +++ b/components/camel-jgroups/src/main/resources/locking.xml @@ -0,0 +1,69 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<config xmlns="urn:org:jgroups" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd"> + <UDP + mcast_port="${jgroups.udp.mcast_port:45588}" + ip_ttl="4" + tos="8" + ucast_recv_buf_size="5M" + ucast_send_buf_size="5M" + mcast_recv_buf_size="5M" + mcast_send_buf_size="5M" + max_bundle_size="64K" + enable_diagnostics="true" + thread_naming_pattern="cl" + + thread_pool.min_threads="0" + thread_pool.max_threads="20" + thread_pool.keep_alive_time="30000"/> + + <PING /> + <MERGE3 max_interval="30000" + min_interval="10000"/> + <FD_SOCK/> + <FD_ALL/> + <VERIFY_SUSPECT timeout="1500" /> + <BARRIER /> + <pbcast.NAKACK2 xmit_interval="500" + xmit_table_num_rows="100" + xmit_table_msgs_per_row="2000" + xmit_table_max_compaction_time="30000" + use_mcast_xmit="false" + discard_delivered_msgs="true"/> + <UNICAST3 xmit_interval="500" + xmit_table_num_rows="100" + xmit_table_msgs_per_row="2000" + xmit_table_max_compaction_time="60000" + conn_expiry_timeout="0"/> + <pbcast.STABLE desired_avg_gossip="50000" + max_bytes="4M"/> + <pbcast.GMS print_local_addr="true" join_timeout="2000" + view_bundling="true"/> + <UFC max_credits="2M" + min_threshold="0.4"/> + <MFC max_credits="2M" + min_threshold="0.4"/> + <FRAG2 frag_size="60K" /> + <RSVP resend_interval="2000" timeout="10000"/> + <pbcast.STATE_TRANSFER /> + <CENTRAL_LOCK/> +</config> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b584b3d3/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/ha/JGroupsLockMasterTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/ha/JGroupsLockMasterTest.java b/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/ha/JGroupsLockMasterTest.java new file mode 100644 index 0000000..4d972cf --- /dev/null +++ b/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/ha/JGroupsLockMasterTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jgroups.ha; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class JGroupsLockMasterTest { + private static final Logger LOGGER = LoggerFactory.getLogger(JGroupsLockMasterTest.class); + private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList()); + private static final List<String> RESULTS = new ArrayList<>(); + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size()); + private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); + + // ************************************ + // Test + // ************************************ + + @Test + public void test() throws Exception { + for (String id : CLIENTS) { + SCHEDULER.submit(() -> run(id)); + } + + LATCH.await(1, TimeUnit.MINUTES); + SCHEDULER.shutdownNow(); + + Assert.assertEquals(CLIENTS.size(), RESULTS.size()); + Assert.assertTrue(RESULTS.containsAll(CLIENTS)); + } + + // ************************************ + // Run a Camel node + // ************************************ + + private static void run(String id) { + try { + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); + + JGroupsLockClusterService service = new JGroupsLockClusterService(); + service.setId("node-" + id); + + DefaultCamelContext context = new DefaultCamelContext(); + context.disableJMX(); + context.setName("context-" + id); + context.addService(service); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("master:jgl:timer:master?delay=1s&period=1s") + .routeId("route-" + id) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Start the context after some random time so the startup order + // changes for each test. + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + context.start(); + + contextLatch.await(); + + LOGGER.debug("Shutting down node {}", id); + RESULTS.add(id); + + context.stop(); + + LATCH.countDown(); + } catch (Exception e) { + LOGGER.warn("", e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/b584b3d3/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/java/org/apache/camel/component/jgroups/springboot/ha/springboot/JGroupsLockClusterServiceAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/java/org/apache/camel/component/jgroups/springboot/ha/springboot/JGroupsLockClusterServiceAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/java/org/apache/camel/component/jgroups/springboot/ha/springboot/JGroupsLockClusterServiceAutoConfiguration.java new file mode 100644 index 0000000..d41d82d --- /dev/null +++ b/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/java/org/apache/camel/component/jgroups/springboot/ha/springboot/JGroupsLockClusterServiceAutoConfiguration.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jgroups.springboot.ha.springboot; + +import org.apache.camel.component.jgroups.ha.JGroupsLockClusterService; +import org.apache.camel.ha.CamelClusterService; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.spring.boot.ha.ClusteredRouteControllerAutoConfiguration; +import org.apache.camel.util.IntrospectionSupport; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Scope; + +@Configuration +@AutoConfigureBefore({ ClusteredRouteControllerAutoConfiguration.class, CamelAutoConfiguration.class }) +@EnableConfigurationProperties(JGroupsLockClusterServiceConfiguration.class) +public class JGroupsLockClusterServiceAutoConfiguration { + @Autowired + private JGroupsLockClusterServiceConfiguration configuration; + + @Bean(name = "jgroups-lock-cluster-service") + @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) + public CamelClusterService zookeeperClusterService() throws Exception { + JGroupsLockClusterService service = new JGroupsLockClusterService(); + + IntrospectionSupport.setProperties( + service, + IntrospectionSupport.getNonNullProperties(configuration) + ); + + return service; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/b584b3d3/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/java/org/apache/camel/component/jgroups/springboot/ha/springboot/JGroupsLockClusterServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/java/org/apache/camel/component/jgroups/springboot/ha/springboot/JGroupsLockClusterServiceConfiguration.java b/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/java/org/apache/camel/component/jgroups/springboot/ha/springboot/JGroupsLockClusterServiceConfiguration.java new file mode 100644 index 0000000..d481311 --- /dev/null +++ b/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/java/org/apache/camel/component/jgroups/springboot/ha/springboot/JGroupsLockClusterServiceConfiguration.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jgroups.springboot.ha.springboot; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "camel.component.jgroups.lock.cluster.service") +public class JGroupsLockClusterServiceConfiguration { + /** + * Cluster Service ID + */ + private String id; + + /** + * JGrups configuration File name + */ + private String jgroupsConfig; + + /** + * JGroups Cluster name + */ + private String jgroupsClusterName; + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b584b3d3/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/resources/META-INF/spring.factories ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/resources/META-INF/spring.factories b/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/resources/META-INF/spring.factories index 75296e1..b52840f 100644 --- a/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/resources/META-INF/spring.factories +++ b/platforms/spring-boot/components-starter/camel-jgroups-starter/src/main/resources/META-INF/spring.factories @@ -15,4 +15,5 @@ ## limitations under the License. ## --------------------------------------------------------------------------- org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.apache.camel.component.jgroups.springboot.JGroupsComponentAutoConfiguration +org.apache.camel.component.jgroups.springboot.JGroupsComponentAutoConfiguration,\ +org.apache.camel.component.jgroups.ha.springboot.JGroupsLockClusterServiceAutoConfiguration