Squashed commit of the following:
commit 52baf4996570f512c736e9333b3e395f97e0b6f5
Merge: 2fc690e64c eeebfca0bd
Author: sboikov <[email protected]>
Date: Fri Jan 12 10:53:20 2018 +0300
Merge remote-tracking branch 'remotes/origin/master' into ignite-zk-join
commit eeebfca0bddbf31b10a86a6725e2c27933fdb0ae
Author: Alexander Belyak <[email protected]>
Date: Fri Jan 12 10:37:20 2018 +0300
ignite-7195 correctly handle primitive array (addition for commit
5d66516d18b84878331918b79afe452d70bd7a42).
commit 04806d93e1919289308a8c1ac5d077e0b2304b62
Author: alexdel <[email protected]>
Date: Fri Jan 12 11:43:15 2018 +0700
IGNITE-7276 Web Console: Fixed updating of "Grant\Revoke admin" in Admin
panel.
commit 0252e90daa265f40f8ba6e87769b4f2598a180fa
Author: Alexey Kuznetsov <[email protected]>
Date: Fri Jan 12 11:26:13 2018 +0700
IGNITE-7295 Fixed GridClient logging.
commit 2e669acca7463c1c156014e0f3e628d9fb0ac0f3
Author: alexdel <[email protected]>
Date: Fri Jan 12 10:42:38 2018 +0700
IGNITE-7238 Web Console: On-focus button color behaviour fixed.
commit 6f39d4ae3dec77c3bf4823b9a4c7510bd7d47808
Author: Vasiliy Sisko <[email protected]>
Date: Fri Jan 12 10:30:40 2018 +0700
IGNITE-7224 Web console: Removed deprecated fields from configuration.
commit d8ce43ac062cc9ff5e600fe994dfcdfd4f50759a
Author: Ilya Borisov <[email protected]>
Date: Fri Jan 12 10:11:50 2018 +0700
IGNITE-7225 Web Console: Added detection of CSV separator based on browser
locale.
commit f6e103d8fa7992dd61f59653f2a98d49ab5477d1
Author: alexdel <[email protected]>
Date: Fri Jan 12 09:54:29 2018 +0700
IGNITE-7210 Web Console: Do not show connected clusters on sign-in page.
commit 2dce0b89d344c5100eabac53f4c1de289c9f24bd
Author: Pavel Tupitsyn <[email protected]>
Date: Thu Jan 11 17:56:36 2018 +0300
IGNITE-7390 .NET: Use InternalsVisibleTo for Core projects
Remove unnecessary compiler directives, add more tests to Core project
commit 2fc690e64c5a75607eb4a8542cf800a8290cda3c
Author: sboikov <[email protected]>
Date: Thu Jan 11 17:52:53 2018 +0300
zk
commit e361cb66ad7d0131ee7c0d744efe9e7ed5ba8240
Author: Vasiliy Sisko <[email protected]>
Date: Thu Jan 11 21:22:59 2018 +0700
IGNITE-6995 Web Console: Fixed code generation for near cache for server
and client node.
commit 79c9c7552311e4fd780837a0cd8163ae554b368d
Author: Pavel Tupitsyn <[email protected]>
Date: Thu Jan 11 17:16:57 2018 +0300
.NET: Add timeout to TestTxDeadlockDetection
commit a7f0422d77c9a713ab3912cc137b045f2ec732e6
Author: Alexandr Kuramshin <[email protected]>
Date: Thu Jan 11 17:05:17 2018 +0300
ignite-7135 IgniteCluster.startNodes() returns successful
ClusterStartNodeResult even though the remote process fails.
Fix windows startup scripts.
Add startNodes() Windows support through task scheduling.
Check successful node startup by reading the output log.
Signed-off-by: Andrey Gura <[email protected]>
commit a4b24465a8ed95c886f2b7e5bea195dc14a5335c
Author: Pavel Tupitsyn <[email protected]>
Date: Thu Jan 11 16:51:00 2018 +0300
.NET: Add timeout to TestTxDeadlockDetection
commit 6136ecd1a92cac26894c74a306b3df6d5646c524
Author: dkarachentsev <[email protected]>
Date: Thu Jan 11 16:48:43 2018 +0300
ignite-7340 Fix flaky
GridServiceProcessorMultiNodeConfigSelfTest#checkDeployOnEachNodeUpdateTopology
Signed-off-by: Andrey Gura <[email protected]>
commit 65e1c577906d41e85aa80cba6271e693aba32d3c
Author: dkarachentsev <[email protected]>
Date: Thu Jan 11 16:36:39 2018 +0300
ignite-7239 In case of not serializable cache update response, future on
node requester will never complete
Signed-off-by: Andrey Gura <[email protected]>
commit 11508d941ee6f7008538416fc1c7af71e602c9d1
Author: Alexey Rokhin <[email protected]>
Date: Thu Jan 11 16:29:20 2018 +0300
IGNITE-7281 .NET Core: make Services work through custom proxy
This closes #3328
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f53d8574
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f53d8574
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f53d8574
Branch: refs/heads/ignite-zk
Commit: f53d8574bea0a37314350b479d32494215975424
Parents: ec75bba
Author: sboikov <[email protected]>
Authored: Fri Jan 12 12:16:59 2018 +0300
Committer: sboikov <[email protected]>
Committed: Fri Jan 12 12:16:59 2018 +0300
----------------------------------------------------------------------
bin/ignite.bat | 2 +-
bin/include/parseargs.bat | 2 +-
.../apache/ignite/IgniteSystemProperties.java | 6 +
.../internal/client/impl/GridClientImpl.java | 23 +-
.../cache/CacheInvokeDirectResult.java | 16 +-
.../service/GridServiceProcessor.java | 8 +
.../util/tostring/GridToStringBuilder.java | 103 ++--
.../zk/internal/ZkBulkJoinContext.java | 50 ++
.../internal/ZkDiscoveryNodeJoinEventData.java | 44 +-
.../discovery/zk/internal/ZkIgnitePaths.java | 6 +-
.../zk/internal/ZkJoinEventDataForJoined.java | 42 +-
.../zk/internal/ZkJoinedNodeEvtData.java | 79 ++++
.../zk/internal/ZookeeperDiscoveryImpl.java | 469 +++++++++++--------
.../startup/cmdline/CommandLineTransformer.java | 6 +-
.../IgniteCacheFailedUpdateResponseTest.java | 310 ++++++++++++
.../tostring/GridToStringBuilderSelfTest.java | 56 +++
.../zk/internal/ZookeeperDiscoverySpiTest.java | 38 +-
.../testsuites/IgniteCacheTestSuite4.java | 2 +
.../Apache.Ignite.Core.Tests.DotNetCore.csproj | 15 +
.../Binary/BinaryBuilderSelfTest.cs | 2 -
.../Cache/CacheAbstractTest.cs | 4 -
.../Cache/CacheAbstractTransactionalTest.cs | 8 +-
.../Client/Cache/CreateCacheTest.cs | 4 -
.../Client/ClientConnectionTest.cs | 2 -
.../Compute/ComputeApiTest.cs | 8 -
.../Log/CustomLoggerTest.cs | 2 -
.../Services/ServiceProxyTest.cs | 16 +-
.../Services/ServicesTest.cs | 40 +-
.../TestUtils.Common.cs | 19 -
.../Apache.Ignite.Core.csproj | 4 +-
.../Impl/Services/ServiceMethodHelper.cs | 61 +++
.../Impl/Services/ServiceProxy.cs | 75 ---
.../Impl/Services/ServiceProxyFactory.cs | 68 +++
.../Impl/Services/ServiceProxyTypeGenerator.cs | 281 +++++++++++
.../Impl/Services/Services.cs | 8 +-
.../Properties/AssemblyInfo.cs | 4 +-
.../util/nodestart/StartNodeCallableImpl.java | 260 ++++++++--
modules/web-console/frontend/app/app.js | 2 +
.../app/components/grid-export/component.js | 11 +-
.../list-of-registered-users.controller.js | 6 +-
.../app/components/page-queries/controller.js | 15 +-
.../components/web-console-header/component.js | 2 +-
.../generator/ConfigurationGenerator.js | 44 +-
.../generator/JavaTransformer.service.js | 11 +-
.../generator/SpringTransformer.service.js | 12 +-
.../configuration/clusters/marshaller.pug | 67 +--
.../states/configuration/clusters/misc.pug | 2 +-
.../frontend/app/primitives/btn/index.scss | 2 +-
.../web-console/frontend/app/services/CSV.js | 22 +
49 files changed, 1803 insertions(+), 536 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/bin/ignite.bat
----------------------------------------------------------------------
diff --git a/bin/ignite.bat b/bin/ignite.bat
index db686cc..f697dc0 100644
--- a/bin/ignite.bat
+++ b/bin/ignite.bat
@@ -133,7 +133,7 @@ set
RESTART_SUCCESS_OPT=-DIGNITE_SUCCESS_FILE=%RESTART_SUCCESS_FILE%
:: This is executed if -nojmx is not specified
::
if not "%NO_JMX%" == "1" (
- for /F "tokens=*" %%A in ('""!JAVA_HOME!\bin\java" -cp %CP%
org.apache.ignite.internal.util.portscanner.GridJmxPortFinder"') do (
+ for /F "usebackq tokens=*" %%A in (`"!JAVA_HOME!\bin\java.exe -cp %CP%
org.apache.ignite.internal.util.portscanner.GridJmxPortFinder"`) do (
set JMX_PORT=%%A
)
)
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/bin/include/parseargs.bat
----------------------------------------------------------------------
diff --git a/bin/include/parseargs.bat b/bin/include/parseargs.bat
index f2151a7..0a584d1 100644
--- a/bin/include/parseargs.bat
+++ b/bin/include/parseargs.bat
@@ -39,7 +39,7 @@
:: )
:: in other scripts to parse common command lines parameters.
-set convertArgsCmd="!JAVA_HOME!\bin\java.exe" -cp %CP%
org.apache.ignite.startup.cmdline.CommandLineTransformer %*
+set convertArgsCmd=!JAVA_HOME!\bin\java.exe -Dfile.encoding=IBM866 -cp %CP%
org.apache.ignite.startup.cmdline.CommandLineTransformer %*
for /f "usebackq tokens=*" %%i in (`!convertArgsCmd!`) do set
reformattedArgs=%%i
for %%i in (%reformattedArgs%) do (
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index ad4835a..7ffc13d 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -25,6 +25,7 @@ import java.util.Properties;
import javax.net.ssl.HostnameVerifier;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
@@ -772,6 +773,11 @@ public final class IgniteSystemProperties {
public static final String IGNITE_DATA_STORAGE_FOLDER_BY_CONSISTENT_ID =
"IGNITE_DATA_STORAGE_FOLDER_BY_CONSISTENT_ID";
/**
+ * If this property is set to {@code true} enable logging in {@link
GridClient}.
+ */
+ public static final String IGNITE_GRID_CLIENT_LOG_ENABLED =
"IGNITE_GRID_CLIENT_LOG_ENABLED";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java
index 19199c1..9b71ae4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java
@@ -36,6 +36,7 @@ import java.util.logging.Logger;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridClientCacheMode;
import org.apache.ignite.internal.client.GridClientClosedException;
@@ -79,18 +80,20 @@ public class GridClientImpl implements GridClient {
/** Logger. */
private static final Logger log =
Logger.getLogger(GridClientImpl.class.getName());
- /** */
+ /* Suppression logging if needed. */
static {
- boolean isLog4jUsed =
U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null;
+ if
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_GRID_CLIENT_LOG_ENABLED,
false)) {
+ boolean isLog4jUsed =
U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null;
+
+ try {
+ if (isLog4jUsed)
+ U.addLog4jNoOpLogger();
- try {
- if (isLog4jUsed)
- U.addLog4jNoOpLogger();
- else
U.addJavaNoOpLogger();
- }
- catch (IgniteCheckedException ignored) {
- // Our log4j warning suppression failed, leave it as is.
+ }
+ catch (IgniteCheckedException ignored) {
+ // If log warning suppression failed, leave it as is.
+ }
}
}
@@ -563,4 +566,4 @@ public class GridClientImpl implements GridClient {
@Override public String toString() {
return "GridClientImpl [id=" + id + ", closed=" + closed + ']';
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
index 05b3c86..17f304e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
@@ -105,8 +105,20 @@ public class CacheInvokeDirectResult implements Message {
public void prepareMarshal(GridCacheContext ctx) throws
IgniteCheckedException {
key.prepareMarshal(ctx.cacheObjectContext());
- if (err != null && errBytes == null)
- errBytes = U.marshal(ctx.marshaller(), err);
+ if (err != null && errBytes == null) {
+ try {
+ errBytes = U.marshal(ctx.marshaller(), err);
+ }
+ catch (IgniteCheckedException e) {
+ // Try send exception even if it's unable to marshal.
+ IgniteCheckedException exc = new
IgniteCheckedException(err.getMessage());
+
+ exc.setStackTrace(err.getStackTrace());
+ exc.addSuppressed(e);
+
+ errBytes = U.marshal(ctx.marshaller(), exc);
+ }
+ }
if (res != null)
res.prepareMarshal(ctx.cacheObjectContext());
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 8581023..6df107b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1289,6 +1289,14 @@ public class GridServiceProcessor extends
GridProcessorAdapter implements Ignite
* @param assigns Assignments.
*/
private void redeploy(GridServiceAssignments assigns) {
+ if (assigns.topologyVersion() < ctx.discovery().topologyVersion()) {
+ if (log.isDebugEnabled())
+ log.debug("Skip outdated assignment [assigns=" + assigns +
+ ", topVer=" + ctx.discovery().topologyVersion() + ']');
+
+ return;
+ }
+
String svcName = assigns.name();
Integer assignCnt = assigns.assigns().get(ctx.localNodeId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
index a5f2c95..56eef1d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
@@ -1047,41 +1047,90 @@ public class GridToStringBuilder {
* @return String representation of an array.
*/
@SuppressWarnings({"ConstantConditions", "unchecked"})
- public static <T> String arrayToString(Class arrType, @Nullable Object
arr) {
+ public static <T> String arrayToString(Class arrType, Object arr) {
if (arr == null)
return "null";
- T[] array = (T[])arr;
-
- if (array.length > COLLECTION_LIMIT)
- arr = Arrays.copyOf(array, COLLECTION_LIMIT);
-
String res;
+ int more = 0;
- if (arrType.equals(byte[].class))
- res = Arrays.toString((byte[])arr);
- else if (arrType.equals(boolean[].class))
- res = Arrays.toString((boolean[])arr);
- else if (arrType.equals(short[].class))
- res = Arrays.toString((short[])arr);
- else if (arrType.equals(int[].class))
- res = Arrays.toString((int[])arr);
- else if (arrType.equals(long[].class))
- res = Arrays.toString((long[])arr);
- else if (arrType.equals(float[].class))
- res = Arrays.toString((float[])arr);
- else if (arrType.equals(double[].class))
- res = Arrays.toString((double[])arr);
- else if (arrType.equals(char[].class))
- res = Arrays.toString((char[])arr);
- else
- res = Arrays.toString((Object[])arr);
-
- if (array.length > COLLECTION_LIMIT) {
+ if (arrType.equals(byte[].class)) {
+ byte[] byteArr = (byte[])arr;
+ if (byteArr.length > COLLECTION_LIMIT) {
+ more = byteArr.length - COLLECTION_LIMIT;
+ byteArr = Arrays.copyOf(byteArr, COLLECTION_LIMIT);
+ }
+ res = Arrays.toString(byteArr);
+ }
+ else if (arrType.equals(boolean[].class)) {
+ boolean[] boolArr = (boolean[])arr;
+ if (boolArr.length > COLLECTION_LIMIT) {
+ more = boolArr.length - COLLECTION_LIMIT;
+ boolArr = Arrays.copyOf(boolArr, COLLECTION_LIMIT);
+ }
+ res = Arrays.toString(boolArr);
+ }
+ else if (arrType.equals(short[].class)) {
+ short[] shortArr = (short[])arr;
+ if (shortArr.length > COLLECTION_LIMIT) {
+ more = shortArr.length - COLLECTION_LIMIT;
+ shortArr = Arrays.copyOf(shortArr, COLLECTION_LIMIT);
+ }
+ res = Arrays.toString(shortArr);
+ }
+ else if (arrType.equals(int[].class)) {
+ int[] intArr = (int[])arr;
+ if (intArr.length > COLLECTION_LIMIT) {
+ more = intArr.length - COLLECTION_LIMIT;
+ intArr = Arrays.copyOf(intArr, COLLECTION_LIMIT);
+ }
+ res = Arrays.toString(intArr);
+ }
+ else if (arrType.equals(long[].class)) {
+ long[] longArr = (long[])arr;
+ if (longArr.length > COLLECTION_LIMIT) {
+ more = longArr.length - COLLECTION_LIMIT;
+ longArr = Arrays.copyOf(longArr, COLLECTION_LIMIT);
+ }
+ res = Arrays.toString(longArr);
+ }
+ else if (arrType.equals(float[].class)) {
+ float[] floatArr = (float[])arr;
+ if (floatArr.length > COLLECTION_LIMIT) {
+ more = floatArr.length - COLLECTION_LIMIT;
+ floatArr = Arrays.copyOf(floatArr, COLLECTION_LIMIT);
+ }
+ res = Arrays.toString(floatArr);
+ }
+ else if (arrType.equals(double[].class)) {
+ double[] doubleArr = (double[])arr;
+ if (doubleArr.length > COLLECTION_LIMIT) {
+ more = doubleArr.length - COLLECTION_LIMIT;
+ doubleArr = Arrays.copyOf(doubleArr, COLLECTION_LIMIT);
+ }
+ res = Arrays.toString(doubleArr);
+ }
+ else if (arrType.equals(char[].class)) {
+ char[] charArr = (char[])arr;
+ if (charArr.length > COLLECTION_LIMIT) {
+ more = charArr.length - COLLECTION_LIMIT;
+ charArr = Arrays.copyOf(charArr, COLLECTION_LIMIT);
+ }
+ res = Arrays.toString(charArr);
+ }
+ else {
+ Object[] objArr = (Object[])arr;
+ if (objArr.length > COLLECTION_LIMIT) {
+ more = objArr.length - COLLECTION_LIMIT;
+ objArr = Arrays.copyOf(objArr, COLLECTION_LIMIT);
+ }
+ res = Arrays.toString(objArr);
+ }
+ if (more > 0) {
StringBuilder resSB = new StringBuilder(res);
resSB.deleteCharAt(resSB.length() - 1);
- resSB.append("... and ").append(array.length -
COLLECTION_LIMIT).append(" more]");
+ resSB.append("... and ").append(more).append(" more]");
res = resSB.toString();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
new file mode 100644
index 0000000..a186aed
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ *
+ */
+class ZkBulkJoinContext {
+ /** */
+ List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes;
+
+ /**
+ * @param nodeEvtData Node event data.
+ * @param discoData Discovery data for node.
+ */
+ void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map<Integer,
Serializable> discoData) {
+ if (nodes == null)
+ nodes = new ArrayList<>();
+
+ nodes.add(new T2<>(nodeEvtData, discoData));
+ }
+
+ /**
+ * @return Number of joined nodes.
+ */
+ int nodes() {
+ return nodes != null ? nodes.size() : 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
index ff75d22..e46d52d 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -17,7 +17,7 @@
package org.apache.ignite.spi.discovery.zk.internal;
-import java.util.UUID;
+import java.util.List;
/**
*
@@ -27,53 +27,27 @@ class ZkDiscoveryNodeJoinEventData extends
ZkDiscoveryEventData {
private static final long serialVersionUID = 0L;
/** */
- final long joinedInternalId;
-
- /** */
- final UUID nodeId;
-
- /** */
- final int joinDataPartCnt;
+ final List<ZkJoinedNodeEvtData> joinedNodes;
/** */
final int dataForJoinedPartCnt;
- /** */
- final int secSubjPartCnt;
-
- /** */
- final UUID joinDataPrefixId;
-
- /** */
- transient ZkJoiningNodeData joiningNodeData;
-
/**
* @param evtId Event ID.
* @param topVer Topology version.
- * @param nodeId Joined node ID.
- * @param joinedInternalId Joined node internal ID.
- * @param joinDataPrefixId Join data unique prefix.
- * @param joinDataPartCnt Join data part count.
+ * @param joinedNodes Joined nodes data.
* @param dataForJoinedPartCnt Data for joined part count.
- * @param secSubjPartCnt Security subject part count.
*/
- ZkDiscoveryNodeJoinEventData(long evtId,
+ ZkDiscoveryNodeJoinEventData(
+ long evtId,
long topVer,
- UUID nodeId,
- long joinedInternalId,
- UUID joinDataPrefixId,
- int joinDataPartCnt,
- int dataForJoinedPartCnt,
- int secSubjPartCnt)
+ List<ZkJoinedNodeEvtData> joinedNodes,
+ int dataForJoinedPartCnt)
{
super(evtId, ZK_EVT_NODE_JOIN, topVer);
- this.nodeId = nodeId;
- this.joinedInternalId = joinedInternalId;
- this.joinDataPrefixId = joinDataPrefixId;
- this.joinDataPartCnt = joinDataPartCnt;
+ this.joinedNodes = joinedNodes;
this.dataForJoinedPartCnt = dataForJoinedPartCnt;
- this.secSubjPartCnt = secSubjPartCnt;
}
/** {@inheritDoc} */
@@ -81,6 +55,6 @@ class ZkDiscoveryNodeJoinEventData extends
ZkDiscoveryEventData {
return "ZkDiscoveryNodeJoinEventData [" +
"evtId=" + eventId() +
", topVer=" + topologyVersion() +
- ", node=" + nodeId + ']';
+ ", nodes=" + joinedNodes + ']';
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 44b247c..642183b 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -314,11 +314,11 @@ class ZkIgnitePaths {
}
/**
- * @param evtId Event ID.
+ * @param topVer Event topology version.
* @return Event zk path.
*/
- String joinEventSecuritySubjectPath(long evtId) {
- return evtsPath + "/s-" + evtId;
+ String joinEventSecuritySubjectPath(long topVer) {
+ return evtsPath + "/s-" + topVer;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
index eb24f27..e4ae4ba0 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import org.jetbrains.annotations.Nullable;
/**
*
@@ -32,28 +33,51 @@ class ZkJoinEventDataForJoined implements Serializable {
private final List<ZookeeperClusterNode> top;
/** */
- private final Map<Integer, Serializable> discoData;
+ private final Map<Long, byte[]> discoData;
+
+ /** */
+ private final Map<Long, Long> dupDiscoData;
/**
* @param top Topology.
* @param discoData Discovery data.
*/
- ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Integer,
Serializable> discoData) {
+ ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Long, byte[]>
discoData, @Nullable Map<Long, Long> dupDiscoData) {
+ assert top != null;
+ assert discoData != null && !discoData.isEmpty();
+
this.top = top;
this.discoData = discoData;
+ this.dupDiscoData = dupDiscoData;
+ }
+
+ byte[] discoveryDataForNode(long nodeOrder) {
+ assert discoData != null;
+
+ byte[] dataBytes = discoData.get(nodeOrder);
+
+ if (dataBytes != null)
+ return dataBytes;
+
+ assert dupDiscoData != null;
+
+ Long dupDataNode = dupDiscoData.get(nodeOrder);
+
+ assert dupDataNode != null;
+
+ dataBytes = discoData.get(dupDataNode);
+
+ assert dataBytes != null;
+
+ return dataBytes;
}
/**
* @return Current topology.
*/
List<ZookeeperClusterNode> topology() {
- return top;
- }
+ assert top != null;
- /**
- * @return Discovery data.
- */
- Map<Integer, Serializable> discoveryData() {
- return discoData;
+ return top;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
new file mode 100644
index 0000000..8149afc
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class ZkJoinedNodeEvtData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ final long topVer;
+
+ /** */
+ final long joinedInternalId;
+
+ /** */
+ final UUID nodeId;
+
+ /** */
+ final int joinDataPartCnt;
+
+ /** */
+ final int secSubjPartCnt;
+
+ /** */
+ final UUID joinDataPrefixId;
+
+ /** */
+ transient ZkJoiningNodeData joiningNodeData;
+
+ /**
+ * @param topVer Topology version for node join event.
+ * @param nodeId Joined node ID.
+ * @param joinedInternalId Joined node internal ID.
+ * @param joinDataPrefixId Join data unique prefix.
+ * @param joinDataPartCnt Join data part count.
+ * @param secSubjPartCnt Security subject part count.
+ */
+ ZkJoinedNodeEvtData(
+ long topVer,
+ UUID nodeId,
+ long joinedInternalId,
+ UUID joinDataPrefixId,
+ int joinDataPartCnt,
+ int secSubjPartCnt)
+ {
+ this.topVer = topVer;
+ this.nodeId = nodeId;
+ this.joinedInternalId = joinedInternalId;
+ this.joinDataPrefixId = joinDataPrefixId;
+ this.joinDataPartCnt = joinDataPartCnt;
+ this.secSubjPartCnt = secSubjPartCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "ZkJoinedNodeData [id=" + nodeId + ", order=" + topVer + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 75363e3..20dba12 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
@@ -66,6 +67,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -850,19 +852,11 @@ public class ZookeeperDiscoveryImpl {
* @param zkClient Client.
* @param basePath Base path.
* @param partCnt Parts count.
- * @param checkExists If {@code true} checks path exists before calling
delete (this check added to avoid errors
- * in ZooKeeper log).
- * @throws Exception If failed.
*/
- private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String
basePath, int partCnt, boolean checkExists)
- throws Exception
- {
+ private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String
basePath, int partCnt) {
for (int i = 0; i < partCnt; i++) {
String path = multipartPathName(basePath, i);
- if (checkExists && !zkClient.exists(path))
- continue;
-
zkClient.deleteIfExistsAsync(path);
}
}
@@ -1619,47 +1613,143 @@ public class ZookeeperDiscoveryImpl {
return;
}
- for (Map.Entry<Long, String> e : alives.entrySet()) {
- Long internalId = e.getKey();
+ generateJoinEvents(curTop, alives, MAX_NEW_EVTS);
- if (!rtState.top.nodesByInternalId.containsKey(internalId)) {
- UUID rslvFutId =
rtState.evtsData.communicationErrorResolveFutureId();
+ if (failedNodes != null)
+ handleProcessedEventsOnNodesFail(failedNodes);
+ }
- if (rslvFutId != null) {
- if (log.isInfoEnabled()) {
- log.info("Delay alive nodes change process while
communication error resolve " +
- "is in progress [reqId=" + rslvFutId + ']');
- }
+ private void generateJoinEvents(TreeMap<Long, ZookeeperClusterNode> curTop,
+ TreeMap<Long, String> alives,
+ final int MAX_NEW_EVTS) throws Exception
+ {
+ ZkBulkJoinContext joinCtx = new ZkBulkJoinContext();
- break;
- }
+ for (Map.Entry<Long, String> e : alives.entrySet()) {
+ Long internalId = e.getKey();
- if (processJoinOnCoordinator(curTop, internalId,
e.getValue())) {
- newEvts++;
+ if (!rtState.top.nodesByInternalId.containsKey(internalId)) {
+ UUID rslvFutId =
rtState.evtsData.communicationErrorResolveFutureId();
- if (newEvts == MAX_NEW_EVTS) {
- saveAndProcessNewEvents();
+ if (rslvFutId != null) {
+ if (log.isInfoEnabled()) {
+ log.info("Delay alive nodes change process while
communication error resolve " +
+ "is in progress [reqId=" + rslvFutId + ']');
+ }
- if (log.isInfoEnabled()) {
- log.info("Delay alive nodes change process, max
event threshold reached [newEvts=" + newEvts +
- ", totalEvts=" + rtState.evtsData.evts.size()
+ ']');
- }
+ break;
+ }
- throttleNewEventsGeneration();
+ processJoinOnCoordinator(joinCtx, curTop, internalId,
e.getValue());
-
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher,
rtState.watcher);
+ if (joinCtx.nodes() == MAX_NEW_EVTS) {
+ generateBulkJoinEvent(curTop, joinCtx);
- return;
- }
+ if (log.isInfoEnabled()) {
+ log.info("Delay alive nodes change process, max event
threshold reached [" +
+ "newEvts=" + joinCtx.nodes() +
+ ", totalEvts=" + rtState.evtsData.evts.size() +
']');
+ }
+
+ throttleNewEventsGeneration();
+
+ rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir,
rtState.watcher, rtState.watcher);
+
+ return;
+ }
+ }
+ }
+
+ if (joinCtx.nodes() > 0)
+ generateBulkJoinEvent(curTop, joinCtx);
+ }
+
+ private void generateBulkJoinEvent(TreeMap<Long, ZookeeperClusterNode>
curTop, ZkBulkJoinContext joinCtx)
+ throws Exception
+ {
+ rtState.evtsData.evtIdGen++;
+
+ long evtId = rtState.evtsData.evtIdGen;
+
+ List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes =
joinCtx.nodes;
+
+ assert nodes != null && nodes.size() > 0;
+
+ int nodeCnt = nodes.size();
+
+ List<ZkJoinedNodeEvtData> joinedNodes = new ArrayList<>(nodeCnt);
+
+ Map<Long, byte[]> discoDataMap = U.newHashMap(nodeCnt);
+ Map<Long, Long> dupDiscoData = null;
+
+ for (int i = 0; i < nodeCnt; i++) {
+ T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>> nodeEvtData =
nodes.get(i);
+
+ Map<Integer, Serializable> discoData = nodeEvtData.get2();
+
+ byte[] discoDataBytes = U.marshal(marsh, discoData);
+
+ Long dupDataNode = null;
+
+ for (Map.Entry<Long, byte[]> e : discoDataMap.entrySet()) {
+ if (Arrays.equals(discoDataBytes, e.getValue())) {
+ dupDataNode = e.getKey();
+
+ break;
}
}
+
+ long nodeTopVer = nodeEvtData.get1().topVer;
+
+ if (dupDataNode != null) {
+ if (dupDiscoData == null)
+ dupDiscoData = new HashMap<>();
+
+ Long old = dupDiscoData.put(nodeTopVer, dupDataNode);
+
+ assert old == null : old;
+ }
+ else
+ discoDataMap.put(nodeTopVer, discoDataBytes);
+
+ joinedNodes.add(nodeEvtData.get1());
}
- if (newEvts > 0)
- saveAndProcessNewEvents();
+ int overhead = 5;
- if (failedNodes != null)
- handleProcessedEventsOnNodesFail(failedNodes);
+ ZkJoinEventDataForJoined dataForJoined = new ZkJoinEventDataForJoined(
+ new ArrayList<>(curTop.values()),
+ discoDataMap,
+ dupDiscoData);
+
+ byte[] dataForJoinedBytes = marshalZip(dataForJoined);
+
+ long addDataStart = System.currentTimeMillis();
+
+ int dataForJoinedPartCnt =
saveData(zkPaths.joinEventDataPathForJoined(evtId),
+ dataForJoinedBytes,
+ overhead);
+
+ long addDataTime = System.currentTimeMillis() - addDataStart;
+
+ ZkDiscoveryNodeJoinEventData evtData = new
ZkDiscoveryNodeJoinEventData(
+ evtId,
+ rtState.evtsData.topVer,
+ joinedNodes,
+ dataForJoinedPartCnt);
+
+ rtState.evtsData.addEvent(curTop.values(), evtData);
+
+ if (log.isInfoEnabled()) {
+ log.info("Generated NODE_JOINED event [" +
+ "nodeCnt=" + nodeCnt +
+ ", dataForJoinedSize=" + dataForJoinedBytes.length +
+ ", dataForJoinedPartCnt=" + dataForJoinedPartCnt +
+ ", addDataTime=" + addDataTime +
+ ", evt=" + evtData + ']');
+ }
+
+ saveAndProcessNewEvents();
}
/**
@@ -1766,6 +1856,7 @@ public class ZookeeperDiscoveryImpl {
* @return {@code True} if new join event was added.
*/
private boolean processJoinOnCoordinator(
+ ZkBulkJoinContext joinCtx,
TreeMap<Long, ZookeeperClusterNode> curTop,
long internalId,
String aliveNodePath)
@@ -1786,7 +1877,9 @@ public class ZookeeperDiscoveryImpl {
assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
- generateNodeJoin(curTop,
+ addJoinedNode(
+ joinCtx,
+ curTop,
joiningNodeData,
internalId,
prefixId,
@@ -1985,7 +2078,8 @@ public class ZookeeperDiscoveryImpl {
* @param secSubjZipBytes Marshalled security subject.
* @throws Exception If failed.
*/
- private void generateNodeJoin(
+ private void addJoinedNode(
+ ZkBulkJoinContext joinCtx,
TreeMap<Long, ZookeeperClusterNode> curTop,
ZkJoiningNodeData joiningNodeData,
long internalId,
@@ -1998,13 +2092,10 @@ public class ZookeeperDiscoveryImpl {
UUID nodeId = joinedNode.id();
rtState.evtsData.topVer++;
- rtState.evtsData.evtIdGen++;
joinedNode.order(rtState.evtsData.topVer);
joinedNode.internalId(internalId);
- long evtId = rtState.evtsData.evtIdGen;
-
DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId);
joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData());
@@ -2017,60 +2108,37 @@ public class ZookeeperDiscoveryImpl {
Map<Integer, Serializable> commonData = collectBag.commonData();
- ZkJoinEventDataForJoined dataForJoined = new ZkJoinEventDataForJoined(
- new ArrayList<>(curTop.values()),
- commonData);
-
Object old = curTop.put(joinedNode.order(), joinedNode);
assert old == null;
- long addDataStart = System.currentTimeMillis();
-
- byte[] dataForJoinedBytes = marshalZip(dataForJoined);
-
int overhead = 5;
- int dataForJoinedPartCnt =
saveData(zkPaths.joinEventDataPathForJoined(evtId),
- dataForJoinedBytes,
- overhead);
-
int secSubjPartCnt = 0;
if (secSubjZipBytes != null) {
- secSubjPartCnt =
saveData(zkPaths.joinEventSecuritySubjectPath(evtId), secSubjZipBytes,
overhead);
+ secSubjPartCnt =
saveData(zkPaths.joinEventSecuritySubjectPath(joinedNode.order()),
+ secSubjZipBytes,
+ overhead);
assert secSubjPartCnt > 0 : secSubjPartCnt;
setNodeSecuritySubject(joinedNode, secSubjZipBytes);
}
- long addDataTime = System.currentTimeMillis() - addDataStart;
-
- ZkDiscoveryNodeJoinEventData evtData = new
ZkDiscoveryNodeJoinEventData(
- evtId,
+ ZkJoinedNodeEvtData nodeEvtData = new ZkJoinedNodeEvtData(
rtState.evtsData.topVer,
joinedNode.id(),
joinedNode.internalId(),
prefixId,
joiningNodeData.partCount(),
- dataForJoinedPartCnt,
secSubjPartCnt);
- rtState.evtsData.onNodeJoin(joinedNode);
-
- evtData.joiningNodeData = joiningNodeData;
-
- rtState.evtsData.addEvent(dataForJoined.topology(), evtData);
+ nodeEvtData.joiningNodeData = joiningNodeData;
- evtData.addRemainingAck(joinedNode); // Topology for joined node does
not contain joined node.
+ joinCtx.addJoinedNode(nodeEvtData, commonData);
- if (log.isInfoEnabled()) {
- log.info("Generated NODE_JOINED event [evt=" + evtData +
- ", dataForJoinedSize=" + dataForJoinedBytes.length +
- ", dataForJoinedPartCnt=" + dataForJoinedPartCnt +
- ", addDataTime=" + addDataTime + ']');
- }
+ rtState.evtsData.onNodeJoin(joinedNode);
}
/**
@@ -2509,116 +2577,82 @@ public class ZookeeperDiscoveryImpl {
try {
for (ZkDiscoveryEventData evtData :
evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) {
- if (!rtState.joined) {
- if (evtData.eventType() !=
ZkDiscoveryEventData.ZK_EVT_NODE_JOIN)
- continue;
-
- ZkDiscoveryNodeJoinEventData evtData0 =
(ZkDiscoveryNodeJoinEventData)evtData;
-
- UUID joinedId = evtData0.nodeId;
-
- boolean locJoin = evtData0.joinedInternalId ==
rtState.internalOrder;
-
- if (locJoin) {
- assert locNode.id().equals(joinedId);
+ if (log.isDebugEnabled())
+ log.debug("New discovery event data [evt=" + evtData + ",
evtsHist=" + evts.size() + ']');
- processLocalJoin(evtsData, evtData0);
+ switch (evtData.eventType()) {
+ case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: {
+ evtProcessed = processBulkJoin(evtsData,
(ZkDiscoveryNodeJoinEventData)evtData);
- evtProcessed = true;
+ break;
}
- }
- else {
- if (log.isDebugEnabled())
- log.debug("New discovery event data [evt=" + evtData +
", evtsHist=" + evts.size() + ']');
-
- switch (evtData.eventType()) {
- case ZkDiscoveryEventData.ZK_EVT_NODE_JOIN: {
- ZkDiscoveryNodeJoinEventData evtData0 =
(ZkDiscoveryNodeJoinEventData)evtData;
-
- ZkJoiningNodeData joiningData;
-
- if (rtState.crd) {
- assert evtData0.joiningNodeData != null;
- joiningData = evtData0.joiningNodeData;
- }
- else {
- joiningData =
unmarshalJoinData(evtData0.nodeId, evtData0.joinDataPrefixId);
-
- DiscoveryDataBag dataBag = new
DiscoveryDataBag(evtData0.nodeId);
-
-
dataBag.joiningNodeData(joiningData.discoveryData());
+ case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: {
+ if (!rtState.joined)
+ break;
- exchange.onExchange(dataBag);
- }
+ evtProcessed = true;
- if (evtData0.secSubjPartCnt > 0 &&
joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null)
- readAndInitSecuritySubject(joiningData.node(),
evtData0);
+ notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData);
- notifyNodeJoin(evtData0, joiningData);
+ break;
+ }
+ case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: {
+ if (!rtState.joined)
break;
- }
-
- case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: {
-
notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData);
- break;
- }
+ evtProcessed = true;
- case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: {
- ZkDiscoveryCustomEventData evtData0 =
(ZkDiscoveryCustomEventData)evtData;
+ ZkDiscoveryCustomEventData evtData0 =
(ZkDiscoveryCustomEventData)evtData;
- if (evtData0.ackEvent() &&
evtData0.topologyVersion() < locNode.order())
- break;
+ if (evtData0.ackEvent() && evtData0.topologyVersion()
< locNode.order())
+ break;
- DiscoverySpiCustomMessage msg;
+ DiscoverySpiCustomMessage msg;
- if (rtState.crd) {
- assert evtData0.resolvedMsg != null : evtData0;
+ if (rtState.crd) {
+ assert evtData0.resolvedMsg != null : evtData0;
- msg = evtData0.resolvedMsg;
- }
- else {
- if (evtData0.msg == null) {
- if (evtData0.ackEvent()) {
- String path =
zkPaths.ackEventDataPath(evtData0.origEvtId);
+ msg = evtData0.resolvedMsg;
+ }
+ else {
+ if (evtData0.msg == null) {
+ if (evtData0.ackEvent()) {
+ String path =
zkPaths.ackEventDataPath(evtData0.origEvtId);
- msg =
unmarshalZip(zkClient.getData(path));
- }
- else {
- assert evtData0.evtPath != null :
evtData0;
+ msg = unmarshalZip(zkClient.getData(path));
+ }
+ else {
+ assert evtData0.evtPath != null : evtData0;
- byte[] msgBytes =
readCustomEventData(zkClient,
- evtData0.evtPath,
- evtData0.sndNodeId);
+ byte[] msgBytes =
readCustomEventData(zkClient,
+ evtData0.evtPath,
+ evtData0.sndNodeId);
- msg = unmarshalZip(msgBytes);
- }
+ msg = unmarshalZip(msgBytes);
}
- else
- msg = evtData0.msg;
-
- evtData0.resolvedMsg = msg;
}
+ else
+ msg = evtData0.msg;
- if (msg instanceof ZkInternalMessage)
- processInternalMessage(evtData0,
(ZkInternalMessage)msg);
- else {
- notifyCustomEvent(evtData0, msg);
+ evtData0.resolvedMsg = msg;
+ }
- if (!evtData0.ackEvent())
- updateNodeInfo = true;
- }
+ if (msg instanceof ZkInternalMessage)
+ processInternalMessage(evtData0,
(ZkInternalMessage)msg);
+ else {
+ notifyCustomEvent(evtData0, msg);
- break;
+ if (!evtData0.ackEvent())
+ updateNodeInfo = true;
}
- default:
- assert false : "Invalid event: " + evtData;
+ break;
}
- evtProcessed = true;
+ default:
+ assert false : "Invalid event: " + evtData;
}
if (rtState.joined) {
@@ -2666,6 +2700,55 @@ public class ZookeeperDiscoveryImpl {
commErrFut.onTopologyChange(rtState.top); // This can add new
event, notify out of event process loop.
}
+ private boolean processBulkJoin(ZkDiscoveryEventsData evtsData,
ZkDiscoveryNodeJoinEventData evtData)
+ throws Exception
+ {
+ boolean evtProcessed = false;
+
+ for (int i = 0; i < evtData.joinedNodes.size(); i++) {
+ ZkJoinedNodeEvtData joinedEvtData = evtData.joinedNodes.get(i);
+
+ if (!rtState.joined) {
+ UUID joinedId = joinedEvtData.nodeId;
+
+ boolean locJoin = joinedEvtData.joinedInternalId ==
rtState.internalOrder;
+
+ if (locJoin) {
+ assert locNode.id().equals(joinedId);
+
+ processLocalJoin(evtsData, joinedEvtData, evtData);
+
+ evtProcessed = true;
+ }
+ }
+ else {
+ ZkJoiningNodeData joiningData;
+
+ if (rtState.crd) {
+ assert joinedEvtData.joiningNodeData != null;
+
+ joiningData = joinedEvtData.joiningNodeData;
+ }
+ else {
+ joiningData = unmarshalJoinData(joinedEvtData.nodeId,
joinedEvtData.joinDataPrefixId);
+
+ DiscoveryDataBag dataBag = new
DiscoveryDataBag(joinedEvtData.nodeId);
+
+ dataBag.joiningNodeData(joiningData.discoveryData());
+
+ exchange.onExchange(dataBag);
+ }
+
+ if (joinedEvtData.secSubjPartCnt > 0 &&
joiningData.node().attribute(ATTR_SECURITY_SUBJECT_V2) == null)
+ readAndInitSecuritySubject(joiningData.node(),
joinedEvtData);
+
+ notifyNodeJoin(joinedEvtData, joiningData);
+ }
+ }
+
+ return evtProcessed;
+ }
+
/**
* @param rtState Runtime state.
* @param updateNodeInfo {@code True} if need update processed events
without delay.
@@ -2745,14 +2828,14 @@ public class ZookeeperDiscoveryImpl {
/**
* @param node Node.
- * @param evtData Node join event data.
+ * @param joinedEvtData Joined node information.
* @throws Exception If failed.
*/
- private void readAndInitSecuritySubject(ZookeeperClusterNode node,
ZkDiscoveryNodeJoinEventData evtData) throws Exception {
- if (evtData.secSubjPartCnt > 0) {
+ private void readAndInitSecuritySubject(ZookeeperClusterNode node,
ZkJoinedNodeEvtData joinedEvtData) throws Exception {
+ if (joinedEvtData.secSubjPartCnt > 0) {
byte[] zipBytes = readMultipleParts(rtState.zkClient,
- zkPaths.joinEventSecuritySubjectPath(evtData.eventId()),
- evtData.secSubjPartCnt);
+ zkPaths.joinEventSecuritySubjectPath(joinedEvtData.topVer),
+ joinedEvtData.secSubjPartCnt);
setNodeSecuritySubject(node, zipBytes);
}
@@ -2764,7 +2847,9 @@ public class ZookeeperDiscoveryImpl {
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
- private void processLocalJoin(ZkDiscoveryEventsData evtsData, final
ZkDiscoveryNodeJoinEventData evtData)
+ private void processLocalJoin(ZkDiscoveryEventsData evtsData,
+ ZkJoinedNodeEvtData joinedEvtData,
+ ZkDiscoveryNodeJoinEventData evtData)
throws Exception
{
synchronized (stateMux) {
@@ -2781,7 +2866,7 @@ public class ZookeeperDiscoveryImpl {
spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo);
if (log.isInfoEnabled())
- log.info("Local join event data: " + evtData + ']');
+ log.info("Local join event data: " + joinedEvtData + ']');
String path =
zkPaths.joinEventDataPathForJoined(evtData.eventId());
@@ -2791,14 +2876,19 @@ public class ZookeeperDiscoveryImpl {
rtState.gridStartTime = evtsData.clusterStartTime;
- locNode.internalId(evtData.joinedInternalId);
- locNode.order(evtData.topologyVersion());
+ locNode.internalId(joinedEvtData.joinedInternalId);
+ locNode.order(joinedEvtData.topVer);
- readAndInitSecuritySubject(locNode, evtData);
+ readAndInitSecuritySubject(locNode, joinedEvtData);
+
+ byte[] discoDataBytes =
dataForJoined.discoveryDataForNode(locNode.order());
+
+ Map<Integer, Serializable> commonDiscoData =
+ marsh.unmarshal(discoDataBytes,
U.resolveClassLoader(spi.ignite().configuration()));
DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id());
- dataBag.commonData(dataForJoined.discoveryData());
+ dataBag.commonData(commonDiscoData);
exchange.onExchange(dataBag);
@@ -2807,6 +2897,10 @@ public class ZookeeperDiscoveryImpl {
for (int i = 0; i < allNodes.size(); i++) {
ZookeeperClusterNode node = allNodes.get(i);
+ // Need filter since ZkJoinEventDataForJoined contains single
topology snapshot for all joined nodes.
+ if (node.order() >= locNode.order())
+ break;
+
node.setMetrics(new ClusterMetricsSnapshot());
rtState.top.addNode(node);
@@ -2817,7 +2911,7 @@ public class ZookeeperDiscoveryImpl {
final List<ClusterNode> topSnapshot =
rtState.top.topologySnapshot();
lsnr.onDiscovery(EVT_NODE_JOINED,
- evtData.topologyVersion(),
+ joinedEvtData.topVer,
locNode,
topSnapshot,
Collections.<Long, Collection<ClusterNode>>emptyMap(),
@@ -2825,7 +2919,7 @@ public class ZookeeperDiscoveryImpl {
if (rtState.prevJoined) {
lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
- evtData.topologyVersion(),
+ joinedEvtData.topVer,
locNode,
topSnapshot,
Collections.<Long, Collection<ClusterNode>>emptyMap(),
@@ -2839,8 +2933,6 @@ public class ZookeeperDiscoveryImpl {
joinFut.onDone();
- deleteDataForJoinedAsync(evtData);
-
if (locNode.isClient())
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new
CheckClientsStatusCallback(rtState));
}
@@ -3302,15 +3394,15 @@ public class ZookeeperDiscoveryImpl {
}
/**
- * @param evtData Event data.
+ * @param joinedEvtData Event data.
* @param joiningData Joining node data.
*/
@SuppressWarnings("unchecked")
- private void notifyNodeJoin(final ZkDiscoveryNodeJoinEventData evtData,
ZkJoiningNodeData joiningData) {
+ private void notifyNodeJoin(ZkJoinedNodeEvtData joinedEvtData,
ZkJoiningNodeData joiningData) {
final ZookeeperClusterNode joinedNode = joiningData.node();
- joinedNode.order(evtData.topologyVersion());
- joinedNode.internalId(evtData.joinedInternalId);
+ joinedNode.order(joinedEvtData.topVer);
+ joinedNode.internalId(joinedEvtData.joinedInternalId);
joinedNode.setMetrics(new ClusterMetricsSnapshot());
@@ -3319,7 +3411,7 @@ public class ZookeeperDiscoveryImpl {
final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
lsnr.onDiscovery(EVT_NODE_JOINED,
- evtData.topologyVersion(),
+ joinedEvtData.topVer,
joinedNode,
topSnapshot,
Collections.<Long, Collection<ClusterNode>>emptyMap(),
@@ -3571,25 +3663,27 @@ public class ZookeeperDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("All nodes processed node join [evtData=" + evtData +
']');
- deleteJoiningNodeData(evtData.nodeId, evtData.joinDataPrefixId,
evtData.joinDataPartCnt);
+ for (int i = 0; i < evtData.joinedNodes.size(); i++) {
+ ZkJoinedNodeEvtData joinedEvtData = evtData.joinedNodes.get(i);
- deleteDataForJoinedAsync(evtData);
+ deleteJoiningNodeData(joinedEvtData.nodeId,
joinedEvtData.joinDataPrefixId, joinedEvtData.joinDataPartCnt);
- if (evtData.secSubjPartCnt > 0) {
- deleteMultiplePartsAsync(rtState.zkClient,
- zkPaths.joinEventSecuritySubjectPath(evtData.eventId()),
- evtData.secSubjPartCnt,
- false);
+ if (joinedEvtData.secSubjPartCnt > 0) {
+ deleteMultiplePartsAsync(rtState.zkClient,
+ zkPaths.joinEventSecuritySubjectPath(evtData.eventId()),
+ joinedEvtData.secSubjPartCnt);
+ }
}
+
+ deleteDataForJoinedAsync(evtData);
}
/**
* @param nodeId Node ID.
* @param joinDataPrefixId Path prefix.
* @param partCnt Parts count.
- * @throws Exception If failed.
*/
- private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int
partCnt) throws Exception {
+ private void deleteJoiningNodeData(UUID nodeId, UUID joinDataPrefixId, int
partCnt) {
String evtDataPath = zkPaths.joiningNodeDataPath(nodeId,
joinDataPrefixId);
if (log.isDebugEnabled())
@@ -3598,20 +3692,19 @@ public class ZookeeperDiscoveryImpl {
rtState.zkClient.deleteIfExistsAsync(evtDataPath);
if (partCnt > 1)
- deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":",
partCnt, true);
+ deleteMultiplePartsAsync(rtState.zkClient, evtDataPath + ":",
partCnt);
}
/**
* @param evtData Event data.
- * @throws Exception If failed.
*/
- private void deleteDataForJoinedAsync(ZkDiscoveryNodeJoinEventData
evtData) throws Exception {
+ private void deleteDataForJoinedAsync(ZkDiscoveryNodeJoinEventData
evtData) {
String dataForJoinedPath =
zkPaths.joinEventDataPathForJoined(evtData.eventId());
if (log.isDebugEnabled())
log.debug("Delete data for joined node [path=" + dataForJoinedPath
+ ']');
- deleteMultiplePartsAsync(rtState.zkClient, dataForJoinedPath,
evtData.dataForJoinedPartCnt, true);
+ deleteMultiplePartsAsync(rtState.zkClient, dataForJoinedPath,
evtData.dataForJoinedPartCnt);
}
/**
@@ -3800,7 +3893,7 @@ public class ZookeeperDiscoveryImpl {
byte[] marshalZip(Object obj) throws IgniteCheckedException {
assert obj != null;
- return zip(marsh.marshal(obj));
+ return zip(U.marshal(marsh, obj));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineTransformer.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineTransformer.java
b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineTransformer.java
index be758fa..b98e8b9 100644
---
a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineTransformer.java
+++
b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineTransformer.java
@@ -18,6 +18,7 @@
package org.apache.ignite.startup.cmdline;
import java.io.PrintStream;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
@@ -84,8 +85,9 @@ public class CommandLineTransformer {
PrintStream ps = null;
try {
- // Intentionality configure output stream with UTF-8 encoding to
support non-ASCII named parameter values.
- ps = new PrintStream(System.out, true, "UTF-8");
+ String encoding = System.getProperty("file.encoding",
Charset.defaultCharset().name());
+
+ ps = new PrintStream(System.out, true, encoding);
ps.println(transform(args));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheFailedUpdateResponseTest.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheFailedUpdateResponseTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheFailedUpdateResponseTest.java
new file mode 100644
index 0000000..ebcff7c
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheFailedUpdateResponseTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CachePartialUpdateException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Checks that no future hangs on non-srializable exceptions and values.
+ */
+public class IgniteCacheFailedUpdateResponseTest extends
GridCommonAbstractTest {
+ /** Atomic cache. */
+ private static final String ATOMIC_CACHE = "atomic";
+
+ /** Tx cache. */
+ private static final String TX_CACHE = "tx";
+
+ /** Atomic cache. */
+ private IgniteCache<Object, Object> atomicCache;
+
+ /** Tx cache. */
+ private IgniteCache<Object, Object> txCache;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ CacheConfiguration atomicCfg = new CacheConfiguration(ATOMIC_CACHE);
+ CacheConfiguration txCfg = new CacheConfiguration(TX_CACHE);
+
+ atomicCfg.setBackups(1);
+ txCfg.setBackups(1);
+
+ txCfg.setAtomicityMode(TRANSACTIONAL);
+
+ cfg.setCacheConfiguration(atomicCfg, txCfg);
+
+ cfg.setClientMode(igniteInstanceName.contains("client"));
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(LOCAL_IP_FINDER);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrid("server-1");
+ startGrid("server-2");
+ startGrid("client");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ atomicCache = grid("client").cache(ATOMIC_CACHE);
+ txCache = grid("client").cache(TX_CACHE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeAtomic() throws Exception {
+ testInvoke(atomicCache);
+ testInvokeAll(atomicCache);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeTx() throws Exception {
+ testInvoke(txCache);
+ testInvokeAll(txCache);
+
+ IgniteEx client = grid("client");
+
+ Callable<Object> clos = new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ testInvoke(txCache);
+ testInvokeAll(txCache);
+
+ return null;
+ }
+ };
+
+ doInTransaction(client, PESSIMISTIC, READ_COMMITTED, clos);
+ doInTransaction(client, PESSIMISTIC, REPEATABLE_READ, clos);
+ doInTransaction(client, PESSIMISTIC, SERIALIZABLE, clos);
+ doInTransaction(client, OPTIMISTIC, READ_COMMITTED, clos);
+ doInTransaction(client, OPTIMISTIC, REPEATABLE_READ, clos);
+ doInTransaction(client, OPTIMISTIC, SERIALIZABLE, clos);
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void testInvoke(final IgniteCache<Object, Object> cache) throws
Exception {
+ Class<? extends Exception> exp = grid("client").transactions().tx() ==
null
+ ? EntryProcessorException.class
+ : NonSerializableException.class;
+
+ //noinspection ThrowableNotThrown
+ assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.invoke("1", new UpdateProcessor());
+
+ return null;
+ }
+ }, exp, null);
+
+ if (ATOMIC_CACHE.equals(cache.getName())) {
+ //noinspection ThrowableNotThrown
+ assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.invoke("1", new UpdateValueProcessor());
+
+ return null;
+ }
+ }, CachePartialUpdateException.class, null);
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void testInvokeAll(final IgniteCache<Object, Object> cache) throws
Exception {
+ Map<Object, EntryProcessorResult<Object>> results =
cache.invokeAll(Collections.singleton("1"), new UpdateProcessor());
+
+ final EntryProcessorResult<Object> epRes = F.first(results.values());
+
+ assertNotNull(epRes);
+
+ // In transactions EP will be invoked locally.
+ Class<? extends Exception> exp = grid("client").transactions().tx() ==
null
+ ? EntryProcessorException.class
+ : NonSerializableException.class;
+
+ //noinspection ThrowableNotThrown
+ assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ epRes.get();
+
+ return null;
+ }
+ }, exp, null);
+
+ if (ATOMIC_CACHE.equals(cache.getName())) {
+ //noinspection ThrowableNotThrown
+ assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.invokeAll(Collections.singleton("1"), new
UpdateValueProcessor());
+
+ return null;
+ }
+ }, CachePartialUpdateException.class, null);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Value implements Externalizable, Binarylizable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ *
+ */
+ public Value() {
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws
IOException {
+ throw new NotSerializableException("Test marshalling exception");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws
BinaryObjectException {
+ throw new BinaryObjectException("Test marshalling exception");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws
BinaryObjectException {
+ // No-op.
+ }
+ }
+
+ /**
+ *
+ */
+ private static class NonSerializableException extends
EntryProcessorException implements Externalizable, Binarylizable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ *
+ */
+ public NonSerializableException() {
+ super();
+ }
+
+ /**
+ * @param msg Message.
+ */
+ NonSerializableException(String msg) {
+ super(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws
IOException {
+ throw new NotSerializableException("Test marshalling exception");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws
BinaryObjectException {
+ throw new BinaryObjectException("Test marshalling exception");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws
BinaryObjectException {
+ // No-op.
+ }
+ }
+
+ /**
+ *
+ */
+ private static class UpdateProcessor implements
CacheEntryProcessor<Object, Object, Object> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> entry,
+ Object... arguments) throws EntryProcessorException {
+ throw new NonSerializableException("Test exception");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class UpdateValueProcessor implements
CacheEntryProcessor<Object, Object, Object> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> entry,
+ Object... arguments) throws EntryProcessorException {
+ return new Value();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 20_000;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/test/java/org/apache/ignite/internal/util/tostring/GridToStringBuilderSelfTest.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/tostring/GridToStringBuilderSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/tostring/GridToStringBuilderSelfTest.java
index 04186a5..4ac05fb 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/tostring/GridToStringBuilderSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/tostring/GridToStringBuilderSelfTest.java
@@ -161,6 +161,62 @@ public class GridToStringBuilderSelfTest extends
GridCommonAbstractTest {
for (Object val : vals)
testArr(val, limit);
+ byte[] byteArr = new byte[1];
+ byteArr[0] = 1;
+ assertEquals(Arrays.toString(byteArr),
GridToStringBuilder.arrayToString(byteArr.getClass(), byteArr));
+ byteArr = Arrays.copyOf(byteArr, 101);
+ assertTrue("Can't find \"... and 1 more\" in overflowed array string!",
+ GridToStringBuilder.arrayToString(byteArr.getClass(),
byteArr).contains("... and 1 more"));
+
+ boolean[] boolArr = new boolean[1];
+ boolArr[0] = true;
+ assertEquals(Arrays.toString(boolArr),
GridToStringBuilder.arrayToString(boolArr.getClass(), boolArr));
+ boolArr = Arrays.copyOf(boolArr, 101);
+ assertTrue("Can't find \"... and 1 more\" in overflowed array string!",
+ GridToStringBuilder.arrayToString(boolArr.getClass(),
boolArr).contains("... and 1 more"));
+
+ short[] shortArr = new short[1];
+ shortArr[0] = 100;
+ assertEquals(Arrays.toString(shortArr),
GridToStringBuilder.arrayToString(shortArr.getClass(), shortArr));
+ shortArr = Arrays.copyOf(shortArr, 101);
+ assertTrue("Can't find \"... and 1 more\" in overflowed array string!",
+ GridToStringBuilder.arrayToString(shortArr.getClass(),
shortArr).contains("... and 1 more"));
+
+ int[] intArr = new int[1];
+ intArr[0] = 10000;
+ assertEquals(Arrays.toString(intArr),
GridToStringBuilder.arrayToString(intArr.getClass(), intArr));
+ intArr = Arrays.copyOf(intArr, 101);
+ assertTrue("Can't find \"... and 1 more\" in overflowed array string!",
+ GridToStringBuilder.arrayToString(intArr.getClass(),
intArr).contains("... and 1 more"));
+
+ long[] longArr = new long[1];
+ longArr[0] = 10000000;
+ assertEquals(Arrays.toString(longArr),
GridToStringBuilder.arrayToString(longArr.getClass(), longArr));
+ longArr = Arrays.copyOf(longArr, 101);
+ assertTrue("Can't find \"... and 1 more\" in overflowed array string!",
+ GridToStringBuilder.arrayToString(longArr.getClass(),
longArr).contains("... and 1 more"));
+
+ float[] floatArr = new float[1];
+ floatArr[0] = 1.f;
+ assertEquals(Arrays.toString(floatArr),
GridToStringBuilder.arrayToString(floatArr.getClass(), floatArr));
+ floatArr = Arrays.copyOf(floatArr, 101);
+ assertTrue("Can't find \"... and 1 more\" in overflowed array string!",
+ GridToStringBuilder.arrayToString(floatArr.getClass(),
floatArr).contains("... and 1 more"));
+
+ double[] doubleArr = new double[1];
+ doubleArr[0] = 1.;
+ assertEquals(Arrays.toString(doubleArr),
GridToStringBuilder.arrayToString(doubleArr.getClass(), doubleArr));
+ doubleArr = Arrays.copyOf(doubleArr, 101);
+ assertTrue("Can't find \"... and 1 more\" in overflowed array string!",
+ GridToStringBuilder.arrayToString(doubleArr.getClass(),
doubleArr).contains("... and 1 more"));
+
+ char[] charArr = new char[1];
+ charArr[0] = 'a';
+ assertEquals(Arrays.toString(charArr),
GridToStringBuilder.arrayToString(charArr.getClass(), charArr));
+ charArr = Arrays.copyOf(charArr, 101);
+ assertTrue("Can't find \"... and 1 more\" in overflowed array string!",
+ GridToStringBuilder.arrayToString(charArr.getClass(),
charArr).contains("... and 1 more"));
+
Map<String, String> strMap = new TreeMap<>();
List<String> strList = new ArrayList<>(limit+1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
index 75ecb8c..273200a 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
@@ -380,8 +380,7 @@ public class ZookeeperDiscoverySpiTest extends
GridCommonAbstractTest {
throw new IgniteException("Failed to create directory for
test Zookeeper server: " + file.getAbsolutePath());
}
-
- specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, 1000, -1));
+ specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, 1000,
10_000));
}
return new TestingCluster(specs);
@@ -1471,6 +1470,41 @@ public class ZookeeperDiscoverySpiTest extends
GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testConcurrentStart() throws Exception {
+ final int NODES = 20;
+
+ for (int i = 0; i < 3; i++) {
+ info("Iteration: " + i);
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ final CyclicBarrier b = new CyclicBarrier(NODES);
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ b.await();
+
+ int threadIdx = idx.getAndIncrement();
+
+ startGrid(threadIdx);
+
+ return null;
+ }
+ }, NODES, "start-node");
+
+ waitForTopology(NODES);
+
+ stopAllGrids();
+
+ checkEventsConsistency();
+
+ evts.clear();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testConcurrentStartStop1() throws Exception {
concurrentStartStop(1);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f53d8574/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index ba7aa1b..7a4d4be 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -98,6 +98,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClass
import
org.apache.ignite.internal.processors.cache.distributed.CacheStartOnJoinTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest;
+import
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheFailedUpdateResponseTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheReadFromBackupTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSingleGetMessageTest;
@@ -238,6 +239,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheStartTest.class);
suite.addTestSuite(CacheDiscoveryDataConcurrentJoinTest.class);
suite.addTestSuite(IgniteClientCacheInitializationFailTest.class);
+ suite.addTestSuite(IgniteCacheFailedUpdateResponseTest.class);
suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);