Merge branch ignite-1282 ignite-950-new
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/76c73f3d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/76c73f3d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/76c73f3d Branch: refs/heads/ignite-1753-1282 Commit: 76c73f3da41f8daa1efc8c96f17691b98ce5b8f3 Parents: ed9d922 e4b128e Author: Alexey Goncharuk <[email protected]> Authored: Wed Nov 4 13:27:18 2015 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Nov 4 13:27:18 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/binary/BinaryField.java | 7 + .../internal/portable/BinaryFieldImpl.java | 28 +- .../internal/portable/BinaryObjectImpl.java | 100 ++- .../portable/BinaryObjectOffheapImpl.java | 95 ++- .../internal/portable/BinaryReaderExImpl.java | 4 +- .../internal/portable/BinaryWriterExImpl.java | 4 +- .../internal/portable/PortablePrimitives.java | 44 ++ .../internal/portable/PortableSchema.java | 9 +- .../ignite/internal/portable/PortableUtils.java | 3 +- .../builder/BinaryObjectBuilderImpl.java | 37 +- .../CacheObjectPortableProcessorImpl.java | 4 + .../platform/utils/PlatformUtils.java | 2 - ...PlatformDotNetPortableTypeConfiguration.java | 25 - .../portable/BinaryFieldsAbstractSelfTest.java | 729 +++++++++++++++++++ .../portable/BinaryFieldsHeapSelfTest.java | 32 + .../portable/BinaryFieldsOffheapSelfTest.java | 61 ++ .../GridPortableMarshallerSelfTest.java | 49 +- .../PortableCompactOffsetsAbstractSelfTest.java | 201 +++++ .../PortableCompactOffsetsHeapSelfTest.java | 32 + .../PortableCompactOffsetsOffheapSelfTest.java | 61 ++ .../IgnitePortableObjectsTestSuite.java | 12 +- .../Interop/PlatformBenchmarkBase.cs | 3 +- .../Portable/PortableReadBenchmark.cs | 4 +- .../Portable/PortableWriteBenchmark.cs | 4 +- .../Portable/PortableApiSelfTest.cs | 4 +- .../Apache.Ignite.Core.csproj | 1 + .../Apache.Ignite.Core/Impl/Common/Fnv1Hash.cs | 21 +- .../Impl/Portable/IPortableTypeDescriptor.cs | 12 +- .../Impl/Portable/PortableBuilderImpl.cs | 135 ++-- .../Impl/Portable/PortableFullTypeDescriptor.cs | 38 +- .../Impl/Portable/PortableMarshaller.cs | 16 +- .../Impl/Portable/PortableObjectHeader.cs | 28 +- .../Impl/Portable/PortableObjectSchemaHolder.cs | 108 +++ .../Portable/PortableSurrogateTypeDescriptor.cs | 10 +- .../Impl/Portable/PortableUserObject.cs | 4 +- .../Impl/Portable/PortableUtils.cs | 24 - .../Impl/Portable/PortableWriterImpl.cs | 59 +- .../Structure/PortableStructureTracker.cs | 2 +- .../Portable/PortableConfiguration.cs | 11 - .../Portable/PortableTypeConfiguration.cs | 9 +- 40 files changed, 1736 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/76c73f3d/modules/core/src/main/java/org/apache/ignite/binary/BinaryField.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/binary/BinaryField.java index c4be5bf,0000000..35aa191 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/binary/BinaryField.java +++ b/modules/core/src/main/java/org/apache/ignite/binary/BinaryField.java @@@ -1,39 -1,0 +1,46 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.binary; + +/** + * Binary object field. Can be used to speed object field lookup. + */ +public interface BinaryField { + /** ++ * Get field's name. ++ * ++ * @return Name. ++ */ ++ public String name(); ++ ++ /** + * Check whether field exists in the object. + * + * @param obj Object. + * @return {@code True} if exists. + */ + public boolean exists(BinaryObject obj); + + /** + * Get field's value from the given object. + * + * @param obj Object. + * @return Value. + */ + public <T> T value(BinaryObject obj); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/76c73f3d/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryFieldImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryFieldImpl.java index ac0b353,0000000..b8a25c1 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryFieldImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryFieldImpl.java @@@ -1,82 -1,0 +1,104 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable; + ++import org.apache.ignite.internal.util.tostring.GridToStringExclude; ++import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryField; + +/** + * Implementation of portable field descriptor. + */ +public class BinaryFieldImpl implements BinaryField { + /** Well-known object schemas. */ ++ @GridToStringExclude + private final PortableSchemaRegistry schemas; + ++ /** Field name. */ ++ private final String fieldName; ++ + /** Pre-calculated field ID. */ + private final int fieldId; + + /** + * Constructor. + * + * @param schemas Schemas. ++ * @param fieldName Field name. + * @param fieldId Field ID. + */ - public BinaryFieldImpl(PortableSchemaRegistry schemas, int fieldId) { ++ public BinaryFieldImpl(PortableSchemaRegistry schemas, String fieldName, int fieldId) { ++ assert schemas != null; ++ assert fieldName != null; ++ assert fieldId != 0; ++ + this.schemas = schemas; ++ this.fieldName = fieldName; + this.fieldId = fieldId; + } + + /** {@inheritDoc} */ ++ @Override public String name() { ++ return fieldName; ++ } ++ ++ /** {@inheritDoc} */ + @Override public boolean exists(BinaryObject obj) { + BinaryObjectEx obj0 = (BinaryObjectEx)obj; + - return fieldOrder(obj0) != 0; ++ return fieldOrder(obj0) != PortableSchema.ORDER_NOT_FOUND; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T value(BinaryObject obj) { + BinaryObjectEx obj0 = (BinaryObjectEx)obj; + + int order = fieldOrder(obj0); + - return order != 0 ? (T)obj0.fieldByOrder(order) : null; ++ return order != PortableSchema.ORDER_NOT_FOUND ? (T)obj0.fieldByOrder(order) : null; + } + + /** + * Get relative field offset. + * + * @param obj Object. + * @return Field offset. + */ + private int fieldOrder(BinaryObjectEx obj) { + int schemaId = obj.schemaId(); + + PortableSchema schema = schemas.schema(schemaId); + + if (schema == null) { + schema = obj.createSchema(); + + schemas.addSchema(schemaId, schema); + } + + assert schema != null; + + return schema.order(fieldId); + } ++ ++ /** {@inheritDoc} */ ++ @Override public String toString() { ++ return S.toString(BinaryFieldImpl.class, this); ++ } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/76c73f3d/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryObjectImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryObjectImpl.java index 992f92c,0000000..9b76604 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryObjectImpl.java @@@ -1,526 -1,0 +1,606 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.portable.streams.PortableHeapInputStream; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.portable.CacheObjectPortableProcessorImpl; ++import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryField; +import org.jetbrains.annotations.Nullable; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; ++import java.math.BigDecimal; ++import java.math.BigInteger; +import java.nio.ByteBuffer; ++import java.sql.Timestamp; ++import java.util.Date; ++import java.util.UUID; + ++import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.BOOLEAN; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.BYTE; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.CHAR; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.DATE; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.DECIMAL; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.DOUBLE; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLOAT; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.INT; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.LONG; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.NULL; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.SHORT; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.TIMESTAMP; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.UUID; + +/** + * Portable object implementation. + */ +@IgniteCodeGeneratingFail // Fields arr and start should not be generated by MessageCodeGenerator. +public final class BinaryObjectImpl extends BinaryObjectEx implements Externalizable, + Message, CacheObject, KeyCacheObject { + /** */ + public static final byte TYPE_PORTABLE = 100; + + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridDirectTransient + private PortableContext ctx; + + /** */ + private byte[] arr; + + /** */ + private int start; + + /** */ + @GridDirectTransient + private Object obj; + + /** */ + @GridDirectTransient + private boolean detachAllowed; + + /** + * For {@link Externalizable}. + */ + public BinaryObjectImpl() { + // No-op. + } + + /** + * @param ctx Context. + * @param arr Array. + * @param start Start. + */ + public BinaryObjectImpl(PortableContext ctx, byte[] arr, int start) { + assert ctx != null; + assert arr != null; + + this.ctx = ctx; + this.arr = arr; + this.start = start; + } + + /** {@inheritDoc} */ + @Override public byte type() { + return TYPE_PORTABLE; + } + + /** {@inheritDoc} */ + @Override public boolean isPlatformType() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean internal() { + return false; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { + Object obj0 = obj; + + if (obj0 == null || cpy) + obj0 = deserializeValue(); + + return (T)obj0; + } + + /** {@inheritDoc} */ + @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { + if (detached()) + return array(); + + int len = length(); + + byte[] arr0 = new byte[len]; + + U.arrayCopy(arr, start, arr0, 0, len); + + return arr0; + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(CacheObjectContext ctx) { + if (detached()) + return this; + + return (BinaryObjectImpl)detach(); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException { + this.ctx = ((CacheObjectPortableProcessorImpl)ctx.processor()).portableContext(); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int length() { + return PortablePrimitives.readInt(arr, start + GridPortableMarshaller.TOTAL_LEN_POS); + } + + /** + * @return Detached portable object. + */ + public BinaryObject detach() { + if (!detachAllowed || detached()) + return this; + + int len = length(); + + byte[] arr0 = new byte[len]; + + U.arrayCopy(arr, start, arr0, 0, len); + + return new BinaryObjectImpl(ctx, arr0, 0); + } + + /** + * @return Detached or not. + */ + public boolean detached() { + return start == 0 && length() == arr.length; + } + + /** - * @return {@code True} if detach is allowed. - */ - public boolean detachAllowed() { - return true; - } - - /** + * @param detachAllowed Detach allowed flag. + */ + public void detachAllowed(boolean detachAllowed) { + this.detachAllowed = detachAllowed; + } + + /** + * @return Context. + */ + public PortableContext context() { + return ctx; + } + + /** + * @param ctx Context. + */ + public void context(PortableContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return arr; + } + + /** {@inheritDoc} */ + @Override public int start() { + return start; + } + + /** {@inheritDoc} */ + @Override public long offheapAddress() { + return 0; + } + + /** {@inheritDoc} */ + @Override protected boolean hasArray() { + return true; + } + + /** {@inheritDoc} */ + @Override public int typeId() { + return PortablePrimitives.readInt(arr, start + GridPortableMarshaller.TYPE_ID_POS); + } + + /** {@inheritDoc} */ + @Nullable @Override public BinaryType metaData() throws BinaryObjectException { + if (ctx == null) + throw new BinaryObjectException("PortableContext is not set for the object."); + + return ctx.metaData(typeId()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <F> F field(String fieldName) throws BinaryObjectException { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, arr, start, null); + + return (F)reader.unmarshalField(fieldName); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <F> F field(int fieldId) throws BinaryObjectException { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, arr, start, null); + + return (F)reader.unmarshalField(fieldId); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override protected <F> F fieldByOrder(int order) { + Object val; + + // Calculate field position. + int schemaOffset = PortablePrimitives.readInt(arr, start + GridPortableMarshaller.SCHEMA_OR_RAW_OFF_POS); + + short flags = PortablePrimitives.readShort(arr, start + GridPortableMarshaller.FLAGS_POS); + int fieldOffsetSize = PortableUtils.fieldOffsetSize(flags); + + int fieldOffsetPos = start + schemaOffset + order * (4 + fieldOffsetSize) + 4; + + int fieldPos; + + if (fieldOffsetSize == PortableUtils.OFFSET_1) - fieldPos = start + (int)PortablePrimitives.readByte(arr, fieldOffsetPos) & 0xFF; ++ fieldPos = start + ((int)PortablePrimitives.readByte(arr, fieldOffsetPos) & 0xFF); + else if (fieldOffsetSize == PortableUtils.OFFSET_2) - fieldPos = start + (int)PortablePrimitives.readShort(arr, fieldOffsetPos) & 0xFFFF; ++ fieldPos = start + ((int)PortablePrimitives.readShort(arr, fieldOffsetPos) & 0xFFFF); + else + fieldPos = start + PortablePrimitives.readInt(arr, fieldOffsetPos); + + // Read header and try performing fast lookup for well-known types (the most common types go first). + byte hdr = PortablePrimitives.readByte(arr, fieldPos); + + switch (hdr) { + case INT: + val = PortablePrimitives.readInt(arr, fieldPos + 1); + + break; + + case LONG: + val = PortablePrimitives.readLong(arr, fieldPos + 1); + + break; + + case BOOLEAN: + val = PortablePrimitives.readBoolean(arr, fieldPos + 1); + + break; + + case SHORT: + val = PortablePrimitives.readShort(arr, fieldPos + 1); + + break; + + case BYTE: + val = PortablePrimitives.readByte(arr, fieldPos + 1); + + break; + + case CHAR: + val = PortablePrimitives.readChar(arr, fieldPos + 1); + + break; + + case FLOAT: + val = PortablePrimitives.readFloat(arr, fieldPos + 1); + + break; + + case DOUBLE: + val = PortablePrimitives.readDouble(arr, fieldPos + 1); + + break; + ++ case STRING: { ++ boolean utf = PortablePrimitives.readBoolean(arr, fieldPos + 1); ++ ++ if (utf) { ++ int dataLen = PortablePrimitives.readInt(arr, fieldPos + 2); ++ ++ val = new String(arr, fieldPos + 6, dataLen, UTF_8); ++ } ++ else { ++ int dataLen = PortablePrimitives.readInt(arr, fieldPos + 2); ++ char[] data = PortablePrimitives.readCharArray(arr, fieldPos + 6, dataLen); ++ ++ val = String.valueOf(data); ++ } ++ ++ break; ++ } ++ ++ case DATE: { ++ long time = PortablePrimitives.readLong(arr, fieldPos + 1); ++ ++ val = new Date(time); ++ ++ break; ++ } ++ ++ case TIMESTAMP: { ++ long time = PortablePrimitives.readLong(arr, fieldPos + 1); ++ int nanos = PortablePrimitives.readInt(arr, fieldPos + 1 + 8); ++ ++ Timestamp ts = new Timestamp(time); ++ ++ ts.setNanos(ts.getNanos() + nanos); ++ ++ val = ts; ++ ++ break; ++ } ++ ++ case UUID: { ++ long most = PortablePrimitives.readLong(arr, fieldPos + 1); ++ long least = PortablePrimitives.readLong(arr, fieldPos + 1 + 8); ++ ++ val = new UUID(most, least); ++ ++ break; ++ } ++ ++ case DECIMAL: { ++ int scale = PortablePrimitives.readInt(arr, fieldPos + 1); ++ ++ int dataLen = PortablePrimitives.readInt(arr, fieldPos + 5); ++ byte[] data = PortablePrimitives.readByteArray(arr, fieldPos + 9, dataLen); ++ ++ BigInteger intVal = new BigInteger(data); ++ ++ if (scale < 0) { ++ scale &= 0x7FFFFFFF; ++ ++ intVal = intVal.negate(); ++ } ++ ++ val = new BigDecimal(intVal, scale); ++ ++ break; ++ } ++ ++ case NULL: ++ val = null; ++ ++ break; ++ + default: { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, arr, start, null); + + val = reader.unmarshalFieldByAbsolutePosition(fieldPos); + } + } + + return (F)val; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override protected <F> F field(PortableReaderContext rCtx, String fieldName) { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, + new PortableHeapInputStream(arr), + start, + null, + rCtx); + + return (F)reader.unmarshalField(fieldName); + } + + /** {@inheritDoc} */ + @Override public boolean hasField(String fieldName) { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, arr, start, null); + + return reader.hasField(fieldName); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <T> T deserialize() throws BinaryObjectException { + Object obj0 = obj; + + if (obj0 == null) + obj0 = deserializeValue(); + + return (T)obj0; + + } + + /** {@inheritDoc} */ + @Override public BinaryObject clone() throws CloneNotSupportedException { + return super.clone(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return PortablePrimitives.readInt(arr, start + GridPortableMarshaller.HASH_CODE_POS); + } + + /** {@inheritDoc} */ + @Override protected int schemaId() { + return PortablePrimitives.readInt(arr, start + GridPortableMarshaller.SCHEMA_ID_POS); + } + + /** {@inheritDoc} */ + @Override protected PortableSchema createSchema() { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, arr, start, null); + + return reader.createSchema(); + } + + /** {@inheritDoc} */ + @Override public BinaryField fieldDescriptor(String fieldName) throws BinaryObjectException { ++ A.notNull(fieldName, "fieldName"); ++ + int typeId = typeId(); + + PortableSchemaRegistry schemaReg = ctx.schemaRegistry(typeId); + + int fieldId = ctx.userTypeIdMapper(typeId).fieldId(typeId, fieldName); + - return new BinaryFieldImpl(schemaReg, fieldId); ++ return new BinaryFieldImpl(schemaReg, fieldName, fieldId); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx); + + if (detachAllowed) { + int len = length(); + + out.writeInt(len); + out.write(arr, start, len); + out.writeInt(0); + } + else { + out.writeInt(arr.length); + out.write(arr); + out.writeInt(start); + } + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ctx = (PortableContext)in.readObject(); + + arr = new byte[in.readInt()]; + + in.readFully(arr); + + start = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("arr", + arr, + detachAllowed ? start : 0, + detachAllowed ? length() : arr.length)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeInt("start", detachAllowed ? 0 : start)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + arr = reader.readByteArray("arr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + start = reader.readInt("start"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 113; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** + * Runs value deserialization regardless of whether obj already has the deserialized value. + * Will set obj if descriptor is configured to keep deserialized values. + */ + private Object deserializeValue() { + // TODO: IGNITE-1272 - Deserialize with proper class loader. + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, arr, start, null); + + Object obj0 = reader.deserialize(); + + PortableClassDescriptor desc = reader.descriptor(); + + assert desc != null; + + if (desc.keepDeserialized()) + obj = obj0; + + return obj0; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/76c73f3d/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryObjectOffheapImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryObjectOffheapImpl.java index 5462b4a,0000000..139e5c9 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryObjectOffheapImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryObjectOffheapImpl.java @@@ -1,381 -1,0 +1,470 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; ++import java.math.BigDecimal; ++import java.math.BigInteger; +import java.nio.ByteBuffer; ++import java.sql.Timestamp; ++import java.util.Date; ++import java.util.UUID; ++ +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.portable.streams.PortableOffheapInputStream; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.util.GridUnsafe; ++import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryField; +import org.jetbrains.annotations.Nullable; +import sun.misc.Unsafe; + ++import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.BOOLEAN; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.BYTE; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.CHAR; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.DATE; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.DECIMAL; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.DOUBLE; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLOAT; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.INT; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.LONG; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.NULL; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.SHORT; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.TIMESTAMP; ++import static org.apache.ignite.internal.portable.GridPortableMarshaller.UUID; + +/** + * Portable object implementation over offheap memory + */ +public class BinaryObjectOffheapImpl extends BinaryObjectEx implements Externalizable, CacheObject { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** */ + private final PortableContext ctx; + + /** */ + private final long ptr; + + /** */ + private final int start; + + /** */ + private final int size; + + /** + * For {@link Externalizable} (not supported). + */ + public BinaryObjectOffheapImpl() { + throw new UnsupportedOperationException(); + } + + /** + * @param ctx Context. + * @param ptr Memory address. + * @param start Object start. + * @param size Memory size. + */ + public BinaryObjectOffheapImpl(PortableContext ctx, long ptr, int start, int size) { + this.ctx = ctx; + this.ptr = ptr; + this.start = start; + this.size = size; + } + + /** + * @return Heap-based copy. + */ + public BinaryObject heapCopy() { + return new BinaryObjectImpl(ctx, U.copyMemory(ptr, size), start); + } + + /** {@inheritDoc} */ + @Override public int typeId() { + return UNSAFE.getInt(ptr + start + GridPortableMarshaller.TYPE_ID_POS); + } + + /** {@inheritDoc} */ + @Override public int length() { + return UNSAFE.getInt(ptr + start + GridPortableMarshaller.TOTAL_LEN_POS); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return UNSAFE.getInt(ptr + start + GridPortableMarshaller.HASH_CODE_POS); + } + + /** {@inheritDoc} */ + @Override protected int schemaId() { + return UNSAFE.getInt(ptr + start + GridPortableMarshaller.SCHEMA_ID_POS); + } + + /** {@inheritDoc} */ + @Override protected PortableSchema createSchema() { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, + new PortableOffheapInputStream(ptr, size, false), + start, + null); + + return reader.createSchema(); + } + + /** {@inheritDoc} */ + @Override public BinaryField fieldDescriptor(String fieldName) throws BinaryObjectException { ++ A.notNull(fieldName, "fieldName"); ++ + int typeId = typeId(); + + PortableSchemaRegistry schemaReg = ctx.schemaRegistry(typeId); + + int fieldId = ctx.userTypeIdMapper(typeId).fieldId(typeId, fieldName); + - return new BinaryFieldImpl(schemaReg, fieldId); ++ return new BinaryFieldImpl(schemaReg, fieldName, fieldId); + } + + /** {@inheritDoc} */ + @Override public int start() { + return start; + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return null; + } + + /** {@inheritDoc} */ + @Override public long offheapAddress() { + return ptr; + } + + /** {@inheritDoc} */ + @Override protected boolean hasArray() { + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public BinaryType metaData() throws BinaryObjectException { + if (ctx == null) + throw new BinaryObjectException("PortableContext is not set for the object."); + + return ctx.metaData(typeId()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <F> F field(String fieldName) throws BinaryObjectException { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, + new PortableOffheapInputStream(ptr, size, false), + start, + null); + + return (F)reader.unmarshalField(fieldName); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <F> F field(int fieldId) throws BinaryObjectException { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, + new PortableOffheapInputStream(ptr, size, false), + start, + null); + + return (F)reader.unmarshalField(fieldId); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override protected <F> F fieldByOrder(int order) { + Object val; + + // Calculate field position. + int schemaOffset = PortablePrimitives.readInt(ptr, start + GridPortableMarshaller.SCHEMA_OR_RAW_OFF_POS); + + short flags = PortablePrimitives.readShort(ptr, start + GridPortableMarshaller.FLAGS_POS); + int fieldOffsetSize = PortableUtils.fieldOffsetSize(flags); + + int fieldOffsetPos = start + schemaOffset + order * (4 + fieldOffsetSize) + 4; + + int fieldPos; + + if (fieldOffsetSize == PortableUtils.OFFSET_1) - fieldPos = start + (int)PortablePrimitives.readByte(ptr, fieldOffsetPos) & 0xFF; ++ fieldPos = start + ((int)PortablePrimitives.readByte(ptr, fieldOffsetPos) & 0xFF); + else if (fieldOffsetSize == PortableUtils.OFFSET_2) - fieldPos = start + (int)PortablePrimitives.readShort(ptr, fieldOffsetPos) & 0xFFFF; ++ fieldPos = start + ((int)PortablePrimitives.readShort(ptr, fieldOffsetPos) & 0xFFFF); + else + fieldPos = start + PortablePrimitives.readInt(ptr, fieldOffsetPos); + + // Read header and try performing fast lookup for well-known types (the most common types go first). + byte hdr = PortablePrimitives.readByte(ptr, fieldPos); + + switch (hdr) { + case INT: + val = PortablePrimitives.readInt(ptr, fieldPos + 1); + + break; + + case LONG: + val = PortablePrimitives.readLong(ptr, fieldPos + 1); + + break; + + case BOOLEAN: + val = PortablePrimitives.readBoolean(ptr, fieldPos + 1); + + break; + + case SHORT: + val = PortablePrimitives.readShort(ptr, fieldPos + 1); + + break; + + case BYTE: + val = PortablePrimitives.readByte(ptr, fieldPos + 1); + + break; + + case CHAR: + val = PortablePrimitives.readChar(ptr, fieldPos + 1); + + break; + + case FLOAT: + val = PortablePrimitives.readFloat(ptr, fieldPos + 1); + + break; + + case DOUBLE: + val = PortablePrimitives.readDouble(ptr, fieldPos + 1); + + break; + ++ case STRING: { ++ boolean utf = PortablePrimitives.readBoolean(ptr, fieldPos + 1); ++ ++ if (utf) { ++ int dataLen = PortablePrimitives.readInt(ptr, fieldPos + 2); ++ byte[] data = PortablePrimitives.readByteArray(ptr, fieldPos + 6, dataLen); ++ ++ val = new String(data, UTF_8); ++ } ++ else { ++ int dataLen = PortablePrimitives.readInt(ptr, fieldPos + 2); ++ char[] data = PortablePrimitives.readCharArray(ptr, fieldPos + 6, dataLen); ++ ++ val = String.valueOf(data); ++ } ++ ++ break; ++ } ++ ++ case DATE: { ++ long time = PortablePrimitives.readLong(ptr, fieldPos + 1); ++ ++ val = new Date(time); ++ ++ break; ++ } ++ ++ case TIMESTAMP: { ++ long time = PortablePrimitives.readLong(ptr, fieldPos + 1); ++ int nanos = PortablePrimitives.readInt(ptr, fieldPos + 1 + 8); ++ ++ Timestamp ts = new Timestamp(time); ++ ++ ts.setNanos(ts.getNanos() + nanos); ++ ++ val = ts; ++ ++ break; ++ } ++ ++ case UUID: { ++ long most = PortablePrimitives.readLong(ptr, fieldPos + 1); ++ long least = PortablePrimitives.readLong(ptr, fieldPos + 1 + 8); ++ ++ val = new UUID(most, least); ++ ++ break; ++ } ++ ++ case DECIMAL: { ++ int scale = PortablePrimitives.readInt(ptr, fieldPos + 1); ++ ++ int dataLen = PortablePrimitives.readInt(ptr, fieldPos + 5); ++ byte[] data = PortablePrimitives.readByteArray(ptr, fieldPos + 9, dataLen); ++ ++ BigInteger intVal = new BigInteger(data); ++ ++ if (scale < 0) { ++ scale &= 0x7FFFFFFF; ++ ++ intVal = intVal.negate(); ++ } ++ ++ val = new BigDecimal(intVal, scale); ++ ++ break; ++ } ++ ++ case NULL: ++ val = null; ++ ++ break; ++ + default: { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, + new PortableOffheapInputStream(ptr, size, false), + start, + null); + + val = reader.unmarshalFieldByAbsolutePosition(fieldPos); + } + } + + return (F)val; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override protected <F> F field(PortableReaderContext rCtx, String fieldName) { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, + new PortableOffheapInputStream(ptr, size, false), + start, + null, + rCtx); + + return (F)reader.unmarshalField(fieldName); + } + + /** {@inheritDoc} */ + @Override public boolean hasField(String fieldName) { + BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, + new PortableOffheapInputStream(ptr, size, false), + start, + null); + + return reader.hasField(fieldName); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <T> T deserialize() throws BinaryObjectException { + return (T)deserializeValue(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("CloneDoesntCallSuperClone") + @Override public BinaryObject clone() throws CloneNotSupportedException { + return heapCopy(); + } + + /** {@inheritDoc} */ + @Override public byte type() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean isPlatformType() { + return false; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { + return (T)deserializeValue(); + } + + /** {@inheritDoc} */ + @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(CacheObjectContext ctx) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + throw new UnsupportedOperationException(); // To make sure it is not marshalled. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + throw new UnsupportedOperationException(); // To make sure it is not marshalled. + } + + /** + * @return Deserialized value. + */ + private Object deserializeValue() { + // TODO: IGNITE-1272 - Deserialize with proper class loader. + BinaryReaderExImpl reader = new BinaryReaderExImpl( + ctx, + new PortableOffheapInputStream(ptr, size, false), + start, + null); + + return reader.deserialize(); + } +}
