#[GG-10298]: corrected after review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/39f6928d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/39f6928d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/39f6928d Branch: refs/heads/ignite-gg-10298 Commit: 39f6928defd2050aca05effaa590eedf628fddc8 Parents: 671fb68 Author: iveselovskiy <[email protected]> Authored: Tue Jun 30 14:04:42 2015 +0300 Committer: iveselovskiy <[email protected]> Committed: Tue Jun 30 14:04:42 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 70 ++++++++++---------- .../plugin/extensions/communication/IoPool.java | 42 ++++++++++++ .../communication/IoPoolExtension.java | 50 -------------- 3 files changed, 78 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39f6928d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index a58cf73..b79c8b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -68,9 +68,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** Disconnect listeners. */ private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>(); - /** Map of {@link IoPoolExtension}-s injected by Ignite plugins. */ - // TODO: This should not be concurrent map. Either HashMap or array. - private final ConcurrentMap<Byte, IoPoolExtension> ioPoolExtensionMap = new ConcurrentHashMap8<>(128); + /** Map of {@link IoPool}-s injected by Ignite plugins. */ + private final IoPool[] ioPools = new IoPool[128]; /** Public pool. */ private ExecutorService pubPool; @@ -270,30 +269,30 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ private void registerIoPoolExtensions() throws IgniteCheckedException { // Process custom IO messaging pool extensions: - final IoPoolExtension[] executorExtensions - = ctx.plugins().extensions(IoPoolExtension.class); + final IoPool[] executorExtensions + = ctx.plugins().extensions(IoPool.class); if (executorExtensions != null) { // Store it into the map and check for duplicates: - for (IoPoolExtension ex : executorExtensions) { + for (IoPool ex : executorExtensions) { final byte id = ex.id(); // 1. Check the pool id is non-negative: if (id < 0) throw new IgniteCheckedException("Failed to register IO executor pool because its Id is negative " + - "[id=" + id + ", pluginId=" + ex.pligunId() + ']'); + "[id=" + id + ']'); // 2. Check the pool id is in allowed range: if (isReservedGridIoPolicy(id)) throw new IgniteCheckedException("Failed to register IO executor pool because its Id in in the " + - "reserved range (0-31) [id=" + id + ", pluginId=" + ex.pligunId() + ']'); + "reserved range (0-31) [id=" + id + ']'); // 3. Check the pool for duplicates: - IoPoolExtension pushedOut = ioPoolExtensionMap.putIfAbsent(id, ex); - if (pushedOut != null) + if (ioPools[id] != null) throw new IgniteCheckedException("Failed to register IO executor pool because its " + - "Id as already used [id=" + ex.id() + ", pluginId=" - + ex.pligunId() + ", existingPoolPluginId=" + pushedOut.pligunId() + ']'); + "Id as already used [id=" + id + ']'); + + ioPools[id] = ex; } } } @@ -493,7 +492,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (log.isDebugEnabled()) log.debug(stopInfo()); - ioPoolExtensionMap.clear(); + Arrays.fill(ioPools, null); } /** @@ -586,17 +585,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } default: - // TODO: Assert. - if (plc < 0) - throw new IgniteException("Failed to process message with negative policy. [policy=" - + plc + ']'); - - // TODO: If from reserved range - ignore. - // TODO: If from not-reserved range and IoExtension exists - process. - // TODO: If from not-reserved range and IoExtension doesnt exist - ignore. + assert plc >= 0 : "Negative policy: " + plc; + if (isReservedGridIoPolicy(plc)) - throw new IgniteException("Failed to process message with policy of reserved range. [policy=" - + plc + ']'); + throw new IgniteCheckedException("Failed to process message with policy of reserved range. " + + "[policy=" + plc + ']'); if (msg.isOrdered()) processOrderedMessage(nodeId, msg, plc, msgC); @@ -618,7 +611,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param plc Policy. * @return Execution pool. */ - private Executor pool(byte plc) { + private Executor pool(byte plc) throws IgniteCheckedException { switch (plc) { case P2P_POOL: return p2pPool; @@ -642,21 +635,27 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return marshCachePool; default: { + assert plc >= 0 : "Negative policy: " + plc; + if (isReservedGridIoPolicy(plc)) - throw new IgniteException("Failed to process message with policy of reserved range, [policy=" - + plc + ']'); + throw new IgniteCheckedException("Failed to process message with policy of reserved" + + " range (0-31), [policy=" + plc + ']'); - IoPoolExtension pool = ioPoolExtensionMap.get(plc); + IoPool pool = ioPools[plc]; - // TODO: Assert if (pool == null) - throw new IgniteException("Failed to process message because corresponding executor pool is not " - + " found. [id=" + plc + ']'); + throw new IgniteCheckedException("Failed to process message because no pool is registered " + + "for policy. [policy=" + plc + ']'); assert plc == pool.id(); - // TODO: Check for null and throw IgniteCheckedException. - return pool.executor(); + Executor ex = pool.executor(); + + if (ex == null) + throw new IgniteCheckedException("Failed to process message because corresponding executor " + + "is null. [id=" + plc + ']'); + + return ex; } } } @@ -718,7 +717,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa final GridIoMessage msg, byte plc, final IgniteRunnable msgC - ) { + ) throws IgniteCheckedException { Runnable c = new Runnable() { @Override public void run() { try { @@ -776,7 +775,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa final GridIoMessage msg, final byte plc, @Nullable final IgniteRunnable msgC - ) { + ) throws IgniteCheckedException { assert msg != null; long timeout = msg.timeout(); @@ -1451,6 +1450,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa for (GridCommunicationMessageSet msgSet : msgSets) unwindMessageSet(msgSet, lsnr); } + catch (IgniteCheckedException ice) { + throw new IgniteException(ice); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39f6928d/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPool.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPool.java new file mode 100644 index 0000000..e87b82c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPool.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +import org.apache.ignite.plugin.*; + +import java.util.concurrent.*; + +/** + * The interface of IO Messaging Pool Extension. + */ +public interface IoPool extends Extension { + /** + * Gets the numeric identifier of the pool. This identifier is to be taken from serialized + * message and used to find the appropriate executor pool to process it. + * + * @return The id. + */ + public byte id(); + + /** + * Gets the Executor for this Pool. Cannot be null. + * + * @return The executor. + */ + public Executor executor(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39f6928d/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPoolExtension.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPoolExtension.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPoolExtension.java deleted file mode 100644 index f452bfd..0000000 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPoolExtension.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.plugin.extensions.communication; - -import org.apache.ignite.plugin.*; - -import java.util.concurrent.*; - -/** - * The interface of IO Messaging Pool Extension. - */ -// TODO: IoPoolExtension -> IoPool -public interface IoPoolExtension extends Extension { - /** - * Gets the numeric identifier of the pool. This identifier is to be taken from serialized - * message and used to find the appropriate executor pool to process it. - * - * @return The id. - */ - public byte id(); - - /** - * Gets the Executor for this Pool. Cannot be null. - * - * @return The executor. - */ - public Executor executor(); - - // TODO: Remove. - /** - * Gets the Id of the plugin that injected this executor pool. - * @return The plugin Id. - */ - public String pligunId(); -}
