Repository: ignite Updated Branches: refs/heads/ignite-zk 26d12c7c5 -> 854412230
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/85441223 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/85441223 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/85441223 Branch: refs/heads/ignite-zk Commit: 854412230dd8ca17c33e3b2d51c3f55b930c0c41 Parents: 26d12c7 Author: sboikov <[email protected]> Authored: Thu Dec 21 16:16:40 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Dec 21 16:36:12 2017 +0300 ---------------------------------------------------------------------- .../IgniteDiscoverySpiInternalListener.java | 4 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 11 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 25 +- .../internal/DiscoverySpiTestListener.java | 3 +- .../ZookeeperDiscoverySpiBasicTest.java | 265 ++++++++++++++++++- 6 files changed, 294 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/85441223/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java index 1983ad3..24405f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.managers.discovery; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; @@ -26,9 +27,10 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; */ public interface IgniteDiscoverySpiInternalListener { /** + * @param locNode Local node. * @param log Log. */ - public void beforeJoin(IgniteLogger log); + public void beforeJoin(ClusterNode locNode, IgniteLogger log); /** * @param spi SPI instance. http://git-wip-us.apache.org/repos/asf/ignite/blob/85441223/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index ad8eca0..9d7dce3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1530,7 +1530,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (internalLsnr != null && msg instanceof TcpDiscoveryJoinRequestMessage) - internalLsnr.beforeJoin(log); + internalLsnr.beforeJoin(locNode, log); assert sock != null; assert msg != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/85441223/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index a422967..aaf761f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -125,6 +125,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** */ private boolean clientReconnectDisabled; + /** */ + private IgniteDiscoverySpiInternalListener internalLsnr; + public String getZkRootPath() { return zkRootPath; } @@ -393,7 +396,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery zkRootPath, locNode, lsnr, - exchange); + exchange, + internalLsnr); try { impl.joinTopology(); @@ -407,7 +411,10 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr) { - impl.internalLsnr = lsnr; + if (impl != null) + impl.internalLsnr = lsnr; + else + internalLsnr = lsnr; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/85441223/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index d66ea2c..847b345 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; @@ -183,6 +184,7 @@ public class ZookeeperDiscoveryImpl { * @param locNode Local node instance. * @param lsnr Discovery events listener. * @param exchange Discovery data exchange. + * @param internalLsnr Internal listener (used for testing only). */ public ZookeeperDiscoveryImpl( ZookeeperDiscoverySpi spi, @@ -191,7 +193,8 @@ public class ZookeeperDiscoveryImpl { String zkRootPath, ZookeeperClusterNode locNode, DiscoverySpiListener lsnr, - DiscoverySpiDataExchange exchange) { + DiscoverySpiDataExchange exchange, + IgniteDiscoverySpiInternalListener internalLsnr) { assert locNode.id() != null && locNode.isLocal() : locNode; zkRootPath = zkRootPath.trim(); @@ -221,6 +224,9 @@ public class ZookeeperDiscoveryImpl { evtsAckThreshold = 1; this.evtsAckThreshold = evtsAckThreshold; + + if (internalLsnr != null) + this.internalLsnr = internalLsnr; } /** @@ -658,6 +664,11 @@ public class ZookeeperDiscoveryImpl { U.warn(log, "Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']'); } catch (Exception e) { + IgniteSpiException spiErr = X.cause(e, IgniteSpiException.class); + + if (spiErr != null) + throw spiErr; + throw new IgniteSpiException("Failed to join cluster", e); } } @@ -673,7 +684,7 @@ public class ZookeeperDiscoveryImpl { IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr; if (internalLsnr != null) - internalLsnr.beforeJoin(log); + internalLsnr.beforeJoin(locNode, log); if (locNode.isClient() && reconnect) locNode.setAttributes(spi.getSpiContext().nodeAttributes()); @@ -935,11 +946,11 @@ public class ZookeeperDiscoveryImpl { joinDataPath, rtState); - spi.getSpiContext().addTimeoutObject(rtState.joinTimeoutObj); - zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new CheckCoordinatorCallback(rtState)); zkClient.getDataAsync(zkPaths.evtsPath, rtState.watcher, rtState.watcher); + + spi.getSpiContext().addTimeoutObject(rtState.joinTimeoutObj); } catch (IgniteCheckedException | ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); @@ -976,8 +987,8 @@ public class ZookeeperDiscoveryImpl { attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2, U.marshal(marsh, subj)); locNode.setAttributes(attrs); - - } catch (IgniteException | IgniteCheckedException e) { + } + catch (Exception e) { throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e); } } @@ -1075,7 +1086,7 @@ public class ZookeeperDiscoveryImpl { /** {@inheritDoc} */ @Override public void onTimeout() { - if (rtState.joined) + if (rtState.errForClose != null || rtState.joined) return; synchronized (stateMux) { http://git-wip-us.apache.org/repos/asf/ignite/blob/85441223/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java index b79454c..44f975a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.internal.util.typedef.internal.U; @@ -69,7 +70,7 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe } /** {@inheritDoc} */ - @Override public void beforeJoin(IgniteLogger log) { + @Override public void beforeJoin(ClusterNode locNode, IgniteLogger log) { try { CountDownLatch writeLatch0 = joinLatch; http://git-wip-us.apache.org/repos/asf/ignite/blob/85441223/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index bb590ca..b17d70c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -63,15 +63,17 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.DiscoverySpiTestListener; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest; +import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; @@ -80,11 +82,17 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.logger.java.JavaLogger; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.security.SecurityCredentials; +import org.apache.ignite.plugin.security.SecurityPermission; +import org.apache.ignite.plugin.security.SecuritySubject; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -101,6 +109,9 @@ import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2; import static org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD; import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; @@ -158,10 +169,13 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { private IgniteOutClosure<CommunicationProblemResolver> commProblemRslvr; /** */ + private IgniteOutClosure<DiscoverySpiNodeAuthenticator> auth; + + /** */ private String zkRootPath; /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + @Override protected IgniteConfiguration getConfiguration(final String igniteInstanceName) throws Exception { if (testSockNio) System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, ZkTestClientCnxnSocketNIO.class.getName()); @@ -177,6 +191,27 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000); + // Set authenticator for basic sanity tests. + if (auth != null) { + zkSpi.setAuthenticator(auth.apply()); + + zkSpi.setInternalListener(new IgniteDiscoverySpiInternalListener() { + @Override public void beforeJoin(ClusterNode locNode, IgniteLogger log) { + ZookeeperClusterNode locNode0 = (ZookeeperClusterNode)locNode; + + Map<String, Object> attrs = new HashMap<>(locNode0.getAttributes()); + + attrs.put(ATTR_SECURITY_CREDENTIALS, new SecurityCredentials(null, null, igniteInstanceName)); + + locNode0.setAttributes(attrs); + } + + @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) { + return false; + } + }); + } + spis.put(igniteInstanceName, zkSpi); if (USE_TEST_CLUSTER) { @@ -475,11 +510,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { for (Ignite node : G.allGrids()) { ClusterNode locNode0 = node.cluster().localNode(); - assertEquals(locNode0.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME), + assertEquals(locNode0.attribute(ATTR_IGNITE_INSTANCE_NAME), locNode0.consistentId()); for (ClusterNode node0 : node.cluster().nodes()) { - assertEquals(node0.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME), + assertEquals(node0.attribute(ATTR_IGNITE_INSTANCE_NAME), node0.consistentId()); } } @@ -564,6 +599,138 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testLocalAuthenticationFails() throws Exception { + auth = ZkTestNodeAuthenticator.factory(getTestIgniteInstanceName(0)); + + Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(0); + + return null; + } + }, IgniteCheckedException.class, null); + + IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class); + + assertNotNull(spiErr); + assertTrue(spiErr.getMessage().contains("Authentication failed for local node")); + + startGrid(1); + startGrid(2); + + checkTestSecuritySubject(2); + } + + /** + * @throws Exception If failed. + */ + public void testAuthentication() throws Exception { + auth = ZkTestNodeAuthenticator.factory(getTestIgniteInstanceName(1), + getTestIgniteInstanceName(5)); + + startGrid(0); + + checkTestSecuritySubject(1); + + { + client = false; + checkStartFail(1); + + client = true; + checkStartFail(1); + + client = false; + } + + startGrid(2); + + checkTestSecuritySubject(2); + + stopGrid(2); + + checkTestSecuritySubject(1); + + startGrid(2); + + checkTestSecuritySubject(2); + + stopGrid(0); + + checkTestSecuritySubject(1); + + checkStartFail(1); + + client = false; + + startGrid(3); + + client = true; + + startGrid(4); + + client = false; + + startGrid(0); + + checkTestSecuritySubject(4); + + checkStartFail(1); + checkStartFail(5); + + client = true; + + checkStartFail(1); + checkStartFail(5); + } + + /** + * @param nodeIdx Node index. + */ + private void checkStartFail(final int nodeIdx) { + Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(nodeIdx); + + return null; + } + }, IgniteCheckedException.class, null); + + IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class); + + assertNotNull(spiErr); + assertTrue(spiErr.getMessage().contains("Authentication failed")); + } + + /** + * @throws Exception If failed. + */ + private void checkTestSecuritySubject(int expNodes) throws Exception { + waitForTopology(expNodes); + + List<Ignite> nodes = G.allGrids(); + + JdkMarshaller marsh = new JdkMarshaller(); + + for (Ignite ignite : nodes) { + Collection<ClusterNode> nodes0 = ignite.cluster().nodes(); + + assertEquals(nodes.size(), nodes0.size()); + + for (ClusterNode node : nodes0) { + byte[] secSubj = node.attribute(ATTR_SECURITY_SUBJECT_V2); + + assertNotNull(secSubj); + + ZkTestNodeAuthenticator.TestSecurityContext secCtx = marsh.unmarshal(secSubj, null); + + assertEquals(node.attribute(ATTR_IGNITE_INSTANCE_NAME), secCtx.nodeName); + } + } + } + + /** + * @throws Exception If failed. + */ public void testStopNode_1() throws Exception { startGrids(5); @@ -3001,6 +3168,96 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * */ + static class ZkTestNodeAuthenticator implements DiscoverySpiNodeAuthenticator { + /** + * @param failAuthNodes Node names which should not pass authentication. + * @return Factory. + */ + static IgniteOutClosure<DiscoverySpiNodeAuthenticator> factory(final String...failAuthNodes) { + return new IgniteOutClosure<DiscoverySpiNodeAuthenticator>() { + @Override public DiscoverySpiNodeAuthenticator apply() { + return new ZkTestNodeAuthenticator(Arrays.asList(failAuthNodes)); + } + }; + } + + /** */ + private final Collection<String> failAuthNodes; + + /** + * @param failAuthNodes Node names which should not pass authentication. + */ + ZkTestNodeAuthenticator(Collection<String> failAuthNodes) { + this.failAuthNodes = failAuthNodes; + } + + /** {@inheritDoc} */ + @Override public SecurityContext authenticateNode(ClusterNode node, SecurityCredentials cred) { + assertNotNull(cred); + + String nodeName = node.attribute(ATTR_IGNITE_INSTANCE_NAME); + + assertEquals(nodeName, cred.getUserObject()); + + boolean auth = !failAuthNodes.contains(nodeName); + + System.out.println(Thread.currentThread().getName() + " authenticateNode [node=" + node.id() + ", res=" + auth + ']'); + + return auth ? new TestSecurityContext(nodeName) : null; + } + + /** {@inheritDoc} */ + @Override public boolean isGlobalNodeAuthentication() { + return false; + } + + /** + * + */ + private static class TestSecurityContext implements SecurityContext, Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** */ + final String nodeName; + + /** + * @param nodeName Authenticated node name. + */ + TestSecurityContext(String nodeName) { + this.nodeName = nodeName; + } + + /** {@inheritDoc} */ + @Override public SecuritySubject subject() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean taskOperationAllowed(String taskClsName, SecurityPermission perm) { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean cacheOperationAllowed(String cacheName, SecurityPermission perm) { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean serviceOperationAllowed(String srvcName, SecurityPermission perm) { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean systemOperationAllowed(SecurityPermission perm) { + return true; + } + } + } + + /** + * + */ static class NoOpCommunicationProblemResolver implements CommunicationProblemResolver { /** */ static final IgniteOutClosure<CommunicationProblemResolver> FACTORY = new IgniteOutClosure<CommunicationProblemResolver>() {
