Merge remote-tracking branch 'remotes/origin/master' into ignite-zk # Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a5a54707 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a5a54707 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a5a54707 Branch: refs/heads/ignite-zk Commit: a5a5470730cbc10122b950b2aa9e52369d725626 Parents: 8ee69f5 89c82f5 Author: sboikov <[email protected]> Authored: Wed Nov 29 11:43:10 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Nov 29 11:43:10 2017 +0300 ---------------------------------------------------------------------- .../eviction/AbstractEvictionPolicyFactory.java | 4 +- .../cache/binary/BinaryMetadataTransport.java | 11 +- .../persistence/pagemem/PageMemoryImpl.java | 2 +- .../continuous/GridContinuousProcessor.java | 276 +++++++------ .../platform/client/ClientMessageParser.java | 97 +++-- .../processors/query/GridQueryProcessor.java | 7 +- .../schema/SchemaIndexCacheVisitorImpl.java | 153 ++++++- .../operation/SchemaIndexCreateOperation.java | 16 +- .../apache/ignite/internal/sql/SqlKeyword.java | 3 + .../ignite/internal/sql/SqlParserUtils.java | 8 +- .../sql/command/SqlCreateIndexCommand.java | 95 ++++- .../IgniteClientReconnectCacheTest.java | 13 +- .../sql/SqlParserCreateIndexSelfTest.java | 152 ++++--- .../query/h2/ddl/DdlStatementsProcessor.java | 6 +- .../cache/index/AbstractSchemaSelfTest.java | 7 +- .../DynamicIndexAbstractBasicSelfTest.java | 168 +++++++- .../DynamicIndexAbstractConcurrentSelfTest.java | 36 +- .../cache/index/SchemaExchangeSelfTest.java | 2 +- .../Common/TestLogger.cs | 74 ++++ .../Common/TestUtils.DotNetCore.cs | 46 ++- .../Client/ClientConnectionTest.cs | 4 +- .../Client/RawSocketTest.cs | 2 +- .../Log/CustomLoggerTest.cs | 7 - .../TestUtils.Common.cs | 3 - .../Apache.Ignite.Core.csproj | 5 +- .../Apache.Ignite.Core/IgniteConfiguration.cs | 7 +- .../dotnet/Apache.Ignite.Core/Ignition.cs | 2 +- .../Apache.Ignite.Core/Impl/Client/ClientOp.cs | 85 ++-- .../Impl/Client/ClientSocket.cs | 12 +- .../Apache.Ignite.Core/Impl/IgniteManager.cs | 8 + .../Apache.Ignite.Core/Impl/IgniteUtils.cs | 242 ----------- .../dotnet/Apache.Ignite.Core/Impl/Shell.cs | 2 + .../Impl/Unmanaged/DllLoader.cs | 176 -------- .../Impl/Unmanaged/Jni/DllLoader.cs | 210 ++++++++++ .../Impl/Unmanaged/Jni/Jvm.cs | 80 +--- .../Impl/Unmanaged/Jni/JvmDll.cs | 414 +++++++++++++++++++ .../Impl/Unmanaged/Jni/JvmInitArgs.cs | 33 ++ .../Impl/Unmanaged/Jni/JvmOption.cs | 34 ++ .../Apache.Ignite.Core/Impl/Unmanaged/Os.cs | 6 + modules/platforms/dotnet/Apache.Ignite.ndproj | 2 +- .../app/modules/branding/branding.service.js | 2 +- .../frontend/app/primitives/ui-grid/index.scss | 1 + modules/web-console/frontend/views/index.pug | 2 +- .../web-console/frontend/views/sql/sql.tpl.pug | 10 +- 44 files changed, 1648 insertions(+), 877 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a5a54707/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index bd9818a,571d654..4194622 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@@ -188,10 -176,10 +188,12 @@@ public class GridContinuousProcessor ex @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StartRoutineDiscoveryMessage msg) { + assert discoProtoVer == 1 : discoProtoVer; + - if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping()) - processStartRequest(snd, msg); + if (ctx.isStopping()) + return; + + processStartRequest(snd, msg); } }); @@@ -255,19 -200,10 +228,13 @@@ @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StopRoutineDiscoveryMessage msg) { + if (discoProtoVer == 2) + routinesInfo.removeRoutine(msg.routineId); + - if (!snd.id().equals(ctx.localNodeId())) { - UUID routineId = msg.routineId(); - - unregisterRemote(routineId); - } + if (ctx.isStopping()) + return; - for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) { - if (clientInfo.remove(msg.routineId()) != null) - break; - } + processStopRequest(snd, msg); } }); @@@ -502,28 -426,14 +469,28 @@@ /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { if (log.isDebugEnabled()) { - log.info("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() + + log.debug("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() + - ", loc=" + ctx.localNodeId() + - ", data=" + data.joiningNodeData() + - ']'); + ", loc=" + ctx.localNodeId() + + ", data=" + data.joiningNodeData() + + ']'); } - if (data.hasJoiningNodeData()) - onDiscoDataReceived((DiscoveryData) data.joiningNodeData()); + if (discoProtoVer == 2) { + if (data.hasJoiningNodeData()) { + ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) + data.joiningNodeData(); + + for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { + routinesInfo.addRoutineInfo(routineInfo); + + startRoutine(routineInfo); + } + } + } + else { + if (data.hasJoiningNodeData()) + onDiscoDataReceived((DiscoveryData) data.joiningNodeData()); + } } /** {@inheritDoc} */ @@@ -1335,6 -1120,6 +1280,99 @@@ } /** ++ * @param snd Sender. ++ * @param msg Start request. ++ */ ++ private void processStartRequestV2(ClusterNode snd, StartRoutineDiscoveryMessageV2 msg) { ++ StartRequestDataV2 reqData = msg.startRequestData(); ++ ++ ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(), ++ msg.routineId(), ++ reqData.handlerBytes(), ++ reqData.nodeFilterBytes(), ++ reqData.bufferSize(), ++ reqData.interval(), ++ reqData.autoUnsubscribe()); ++ ++ routinesInfo.addRoutineInfo(routineInfo); ++ ++ Exception err = null; ++ ++ IgnitePredicate<ClusterNode> nodeFilter = null; ++ ++ if (reqData.nodeFilterBytes() != null) { ++ try { ++ if (ctx.config().isPeerClassLoadingEnabled() && reqData.className() != null) { ++ String clsName = reqData.className(); ++ GridDeploymentInfo depInfo = reqData.deploymentInfo(); ++ ++ GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), ++ clsName, ++ clsName, ++ depInfo.userVersion(), ++ snd.id(), ++ depInfo.classLoaderId(), ++ depInfo.participants(), ++ null); ++ ++ if (dep == null) ++ throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); ++ ++ nodeFilter = U.unmarshal(marsh, reqData.nodeFilterBytes(), U.resolveClassLoader(dep.classLoader(), ctx.config())); ++ } ++ else ++ nodeFilter = U.unmarshal(marsh, reqData.nodeFilterBytes(), U.resolveClassLoader(ctx.config())); ++ ++ if (nodeFilter != null) ++ ctx.resource().injectGeneric(nodeFilter); ++ } ++ catch (Exception e) { ++ err = e; ++ ++ U.error(log, "Failed to unmarshal continuous routine filter [" + ++ "routineId=" + routineInfo.routineId + ++ ", srcNodeId=" + routineInfo.srcNodeId + ']', e); ++ } ++ } ++ ++ boolean register = err == null && (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())); ++ ++ if (register) { ++ try { ++ GridContinuousHandler hnd = U.unmarshal(marsh, reqData.handlerBytes(), U.resolveClassLoader(ctx.config())); ++ ++ if (ctx.config().isPeerClassLoadingEnabled()) ++ hnd.p2pUnmarshal(snd.id(), ctx); ++ ++ if (msg.keepBinary()) { ++ assert hnd instanceof CacheContinuousQueryHandler : hnd; ++ ++ ((CacheContinuousQueryHandler)hnd).keepBinary(true); ++ } ++ ++ GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ? ++ new GridMessageListenHandler((GridMessageListenHandler)hnd) : ++ hnd; ++ ++ registerHandler(snd.id(), ++ msg.routineId, ++ hnd0, ++ reqData.bufferSize(), ++ reqData.interval(), ++ reqData.autoUnsubscribe(), ++ false); ++ } ++ catch (Exception e) { ++ err = e; ++ ++ U.error(log, "Failed to register continuous routine handler [" + ++ "routineId=" + routineInfo.routineId + ++ ", srcNodeId=" + routineInfo.srcNodeId + ']', e); ++ } ++ } ++ } ++ ++ /** * @param msg Message. */ private void processMessageAck(GridContinuousMessage msg) {
