http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java new file mode 100644 index 0000000..7e9587f --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.callback; + +/** + * Platform callback utility methods. Implemented in target platform. All methods in this class must be + * package-visible and invoked only through {@link PlatformCallbackGateway}. + */ +public class PlatformCallbackUtils { + /** + * Create cache store. + * + * @param envPtr Environment pointer. + * @param memPtr Memory pointer. + * @return Pointer. + */ + static native long cacheStoreCreate(long envPtr, long memPtr); + + /** + * @param envPtr Environment pointer. + * @param objPtr Object pointer. + * @param memPtr Memory pointer. + * @param cb Callback. + * @return Result. + */ + static native int cacheStoreInvoke(long envPtr, long objPtr, long memPtr, Object cb); + + /** + * @param envPtr Environment pointer. + * @param objPtr Object pointer. + */ + static native void cacheStoreDestroy(long envPtr, long objPtr); + + /** + * Creates cache store session. + * + * @param envPtr Environment pointer. + * @param storePtr Store instance pointer. + * @return Session instance pointer. + */ + static native long cacheStoreSessionCreate(long envPtr, long storePtr); + + /** + * Creates cache entry filter and returns a pointer. + * + * @param envPtr Environment pointer. + * @param memPtr Memory pointer. + * @return Pointer. + */ + static native long cacheEntryFilterCreate(long envPtr, long memPtr); + + /** + * @param envPtr Environment pointer. + * @param objPtr Pointer. + * @param memPtr Memory pointer. + * @return Result. + */ + static native int cacheEntryFilterApply(long envPtr, long objPtr, long memPtr); + + /** + * @param envPtr Environment pointer. + * @param objPtr Pointer. + */ + static native void cacheEntryFilterDestroy(long envPtr, long objPtr); + + /** + * Invoke cache entry processor. + * + * @param envPtr Environment pointer. + * @param outMemPtr Output memory pointer. + * @param inMemPtr Input memory pointer. + */ + static native void cacheInvoke(long envPtr, long outMemPtr, long inMemPtr); + + /** + * Perform native task map. Do not throw exceptions, serializing them to the output stream instead. + * + * @param envPtr Environment pointer. + * @param taskPtr Task pointer. + * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}). + * @param inMemPtr Input memory pointer. + */ + static native void computeTaskMap(long envPtr, long taskPtr, long outMemPtr, long inMemPtr); + + /** + * Perform native task job result notification. + * + * @param envPtr Environment pointer. + * @param taskPtr Task pointer. + * @param jobPtr Job pointer. + * @param memPtr Memory pointer (always zero for local job execution). + * @return Job result enum ordinal. + */ + static native int computeTaskJobResult(long envPtr, long taskPtr, long jobPtr, long memPtr); + + /** + * Perform native task reduce. + * + * @param envPtr Environment pointer. + * @param taskPtr Task pointer. + */ + static native void computeTaskReduce(long envPtr, long taskPtr); + + /** + * Complete task with native error. + * + * @param envPtr Environment pointer. + * @param taskPtr Task pointer. + * @param memPtr Memory pointer with exception data or {@code 0} in case of success. + */ + static native void computeTaskComplete(long envPtr, long taskPtr, long memPtr); + + /** + * Serialize native job. + * + * @param envPtr Environment pointer. + * @param jobPtr Job pointer. + * @param memPtr Memory pointer. + * @return {@code True} if serialization succeeded. + */ + static native int computeJobSerialize(long envPtr, long jobPtr, long memPtr); + + /** + * Create job in native platform. + * + * @param envPtr Environment pointer. + * @param memPtr Memory pointer. + * @return Pointer to job. + */ + static native long computeJobCreate(long envPtr, long memPtr); + + /** + * Execute native job on a node other than where it was created. + * + * @param envPtr Environment pointer. + * @param jobPtr Job pointer. + * @param cancel Cancel flag. + * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution. + */ + static native void computeJobExecute(long envPtr, long jobPtr, int cancel, long memPtr); + + /** + * Cancel the job. + * + * @param envPtr Environment pointer. + * @param jobPtr Job pointer. + */ + static native void computeJobCancel(long envPtr, long jobPtr); + + /** + * Destroy the job. + * + * @param envPtr Environment pointer. + * @param ptr Pointer. + */ + static native void computeJobDestroy(long envPtr, long ptr); + + /** + * Invoke local callback. + * + * @param envPtr Environment pointer. + * @param cbPtr Callback pointer. + * @param memPtr Memory pointer. + */ + static native void continuousQueryListenerApply(long envPtr, long cbPtr, long memPtr); + + /** + * Create filter in native platform. + * + * @param envPtr Environment pointer. + * @param memPtr Memory pointer. + * @return Pointer to created filter. + */ + static native long continuousQueryFilterCreate(long envPtr, long memPtr); + + /** + * Invoke remote filter. + * + * @param envPtr Environment pointer. + * @param filterPtr Filter pointer. + * @param memPtr Memory pointer. + * @return Result. + */ + static native int continuousQueryFilterApply(long envPtr, long filterPtr, long memPtr); + + /** + * Release remote filter. + * + * @param envPtr Environment pointer. + * @param filterPtr Filter pointer. + */ + static native void continuousQueryFilterRelease(long envPtr, long filterPtr); + + /** + * Notify native data streamer about topology update. + * + * @param envPtr Environment pointer. + * @param ptr Data streamer native pointer. + * @param topVer Topology version. + * @param topSize Topology size. + */ + static native void dataStreamerTopologyUpdate(long envPtr, long ptr, long topVer, int topSize); + + /** + * Invoke stream receiver. + * + * @param envPtr Environment pointer. + * @param ptr Receiver native pointer. + * @param cache Cache object. + * @param memPtr Stream pointer. + * @param keepPortable Portable flag. + */ + static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, Object cache, long memPtr, + boolean keepPortable); + + /** + * Notify future with byte result. + * + * @param envPtr Environment pointer. + * @param futPtr Future pointer. + * @param res Result. + */ + static native void futureByteResult(long envPtr, long futPtr, int res); + + /** + * Notify future with boolean result. + * + * @param envPtr Environment pointer. + * @param futPtr Future pointer. + * @param res Result. + */ + static native void futureBoolResult(long envPtr, long futPtr, int res); + + /** + * Notify future with short result. + * + * @param envPtr Environment pointer. + * @param futPtr Future pointer. + * @param res Result. + */ + static native void futureShortResult(long envPtr, long futPtr, int res); + + /** + * Notify future with byte result. + * + * @param envPtr Environment pointer. + * @param futPtr Future pointer. + * @param res Result. + */ + static native void futureCharResult(long envPtr, long futPtr, int res); + + /** + * Notify future with int result. + * + * @param envPtr Environment pointer. + * @param futPtr Future pointer. + * @param res Result. + */ + static native void futureIntResult(long envPtr, long futPtr, int res); + + /** + * Notify future with float result. + * + * @param envPtr Environment pointer. + * @param futPtr Future pointer. + * @param res Result. + */ + static native void futureFloatResult(long envPtr, long futPtr, float res); + + /** + * Notify future with long result. + * + * @param envPtr Environment pointer. + * @param futPtr Future pointer. + * @param res Result. + */ + static native void futureLongResult(long envPtr, long futPtr, long res); + + /** + * Notify future with double result. + * + * @param envPtr Environment pointer. + * @param futPtr Future pointer. + * @param res Result. + */ + static native void futureDoubleResult(long envPtr, long futPtr, double res); + + /** + * Notify future with object result. + * + * @param envPtr Environment pointer. + * @param futPtr Future pointer. + * @param memPtr Memory pointer. + */ + static native void futureObjectResult(long envPtr, long futPtr, long memPtr); + + /** + * Notify future with null result. + * + * @param envPtr Environment pointer. + * @param futPtr Future pointer. + */ + static native void futureNullResult(long envPtr, long futPtr); + + /** + * Notify future with error. + * + * @param envPtr Environment pointer. + * @param futPtr Future pointer. + * @param memPtr Pointer to memory with error information. + */ + static native void futureError(long envPtr, long futPtr, long memPtr); + + /** + * Creates message filter and returns a pointer. + * + * @param envPtr Environment pointer. + * @param memPtr Memory pointer. + * @return Pointer. + */ + static native long messagingFilterCreate(long envPtr, long memPtr); + + /** + * @param envPtr Environment pointer. + * @param objPtr Pointer. + * @param memPtr Memory pointer. + * @return Result. + */ + static native int messagingFilterApply(long envPtr, long objPtr, long memPtr); + + /** + * @param envPtr Environment pointer. + * @param objPtr Pointer. + */ + static native void messagingFilterDestroy(long envPtr, long objPtr); + + /** + * Creates event filter and returns a pointer. + * + * @param envPtr Environment pointer. + * @param memPtr Memory pointer. + * @return Pointer. + */ + static native long eventFilterCreate(long envPtr, long memPtr); + + /** + * @param envPtr Environment pointer. + * @param objPtr Pointer. + * @param memPtr Memory pointer. + * @return Result. + */ + static native int eventFilterApply(long envPtr, long objPtr, long memPtr); + + /** + * @param envPtr Environment pointer. + * @param objPtr Pointer. + */ + static native void eventFilterDestroy(long envPtr, long objPtr); + + /** + * Sends node info to native target. + * + * @param envPtr Environment pointer. + * @param memPtr Ptr to a stream with serialized node. + */ + static native void nodeInfo(long envPtr, long memPtr); + + /** + * Kernal start callback. + * + * @param envPtr Environment pointer. + * @param memPtr Memory pointer. + */ + static native void onStart(long envPtr, long memPtr); + + /* + * Kernal stop callback. + * + * @param envPtr Environment pointer. + */ + static native void onStop(long envPtr); + + /** + * Lifecycle event callback. + * + * @param envPtr Environment pointer. + * @param ptr Holder pointer. + * @param evt Event. + */ + static native void lifecycleEvent(long envPtr, long ptr, int evt); + + /** + * Re-allocate external memory chunk. + * + * @param envPtr Environment pointer. + * @param memPtr Cross-platform pointer. + * @param cap Capacity. + */ + static native void memoryReallocate(long envPtr, long memPtr, int cap); + + /** + * Initializes native service. + * + * @param envPtr Environment pointer. + * @param memPtr Stream pointer. + * @return Pointer to the native platform service. + */ + static native long serviceInit(long envPtr, long memPtr); + + /** + * Executes native service. + * + * @param envPtr Environment pointer. + * @param svcPtr Pointer to the service in the native platform. + * @param memPtr Stream pointer. + */ + static native void serviceExecute(long envPtr, long svcPtr, long memPtr); + + /** + * Cancels native service. + * + * @param envPtr Environment pointer. + * @param svcPtr Pointer to the service in the native platform. + * @param memPtr Stream pointer. + */ + static native void serviceCancel(long envPtr, long svcPtr, long memPtr); + + /** + /** + * Invokes service method. + * + * @param envPtr Environment pointer. + * @param svcPtr Pointer to the service in the native platform. + * @param outMemPtr Output memory pointer. + * @param inMemPtr Input memory pointer. + */ + static native void serviceInvokeMethod(long envPtr, long svcPtr, long outMemPtr, long inMemPtr); + + /** + * Invokes cluster node filter. + * + * @param envPtr Environment pointer. + * @param memPtr Stream pointer. + */ + static native int clusterNodeFilterApply(long envPtr, long memPtr); + + /** + * Private constructor. + */ + private PlatformCallbackUtils() { + // No-op. + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java new file mode 100644 index 0000000..fbbabb7 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +/** + * Interop memory chunk abstraction. + */ +public abstract class PlatformAbstractMemory implements PlatformMemory { + /** Stream factory. */ + private static final StreamFactory STREAM_FACTORY = PlatformMemoryUtils.LITTLE_ENDIAN ? + new LittleEndianStreamFactory() : new BigEndianStreamFactory(); + + /** Cross-platform memory pointer. */ + protected long memPtr; + + /** + * Constructor. + * + * @param memPtr Cross-platform memory pointer. + */ + protected PlatformAbstractMemory(long memPtr) { + this.memPtr = memPtr; + } + + /** {@inheritDoc} */ + @Override public PlatformInputStream input() { + return STREAM_FACTORY.createInput(this); + } + + /** {@inheritDoc} */ + @Override public PlatformOutputStream output() { + return STREAM_FACTORY.createOutput(this); + } + + /** {@inheritDoc} */ + @Override public long pointer() { + return memPtr; + } + + /** {@inheritDoc} */ + @Override public long data() { + return PlatformMemoryUtils.data(memPtr); + } + + /** {@inheritDoc} */ + @Override public int capacity() { + return PlatformMemoryUtils.capacity(memPtr); + } + + /** {@inheritDoc} */ + @Override public int length() { + return PlatformMemoryUtils.length(memPtr); + } + + /** + * Stream factory. + */ + private static interface StreamFactory { + /** + * Create input stream. + * + * @param mem Memory. + * @return Input stream. + */ + PlatformInputStreamImpl createInput(PlatformMemory mem); + + /** + * Create output stream. + * + * @param mem Memory. + * @return Output stream. + */ + PlatformOutputStreamImpl createOutput(PlatformMemory mem); + } + + /** + * Stream factory for LITTLE ENDIAN architecture. + */ + private static class LittleEndianStreamFactory implements StreamFactory { + /** {@inheritDoc} */ + @Override public PlatformInputStreamImpl createInput(PlatformMemory mem) { + return new PlatformInputStreamImpl(mem); + } + + /** {@inheritDoc} */ + @Override public PlatformOutputStreamImpl createOutput(PlatformMemory mem) { + return new PlatformOutputStreamImpl(mem); + } + } + + /** + * Stream factory for BIG ENDIAN architecture. + */ + private static class BigEndianStreamFactory implements StreamFactory { + /** {@inheritDoc} */ + @Override public PlatformInputStreamImpl createInput(PlatformMemory mem) { + return new PlatformBigEndianInputStreamImpl(mem); + } + + /** {@inheritDoc} */ + @Override public PlatformOutputStreamImpl createOutput(PlatformMemory mem) { + return new PlatformBigEndianOutputStreamImpl(mem); + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java new file mode 100644 index 0000000..b029ee0 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.platform.memory; + +/** + * Interop input stream implementation working with BIG ENDIAN architecture. + */ +public class PlatformBigEndianInputStreamImpl extends PlatformInputStreamImpl { + /** + * Constructor. + * + * @param mem Memory chunk. + */ + public PlatformBigEndianInputStreamImpl(PlatformMemory mem) { + super(mem); + } + + /** {@inheritDoc} */ + @Override public short readShort() { + return Short.reverseBytes(super.readShort()); + } + + /** {@inheritDoc} */ + @Override public short[] readShortArray(int cnt) { + short[] res = super.readShortArray(cnt); + + for (int i = 0; i < cnt; i++) + res[i] = Short.reverseBytes(res[i]); + + return res; + } + + /** {@inheritDoc} */ + @Override public char readChar() { + return Character.reverseBytes(super.readChar()); + } + + /** {@inheritDoc} */ + @Override public char[] readCharArray(int cnt) { + char[] res = super.readCharArray(cnt); + + for (int i = 0; i < cnt; i++) + res[i] = Character.reverseBytes(res[i]); + + return res; + } + + /** {@inheritDoc} */ + @Override public int readInt() { + return Integer.reverseBytes(super.readInt()); + } + + /** {@inheritDoc} */ + @Override public int readInt(int pos) { + return Integer.reverseBytes(super.readInt(pos)); + } + + /** {@inheritDoc} */ + @Override public int[] readIntArray(int cnt) { + int[] res = super.readIntArray(cnt); + + for (int i = 0; i < cnt; i++) + res[i] = Integer.reverseBytes(res[i]); + + return res; + } + + /** {@inheritDoc} */ + @Override public float readFloat() { + return Float.intBitsToFloat(Integer.reverseBytes(Float.floatToIntBits(super.readFloat()))); + } + + /** {@inheritDoc} */ + @Override public float[] readFloatArray(int cnt) { + float[] res = super.readFloatArray(cnt); + + for (int i = 0; i < cnt; i++) + res[i] = Float.intBitsToFloat(Integer.reverseBytes(Float.floatToIntBits(res[i]))); + + return res; + } + + /** {@inheritDoc} */ + @Override public long readLong() { + return Long.reverseBytes(super.readLong()); + } + + /** {@inheritDoc} */ + @Override public long[] readLongArray(int cnt) { + long[] res = super.readLongArray(cnt); + + for (int i = 0; i < cnt; i++) + res[i] = Long.reverseBytes(res[i]); + + return res; + } + + /** {@inheritDoc} */ + @Override public double readDouble() { + return Double.longBitsToDouble(Long.reverseBytes(Double.doubleToLongBits(super.readDouble()))); + } + + /** {@inheritDoc} */ + @Override public double[] readDoubleArray(int cnt) { + double[] res = super.readDoubleArray(cnt); + + for (int i = 0; i < cnt; i++) + res[i] = Double.longBitsToDouble(Long.reverseBytes(Double.doubleToLongBits(res[i]))); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java new file mode 100644 index 0000000..e1c1585 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*; + +/** + * Interop output stream implementation working with BIG ENDIAN architecture. + */ +public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl { + /** + * Constructor. + * + * @param mem Underlying memory chunk. + */ + public PlatformBigEndianOutputStreamImpl(PlatformMemory mem) { + super(mem); + } + + /** {@inheritDoc} */ + @Override public void writeShort(short val) { + super.writeShort(Short.reverseBytes(val)); + } + + /** {@inheritDoc} */ + @Override public void writeShortArray(short[] val) { + int cnt = val.length << 1; + + ensureCapacity(pos + cnt); + + long startPos = data + pos; + + for (short item : val) { + UNSAFE.putShort(startPos, Short.reverseBytes(item)); + + startPos += 2; + } + + shift(cnt); + } + + /** {@inheritDoc} */ + @Override public void writeChar(char val) { + super.writeChar(Character.reverseBytes(val)); + } + + /** {@inheritDoc} */ + @Override public void writeCharArray(char[] val) { + int cnt = val.length << 1; + + ensureCapacity(pos + cnt); + + long startPos = data + pos; + + for (char item : val) { + UNSAFE.putChar(startPos, Character.reverseBytes(item)); + + startPos += 2; + } + + shift(cnt); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int val) { + super.writeInt(Integer.reverseBytes(val)); + } + + /** {@inheritDoc} */ + @Override public void writeIntArray(int[] val) { + int cnt = val.length << 2; + + ensureCapacity(pos + cnt); + + long startPos = data + pos; + + for (int item : val) { + UNSAFE.putInt(startPos, Integer.reverseBytes(item)); + + startPos += 4; + } + + shift(cnt); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int pos, int val) { + super.writeInt(pos, Integer.reverseBytes(val)); + } + + /** {@inheritDoc} */ + @Override public void writeFloatArray(float[] val) { + int cnt = val.length << 2; + + ensureCapacity(pos + cnt); + + long startPos = data + pos; + + for (float item : val) { + UNSAFE.putInt(startPos, Integer.reverseBytes(Float.floatToIntBits(item))); + + startPos += 4; + } + + shift(cnt); + } + + /** {@inheritDoc} */ + @Override public void writeLong(long val) { + super.writeLong(Long.reverseBytes(val)); + } + + /** {@inheritDoc} */ + @Override public void writeLongArray(long[] val) { + int cnt = val.length << 3; + + ensureCapacity(pos + cnt); + + long startPos = data + pos; + + for (long item : val) { + UNSAFE.putLong(startPos, Long.reverseBytes(item)); + + startPos += 8; + } + + shift(cnt); + } + + /** {@inheritDoc} */ + @Override public void writeDoubleArray(double[] val) { + int cnt = val.length << 3; + + ensureCapacity(pos + cnt); + + long startPos = data + pos; + + for (double item : val) { + UNSAFE.putLong(startPos, Long.reverseBytes(Double.doubleToLongBits(item))); + + startPos += 8; + } + + shift(cnt); + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java new file mode 100644 index 0000000..0d47aff --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.platform.callback.*; +import org.jetbrains.annotations.*; + +/** + * Interop external memory chunk. + */ +public class PlatformExternalMemory extends PlatformAbstractMemory { + /** Native gateway. */ + private final PlatformCallbackGateway gate; + + /** + * Constructor. + * + * @param gate Native gateway. + * @param memPtr Memory pointer. + */ + public PlatformExternalMemory(@Nullable PlatformCallbackGateway gate, long memPtr) { + super(memPtr); + + this.gate = gate; + } + + /** {@inheritDoc} */ + @Override public void reallocate(int cap) { + if (gate == null) + throw new IgniteException("Failed to re-allocate external memory chunk because it is read-only."); + + gate.memoryReallocate(memPtr, cap); + } + + /** {@inheritDoc} */ + @Override public void close() { + // Do nothing, memory must be released by native platform. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java new file mode 100644 index 0000000..9273e29 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +import org.apache.ignite.internal.portable.streams.*; + +/** + * Interop output stream, + */ +public interface PlatformInputStream extends PortableInputStream { + /** + * Synchronize input. Must be called before start reading data from a memory changed by another platform. + */ + public void synchronize(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java new file mode 100644 index 0000000..68beaee --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +import org.apache.ignite.*; + +import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*; + +/** + * Interop input stream implementation. + */ +public class PlatformInputStreamImpl implements PlatformInputStream { + /** Underlying memory. */ + private final PlatformMemory mem; + + /** Real data pointer */ + private long data; + + /** Amount of available data. */ + private int len; + + /** Current position. */ + private int pos; + + /** Heap-copied data. */ + private byte[] dataCopy; + + /** + * Constructor. + * + * @param mem Underlying memory chunk. + */ + public PlatformInputStreamImpl(PlatformMemory mem) { + this.mem = mem; + + data = mem.data(); + len = mem.length(); + } + + /** {@inheritDoc} */ + @Override public byte readByte() { + ensureEnoughData(1); + + return UNSAFE.getByte(data + pos++); + } + + /** {@inheritDoc} */ + @Override public byte[] readByteArray(int cnt) { + byte[] res = new byte[cnt]; + + copyAndShift(res, BYTE_ARR_OFF, cnt); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() { + return readByte() == 1; + } + + /** {@inheritDoc} */ + @Override public boolean[] readBooleanArray(int cnt) { + boolean[] res = new boolean[cnt]; + + copyAndShift(res, BOOLEAN_ARR_OFF, cnt); + + return res; + } + + /** {@inheritDoc} */ + @Override public short readShort() { + ensureEnoughData(2); + + short res = UNSAFE.getShort(data + pos); + + shift(2); + + return res; + } + + /** {@inheritDoc} */ + @Override public short[] readShortArray(int cnt) { + int len = cnt << 1; + + short[] res = new short[cnt]; + + copyAndShift(res, SHORT_ARR_OFF, len); + + return res; + } + + /** {@inheritDoc} */ + @Override public char readChar() { + ensureEnoughData(2); + + char res = UNSAFE.getChar(data + pos); + + shift(2); + + return res; + } + + /** {@inheritDoc} */ + @Override public char[] readCharArray(int cnt) { + int len = cnt << 1; + + char[] res = new char[cnt]; + + copyAndShift(res, CHAR_ARR_OFF, len); + + return res; + } + + /** {@inheritDoc} */ + @Override public int readInt() { + ensureEnoughData(4); + + int res = UNSAFE.getInt(data + pos); + + shift(4); + + return res; + } + + /** {@inheritDoc} */ + @Override public int readInt(int pos) { + int delta = pos + 4 - this.pos; + + if (delta > 0) + ensureEnoughData(delta); + + return UNSAFE.getInt(data + pos); + } + + /** {@inheritDoc} */ + @Override public int[] readIntArray(int cnt) { + int len = cnt << 2; + + int[] res = new int[cnt]; + + copyAndShift(res, INT_ARR_OFF, len); + + return res; + } + + /** {@inheritDoc} */ + @Override public float readFloat() { + ensureEnoughData(4); + + float res = UNSAFE.getFloat(data + pos); + + shift(4); + + return res; + } + + /** {@inheritDoc} */ + @Override public float[] readFloatArray(int cnt) { + int len = cnt << 2; + + float[] res = new float[cnt]; + + copyAndShift(res, FLOAT_ARR_OFF, len); + + return res; + } + + /** {@inheritDoc} */ + @Override public long readLong() { + ensureEnoughData(8); + + long res = UNSAFE.getLong(data + pos); + + shift(8); + + return res; + } + + /** {@inheritDoc} */ + @Override public long[] readLongArray(int cnt) { + int len = cnt << 3; + + long[] res = new long[cnt]; + + copyAndShift(res, LONG_ARR_OFF, len); + + return res; + } + + /** {@inheritDoc} */ + @Override public double readDouble() { + ensureEnoughData(8); + + double res = UNSAFE.getDouble(data + pos); + + shift(8); + + return res; + } + + /** {@inheritDoc} */ + @Override public double[] readDoubleArray(int cnt) { + int len = cnt << 3; + + double[] res = new double[cnt]; + + copyAndShift(res, DOUBLE_ARR_OFF, len); + + return res; + } + + /** {@inheritDoc} */ + @Override public int read(byte[] arr, int off, int len) { + if (len > remaining()) + len = remaining(); + + copyAndShift(arr, BYTE_ARR_OFF + off, len); + + return len; + } + + /** {@inheritDoc} */ + @Override public int remaining() { + return len - pos; + } + + /** {@inheritDoc} */ + @Override public int position() { + return pos; + } + + /** {@inheritDoc} */ + @Override public void position(int pos) { + if (pos > len) + throw new IgniteException("Position is out of bounds: " + pos); + else + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return arrayCopy(); + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + if (dataCopy == null) { + dataCopy = new byte[len]; + + UNSAFE.copyMemory(null, data, dataCopy, BYTE_ARR_OFF, dataCopy.length); + } + + return dataCopy; + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return false; + } + + /** {@inheritDoc} */ + @Override public void synchronize() { + data = mem.data(); + len = mem.length(); + } + + /** + * Ensure there is enough data in the stream. + * + * @param cnt Amount of byte expected to be available. + */ + private void ensureEnoughData(int cnt) { + if (remaining() < cnt) + throw new IgniteException("Not enough data to read the value [position=" + pos + + ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']'); + } + + /** + * Copy required amount of data and shift position. + * + * @param target Target to copy data to. + * @param off Offset. + * @param cnt Count. + */ + private void copyAndShift(Object target, long off, int cnt) { + ensureEnoughData(cnt); + + UNSAFE.copyMemory(null, data + pos, target, off, cnt); + + shift(cnt); + } + + /** + * Shift position to the right. + * + * @param cnt Amount of bytes. + */ + private void shift(int cnt) { + pos += cnt; + + assert pos <= len; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java new file mode 100644 index 0000000..9d8f94e --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +/** + * Interop memory chunk. + */ +public interface PlatformMemory extends AutoCloseable { + /** + * Gets input stream. + * + * @return Input stream. + */ + public PlatformInputStream input(); + + /** + * Gets output stream. + * + * @return Output stream. + */ + public PlatformOutputStream output(); + + /** + * Gets pointer which can be passed between platforms. + * + * @return Pointer. + */ + public long pointer(); + + /** + * Gets data pointer. + * + * @return Data pointer. + */ + public long data(); + + /** + * Gets capacity. + * + * @return Capacity. + */ + public int capacity(); + + /** + * Gets length. + * + * @return Length. + */ + public int length(); + + /** + * Reallocate memory chunk. + * + * @param cap Minimum capacity. + */ + public void reallocate(int cap); + + /** + * Close memory releasing it. + */ + @Override void close(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java new file mode 100644 index 0000000..c2233a8 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +/** + * Interop memory manager interface. + */ +public interface PlatformMemoryManager { + /** + * Allocates memory. + * + * @return Memory. + */ + public PlatformMemory allocate(); + + /** + * Allocates memory having at least the given capacity. + * + * @param cap Minimum capacity. + * @return Memory. + */ + public PlatformMemory allocate(int cap); + + /** + * Gets memory from existing pointer. + * + * @param memPtr Cross-platform memory pointer. + * @return Memory. + */ + public PlatformMemory get(long memPtr); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java new file mode 100644 index 0000000..83388e0 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +import org.apache.ignite.internal.processors.platform.callback.*; +import org.jetbrains.annotations.*; + +import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*; + +/** + * Interop memory manager implementation. + */ +public class PlatformMemoryManagerImpl implements PlatformMemoryManager { + /** Native gateway. */ + private final PlatformCallbackGateway gate; + + /** Default allocation capacity. */ + private final int dfltCap; + + /** Thread-local pool. */ + private final ThreadLocal<PlatformMemoryPool> threadLocPool = new ThreadLocal<>(); + + /** + * Constructor. + * + * @param gate Native gateway. + * @param dfltCap Default memory chunk capacity. + */ + public PlatformMemoryManagerImpl(@Nullable PlatformCallbackGateway gate, int dfltCap) { + this.gate = gate; + this.dfltCap = dfltCap; + } + + /** {@inheritDoc} */ + @Override public PlatformMemory allocate() { + return allocate(dfltCap); + } + + /** {@inheritDoc} */ + @Override public PlatformMemory allocate(int cap) { + return pool().allocate(cap); + } + + /** {@inheritDoc} */ + @Override public PlatformMemory get(long memPtr) { + int flags = flags(memPtr); + + return isExternal(flags) ? new PlatformExternalMemory(gate, memPtr) : + isPooled(flags) ? pool().get(memPtr) : new PlatformUnpooledMemory(memPtr); + } + + /** + * Gets or creates thread-local memory pool. + * + * @return Memory pool. + */ + private PlatformMemoryPool pool() { + PlatformMemoryPool pool = threadLocPool.get(); + + if (pool == null) { + pool = new PlatformMemoryPool(); + + threadLocPool.set(pool); + } + + return pool; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java new file mode 100644 index 0000000..75db4b9 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*; + +/** + * Memory pool associated with a thread. + */ +public class PlatformMemoryPool { + /** base pointer. */ + private final long poolPtr; + + /** First pooled memory chunk. */ + private PlatformPooledMemory mem1; + + /** Second pooled memory chunk. */ + private PlatformPooledMemory mem2; + + /** Third pooled memory chunk. */ + private PlatformPooledMemory mem3; + + /** + * Constructor. + */ + public PlatformMemoryPool() { + poolPtr = allocatePool(); + + sun.misc.Cleaner.create(this, new CleanerRunnable(poolPtr)); + } + + /** + * Allocate memory chunk, optionally pooling it. + * + * @param cap Minimum capacity. + * @return Memory chunk. + */ + public PlatformMemory allocate(int cap) { + long memPtr = allocatePooled(poolPtr, cap); + + // memPtr == 0 means that we failed to acquire thread-local memory chunk, so fallback to unpooled memory. + return memPtr != 0 ? get(memPtr) : new PlatformUnpooledMemory(allocateUnpooled(cap)); + } + + /** + * Re-allocate existing pool memory chunk. + * + * @param memPtr Memory pointer. + * @param cap Minimum capacity. + */ + void reallocate(long memPtr, int cap) { + reallocatePooled(memPtr, cap); + } + + /** + * Release pooled memory chunk. + * + * @param memPtr Memory pointer. + */ + void release(long memPtr) { + releasePooled(memPtr); + } + + /** + * Get pooled memory chunk. + * + * @param memPtr Memory pointer. + * @return Memory chunk. + */ + public PlatformMemory get(long memPtr) { + long delta = memPtr - poolPtr; + + if (delta == POOL_HDR_OFF_MEM_1) { + if (mem1 == null) + mem1 = new PlatformPooledMemory(this, memPtr); + + return mem1; + } + else if (delta == POOL_HDR_OFF_MEM_2) { + if (mem2 == null) + mem2 = new PlatformPooledMemory(this, memPtr); + + return mem2; + } + else { + assert delta == POOL_HDR_OFF_MEM_3; + + if (mem3 == null) + mem3 = new PlatformPooledMemory(this, memPtr); + + return mem3; + } + } + + /** + * Cleaner runnable. + */ + private static class CleanerRunnable implements Runnable { + /** Pointer. */ + private final long poolPtr; + + /** + * Constructor. + * + * @param poolPtr Pointer. + */ + private CleanerRunnable(long poolPtr) { + assert poolPtr != 0; + + this.poolPtr = poolPtr; + } + + /** {@inheritDoc} */ + @Override public void run() { + PlatformMemoryUtils.releasePool(poolPtr); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java new file mode 100644 index 0000000..c5ca971 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +import org.apache.ignite.internal.util.*; +import sun.misc.*; + +import java.nio.*; + +/** + * Utility classes for memory management. + */ +public class PlatformMemoryUtils { + /** Unsafe instance. */ + public static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** Array offset: boolean. */ + public static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class); + + /** Array offset: byte. */ + public static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); + + /** Array offset: short. */ + public static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class); + + /** Array offset: char. */ + public static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class); + + /** Array offset: int. */ + public static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class); + + /** Array offset: float. */ + public static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class); + + /** Array offset: long. */ + public static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class); + + /** Array offset: double. */ + public static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class); + + /** Whether little endian is used on the platform. */ + public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + + /** Header length. */ + public static final int POOL_HDR_LEN = 64; + + /** Pool header offset: first memory chunk. */ + public static final int POOL_HDR_OFF_MEM_1 = 0; + + /** Pool header offset: second memory chunk. */ + public static final int POOL_HDR_OFF_MEM_2 = 20; + + /** Pool header offset: third memory chunk. */ + public static final int POOL_HDR_OFF_MEM_3 = 40; + + /** Memory chunk header length. */ + public static final int MEM_HDR_LEN = 20; + + /** Offset: capacity. */ + public static final int MEM_HDR_OFF_CAP = 8; + + /** Offset: length. */ + public static final int MEM_HDR_OFF_LEN = 12; + + /** Offset: flags. */ + public static final int MEM_HDR_OFF_FLAGS = 16; + + /** Flag: external. */ + public static final int FLAG_EXT = 0x1; + + /** Flag: pooled. */ + public static final int FLAG_POOLED = 0x2; + + /** Flag: whether this pooled memory chunk is acquired. */ + public static final int FLAG_ACQUIRED = 0x4; + + /** --- COMMON METHODS. --- */ + + /** + * Gets data pointer for the given memory chunk. + * + * @param memPtr Memory pointer. + * @return Data pointer. + */ + public static long data(long memPtr) { + return UNSAFE.getLong(memPtr); + } + + /** + * Gets capacity for the given memory chunk. + * + * @param memPtr Memory pointer. + * @return Capacity. + */ + public static int capacity(long memPtr) { + return UNSAFE.getInt(memPtr + MEM_HDR_OFF_CAP); + } + + /** + * Sets capacity for the given memory chunk. + * + * @param memPtr Memory pointer. + * @param cap Capacity. + */ + public static void capacity(long memPtr, int cap) { + assert !isExternal(memPtr) : "Attempt to update external memory chunk capacity: " + memPtr; + + UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); + } + + /** + * Gets length for the given memory chunk. + * + * @param memPtr Memory pointer. + * @return Length. + */ + public static int length(long memPtr) { + return UNSAFE.getInt(memPtr + MEM_HDR_OFF_LEN); + } + + /** + * Sets length for the given memory chunk. + * + * @param memPtr Memory pointer. + * @param len Length. + */ + public static void length(long memPtr, int len) { + UNSAFE.putInt(memPtr + MEM_HDR_OFF_LEN, len); + } + + /** + * Gets flags for the given memory chunk. + * + * @param memPtr Memory pointer. + * @return Flags. + */ + public static int flags(long memPtr) { + return UNSAFE.getInt(memPtr + MEM_HDR_OFF_FLAGS); + } + + /** + * Sets flags for the given memory chunk. + * + * @param memPtr Memory pointer. + * @param flags Flags. + */ + public static void flags(long memPtr, int flags) { + assert !isExternal(memPtr) : "Attempt to update external memory chunk flags: " + memPtr; + + UNSAFE.putInt(memPtr + MEM_HDR_OFF_FLAGS, flags); + } + + /** + * Check whether this memory chunk is external. + * + * @param memPtr Memory pointer. + * @return {@code True} if owned by native platform. + */ + public static boolean isExternal(long memPtr) { + return isExternal(flags(memPtr)); + } + + /** + * Check whether flags denote that this memory chunk is external. + * + * @param flags Flags. + * @return {@code True} if owned by native platform. + */ + public static boolean isExternal(int flags) { + return (flags & FLAG_EXT) == FLAG_EXT; + } + + /** + * Check whether this memory chunk is pooled. + * + * @param memPtr Memory pointer. + * @return {@code True} if pooled. + */ + public static boolean isPooled(long memPtr) { + return isPooled(flags(memPtr)); + } + + /** + * Check whether flags denote pooled memory chunk. + * + * @param flags Flags. + * @return {@code True} if pooled. + */ + public static boolean isPooled(int flags) { + return (flags & FLAG_POOLED) != 0; + } + + /** + * Check whether this memory chunk is pooled and acquired. + * + * @param memPtr Memory pointer. + * @return {@code True} if pooled and acquired. + */ + public static boolean isAcquired(long memPtr) { + return isAcquired(flags(memPtr)); + } + + /** + * Check whether flags denote pooled and acquired memory chunk. + * + * @param flags Flags. + * @return {@code True} if acquired. + */ + public static boolean isAcquired(int flags) { + assert isPooled(flags); + + return (flags & FLAG_ACQUIRED) != 0; + } + + /** --- UNPOOLED MEMORY MANAGEMENT. --- */ + + /** + * Allocate unpooled memory chunk. + * + * @param cap Minimum capacity. + * @return New memory pointer. + */ + public static long allocateUnpooled(int cap) { + assert cap > 0; + + long memPtr = UNSAFE.allocateMemory(MEM_HDR_LEN); + long dataPtr = UNSAFE.allocateMemory(cap); + + UNSAFE.putLong(memPtr, dataPtr); // Write address. + UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); // Write capacity. + UNSAFE.putInt(memPtr + MEM_HDR_OFF_LEN, 0); // Write length. + UNSAFE.putInt(memPtr + MEM_HDR_OFF_FLAGS, 0); // Write flags. + + return memPtr; + } + + /** + * Reallocate unpooled memory chunk. + * + * @param memPtr Memory pointer. + * @param cap Minimum capacity. + */ + public static void reallocateUnpooled(long memPtr, int cap) { + assert cap > 0; + + assert !isExternal(memPtr) : "Attempt to reallocate external memory chunk directly: " + memPtr; + assert !isPooled(memPtr) : "Attempt to reallocate pooled memory chunk directly: " + memPtr; + + long dataPtr = data(memPtr); + + long newDataPtr = UNSAFE.reallocateMemory(dataPtr, cap); + + if (dataPtr != newDataPtr) + UNSAFE.putLong(memPtr, newDataPtr); // Write new data address if needed. + + UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); // Write new capacity. + } + + /** + * Release unpooled memory chunk. + * + * @param memPtr Memory pointer. + */ + public static void releaseUnpooled(long memPtr) { + assert !isExternal(memPtr) : "Attempt to release external memory chunk directly: " + memPtr; + assert !isPooled(memPtr) : "Attempt to release pooled memory chunk directly: " + memPtr; + + UNSAFE.freeMemory(data(memPtr)); + UNSAFE.freeMemory(memPtr); + } + + /** --- POOLED MEMORY MANAGEMENT. --- */ + + /** + * Allocate pool memory. + * + * @return Pool pointer. + */ + public static long allocatePool() { + long poolPtr = UNSAFE.allocateMemory(POOL_HDR_LEN); + + UNSAFE.setMemory(poolPtr, POOL_HDR_LEN, (byte)0); + + flags(poolPtr + POOL_HDR_OFF_MEM_1, FLAG_POOLED); + flags(poolPtr + POOL_HDR_OFF_MEM_2, FLAG_POOLED); + flags(poolPtr + POOL_HDR_OFF_MEM_3, FLAG_POOLED); + + return poolPtr; + } + + /** + * Release pool memory. + * + * @param poolPtr Pool pointer. + */ + public static void releasePool(long poolPtr) { + // Clean predefined memory chunks. + long mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_1); + + if (mem != 0) + UNSAFE.freeMemory(mem); + + mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_2); + + if (mem != 0) + UNSAFE.freeMemory(mem); + + mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_3); + + if (mem != 0) + UNSAFE.freeMemory(mem); + + // Clean pool chunk. + UNSAFE.freeMemory(poolPtr); + } + + /** + * Allocate pooled memory chunk. + * + * @param poolPtr Pool pointer. + * @param cap Capacity. + * @return Cross-platform memory pointer or {@code 0} in case there are no free memory chunks in the pool. + */ + public static long allocatePooled(long poolPtr, int cap) { + long memPtr1 = poolPtr + POOL_HDR_OFF_MEM_1; + + if (isAcquired(memPtr1)) { + long memPtr2 = poolPtr + POOL_HDR_OFF_MEM_2; + + if (isAcquired(memPtr2)) { + long memPtr3 = poolPtr + POOL_HDR_OFF_MEM_3; + + if (isAcquired(memPtr3)) + return 0L; + else { + allocatePooled0(memPtr3, cap); + + return memPtr3; + } + } + else { + allocatePooled0(memPtr2, cap); + + return memPtr2; + } + } + else { + allocatePooled0(memPtr1, cap); + + return memPtr1; + } + } + + /** + * Internal pooled memory chunk allocation routine. + * + * @param memPtr Memory pointer. + * @param cap Capacity. + */ + private static void allocatePooled0(long memPtr, int cap) { + assert !isExternal(memPtr); + assert isPooled(memPtr); + assert !isAcquired(memPtr); + + long data = UNSAFE.getLong(memPtr); + + if (data == 0) { + // First allocation of the chunk. + data = UNSAFE.allocateMemory(cap); + + UNSAFE.putLong(memPtr, data); + UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); + } + else { + // Ensure that we have enough capacity. + int curCap = capacity(memPtr); + + if (cap > curCap) { + data = UNSAFE.reallocateMemory(data, cap); + + UNSAFE.putLong(memPtr, data); + UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); + } + } + + flags(memPtr, FLAG_POOLED | FLAG_ACQUIRED); + } + + /** + * Reallocate pooled memory chunk. + * + * @param memPtr Memory pointer. + * @param cap Minimum capacity. + */ + public static void reallocatePooled(long memPtr, int cap) { + assert !isExternal(memPtr); + assert isPooled(memPtr); + assert isAcquired(memPtr); + + long data = UNSAFE.getLong(memPtr); + + assert data != 0; + + int curCap = capacity(memPtr); + + if (cap > curCap) { + data = UNSAFE.reallocateMemory(data, cap); + + UNSAFE.putLong(memPtr, data); + UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); + } + } + + /** + * Release pooled memory chunk. + * + * @param memPtr Memory pointer. + */ + public static void releasePooled(long memPtr) { + assert !isExternal(memPtr); + assert isPooled(memPtr); + assert isAcquired(memPtr); + + flags(memPtr, flags(memPtr) ^ FLAG_ACQUIRED); + } + + /** --- UTILITY STUFF. --- */ + + /** + * Reallocate arbitrary memory chunk. + * + * @param memPtr Memory pointer. + * @param cap Capacity. + */ + public static void reallocate(long memPtr, int cap) { + int flags = flags(memPtr); + + if (isPooled(flags)) + reallocatePooled(memPtr, cap); + else { + assert !isExternal(flags); + + reallocateUnpooled(memPtr, cap); + } + } + + /** + * Constructor. + */ + private PlatformMemoryUtils() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java new file mode 100644 index 0000000..eb2490a --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +import org.apache.ignite.internal.portable.streams.*; + +/** + * Interop output stream. + */ +public interface PlatformOutputStream extends PortableOutputStream { + /** + * Synchronize output stream with underlying memory + */ + public void synchronize(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java new file mode 100644 index 0000000..6c7c865 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*; + +/** + * Interop output stream implementation. + */ +public class PlatformOutputStreamImpl implements PlatformOutputStream { + /** Underlying memory chunk. */ + protected final PlatformMemory mem; + + /** Pointer. */ + protected long data; + + /** Maximum capacity. */ + protected int cap; + + /** Current position. */ + protected int pos; + + /** + * Constructor. + * + * @param mem Underlying memory chunk. + */ + public PlatformOutputStreamImpl(PlatformMemory mem) { + this.mem = mem; + + data = mem.data(); + cap = mem.capacity(); + } + + /** {@inheritDoc} */ + @Override public void writeByte(byte val) { + ensureCapacity(pos + 1); + + UNSAFE.putByte(data + pos++, val); + } + + /** {@inheritDoc} */ + @Override public void writeByteArray(byte[] val) { + copyAndShift(val, BYTE_ARR_OFF, val.length); + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean val) { + writeByte(val ? (byte) 1 : (byte) 0); + } + + /** {@inheritDoc} */ + @Override public void writeBooleanArray(boolean[] val) { + copyAndShift(val, BOOLEAN_ARR_OFF, val.length); + } + + /** {@inheritDoc} */ + @Override public void writeShort(short val) { + ensureCapacity(pos + 2); + + UNSAFE.putShort(data + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void writeShortArray(short[] val) { + copyAndShift(val, SHORT_ARR_OFF, val.length << 1); + } + + /** {@inheritDoc} */ + @Override public void writeChar(char val) { + ensureCapacity(pos + 2); + + UNSAFE.putChar(data + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void writeCharArray(char[] val) { + copyAndShift(val, CHAR_ARR_OFF, val.length << 1); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int val) { + ensureCapacity(pos + 4); + + UNSAFE.putInt(data + pos, val); + + shift(4); + } + + /** {@inheritDoc} */ + @Override public void writeIntArray(int[] val) { + copyAndShift(val, INT_ARR_OFF, val.length << 2); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int pos, int val) { + ensureCapacity(pos + 4); + + UNSAFE.putInt(data + pos, val); + } + + /** {@inheritDoc} */ + @Override public void writeFloat(float val) { + writeInt(Float.floatToIntBits(val)); + } + + /** {@inheritDoc} */ + @Override public void writeFloatArray(float[] val) { + copyAndShift(val, FLOAT_ARR_OFF, val.length << 2); + } + + /** {@inheritDoc} */ + @Override public void writeLong(long val) { + ensureCapacity(pos + 8); + + UNSAFE.putLong(data + pos, val); + + shift(8); + } + + /** {@inheritDoc} */ + @Override public void writeLongArray(long[] val) { + copyAndShift(val, LONG_ARR_OFF, val.length << 3); + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double val) { + writeLong(Double.doubleToLongBits(val)); + } + + /** {@inheritDoc} */ + @Override public void writeDoubleArray(double[] val) { + copyAndShift(val, DOUBLE_ARR_OFF, val.length << 3); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] arr, int off, int len) { + copyAndShift(arr, BYTE_ARR_OFF + off, len); + } + + /** {@inheritDoc} */ + @Override public void write(long addr, int cnt) { + copyAndShift(null, addr, cnt); + } + + /** {@inheritDoc} */ + @Override public int position() { + return pos; + } + + /** {@inheritDoc} */ + @Override public void position(int pos) { + ensureCapacity(pos); + + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public void close() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + assert false; + + throw new UnsupportedOperationException("Should not be called."); + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + assert false; + + throw new UnsupportedOperationException("Should not be called."); + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + assert false; + + throw new UnsupportedOperationException("Should not be called."); + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + assert false; + + throw new UnsupportedOperationException("Should not be called."); + } + + /** {@inheritDoc} */ + @Override public void synchronize() { + PlatformMemoryUtils.length(mem.pointer(), pos); + } + + /** + * Ensure capacity. + * + * @param reqCap Required byte count. + */ + protected void ensureCapacity(int reqCap) { + if (reqCap > cap) { + int newCap = cap << 1; + + if (newCap < reqCap) + newCap = reqCap; + + mem.reallocate(newCap); + + assert mem.capacity() >= newCap; + + data = mem.data(); + cap = newCap; + } + } + + /** + * Shift position. + * + * @param cnt Byte count. + */ + protected void shift(int cnt) { + pos += cnt; + } + + /** + * Copy source object to the stream shifting position afterwards. + * + * @param src Source. + * @param off Offset. + * @param len Length. + */ + private void copyAndShift(Object src, long off, int len) { + ensureCapacity(pos + len); + + UNSAFE.copyMemory(src, off, null, data + pos, len); + + shift(len); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java new file mode 100644 index 0000000..98a9a58 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*; + +/** + * Interop pooled memory chunk. + */ +public class PlatformPooledMemory extends PlatformAbstractMemory { + /** Owning memory pool. */ + private final PlatformMemoryPool pool; + + /** + * Constructor. + * + * @param pool Owning memory pool. + * @param memPtr Cross-platform memory pointer. + */ + public PlatformPooledMemory(PlatformMemoryPool pool, long memPtr) { + super(memPtr); + + assert isPooled(memPtr); + assert isAcquired(memPtr); + + this.pool = pool; + } + + /** {@inheritDoc} */ + @Override public void reallocate(int cap) { + assert isAcquired(memPtr); + + // Try doubling capacity to avoid excessive allocations. + int doubledCap = PlatformMemoryUtils.capacity(memPtr) << 1; + + if (doubledCap > cap) + cap = doubledCap; + + pool.reallocate(memPtr, cap); + } + + /** {@inheritDoc} */ + @Override public void close() { + assert isAcquired(memPtr); + + pool.release(memPtr); // Return to the pool. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java new file mode 100644 index 0000000..a236dab --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.memory; + +import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*; + +/** + * Interop un-pooled memory chunk. + */ +public class PlatformUnpooledMemory extends PlatformAbstractMemory { + /** + * Constructor. + * + * @param memPtr Cross-platform memory pointer. + */ + public PlatformUnpooledMemory(long memPtr) { + super(memPtr); + } + + /** {@inheritDoc} */ + @Override public void reallocate(int cap) { + // Try doubling capacity to avoid excessive allocations. + int doubledCap = PlatformMemoryUtils.capacity(memPtr) << 1; + + if (doubledCap > cap) + cap = doubledCap; + + reallocateUnpooled(memPtr, cap); + } + + /** {@inheritDoc} */ + @Override public void close() { + releaseUnpooled(memPtr); + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderBiClosure.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderBiClosure.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderBiClosure.java new file mode 100644 index 0000000..ea808d7 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderBiClosure.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.utils; + +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.lang.*; + +/** + * Reader bi-closure. + */ +public interface PlatformReaderBiClosure<T1, T2> { + /** + * Read object from reader. + * + * @param reader Reader. + * @return Object. + */ + IgniteBiTuple<T1, T2> read(PortableRawReaderEx reader); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderClosure.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderClosure.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderClosure.java new file mode 100644 index 0000000..cd0523c --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderClosure.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.utils; + +import org.apache.ignite.internal.portable.*; + +/** + * Reader closure. + */ +public interface PlatformReaderClosure<T> { + + /** + * Read object from reader. + * + * @param reader Reader. + * @return Object. + */ + T read(PortableRawReaderEx reader); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformWriterBiClosure.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformWriterBiClosure.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformWriterBiClosure.java new file mode 100644 index 0000000..89a73b0 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformWriterBiClosure.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.utils; + +import org.apache.ignite.internal.portable.*; + +/** + * Interop writer bi-closure. + */ +public interface PlatformWriterBiClosure<T1, T2> { + /** + * Write values. + * + * @param writer Writer. + * @param val1 Value 1. + * @param val2 Value 2. + */ + public void write(PortableRawWriterEx writer, T1 val1, T2 val2); +}