Repository: ignite Updated Branches: refs/heads/ignite-zk 00f41c195 -> b8979efad
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8979efa Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8979efa Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8979efa Branch: refs/heads/ignite-zk Commit: b8979efad94ead5f80e87e2275f451d369754da2 Parents: 00f41c1 Author: sboikov <[email protected]> Authored: Wed Dec 20 15:03:49 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Dec 20 15:50:21 2017 +0300 ---------------------------------------------------------------------- .../cache/binary/BinaryMetadataTransport.java | 22 +- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 17 +- .../discovery/zk/internal/ZkIgnitePaths.java | 3 + .../zk/internal/ZookeeperClusterNode.java | 19 ++ .../zk/internal/ZookeeperDiscoveryImpl.java | 214 ++++++++++++++++++- .../ZookeeperDiscoverySpiBasicTest.java | 25 +++ 6 files changed, 278 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b8979efa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java index def7caa..a272304 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java @@ -155,21 +155,29 @@ final class BinaryMetadataTransport { * @param metadata Metadata proposed for update. * @return Future to wait for update result on. */ - GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata metadata) throws IgniteCheckedException { + GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata metadata) { MetadataUpdateResultFuture resFut = new MetadataUpdateResultFuture(); if (log.isDebugEnabled()) log.debug("Requesting metadata update for " + metadata.typeId()); - synchronized (this) { - unlabeledFutures.add(resFut); + try { + synchronized (this) { + unlabeledFutures.add(resFut); - if (!stopping) - discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(metadata, ctx.localNodeId())); - else - resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult()); + if (!stopping) + discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(metadata, ctx.localNodeId())); + else + resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult()); + } + } + catch (Exception e) { + resFut.onDone(e); } + if (ctx.clientDisconnected()) + onDisconnected(); + return resFut; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b8979efa/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 14bb107..a422967 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -92,7 +92,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** */ @GridToStringExclude - private DiscoverySpiNodeAuthenticator auth; + DiscoverySpiNodeAuthenticator nodeAuth; /** */ @GridToStringExclude @@ -129,6 +129,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery return zkRootPath; } + @IgniteSpiConfiguration(optional = true) public ZookeeperDiscoverySpi setZkRootPath(String zkRootPath) { this.zkRootPath = zkRootPath; @@ -139,6 +140,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery return sesTimeout; } + @IgniteSpiConfiguration(optional = true) public ZookeeperDiscoverySpi setSessionTimeout(int sesTimeout) { this.sesTimeout = sesTimeout; @@ -149,6 +151,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery return zkConnectionString; } + @IgniteSpiConfiguration(optional = false) public ZookeeperDiscoverySpi setZkConnectionString(String zkConnectionString) { this.zkConnectionString = zkConnectionString; @@ -322,13 +325,19 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public void disconnect() throws IgniteSpiException { - // TODO ZK + impl.stop(); } /** {@inheritDoc} */ @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) { - // TODO ZK - this.auth = auth; + this.nodeAuth = auth; + } + + /** + * @return Authenticator. + */ + public DiscoverySpiNodeAuthenticator getAuthenticator() { + return nodeAuth; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b8979efa/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 2a1d804..588a5ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -24,6 +24,9 @@ import java.util.UUID; */ class ZkIgnitePaths { /** */ + static final String PATH_SEPARATOR = "/"; + + /** */ private static final int UUID_LEN = 36; /** Directory to store joined node data. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b8979efa/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java index 859c105..75e5715 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; @@ -171,6 +172,24 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable, Co return (T)attrs.get(name); } + /** + * Sets node attributes. + * + * @param attrs Node attributes. + */ + void setAttributes(Map<String, Object> attrs) { + this.attrs = U.sealMap(attrs); + } + + /** + * Gets node attributes without filtering. + * + * @return Node attributes without filtering. + */ + Map<String, Object> getAttributes() { + return attrs; + } + /** {@inheritDoc} */ @Override public ClusterMetrics metrics() { if (metricsProvider != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b8979efa/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index ee8b7b4..7ea544f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -52,6 +53,7 @@ import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; +import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; @@ -62,6 +64,7 @@ import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.MarshallerUtils; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.spi.IgniteNodeValidationResult; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiTimeoutObject; @@ -69,6 +72,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.apache.zookeeper.AsyncCallback; @@ -183,10 +187,15 @@ public class ZookeeperDiscoveryImpl { DiscoverySpiDataExchange exchange) { assert locNode.id() != null && locNode.isLocal() : locNode; - MarshallerUtils.setNodeName(marsh, igniteInstanceName); + zkRootPath = zkRootPath.trim(); + + if (zkRootPath.endsWith(ZkIgnitePaths.PATH_SEPARATOR)) + zkRootPath = zkRootPath.substring(0, zkRootPath.length() - 1); ZkIgnitePaths.validatePath(zkRootPath); + MarshallerUtils.setNodeName(marsh, igniteInstanceName); + zkPaths = new ZkIgnitePaths(zkRootPath); this.spi = spi; @@ -430,7 +439,7 @@ public class ZookeeperDiscoveryImpl { try { locNode.onClientDisconnected(newId); - joinTopology0(rtState.joined); + joinTopology0(true, rtState.joined); } catch (Exception e) { U.error(log, "Failed to reconnect: " + e, e); @@ -627,7 +636,7 @@ public class ZookeeperDiscoveryImpl { * @throws InterruptedException If interrupted. */ public void joinTopology() throws InterruptedException { - joinTopology0(false); + joinTopology0(false, false); for (;;) { try { @@ -644,17 +653,72 @@ public class ZookeeperDiscoveryImpl { } } + private SecurityCredentials unmarshalCredentials(ZookeeperClusterNode node) throws IgniteCheckedException { + byte[] credBytes = (byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); + + if (credBytes == null) + return null; + + return U.unmarshal(marsh, credBytes, null); + } + /** + * Marshalls credentials with discovery SPI marshaller (will replace attribute value). + * + * @param node Node to marshall credentials for. + * @throws IgniteSpiException If marshalling failed. + */ + private void marshalCredentials(ZookeeperClusterNode node) throws IgniteSpiException { + try { + // Use security-unsafe getter. + Map<String, Object> attrs0 = node.getAttributes(); + + Object creds = attrs0.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); + + if (creds != null) { + Map<String, Object> attrs = new HashMap<>(attrs0); + + assert !(creds instanceof byte[]); + + attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS, U.marshal(marsh, creds)); + + node.setAttributes(attrs); + } + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e); + } + } + + /** + * @param reconnect {@code True} if client node reconnects. * @param prevJoined {@code True} if reconnect after already joined topology * in this case (need produce EVT_CLIENT_NODE_RECONNECTED event). * @throws InterruptedException If interrupted. */ - private void joinTopology0(boolean prevJoined) throws InterruptedException { + private void joinTopology0(boolean reconnect, boolean prevJoined) throws InterruptedException { IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr; if (internalLsnr != null) internalLsnr.beforeJoin(log); + if (!locNode.isClient()) { + DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator(); + + if (nodeAuth != null && nodeAuth.isGlobalNodeAuthentication()) { + SecurityCredentials locCred = (SecurityCredentials)locNode.getAttributes() + .get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS); + + localAuthentication(nodeAuth, locCred); + } + } + else { + if (reconnect) + locNode.setAttributes(spi.getSpiContext().nodeAttributes()); + } + + marshalCredentials(locNode); + rtState = new ZkRuntimeState(prevJoined); DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id()); @@ -687,16 +751,50 @@ public class ZookeeperDiscoveryImpl { } /** + * Authenticate local node. + * + * @param nodeAuth Authenticator. + * @param locCred Local security credentials for authentication. + * @throws IgniteSpiException If any error occurs. + */ + private void localAuthentication(DiscoverySpiNodeAuthenticator nodeAuth, SecurityCredentials locCred){ + assert nodeAuth != null; + assert locCred != null; + + try { + SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred); + + if (subj == null) + throw new IgniteSpiException("Authentication failed for local node."); + + if (!(subj instanceof Serializable)) + throw new IgniteSpiException("Authentication subject is not Serializable."); + + Map<String, Object> attrs = new HashMap<>(locNode.attributes()); + + attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2, U.marshal(marsh, subj)); + + locNode.setAttributes(attrs); + + } catch (IgniteException | IgniteCheckedException e) { + throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e); + } + } + + /** * @throws InterruptedException If interrupted. */ private void initZkNodes() throws InterruptedException { try { - if (rtState.zkClient.exists(zkPaths.aliveNodesDir)) + ZookeeperClient client = rtState.zkClient; + + if (client.exists(zkPaths.aliveNodesDir)) return; // This path is created last, assume all others dirs are created. - List<String> dirs = new ArrayList<>(); + if (!client.exists(zkPaths.aliveNodesDir)) + createRootPathParents(zkPaths.clusterDir, client); - // TODO ZK: test create all parents? + List<String> dirs = new ArrayList<>(); dirs.add(zkPaths.clusterDir); dirs.add(zkPaths.evtsPath); @@ -707,14 +805,14 @@ public class ZookeeperDiscoveryImpl { dirs.add(zkPaths.aliveNodesDir); try { - rtState.zkClient.createAll(dirs, PERSISTENT); + client.createAll(dirs, PERSISTENT); } catch (KeeperException.NodeExistsException e) { if (log.isDebugEnabled()) log.debug("Failed to create nodes using bulk operation: " + e); for (String dir : dirs) - rtState.zkClient.createIfNeeded(dir, null, PERSISTENT); + client.createIfNeeded(dir, null, PERSISTENT); } } catch (ZookeeperClientFailedException e) { @@ -723,6 +821,32 @@ public class ZookeeperDiscoveryImpl { } /** + * @param rootDir Root directory. + * @param client Client. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + private void createRootPathParents(String rootDir, ZookeeperClient client) + throws ZookeeperClientFailedException, InterruptedException { + int startIdx = 0; + + for (;;) { + int separatorIdx = rootDir.indexOf(ZkIgnitePaths.PATH_SEPARATOR, startIdx); + + if (separatorIdx == -1) + break; + + if (separatorIdx > 0) { + String path = rootDir.substring(0, separatorIdx); + + client.createIfNeeded(path, null, CreateMode.PERSISTENT); + } + + startIdx = separatorIdx + 1; + } + } + + /** * @param zkClient Client. * @param basePath Base path. * @param partCnt Parts count. @@ -901,7 +1025,8 @@ public class ZookeeperDiscoveryImpl { /** * */ - private class CheckJoinStateTimeoutObject extends ZkAbstractWatcher implements IgniteSpiTimeoutObject, AsyncCallback.DataCallback { + private class CheckJoinStateTimeoutObject extends ZkAbstractWatcher + implements IgniteSpiTimeoutObject, AsyncCallback.DataCallback { /** */ private final IgniteUuid id = IgniteUuid.randomUuid(); @@ -970,6 +1095,9 @@ public class ZookeeperDiscoveryImpl { /** {@inheritDoc} */ @Override public void process0(WatchedEvent evt) { + if (rtState.closing || rtState.joined) + return; + if (evt.getType() == Event.EventType.NodeDataChanged) rtState.zkClient.getDataAsync(evt.getPath(), this, this); } @@ -1125,6 +1253,24 @@ public class ZookeeperDiscoveryImpl { if (log.isInfoEnabled()) log.info("Node is first cluster node [locId=" + locNode.id() + ']'); + DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator(); + + if (nodeAuth != null && !nodeAuth.isGlobalNodeAuthentication()) { + try { + localAuthentication(nodeAuth, unmarshalCredentials(locNode)); + } + catch (Exception e) { + U.warn(log, "Local node authentication failed: " + e, e); + + rtState.onCloseStart(); + + joinFut.onDone(e); + + // Stop any further processing. + throw new ZookeeperClientFailedException("Local node authentication failed: " + e); + } + } + newClusterStarted(locInternalId); } @@ -1425,6 +1571,11 @@ public class ZookeeperDiscoveryImpl { return "Node with the same ID already exists: " + node0; } + String authErr = authenticateNode(node); + + if (authErr != null) + return null; + IgniteNodeValidationResult err = spi.getSpiContext().validateNode(node); if (err != null) { @@ -1436,6 +1587,47 @@ public class ZookeeperDiscoveryImpl { return null; } + @Nullable private String authenticateNode(ZookeeperClusterNode node) { + DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator(); + + if (nodeAuth == null) + return null; + + SecurityCredentials cred; + + try { + cred = unmarshalCredentials(node); + } + catch (Exception e) { + U.error(log, "Failed to unmarshal node credentials: " + e, e); + + return "Failed to unmarshal node credentials"; + } + + SecurityContext subj = nodeAuth.authenticateNode(node, cred); + + if (subj == null) { + U.warn(log, "Authentication failed [nodeId=" + node.id() + + ", addrs=" + U.addressesAsString(node) + ']', + "Authentication failed [nodeId=" + U.id8(node.id()) + ", addrs=" + + U.addressesAsString(node) + ']'); + + return "Authentication failed"; + } + + if (!(subj instanceof Serializable)) { + U.warn(log, "Authentication subject is not Serializable [nodeId=" + node.id() + + ", addrs=" + U.addressesAsString(node) + ']', + "Authentication subject is not Serializable [nodeId=" + U.id8(node.id()) + + ", addrs=" + + U.addressesAsString(node) + ']'); + + return "Authentication subject is not serializable"; + } + + return null; + } + /** * @throws Exception If failed. */ @@ -2690,7 +2882,7 @@ public class ZookeeperDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Create ack event: " + path); - // TODO ZK: delete is previous exists? + // TODO ZK: delete if previous exists? rtState.zkClient.createIfNeeded( path, ackBytes, http://git-wip-us.apache.org/repos/asf/ignite/blob/b8979efa/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index b5e6791..bb590ca 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -157,6 +157,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** */ private IgniteOutClosure<CommunicationProblemResolver> commProblemRslvr; + /** */ + private String zkRootPath; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { if (testSockNio) @@ -180,6 +183,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { assert zkCluster != null; zkSpi.setZkConnectionString(zkCluster.getConnectString()); + + if (zkRootPath != null) + zkSpi.setZkRootPath(zkRootPath); } else zkSpi.setZkConnectionString("localhost:2181"); @@ -397,6 +403,25 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testZkRootNotExists() throws Exception { + zkRootPath = "/a/b/c"; + + for (int i = 0; i < 3; i++) { + reset(); + + startGridsMultiThreaded(5); + + waitForTopology(5); + + stopAllGrids(); + + checkEventsConsistency(); + } + } + + /** + * @throws Exception If failed. + */ public void testMetadataUpdate() throws Exception { startGrid(0);
