zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ee69f50 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ee69f50 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ee69f50 Branch: refs/heads/ignite-zk Commit: 8ee69f5017912caba0ef0b249a39fc525d6b7a65 Parents: 287b717 Author: sboikov <[email protected]> Authored: Wed Nov 29 11:21:19 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Nov 29 11:21:19 2017 +0300 ---------------------------------------------------------------------- .../continuous/ContinuousRoutinesInfo.java | 30 +++- .../continuous/GridContinuousProcessor.java | 152 +++++++++++++---- .../continuous/StartRequestDataV2.java | 164 +++++++++++++++++++ .../StartRoutineDiscoveryMessageV2.java | 77 +++++++++ .../discovery/zk/internal/ZkIgnitePaths.java | 10 +- .../IgniteCacheEntryListenerAtomicTest.java | 2 +- .../zk/internal/ZookeeperClientTest.java | 35 ++++ .../ZookeeperDiscoverySpiBasicTest.java | 22 +++ 8 files changed, 454 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java index 0d9ee44..8977b15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java @@ -33,30 +33,51 @@ class ContinuousRoutinesInfo { /** */ private Map<UUID, ContinuousRoutineInfo> startedRoutines = new HashMap<>(); + /** + * @param dataBag Discovery data bag. + */ void collectGridNodeData(DiscoveryDataBag dataBag) { if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal())) dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(), new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values()))); } + /** + * @param dataBag Discovery data bag. + */ void collectJoiningNodeData(DiscoveryDataBag dataBag) { dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values()))); } + /** + * @param info Routine info. + */ void addRoutineInfo(ContinuousRoutineInfo info) { startedRoutines.put(info.routineId, info); } + /** + * @param routineId Routine ID. + * @return {@code True} if routine exists. + */ boolean routineExists(UUID routineId) { return startedRoutines.containsKey(routineId); } + /** + * @param routineId Routine ID. + */ void removeRoutine(UUID routineId) { startedRoutines.remove(routineId); } - void removeNodeRoutines(UUID nodeId) { + /** + * Removes all routines with autoUnsubscribe=false started by given node. + * + * @param nodeId Node ID. + */ + void onNodeFail(UUID nodeId) { for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator(); it.hasNext();) { Map.Entry<UUID, ContinuousRoutineInfo> e = it.next(); @@ -66,4 +87,11 @@ class ContinuousRoutinesInfo { it.remove(); } } + + /** + * + */ + void clear() { + startedRoutines.clear(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 2650659..bd9818a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -188,29 +188,27 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StartRoutineDiscoveryMessage msg) { - // TODO ZK - if (discoProtoVer == 2) { - StartRequestData reqData = msg.startRequestData(); - - try { - routinesInfo.addRoutineInfo(createRoutineInfo(snd.id(), - msg.routineId(), - reqData.handler(), - reqData.projectionPredicate(), - reqData.bufferSize(), - reqData.interval(), - reqData.autoUnsubscribe())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to register continuous handler information: " + e); - } - } + assert discoProtoVer == 1 : discoProtoVer; if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping()) processStartRequest(snd, msg); } }); + ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class, + new CustomEventListener<StartRoutineDiscoveryMessageV2>() { + @Override public void onCustomEvent(AffinityTopologyVersion topVer, + ClusterNode snd, + StartRoutineDiscoveryMessageV2 msg) { + assert discoProtoVer == 2 : discoProtoVer; + + if (ctx.isStopping()) + return; + + processStartRequestV2(snd, msg); + } + }); + ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class, new CustomEventListener<StartRoutineAckDiscoveryMessage>() { @Override public void onCustomEvent(AffinityTopologyVersion topVer, @@ -505,9 +503,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { if (log.isDebugEnabled()) { log.info("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() + - ", loc=" + ctx.localNodeId() + - ", data=" + data.joiningNodeData() + - ']'); + ", loc=" + ctx.localNodeId() + + ", data=" + data.joiningNodeData() + + ']'); } if (discoProtoVer == 2) { @@ -569,9 +567,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal continuous routine filter, ignore routine [routineId=" + routineInfo.routineId + - ", srcNodeId=" + routineInfo.srcNodeId + ']', - e); + U.error(log, "Failed to unmarshal continuous routine filter, ignore routine [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', e); return; } @@ -584,9 +582,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal continuous routine handler, ignore routine [" + - "routineId=" + routineInfo.routineId + - ", srcNodeId=" + routineInfo.srcNodeId + ']', - e); + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', e); return; } @@ -597,13 +594,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter { hnd, routineInfo.bufSize, routineInfo.interval, - routineInfo.autoUnsubscribe, false); + routineInfo.autoUnsubscribe, + false); } catch (IgniteCheckedException e) { U.error(log, "Failed to register continuous routine handler, ignore routine [" + - "routineId=" + routineInfo.routineId + - ", srcNodeId=" + routineInfo.srcNodeId + ']', - e); + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', e); } } } @@ -1139,6 +1136,99 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param snd Sender. + * @param msg Start request. + */ + private void processStartRequestV2(ClusterNode snd, StartRoutineDiscoveryMessageV2 msg) { + StartRequestDataV2 reqData = msg.startRequestData(); + + ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(), + msg.routineId(), + reqData.handlerBytes(), + reqData.nodeFilterBytes(), + reqData.bufferSize(), + reqData.interval(), + reqData.autoUnsubscribe()); + + routinesInfo.addRoutineInfo(routineInfo); + + Exception err = null; + + IgnitePredicate<ClusterNode> nodeFilter = null; + + if (reqData.nodeFilterBytes() != null) { + try { + if (ctx.config().isPeerClassLoadingEnabled() && reqData.className() != null) { + String clsName = reqData.className(); + GridDeploymentInfo depInfo = reqData.deploymentInfo(); + + GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), + clsName, + clsName, + depInfo.userVersion(), + snd.id(), + depInfo.classLoaderId(), + depInfo.participants(), + null); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); + + nodeFilter = U.unmarshal(marsh, reqData.nodeFilterBytes(), U.resolveClassLoader(dep.classLoader(), ctx.config())); + } + else + nodeFilter = U.unmarshal(marsh, reqData.nodeFilterBytes(), U.resolveClassLoader(ctx.config())); + + if (nodeFilter != null) + ctx.resource().injectGeneric(nodeFilter); + } + catch (Exception e) { + err = e; + + U.error(log, "Failed to unmarshal continuous routine filter [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', e); + } + } + + boolean register = err == null && (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())); + + if (register) { + try { + GridContinuousHandler hnd = U.unmarshal(marsh, reqData.handlerBytes(), U.resolveClassLoader(ctx.config())); + + if (ctx.config().isPeerClassLoadingEnabled()) + hnd.p2pUnmarshal(snd.id(), ctx); + + if (msg.keepBinary()) { + assert hnd instanceof CacheContinuousQueryHandler : hnd; + + ((CacheContinuousQueryHandler)hnd).keepBinary(true); + } + + GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ? + new GridMessageListenHandler((GridMessageListenHandler)hnd) : + hnd; + + registerHandler(snd.id(), + msg.routineId, + hnd0, + reqData.bufferSize(), + reqData.interval(), + reqData.autoUnsubscribe(), + false); + } + catch (Exception e) { + err = e; + + U.error(log, "Failed to register continuous routine handler [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', e); + } + } + } + + /** * @param node Sender. * @param req Start request. */ @@ -1563,7 +1653,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); if (discoProtoVer == 2) - routinesInfo.removeNodeRoutines(nodeId); + routinesInfo.onNodeFail(nodeId); clientInfos.remove(nodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java new file mode 100644 index 0000000..c001616 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.io.Serializable; +import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Start request data. + */ +class StartRequestDataV2 implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Serialized node filter. */ + private byte[] nodeFilterBytes; + + /** Deployment class name. */ + private String clsName; + + /** Deployment info. */ + private GridDeploymentInfo depInfo; + + /** Serialized handler. */ + private byte[] hndBytes; + + /** Buffer size. */ + private int bufSize; + + /** Time interval. */ + private long interval; + + /** Automatic unsubscribe flag. */ + private boolean autoUnsubscribe; + + /** + * @param nodeFilterBytes Serialized node filter. + * @param hndBytes Serialized handler. + * @param bufSize Buffer size. + * @param interval Time interval. + * @param autoUnsubscribe Automatic unsubscribe flag. + */ + StartRequestDataV2( + byte[] nodeFilterBytes, + byte[] hndBytes, + int bufSize, + long interval, + boolean autoUnsubscribe) { + assert hndBytes != null; + assert bufSize > 0; + assert interval >= 0; + + this.nodeFilterBytes = nodeFilterBytes; + this.hndBytes = hndBytes; + this.bufSize = bufSize; + this.interval = interval; + this.autoUnsubscribe = autoUnsubscribe; + } + + /** + * @return Serialized node filter. + */ + public byte[] nodeFilterBytes() { + return nodeFilterBytes; + } + + /** + * @return Deployment class name. + */ + public String className() { + return clsName; + } + + /** + * @param clsName New deployment class name. + */ + public void className(String clsName) { + this.clsName = clsName; + } + + /** + * @return Deployment info. + */ + public GridDeploymentInfo deploymentInfo() { + return depInfo; + } + + /** + * @param depInfo New deployment info. + */ + public void deploymentInfo(GridDeploymentInfo depInfo) { + this.depInfo = depInfo; + } + + /** + * @return Handler. + */ + public byte[] handlerBytes() { + return hndBytes; + } + + /** + * @return Buffer size. + */ + public int bufferSize() { + return bufSize; + } + + /** + * @param bufSize New buffer size. + */ + public void bufferSize(int bufSize) { + this.bufSize = bufSize; + } + + /** + * @return Time interval. + */ + public long interval() { + return interval; + } + + /** + * @param interval New time interval. + */ + public void interval(long interval) { + this.interval = interval; + } + + /** + * @return Automatic unsubscribe flag. + */ + public boolean autoUnsubscribe() { + return autoUnsubscribe; + } + + /** + * @param autoUnsubscribe New automatic unsubscribe flag. + */ + public void autoUnsubscribe(boolean autoUnsubscribe) { + this.autoUnsubscribe = autoUnsubscribe; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartRequestDataV2.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java new file mode 100644 index 0000000..e9760a8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class StartRoutineDiscoveryMessageV2 extends AbstractContinuousMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final int KEEP_BINARY_FLAG = 0x01; + + /** */ + private final StartRequestDataV2 startReqData; + + /** Flags. */ + private int flags; + + /** + * @param routineId Routine id. + * @param startReqData Start request data. + * @param keepBinary Keep binary flag. + */ + public StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2 startReqData, boolean keepBinary) { + super(routineId); + + this.startReqData = startReqData; + + if (keepBinary) + flags |= KEEP_BINARY_FLAG; + } + + /** + * @return Start request data. + */ + public StartRequestDataV2 startRequestData() { + return startReqData; + } + + /** + * @return {@code True} if keep binary flag was set on continuous handler. + */ + public boolean keepBinary() { + return (flags & KEEP_BINARY_FLAG) != 0; + } + + /** {@inheritDoc} */ + @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartRoutineDiscoveryMessageV2.class, this, "routineId", routineId()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 30138e5..535df93 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -27,19 +27,19 @@ class ZkIgnitePaths { private static final int UUID_LEN = 36; /** */ - private static final String JOIN_DATA_DIR = "joinData"; + private static final String JOIN_DATA_DIR = "jd"; /** */ - private static final String CUSTOM_EVTS_DIR = "customEvts"; + private static final String CUSTOM_EVTS_DIR = "c"; /** */ - private static final String CUSTOM_EVTS_ACKS_DIR = "customEvtsAcks"; + private static final String CUSTOM_EVTS_ACKS_DIR = "ca"; /** */ - private static final String ALIVE_NODES_DIR = "alive"; + private static final String ALIVE_NODES_DIR = "n"; /** */ - private static final String DISCO_EVENTS_PATH = "events"; + private static final String DISCO_EVENTS_PATH = "e"; /** */ final String basePath; http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java index cddb446..d7d97a4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java @@ -30,7 +30,7 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED; public class IgniteCacheEntryListenerAtomicTest extends IgniteCacheEntryListenerAbstractTest { /** {@inheritDoc} */ @Override protected int gridCount() { - return 3; + return 1; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java index 6330595..ec495cf 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java @@ -34,6 +34,10 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** @@ -50,6 +54,37 @@ public class ZookeeperClientTest extends GridCommonAbstractTest { super.afterTest(); } +// /** +// * @throws Exception If failed. +// */ +// public void testSaveLargeValue() throws Exception { +// startZK(1); +// +// final ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, null); +// +// ZooKeeper zk = client.zk(); +// +// int s = 1048526 + 1; +// // 1048517 11 1048528 +// // 1048519 9 1048528 +// // 1048520 8 1048528 +// +// String path = "/aaaaaaa"; +// +// while (true) { +// try { +// zk.create(path, new byte[s], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); +// +// info("Created: " + s + " " + path.length() + " " + (s + path.length())); +// +// break; +// } +// catch (KeeperException.ConnectionLossException e) { +// s -= 1; +// } +// } +// } + /** * @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index bbf2945..a46c678 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -110,6 +110,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** */ private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>(); + /** */ + private Map<String, Object> userAttrs; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { if (testSockNio) @@ -145,6 +148,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { cfg.setClientMode(client); + if (userAttrs != null) + cfg.setUserAttributes(userAttrs); + Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>(); lsnrs.put(new IgnitePredicate<Event>() { @@ -1107,6 +1113,22 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testLargeUserAttribute() throws Exception { + userAttrs = new HashMap<>(); + + int[] attr = new int[1024 * 1024]; + + for (int i = 0; i < attr.length; i++) + attr[i] = i; + + userAttrs.put("testAttr", attr); + + startGrid(0); + } + + /** + * @throws Exception If failed. + */ public void testClientReconnectSessionExpire1() throws Exception { startGrid(0);
