Platforms: WIP on better interfaces.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66d46eca Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66d46eca Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66d46eca Branch: refs/heads/ignite-1093-2 Commit: 66d46ecacd0a6babce6e0a722c580d6646eeb9cb Parents: 39da853 Author: vozerov-gridgain <[email protected]> Authored: Tue Sep 1 11:23:50 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Sep 1 11:23:50 2015 +0300 ---------------------------------------------------------------------- .../processors/platform/PlatformContext.java | 13 +- .../cluster/PlatformClusterNodeFilter.java | 28 +++++ .../datastreamer/PlatformStreamReceiver.java | 27 +++++ .../cluster/PlatformClusterNodeFilter.java | 79 ------------ .../cluster/PlatformClusterNodeFilterImpl.java | 78 ++++++++++++ .../datastreamer/PlatformStreamReceiver.java | 120 ------------------- .../PlatformStreamReceiverImpl.java | 119 ++++++++++++++++++ 7 files changed, 259 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/66d46eca/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java index bff0fc8..4c70360 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.platform; -import java.util.Collection; -import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cluster.ClusterMetrics; @@ -33,15 +31,18 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; +import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilter; import org.apache.ignite.internal.processors.platform.compute.PlatformJob; +import org.apache.ignite.internal.processors.platform.datastreamer.PlatformStreamReceiver; import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.stream.StreamReceiver; import org.jetbrains.annotations.Nullable; +import java.util.Collection; +import java.util.UUID; + /** * Platform context. Acts as an entry point for platform operations. */ @@ -266,7 +267,7 @@ public interface PlatformContext { * @param keepPortable Keep portable flag. * @return Stream receiver. */ - public StreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable); + public PlatformStreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable); /** * Create cluster node filter. @@ -274,5 +275,5 @@ public interface PlatformContext { * @param filter Native filter. * @return Cluster node filter. */ - public IgnitePredicate<ClusterNode> createClusterNodeFilter(Object filter); + public PlatformClusterNodeFilter createClusterNodeFilter(Object filter); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/66d46eca/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java new file mode 100644 index 0000000..fd550fb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cluster; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.lang.IgnitePredicate; + +/** + * Platform cluster node filter marker interface. + */ +public interface PlatformClusterNodeFilter extends IgnitePredicate<ClusterNode> { + // No-op. +} http://git-wip-us.apache.org/repos/asf/ignite/blob/66d46eca/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java new file mode 100644 index 0000000..9108920 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.datastreamer; + +import org.apache.ignite.stream.StreamReceiver; + +/** + * Platform data streamer filter marker interface. + */ +public interface PlatformStreamReceiver extends StreamReceiver<Object, Object> { + // No-op. +} http://git-wip-us.apache.org/repos/asf/ignite/blob/66d46eca/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java deleted file mode 100644 index eb203cd..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform.cluster; - -import org.apache.ignite.Ignite; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.portable.PortableRawWriterEx; -import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; -import org.apache.ignite.internal.processors.platform.PlatformContext; -import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; -import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.resources.IgniteInstanceResource; - -/** - * Interop cluster node filter. - */ -public class PlatformClusterNodeFilter extends PlatformAbstractPredicate implements IgnitePredicate<ClusterNode> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * {@link java.io.Externalizable} support. - */ - public PlatformClusterNodeFilter() { - // No-op. - } - - /** - * Constructor. - * - * @param pred .Net portable predicate. - * @param ctx Kernal context. - */ - public PlatformClusterNodeFilter(Object pred, PlatformContext ctx) { - super(pred, 0, ctx); - } - - /** {@inheritDoc} */ - @Override public boolean apply(ClusterNode clusterNode) { - try (PlatformMemory mem = ctx.memory().allocate()) { - PlatformOutputStream out = mem.output(); - - PortableRawWriterEx writer = ctx.writer(out); - - writer.writeObject(pred); - ctx.writeNode(writer, clusterNode); - - out.synchronize(); - - return ctx.gateway().clusterNodeFilterApply(mem.pointer()) != 0; - } - } - - /** - * @param ignite Ignite instance. - */ - @SuppressWarnings("UnusedDeclaration") - @IgniteInstanceResource - public void setIgniteInstance(Ignite ignite) { - ctx = PlatformUtils.platformContext(ignite); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/66d46eca/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java new file mode 100644 index 0000000..5ba9a85 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cluster; + +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.resources.IgniteInstanceResource; + +/** + * Interop cluster node filter. + */ +public class PlatformClusterNodeFilterImpl extends PlatformAbstractPredicate implements PlatformClusterNodeFilter { + /** */ + private static final long serialVersionUID = 0L; + + /** + * {@link java.io.Externalizable} support. + */ + public PlatformClusterNodeFilterImpl() { + // No-op. + } + + /** + * Constructor. + * + * @param pred .Net portable predicate. + * @param ctx Kernal context. + */ + public PlatformClusterNodeFilterImpl(Object pred, PlatformContext ctx) { + super(pred, 0, ctx); + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(pred); + ctx.writeNode(writer, clusterNode); + + out.synchronize(); + + return ctx.gateway().clusterNodeFilterApply(mem.pointer()) != 0; + } + } + + /** + * @param ignite Ignite instance. + */ + @SuppressWarnings("UnusedDeclaration") + @IgniteInstanceResource + public void setIgniteInstance(Ignite ignite) { + ctx = PlatformUtils.platformContext(ignite); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/66d46eca/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java deleted file mode 100644 index 851216a..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform.datastreamer; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collection; -import java.util.Map; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.portable.PortableRawWriterEx; -import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; -import org.apache.ignite.internal.processors.platform.PlatformContext; -import org.apache.ignite.internal.processors.platform.cache.PlatformCache; -import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; -import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.stream.StreamReceiver; - -/** - * Interop receiver. - */ -public class PlatformStreamReceiver<K, V> extends PlatformAbstractPredicate implements StreamReceiver<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private boolean keepPortable; - - /** - * Constructor. - */ - public PlatformStreamReceiver() - { - super(); - } - - /** - * Constructor. - * - * @param pred .Net portable receiver. - * @param ptr Pointer to receiver in the native platform. - * @param ctx Kernal context. - */ - public PlatformStreamReceiver(Object pred, long ptr, boolean keepPortable, PlatformContext ctx) { - super(pred, ptr, ctx); - - assert pred != null; - - this.keepPortable = keepPortable; - } - - /** {@inheritDoc} */ - @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> collection) - throws IgniteException { - assert ctx != null; - - try (PlatformMemory mem = ctx.memory().allocate()) { - PlatformOutputStream out = mem.output(); - - PortableRawWriterEx writer = ctx.writer(out); - - writer.writeObject(pred); - - writer.writeInt(collection.size()); - - for (Map.Entry<K, V> e : collection) { - writer.writeObject(e.getKey()); - writer.writeObject(e.getValue()); - } - - out.synchronize(); - - ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, - new PlatformCache(ctx, cache, keepPortable), mem.pointer(), keepPortable); - } - } - - /** - * @param ignite Ignite instance. - */ - @SuppressWarnings("UnusedDeclaration") - @IgniteInstanceResource - public void setIgniteInstance(Ignite ignite) { - ctx = PlatformUtils.platformContext(ignite); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeBoolean(keepPortable); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - keepPortable = in.readBoolean(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/66d46eca/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java new file mode 100644 index 0000000..92250c0 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.datastreamer; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.cache.PlatformCache; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.resources.IgniteInstanceResource; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collection; +import java.util.Map; + +/** + * Interop receiver. + */ +public class PlatformStreamReceiverImpl extends PlatformAbstractPredicate implements PlatformStreamReceiver { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private boolean keepPortable; + + /** + * Constructor. + */ + public PlatformStreamReceiverImpl() + { + super(); + } + + /** + * Constructor. + * + * @param pred .Net portable receiver. + * @param ptr Pointer to receiver in the native platform. + * @param ctx Kernal context. + */ + public PlatformStreamReceiverImpl(Object pred, long ptr, boolean keepPortable, PlatformContext ctx) { + super(pred, ptr, ctx); + + assert pred != null; + + this.keepPortable = keepPortable; + } + + /** {@inheritDoc} */ + @Override public void receive(IgniteCache<Object, Object> cache, Collection<Map.Entry<Object, Object>> collection) + throws IgniteException { + assert ctx != null; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(pred); + + writer.writeInt(collection.size()); + + for (Map.Entry<Object, Object> e : collection) { + writer.writeObject(e.getKey()); + writer.writeObject(e.getValue()); + } + + out.synchronize(); + + ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, new PlatformCache(ctx, cache, keepPortable), + mem.pointer(), keepPortable); + } + } + + /** + * @param ignite Ignite instance. + */ + @SuppressWarnings("UnusedDeclaration") + @IgniteInstanceResource + public void setIgniteInstance(Ignite ignite) { + ctx = PlatformUtils.platformContext(ignite); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeBoolean(keepPortable); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + keepPortable = in.readBoolean(); + } +} \ No newline at end of file
