http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index 54b3a78..e89a4c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -17,11 +17,21 @@ package org.apache.ignite.spi.communication.tcp; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.GridAbstractCommunicationSelfTest; @@ -85,6 +95,67 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica } } + /** + * + */ + public void testCheckConnection1() { + for (int i = 0; i < 100; i++) { + for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) { + TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue(); + + List<ClusterNode> checkNodes = new ArrayList<>(nodes); + + assert checkNodes.size() > 1; + + IgniteFuture<BitSet> fut = spi.checkConnection(checkNodes); + + BitSet res = fut.get(); + + for (int n = 0; n < checkNodes.size(); n++) + assertTrue(res.get(n)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testCheckConnection2() throws Exception { + final int THREADS = spis.size(); + + final CyclicBarrier b = new CyclicBarrier(THREADS); + + List<IgniteInternalFuture> futs = new ArrayList<>(); + + for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) { + final TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue(); + + futs.add(GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + List<ClusterNode> checkNodes = new ArrayList<>(nodes); + + assert checkNodes.size() > 1; + + b.await(); + + for (int i = 0; i < 100; i++) { + IgniteFuture<BitSet> fut = spi.checkConnection(checkNodes); + + BitSet res = fut.get(); + + for (int n = 0; n < checkNodes.size(); n++) + assertTrue(res.get(n)); + } + + return null; + } + })); + } + + for (IgniteInternalFuture f : futs) + f.get(); + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java index 54b48e5..9a45d2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java @@ -206,6 +206,11 @@ public class FilterDataForClientNodeDiscoveryTest extends GridCommonAbstractTest } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index ca05288..51dcb23 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -613,6 +613,16 @@ public class GridSpiTestContext implements IgniteSpiContext { return Collections.emptyMap(); } + /** {@inheritDoc} */ + @Override public boolean communicationFailureResolveSupported() { + return false; + } + + /** {@inheritDoc} */ + @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) { + throw new UnsupportedOperationException(); + } + /** * @param cacheName Cache name. * @return Map representing cache. http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java b/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java index 4507572..e2594ca 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java @@ -83,6 +83,15 @@ public final class GridTestProperties { /** "True value" enables {@link BinaryBasicNameMapper} in {@link BinaryTypeConfiguration#getNameMapper()} */ public static final String BINARY_MARSHALLER_USE_SIMPLE_NAME_MAPPER = "binary.marshaller.use.simple.name.mapper"; + /** + * Name of class which provides static method preprocessConfiguration(IgniteConfiguration cfg) to + * alter {@link org.apache.ignite.configuration.IgniteConfiguration} before node is started. + * <p> + * Note: this pre-preprocessor is started only if test starts node using one of GridAbstractTest's startGrid + * method. + */ + public static final String IGNITE_CFG_PREPROCESSOR_CLS = "ignite.cfg.preprocessor.class"; + /** */ static { // Initialize IGNITE_HOME system property. http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index c3b262c..f5784eb 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -46,6 +46,7 @@ import junit.framework.TestCase; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; @@ -84,6 +85,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.logger.NullLogger; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.MarshallerContextTestImpl; @@ -92,6 +94,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; @@ -124,6 +127,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.GridKernalState.DISCONNECTED; import static org.apache.ignite.testframework.config.GridTestProperties.BINARY_MARSHALLER_USE_SIMPLE_NAME_MAPPER; +import static org.apache.ignite.testframework.config.GridTestProperties.IGNITE_CFG_PREPROCESSOR_CLS; /** * Common abstract test for Ignite tests. @@ -203,13 +207,15 @@ public abstract class GridAbstractTest extends TestCase { if (BINARY_MARSHALLER) GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName()); - Thread timer = new Thread(new GridTestClockTimer(), "ignite-clock-for-tests"); + if (GridTestClockTimer.startTestTimer()) { + Thread timer = new Thread(new GridTestClockTimer(), "ignite-clock-for-tests"); - timer.setDaemon(true); + timer.setDaemon(true); - timer.setPriority(10); + timer.setPriority(10); - timer.start(); + timer.start(); + } } /** */ @@ -838,6 +844,7 @@ public abstract class GridAbstractTest extends TestCase { protected Ignite startGrid(String igniteInstanceName, GridSpringResourceContext ctx) throws Exception { return startGrid(igniteInstanceName, optimize(getConfiguration(igniteInstanceName)), ctx); } + /** * Starts new grid with given name. * @@ -852,12 +859,33 @@ public abstract class GridAbstractTest extends TestCase { startingIgniteInstanceName.set(igniteInstanceName); try { + String cfgProcClsName = System.getProperty(IGNITE_CFG_PREPROCESSOR_CLS); + + if (cfgProcClsName != null) { + try { + Class<?> cfgProc = Class.forName(cfgProcClsName); + + Method method = cfgProc.getMethod("preprocessConfiguration", IgniteConfiguration.class); + + if (!Modifier.isStatic(method.getModifiers())) + throw new Exception("Non-static pre-processor method in pre-processor class: " + cfgProcClsName); + + method.invoke(null, cfg); + } + catch (Exception e) { + log.error("Failed to pre-process IgniteConfiguration using pre-processor class: " + cfgProcClsName); + + throw new IgniteException(e); + } + } + Ignite node = IgnitionEx.start(cfg, ctx); IgniteConfiguration nodeCfg = node.configuration(); log.info("Node started with the following configuration [id=" + node.cluster().localNode().id() + ", marshaller=" + nodeCfg.getMarshaller() + + ", discovery=" + nodeCfg.getDiscoverySpi() + ", binaryCfg=" + nodeCfg.getBinaryConfiguration() + ", lateAff=" + nodeCfg.isLateAffinityAssignment() + "]"); @@ -967,6 +995,26 @@ public abstract class GridAbstractTest extends TestCase { if (cfg == null) cfg = optimize(getConfiguration(igniteInstanceName)); + if (locNode != null) { + DiscoverySpi discoverySpi = locNode.configuration().getDiscoverySpi(); + + if (discoverySpi != null && !(discoverySpi instanceof TcpDiscoverySpi)) { + try { + // Clone added to support ZookeeperDiscoverySpi. + Method m = discoverySpi.getClass().getDeclaredMethod("cloneSpiConfiguration"); + + m.setAccessible(true); + + cfg.setDiscoverySpi((DiscoverySpi) m.invoke(discoverySpi)); + + resetDiscovery = false; + } + catch (NoSuchMethodException e) { + // Ignore. + } + } + } + return new IgniteProcessProxy(cfg, log, locNode, resetDiscovery); } @@ -1075,7 +1123,9 @@ public abstract class GridAbstractTest extends TestCase { for (Ignite g : srvs) stopGrid(g.name(), cancel, false); - assert G.allGrids().isEmpty(); + List<Ignite> nodes = G.allGrids(); + + assert nodes.isEmpty() : nodes; } finally { IgniteProcessProxy.killAll(); // In multi-JVM case. @@ -1177,6 +1227,14 @@ public abstract class GridAbstractTest extends TestCase { } /** + * @param nodeIdx Node index. + * @return Node ID. + */ + protected final UUID nodeId(int nodeIdx) { + return ignite(nodeIdx).cluster().localNode().id(); + } + + /** * Gets grid for given test. * * @return Grid for given test. @@ -1217,7 +1275,11 @@ public abstract class GridAbstractTest extends TestCase { * @throws Exception If failed. */ protected Ignite startGrid(String igniteInstanceName, String springCfgPath) throws Exception { - return startGrid(igniteInstanceName, loadConfiguration(springCfgPath)); + IgniteConfiguration cfg = loadConfiguration(springCfgPath); + + cfg.setGridLogger(getTestResources().getLogger()); + + return startGrid(igniteInstanceName, cfg); } /** @@ -2142,6 +2204,50 @@ public abstract class GridAbstractTest extends TestCase { } } } + /** + * @param expSize Expected nodes number. + * @throws Exception If failed. + */ + protected void waitForTopology(final int expSize) throws Exception { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + List<Ignite> nodes = G.allGrids(); + + if (nodes.size() != expSize) { + info("Wait all nodes [size=" + nodes.size() + ", exp=" + expSize + ']'); + + return false; + } + + for (Ignite node: nodes) { + try { + IgniteFuture<?> reconnectFut = node.cluster().clientReconnectFuture(); + + if (reconnectFut != null && !reconnectFut.isDone()) { + info("Wait for size on node, reconnect is in progress [node=" + node.name() + ']'); + + return false; + } + + int sizeOnNode = node.cluster().nodes().size(); + + if (sizeOnNode != expSize) { + info("Wait for size on node [node=" + node.name() + ", size=" + sizeOnNode + ", exp=" + expSize + ']'); + + return false; + } + } + catch (IgniteClientDisconnectedException e) { + info("Wait for size on node, node disconnected [node=" + node.name() + ']'); + + return false; + } + } + + return true; + } + }, 30_000)); + } /** * @param millis Time to sleep. @@ -2172,6 +2278,17 @@ public abstract class GridAbstractTest extends TestCase { } /** + * @return {@code True} if nodes use {@link TcpDiscoverySpi}. + */ + protected static boolean tcpDiscovery() { + List<Ignite> nodes = G.allGrids(); + + assertFalse("There are no nodes", nodes.isEmpty()); + + return nodes.get(0).configuration().getDiscoverySpi() instanceof TcpDiscoverySpi; + } + + /** * */ private static interface WriteReplaceOwner { http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java index d7be576..2b3a19c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java @@ -162,6 +162,8 @@ public class IgniteNodeRunner { cfg.setDiscoverySpi(disco); } + X.println("Configured discovery: " + cfg.getDiscoverySpi().getClass().getName()); + return cfg; } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java index 55fab8d..14eb296 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java @@ -19,6 +19,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.ClusterNodeMetricsSelfTest; +import org.apache.ignite.internal.ClusterNodeMetricsUpdateTest; import org.apache.ignite.internal.GridAffinityNoCacheSelfTest; import org.apache.ignite.internal.GridAffinitySelfTest; import org.apache.ignite.internal.GridAlwaysFailoverSpiFailSelfTest; @@ -122,6 +123,7 @@ public class IgniteComputeGridTestSuite { suite.addTestSuite(GridAlwaysFailoverSpiFailSelfTest.class); suite.addTestSuite(GridTaskInstanceExecutionSelfTest.class); suite.addTestSuite(ClusterNodeMetricsSelfTest.class); + suite.addTestSuite(ClusterNodeMetricsUpdateTest.class); suite.addTestSuite(GridNonHistoryMetricsSelfTest.class); suite.addTestSuite(GridCancelledJobsMetricsSelfTest.class); suite.addTestSuite(GridCollisionJobsContextSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java index b9ef1e4..e26b211 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java @@ -117,7 +117,7 @@ public class IgniteCacheDistributedQueryCancelSelfTest extends GridCommonAbstrac } for (Ignite g : G.allGrids()) - if (!g.configuration().getDiscoverySpi().isClientMode()) + if (!g.configuration().isClientMode()) stopGrid(g.name(), true); } }, 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java index 97720d5..bd3b093 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java @@ -89,11 +89,14 @@ public abstract class DynamicIndexAbstractBasicSelfTest extends DynamicIndexAbst * @param mode Mode. * @param atomicityMode Atomicity mode. * @param near Near flag. + * @throws Exception If failed. */ private void initialize(CacheMode mode, CacheAtomicityMode atomicityMode, boolean near) - throws IgniteCheckedException { + throws Exception { createSqlCache(node(), cacheConfiguration(mode, atomicityMode, near)); + awaitPartitionMapExchange(); + grid(IDX_CLI_NEAR_ONLY).getOrCreateNearCache(CACHE_NAME, new NearCacheConfiguration<>()); assertNoIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java index a181068..5cad167 100644 --- a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java +++ b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java @@ -44,13 +44,7 @@ public class GridJtaTransactionManagerSelfTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName). setCacheConfiguration(defaultCacheConfiguration().setCacheMode(PARTITIONED)); - cfg.getTransactionConfiguration().setTxManagerFactory(new Factory<TransactionManager>() { - private static final long serialVersionUID = 0L; - - @Override public TransactionManager create() { - return jotm.getTransactionManager(); - } - }); + cfg.getTransactionConfiguration().setTxManagerFactory(new TestTxManagerFactory()); return cfg; } @@ -205,4 +199,17 @@ public class GridJtaTransactionManagerSelfTest extends GridCommonAbstractTest { cache.removeAll(); } } + + /** + * + */ + static class TestTxManagerFactory implements Factory<TransactionManager> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public TransactionManager create() { + return jotm.getTransactionManager(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java index f6fd5c7..14b7fae 100644 --- a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java +++ b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java @@ -30,12 +30,19 @@ public class GridPartitionedCacheJtaFactorySelfTest extends AbstractCacheJtaSelf @Override protected void configureJta(IgniteConfiguration cfg) { TransactionConfiguration txCfg = cfg.getTransactionConfiguration(); - txCfg.setTxManagerFactory(new Factory<TransactionManager>() { - private static final long serialVersionUID = 0L; + txCfg.setTxManagerFactory(new TestTxManagerFactory()); + } + + /** + * + */ + static class TestTxManagerFactory implements Factory<TransactionManager> { + /** */ + private static final long serialVersionUID = 0L; - @Override public TransactionManager create() { - return jotm.getTransactionManager(); - } - }); + /** {@inheritDoc} */ + @Override public TransactionManager create() { + return jotm.getTransactionManager(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index fce47a6..d87ea0a 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.query.QueryCursorEx import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata import org.apache.ignite.lang.IgniteUuid import org.apache.ignite.spark.impl._ +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode import org.apache.spark._ import org.apache.spark.rdd.RDD @@ -91,8 +92,14 @@ class IgniteRDD[K, V] ( override protected[spark] def getPreferredLocations(split: Partition): Seq[String] = { ensureCache() - ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index) + if (ic.ignite().configuration().getDiscoverySpi().isInstanceOf[TcpDiscoverySpi]) { + ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index) .map(_.asInstanceOf[TcpDiscoveryNode].socketAddresses()).flatten.map(_.getHostName).toList + } + else { + ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index) + .flatten(_.hostNames).toSeq + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java b/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java index 75128fc..b453858 100644 --- a/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java @@ -998,7 +998,8 @@ public class GridFactorySelfTest extends GridCommonAbstractTest { startGrid("1", c); - assert ((TcpDiscoverySpi)c.getDiscoverySpi()).started(); + if (tcpDiscovery()) + assert ((TcpDiscoverySpi)c.getDiscoverySpi()).started(); try { startGrid("2", c); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java index b861e19..46da3cc 100644 --- a/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java @@ -44,6 +44,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testsuites.IgniteIgnore; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED; @@ -255,12 +256,12 @@ public class GridP2PUserVersionChangeSelfTest extends GridCommonAbstractTest { ignite2.events().localListen(new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { - if (evt.type() == EVT_NODE_LEFT) + if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) discoLatch.countDown(); return true; } - }, EVT_NODE_LEFT); + }, EVT_NODE_LEFT, EVT_NODE_FAILED); Integer res1 = (Integer)ignite1.compute().execute(task1, ignite2.cluster().localNode().id()); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/yardstick/pom-standalone.xml ---------------------------------------------------------------------- diff --git a/modules/yardstick/pom-standalone.xml b/modules/yardstick/pom-standalone.xml index 577a95e..6905d94 100644 --- a/modules/yardstick/pom-standalone.xml +++ b/modules/yardstick/pom-standalone.xml @@ -54,6 +54,12 @@ <dependency> <groupId>org.apache.ignite</groupId> + <artifactId>ignite-zookeeper</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> <artifactId>ignite-log4j</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/yardstick/pom.xml ---------------------------------------------------------------------- diff --git a/modules/yardstick/pom.xml b/modules/yardstick/pom.xml index 8cad24b..9923bb7 100644 --- a/modules/yardstick/pom.xml +++ b/modules/yardstick/pom.xml @@ -55,6 +55,12 @@ <dependency> <groupId>org.apache.ignite</groupId> + <artifactId>ignite-zookeeper</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> <artifactId>ignite-log4j</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/pom.xml ---------------------------------------------------------------------- diff --git a/modules/zookeeper/pom.xml b/modules/zookeeper/pom.xml index c3c3679..2d47ece 100644 --- a/modules/zookeeper/pom.xml +++ b/modules/zookeeper/pom.xml @@ -49,6 +49,12 @@ <dependency> <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>${curator.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>${curator.version}</version> </dependency> @@ -109,6 +115,13 @@ <dependency> <groupId>org.apache.ignite</groupId> + <artifactId>ignite-indexing</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> <artifactId>ignite-log4j</artifactId> <version>${project.version}</version> <scope>test</scope> @@ -122,16 +135,43 @@ </dependency> <dependency> + <groupId>com.thoughtworks.xstream</groupId> + <artifactId>xstream</artifactId> + <version>1.4.8</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-core</artifactId> <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-indexing</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <!-- Generate the OSGi MANIFEST.MF for this bundle. --> <plugin> <groupId>org.apache.felix</groupId> http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java new file mode 100644 index 0000000..860c71c --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -0,0 +1,557 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk; + +import java.io.IOException; +import java.io.Serializable; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.curator.utils.PathUtils; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.spi.IgniteSpiAdapter; +import org.apache.ignite.spi.IgniteSpiConfiguration; +import org.apache.ignite.spi.IgniteSpiContext; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; +import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; +import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; +import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; +import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport; +import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; +import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; +import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode; +import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT; +import static org.apache.ignite.IgniteSystemProperties.getBoolean; + +/** + * Zookeeper Discovery Spi. + */ +@IgniteSpiMultipleInstancesSupport(true) +@DiscoverySpiOrderSupport(true) +@DiscoverySpiHistorySupport(true) +@DiscoverySpiMutableCustomMessageSupport(false) +public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, IgniteDiscoverySpi { + /** */ + public static final String DFLT_ROOT_PATH = "/apacheIgnite"; + + /** */ + public static final long DFLT_JOIN_TIMEOUT = 0; + + /** */ + @GridToStringInclude + private String zkRootPath = DFLT_ROOT_PATH; + + /** */ + @GridToStringInclude + private String zkConnectionString; + + /** */ + private long joinTimeout = DFLT_JOIN_TIMEOUT; + + /** */ + @GridToStringInclude + private long sesTimeout; + + /** */ + private boolean clientReconnectDisabled; + + /** */ + @GridToStringExclude + private DiscoverySpiListener lsnr; + + /** */ + @GridToStringExclude + private DiscoverySpiDataExchange exchange; + + /** */ + @GridToStringExclude + private DiscoverySpiNodeAuthenticator nodeAuth; + + /** */ + @GridToStringExclude + private DiscoveryMetricsProvider metricsProvider; + + /** */ + @GridToStringExclude + private ZookeeperDiscoveryImpl impl; + + /** */ + @GridToStringExclude + private Map<String, Object> locNodeAttrs; + + /** */ + @GridToStringExclude + private IgniteProductVersion locNodeVer; + + /** */ + @GridToStringExclude + private Serializable consistentId; + + /** Local node addresses. */ + private IgniteBiTuple<Collection<String>, Collection<String>> addrs; + + /** */ + @LoggerResource + @GridToStringExclude + private IgniteLogger log; + + /** */ + private IgniteDiscoverySpiInternalListener internalLsnr; + + /** + * @return Base path in ZK for znodes created by SPI. + */ + public String getZkRootPath() { + return zkRootPath; + } + + /** + * @param zkRootPath Base path in ZooKeeper for znodes created by SPI. + * @return {@code this} for chaining. + */ + @IgniteSpiConfiguration(optional = true) + public ZookeeperDiscoverySpi setZkRootPath(String zkRootPath) { + this.zkRootPath = zkRootPath; + + return this; + } + + /** + * @return ZooKeeper session timeout. + */ + public long getSessionTimeout() { + return sesTimeout; + } + + /** + * @param sesTimeout ZooKeeper session timeout. + * @return {@code this} for chaining. + */ + @IgniteSpiConfiguration(optional = true) + public ZookeeperDiscoverySpi setSessionTimeout(long sesTimeout) { + this.sesTimeout = sesTimeout; + + return this; + } + + /** + * @return Cluster join timeout. + */ + public long getJoinTimeout() { + return joinTimeout; + } + + /** + * @param joinTimeout Cluster join timeout ({@code 0} means wait forever). + * @return {@code this} for chaining. + */ + @IgniteSpiConfiguration(optional = true) + public ZookeeperDiscoverySpi setJoinTimeout(long joinTimeout) { + this.joinTimeout = joinTimeout; + + return this; + } + + /** + * @return ZooKeeper connection string + */ + public String getZkConnectionString() { + return zkConnectionString; + } + + /** + * @param zkConnectionString ZooKeeper connection string + * @return {@code this} for chaining. + */ + @IgniteSpiConfiguration(optional = false) + public ZookeeperDiscoverySpi setZkConnectionString(String zkConnectionString) { + this.zkConnectionString = zkConnectionString; + + return this; + } + + /** + * If {@code true} client does not try to reconnect. + * + * @return Client reconnect disabled flag. + */ + public boolean isClientReconnectDisabled() { + return clientReconnectDisabled; + } + + /** + * Sets client reconnect disabled flag. + * + * @param clientReconnectDisabled Client reconnect disabled flag. + * @return {@code this} for chaining. + */ + @IgniteSpiConfiguration(optional = true) + public ZookeeperDiscoverySpi setClientReconnectDisabled(boolean clientReconnectDisabled) { + this.clientReconnectDisabled = clientReconnectDisabled; + + return this; + } + + /** {@inheritDoc} */ + @Override public boolean clientReconnectSupported() { + return !clientReconnectDisabled; + } + + /** {@inheritDoc} */ + @Override public void clientReconnect() { + impl.reconnect(); + } + + /** {@inheritDoc} */ + @Override public boolean knownNode(UUID nodeId) { + return impl.knownNode(nodeId); + } + + /** {@inheritDoc} */ + @Override public boolean supportsCommunicationFailureResolve() { + return true; + } + + /** {@inheritDoc} */ + @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) { + impl.resolveCommunicationError(node, err); + } + + /** {@inheritDoc} */ + @Nullable @Override public Serializable consistentId() throws IgniteSpiException { + if (consistentId == null) { + consistentId = ignite.configuration().getConsistentId(); + + if (consistentId == null) { + initAddresses(); + + final List<String> sortedAddrs = new ArrayList<>(addrs.get1()); + + Collections.sort(sortedAddrs); + + if (getBoolean(IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT)) + consistentId = U.consistentId(sortedAddrs); + else { + Integer commPort = null; + + if (locNodeAttrs != null) { + commPort = (Integer)locNodeAttrs.get( + TcpCommunicationSpi.class.getSimpleName() + "." + TcpCommunicationSpi.ATTR_PORT); + } + else { + CommunicationSpi commSpi = ignite.configuration().getCommunicationSpi(); + + if (commSpi instanceof TcpCommunicationSpi) { + commPort = ((TcpCommunicationSpi)commSpi).boundPort(); + + if (commPort == -1) + commPort = null; + } + } + + if (commPort == null) { + U.warn(log, "Can not initialize default consistentId, TcpCommunicationSpi port is not initialized."); + + consistentId = ignite.configuration().getNodeId(); + } + else + consistentId = U.consistentId(sortedAddrs, commPort); + } + } + } + + return consistentId; + } + + /** + * + */ + private void initAddresses() { + if (addrs == null) { + String locHost = ignite != null ? ignite.configuration().getLocalHost() : null; + + InetAddress locAddr; + + try { + locAddr = U.resolveLocalHost(locHost); + } + catch (IOException e) { + throw new IgniteSpiException("Unknown local address: " + locHost, e); + } + + try { + addrs = U.resolveLocalAddresses(locAddr); + } + catch (Exception e) { + throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, + e); + } + } + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> getRemoteNodes() { + return impl.remoteNodes(); + } + + /** {@inheritDoc} */ + @Override public ClusterNode getLocalNode() { + return impl != null ? impl.localNode() : null; + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode getNode(UUID nodeId) { + return impl.node(nodeId); + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + return impl.pingNode(nodeId); + } + + /** {@inheritDoc} */ + @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { + assert locNodeAttrs == null; + assert locNodeVer == null; + + if (log.isDebugEnabled()) { + log.debug("Node attributes to set: " + attrs); + log.debug("Node version to set: " + ver); + } + + locNodeAttrs = attrs; + locNodeVer = ver; + } + + /** {@inheritDoc} */ + @Override public void setListener(@Nullable DiscoverySpiListener lsnr) { + this.lsnr = lsnr; + } + + /** {@inheritDoc} */ + @Override public void setDataExchange(DiscoverySpiDataExchange exchange) { + this.exchange = exchange; + } + + /** {@inheritDoc} */ + @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) { + this.metricsProvider = metricsProvider; + } + + /** {@inheritDoc} */ + @Override public void disconnect() throws IgniteSpiException { + impl.stop(); + } + + /** {@inheritDoc} */ + @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) { + this.nodeAuth = auth; + } + + /** + * @return Authenticator. + */ + public DiscoverySpiNodeAuthenticator getAuthenticator() { + return nodeAuth; + } + + /** {@inheritDoc} */ + @Override public long getGridStartTime() { + return impl.gridStartTime(); + } + + /** {@inheritDoc} */ + @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) { + IgniteDiscoverySpiInternalListener internalLsnr = impl.internalLsnr; + + if (internalLsnr != null) { + if (!internalLsnr.beforeSendCustomEvent(this, log, msg)) + return; + } + + impl.sendCustomMessage(msg); + } + + /** {@inheritDoc} */ + @Override public void failNode(UUID nodeId, @Nullable String warning) { + impl.failNode(nodeId, warning); + } + + /** {@inheritDoc} */ + @Override public boolean isClientMode() throws IllegalStateException { + return impl.localNode().isClient(); + } + + /** {@inheritDoc} */ + @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { + super.onContextInitialized0(spiCtx); + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException { + if (sesTimeout == 0) + sesTimeout = ignite.configuration().getFailureDetectionTimeout().intValue(); + + assertParameter(sesTimeout > 0, "sessionTimeout > 0"); + + A.notNullOrEmpty(zkConnectionString, "zkConnectionString can not be empty"); + + A.notNullOrEmpty(zkRootPath, "zkRootPath can not be empty"); + + zkRootPath = zkRootPath.trim(); + + if (zkRootPath.endsWith("/")) + zkRootPath = zkRootPath.substring(0, zkRootPath.length() - 1); + + try { + PathUtils.validatePath(zkRootPath); + } + catch (IllegalArgumentException e) { + throw new IgniteSpiException("zkRootPath is invalid: " + zkRootPath, e); + } + + ZookeeperClusterNode locNode = initLocalNode(); + + if (log.isInfoEnabled()) { + log.info("Start Zookeeper discovery [zkConnectionString=" + zkConnectionString + + ", sessionTimeout=" + sesTimeout + + ", zkRootPath=" + zkRootPath + ']'); + } + + impl = new ZookeeperDiscoveryImpl( + this, + igniteInstanceName, + log, + zkRootPath, + locNode, + lsnr, + exchange, + internalLsnr); + + try { + impl.startJoinAndWait(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteSpiException("Failed to join cluster, thread was interrupted", e); + } + } + + /** {@inheritDoc} */ + @Override public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr) { + if (impl != null) + impl.internalLsnr = lsnr; + else + internalLsnr = lsnr; + } + + /** {@inheritDoc} */ + @Override public void simulateNodeFailure() { + impl.simulateNodeFailure(); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + if (impl != null) + impl.stop(); + } + + /** + * @return Local node instance. + */ + private ZookeeperClusterNode initLocalNode() { + assert ignite != null; + + initAddresses(); + + ZookeeperClusterNode locNode = new ZookeeperClusterNode( + ignite.configuration().getNodeId(), + addrs.get1(), + addrs.get2(), + locNodeVer, + locNodeAttrs, + consistentId(), + sesTimeout, + ignite.configuration().isClientMode(), + metricsProvider); + + locNode.local(true); + + DiscoverySpiListener lsnr = this.lsnr; + + if (lsnr != null) + lsnr.onLocalNodeInitialized(locNode); + + if (log.isDebugEnabled()) + log.debug("Local node initialized: " + locNode); + + if (metricsProvider != null) { + locNode.setMetrics(metricsProvider.metrics()); + locNode.setCacheMetrics(metricsProvider.cacheMetrics()); + } + + return locNode; + } + + /** + * Used in tests (called via reflection). + * + * @return Copy of SPI. + */ + private ZookeeperDiscoverySpi cloneSpiConfiguration() { + ZookeeperDiscoverySpi spi = new ZookeeperDiscoverySpi(); + + spi.setZkRootPath(zkRootPath); + spi.setZkConnectionString(zkConnectionString); + spi.setSessionTimeout(sesTimeout); + spi.setJoinTimeout(joinTimeout); + spi.setClientReconnectDisabled(clientReconnectDisabled); + + return spi; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZookeeperDiscoverySpi.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java new file mode 100644 index 0000000..b80a9dd --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.ignite.internal.util.GridSpinBusyLock; + +/** + * + */ +abstract class ZkAbstractCallabck { + /** */ + final ZkRuntimeState rtState; + + /** */ + private final ZookeeperDiscoveryImpl impl; + + /** */ + private final GridSpinBusyLock busyLock; + + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + ZkAbstractCallabck(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + this.rtState = rtState; + this.impl = impl; + + busyLock = impl.busyLock; + } + + /** + * @return {@code True} if is able to start processing. + */ + final boolean onProcessStart() { + boolean start = rtState.errForClose == null && busyLock.enterBusy(); + + if (!start) { + assert rtState.errForClose != null; + + onStartFailed(); + + return false; + } + + return true; + } + + /** + * + */ + void onStartFailed() { + // No-op. + } + + /** + * + */ + final void onProcessEnd() { + busyLock.leaveBusy(); + } + + /** + * @param e Error. + */ + final void onProcessError(Throwable e) { + impl.onFatalError(busyLock, e); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java new file mode 100644 index 0000000..2292e35 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.List; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.data.Stat; + +/** + * + */ +abstract class ZkAbstractChildrenCallback extends ZkAbstractCallabck implements AsyncCallback.Children2Callback { + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + ZkAbstractChildrenCallback(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + super(rtState, impl); + } + + /** {@inheritDoc} */ + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (!onProcessStart()) + return; + + try { + processResult0(rc, path, ctx, children, stat); + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + + /** + * @param rc + * @param path + * @param ctx + * @param children + * @param stat + * @throws Exception If failed. + */ + abstract void processResult0(int rc, String path, Object ctx, List<String> children, Stat stat) + throws Exception; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java new file mode 100644 index 0000000..9098d05 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +/** + * + */ +abstract class ZkAbstractWatcher extends ZkAbstractCallabck implements Watcher { + /** + * @param rtState Runtime state. + * @param impl Discovery impl. + */ + ZkAbstractWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + super(rtState, impl); + } + + /** {@inheritDoc} */ + @Override public final void process(WatchedEvent evt) { + if (!onProcessStart()) + return; + + try { + process0(evt); + + onProcessEnd(); + } + catch (Throwable e) { + onProcessError(e); + } + } + + /** + * @param evt Event. + * @throws Exception If failed. + */ + protected abstract void process0(WatchedEvent evt) throws Exception; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java new file mode 100644 index 0000000..d824377 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Zk Alive Node Data. + */ +public class ZkAliveNodeData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + long lastProcEvt = -1; + + /** */ + transient boolean needUpdate; + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkAliveNodeData.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java new file mode 100644 index 0000000..a186aed --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.ignite.internal.util.typedef.T2; + +/** + * + */ +class ZkBulkJoinContext { + /** */ + List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes; + + /** + * @param nodeEvtData Node event data. + * @param discoData Discovery data for node. + */ + void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map<Integer, Serializable> discoData) { + if (nodes == null) + nodes = new ArrayList<>(); + + nodes.add(new T2<>(nodeEvtData, discoData)); + } + + /** + * @return Number of joined nodes. + */ + int nodes() { + return nodes != null ? nodes.size() : 0; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java new file mode 100644 index 0000000..7e2ea7b --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.ignite.cluster.ClusterNode; + +/** + * Zk Cluster Nodes. + */ +public class ZkClusterNodes { + /** */ + final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByOrder = new ConcurrentSkipListMap<>(); + + /** */ + final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByInternalId = new ConcurrentSkipListMap<>(); + + /** */ + final ConcurrentHashMap<UUID, ZookeeperClusterNode> nodesById = new ConcurrentHashMap<>(); + + /** + * @return Remote nodes. + */ + public Collection<ClusterNode> remoteNodes() { + List<ClusterNode> nodes = new ArrayList<>(); + + for (ClusterNode node : nodesById.values()) { + if (!node.isLocal()) + nodes.add(node); + } + + return nodes; + } + + /** + * @return Current nodes in topology. + */ + @SuppressWarnings("unchecked") + List<ClusterNode> topologySnapshot() { + return new ArrayList<>((Collection)nodesByOrder.values()); + } + + /** + * @param node New node. + */ + void addNode(ZookeeperClusterNode node) { + assert node.id() != null : node; + assert node.order() > 0 : node; + + ZookeeperClusterNode old = nodesById.put(node.id(), node); + + assert old == null : old; + + old = nodesByOrder.put(node.order(), node); + + assert old == null : old; + + old = nodesByInternalId.put(node.internalId(), node); + + assert old == null : old; + } + + /** + * @param internalId Node internal ID. + * @return Removed node. + */ + ZookeeperClusterNode removeNode(long internalId) { + ZookeeperClusterNode node = nodesByInternalId.remove(internalId); + + assert node != null : internalId; + assert node.order() > 0 : node; + + Object rvmd = nodesByOrder.remove(node.order()); + + assert rvmd != null; + + rvmd = nodesById.remove(node.id()); + + assert rvmd != null; + + return node; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java new file mode 100644 index 0000000..9c21f13 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.BitSet; + +/** + * + */ +class ZkCommunicationErrorNodeState implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final BitSet commState; + + /** */ + final Exception err; + + /** + * @param commState Communication state. + * @param err Error if failed get communication state.. + */ + ZkCommunicationErrorNodeState(BitSet commState, Exception err) { + assert commState != null || err != null; + + this.commState = commState; + this.err = err; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java new file mode 100644 index 0000000..accda6e --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.BitSet; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.jboss.netty.util.internal.ConcurrentHashMap; +import org.jetbrains.annotations.Nullable; + +/** + * Future is created on each node when either connection error occurs or resolve communication error request + * received. + */ +class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implements IgniteSpiTimeoutObject, Runnable { + /** */ + private final ZookeeperDiscoveryImpl impl; + + /** */ + private final IgniteLogger log; + + /** */ + private final Map<Long, GridFutureAdapter<Boolean>> nodeFuts = new ConcurrentHashMap<>(); + + /** */ + private final long endTime; + + /** */ + private final IgniteUuid id; + + /** */ + private State state; + + /** */ + private long resolveTopVer; + + /** */ + private Set<Long> resFailedNodes; + + /** */ + private Exception resErr; + + /** */ + private ZkDistributedCollectDataFuture collectResFut; + + /** + * @param impl Discovery impl. + * @param timeout Timeout to wait before initiating resolve process. + * @return Future. + */ + static ZkCommunicationErrorProcessFuture createOnCommunicationError(ZookeeperDiscoveryImpl impl, long timeout) { + return new ZkCommunicationErrorProcessFuture(impl, State.WAIT_TIMEOUT, timeout); + } + + /** + * @param impl Discovery impl. + * @return Future. + */ + static ZkCommunicationErrorProcessFuture createOnStartResolveRequest(ZookeeperDiscoveryImpl impl) { + return new ZkCommunicationErrorProcessFuture(impl, State.RESOLVE_STARTED, 0); + } + + /** + * @param impl Discovery implementation. + * @param state Initial state. + * @param timeout Wait timeout before initiating communication errors resolve. + */ + private ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, State state, long timeout) { + assert state != State.DONE; + + this.impl = impl; + this.log = impl.log(); + + if (state == State.WAIT_TIMEOUT) { + assert timeout > 0 : timeout; + + id = IgniteUuid.fromUuid(impl.localNode().id()); + endTime = System.currentTimeMillis() + timeout; + } + else { + id = null; + endTime = 0; + } + + this.state = state; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteLogger logger() { + return log; + } + + /** + * @param collectResFut Collect nodes' communication status future. + */ + void nodeResultCollectFuture(ZkDistributedCollectDataFuture collectResFut) { + assert this.collectResFut == null : collectResFut; + + this.collectResFut = collectResFut; + } + + /** + * @param top Topology. + * @throws Exception If failed. + */ + void onTopologyChange(ZkClusterNodes top) throws Exception { + for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : nodeFuts.entrySet()) { + if (!top.nodesByOrder.containsKey(e.getKey())) + e.getValue().onDone(false); + } + + if (collectResFut != null) + collectResFut.onTopologyChange(top); + } + + /** + * @param rtState Runtime state. + * @param futPath Future path. + * @param nodes Nodes to ping. + */ + void checkConnection(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) { + final TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi(); + + IgniteFuture<BitSet> fut = spi.checkConnection(nodes); + + fut.listen(new IgniteInClosure<IgniteFuture<BitSet>>() { + @Override public void apply(final IgniteFuture<BitSet> fut) { + // Future completed either from NIO thread or timeout worker, save result from another thread. + impl.runInWorkerThread(new ZkRunnable(rtState, impl) { + @Override public void run0() throws Exception { + BitSet commState = null; + Exception err = null; + + try { + commState = fut.get(); + } + catch (Exception e) { + err = e; + } + + ZkCommunicationErrorNodeState state = new ZkCommunicationErrorNodeState(commState, err); + + ZkDistributedCollectDataFuture.saveNodeResult(futPath, + rtState.zkClient, + impl.localNode().order(), + impl.marshalZip(state)); + } + + @Override void onStartFailed() { + onError(rtState.errForClose); + } + }); + + } + }); + } + + /** + * + */ + void scheduleCheckOnTimeout() { + synchronized (this) { + if (state == State.WAIT_TIMEOUT) + impl.spi.getSpiContext().addTimeoutObject(this); + } + } + + /** + * @param topVer Topology version. + * @return {@code False} if future was already completed and need create another future instance. + */ + boolean onStartResolveRequest(long topVer) { + synchronized (this) { + if (state == State.DONE) + return false; + + if (state == State.WAIT_TIMEOUT) + impl.spi.getSpiContext().removeTimeoutObject(this); + + assert resolveTopVer == 0 : resolveTopVer; + + resolveTopVer = topVer; + + state = State.RESOLVE_STARTED; + } + + return true; + } + + /** + * @param err Error. + */ + void onError(Exception err) { + assert err != null; + + Map<Long, GridFutureAdapter<Boolean>> futs; + + synchronized (this) { + if (state == State.DONE) { + assert resErr != null; + + return; + } + + state = State.DONE; + + resErr = err; + + futs = nodeFuts; // nodeFuts should not be modified after state changed to DONE. + } + + for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet()) + e.getValue().onDone(err); + + onDone(err); + } + + /** + * @param failedNodes Node failed as result of resolve process. + */ + void onFinishResolve(Set<Long> failedNodes) { + Map<Long, GridFutureAdapter<Boolean>> futs; + + synchronized (this) { + if (state == State.DONE) { + assert resErr != null; + + return; + } + + assert state == State.RESOLVE_STARTED : state; + + state = State.DONE; + + resFailedNodes = failedNodes; + + futs = nodeFuts; // nodeFuts should not be modified after state changed to DONE. + } + + for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet()) { + Boolean res = !F.contains(resFailedNodes, e.getKey()); + + e.getValue().onDone(res); + } + + onDone(); + } + + /** + * @param node Node. + * @return Future finished when communication error resolve is done or {@code null} if another + * resolve process should be started. + */ + @Nullable IgniteInternalFuture<Boolean> nodeStatusFuture(ClusterNode node) { + GridFutureAdapter<Boolean> fut; + + synchronized (this) { + if (state == State.DONE) { + if (resolveTopVer != 0 && node.order() <= resolveTopVer) { + Boolean res = !F.contains(resFailedNodes, node.order()); + + return new GridFinishedFuture<>(res); + } + else + return null; + } + + fut = nodeFuts.get(node.order()); + + if (fut == null) + nodeFuts.put(node.order(), fut = new GridFutureAdapter<>()); + } + + if (impl.node(node.order()) == null) + fut.onDone(false); + + return fut; + } + + /** {@inheritDoc} */ + @Override public void run() { + // Run from zk discovery worker pool after timeout. + if (needProcessTimeout()) { + try { + UUID reqId = UUID.randomUUID(); + + if (log.isInfoEnabled()) { + log.info("Initiate cluster-wide communication error resolve process [reqId=" + reqId + + ", errNodes=" + nodeFuts.size() + ']'); + } + + impl.sendCustomMessage(new ZkCommunicationErrorResolveStartMessage(reqId)); + } + catch (Exception e) { + Collection<GridFutureAdapter<Boolean>> futs; + + synchronized (this) { + if (state != State.WAIT_TIMEOUT) + return; + + state = State.DONE; + resErr = e; + + futs = nodeFuts.values(); // nodeFuts should not be modified after state changed to DONE. + } + + for (GridFutureAdapter<Boolean> fut : futs) + fut.onDone(e); + + onDone(e); + } + } + } + + /** + * @return {@code True} if need initiate resolve process after timeout expired. + */ + private boolean needProcessTimeout() { + synchronized (this) { + if (state != State.WAIT_TIMEOUT) + return false; + + for (GridFutureAdapter<Boolean> fut : nodeFuts.values()) { + if (!fut.isDone()) + return true; + } + + state = State.DONE; + } + + onDone(null, null); + + return false; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (needProcessTimeout()) + impl.runInWorkerThread(this); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + impl.clearCommunicationErrorProcessFuture(this); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkCommunicationErrorProcessFuture.class, this); + } + + /** + * + */ + enum State { + /** */ + DONE, + + /** */ + WAIT_TIMEOUT, + + /** */ + RESOLVE_STARTED + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java new file mode 100644 index 0000000..9b7476c --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final UUID futId; + + /** */ + final long topVer; + + /** */ + transient ZkCommunicationErrorResolveResult res; + + /** + * @param futId Future ID. + * @param topVer Topology version when resolve process finished. + */ + ZkCommunicationErrorResolveFinishMessage(UUID futId, long topVer) { + this.futId = futId; + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ZkCommunicationErrorResolveFinishMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java new file mode 100644 index 0000000..23495aa --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import org.apache.ignite.internal.util.GridLongList; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class ZkCommunicationErrorResolveResult implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final GridLongList killedNodes; + + /** */ + final Exception err; + + /** + * @param killedNodes Killed nodes. + * @param err Error. + */ + ZkCommunicationErrorResolveResult(@Nullable GridLongList killedNodes, Exception err) { + this.killedNodes = killedNodes; + this.err = err; + } +}