This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 472049d IGNITE-14679 Schema serialization. (#349)
472049d is described below
commit 472049d6d37b59501fde65dbd14a5dcc2960e126
Author: Vladimir Ermakov <[email protected]>
AuthorDate: Thu Sep 30 11:36:45 2021 +0300
IGNITE-14679 Schema serialization. (#349)
---
.../ignite/internal/schema/VarlenNativeType.java | 7 +
.../schema/AbstractSchemaSerializer.java | 109 +++
.../schema/SchemaSerializer.java} | 44 +-
.../marshaller/schema/SchemaSerializerImpl.java | 743 +++++++++++++++++++++
.../schema/serializer/AbstractSerializerTest.java | 164 +++++
.../internal/table/distributed/TableManager.java | 12 +-
6 files changed, 1052 insertions(+), 27 deletions(-)
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
index 867ec04..32a4283 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
@@ -41,6 +41,13 @@ public class VarlenNativeType extends NativeType {
return super.mismatch(type) || len < ((VarlenNativeType)type).len;
}
+ /**
+ * @return Length of the type.
+ */
+ public int length() {
+ return len;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(VarlenNativeType.class.getSimpleName(), "name",
spec(), "len", len);
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/AbstractSchemaSerializer.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/AbstractSchemaSerializer.java
new file mode 100644
index 0000000..ccbf111
--- /dev/null
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/AbstractSchemaSerializer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.marshaller.schema;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+
+/**
+ * Schema serializer.
+ */
+public abstract class AbstractSchemaSerializer implements SchemaSerializer {
+ /** Schema serializer version. */
+ protected final short version;
+
+ /** Previous version serializer. */
+ protected final AbstractSchemaSerializer previous;
+
+ /**
+ * @param ver Serializer version.
+ * @param previous Previous version serializer.
+ */
+ protected AbstractSchemaSerializer(short ver, AbstractSchemaSerializer
previous) {
+ this.version = ver;
+ this.previous = previous;
+ }
+
+ /**
+ * @param ver Serializer version.
+ */
+ protected AbstractSchemaSerializer(short ver) {
+ this(ver, null);
+ }
+
+ /**
+ * @return Serializer version;
+ */
+ public short getVersion() {
+ return version;
+ }
+
+ /**
+ * Serialize SchemaDescriptor object to byte array.
+ *
+ * @param desc SchemaDescriptor object.
+ * @return SchemaDescriptor byte array representation.
+ */
+ public byte[] serialize(SchemaDescriptor desc) {
+ ByteBuffer buf = ByteBuffer.allocate(size(desc));
+
+ this.writeTo(desc, buf);
+
+ return buf.array();
+ }
+
+ /**
+ * Deserialize byte array to SchemaDescriptor object.
+ *
+ * @param bytes SchemaDescriptor byte array representation.
+ * @return SchemaDescriptor object.
+ */
+ public SchemaDescriptor deserialize(byte[] bytes) {
+ ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+ short ver = readVersion(buf);
+
+ return getSerializerByVersion(ver).readFrom(buf);
+ }
+
+ /**
+ * Gets schema serializer by version.
+ *
+ * @param ver SchemaSerializer target version.
+ * @return SchemaSerializer object.
+ * @throws IllegalArgumentException If SchemaSerializer with right version
is not found.
+ */
+ private SchemaSerializer getSerializerByVersion(short ver) {
+ if (ver == this.version)
+ return this;
+ else if (this.previous == null)
+ throw new IllegalArgumentException("Unable to find schema
serializer with version " + ver);
+
+ return this.previous.getSerializerByVersion(ver);
+ }
+
+ /**
+ * Reads SchemaSerializer version from byte buffer.
+ *
+ * @param buf ByteBuffer object.
+ * @return SchemaSerializer version.
+ */
+ private short readVersion(ByteBuffer buf) {
+ return buf.getShort();
+ }
+}
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializer.java
similarity index 50%
copy from
modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
copy to
modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializer.java
index 867ec04..b623b28 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/VarlenNativeType.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializer.java
@@ -15,34 +15,34 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.schema;
+package org.apache.ignite.internal.schema.marshaller.schema;
-import org.apache.ignite.internal.tostring.S;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
/**
- * Variable-length native type.
+ * SchemaDescriptor (De)Serializer interface.
*/
-public class VarlenNativeType extends NativeType {
- /** Length of the type. */
- private final int len;
-
+public interface SchemaSerializer {
/**
- * @param typeSpec Type spec.
- * @param len Type length.
+ * Writes SchemaDescriptor object to byte buffer.
+ *
+ * @param desc SchemaDescriptor object.
+ * @param byteBuf ByteBuffer object with allocated byte array.
*/
- protected VarlenNativeType(NativeTypeSpec typeSpec, int len) {
- super(typeSpec);
-
- this.len = len;
- }
+ void writeTo(SchemaDescriptor desc, ByteBuffer byteBuf);
- /** {@inheritDoc} */
- @Override public boolean mismatch(NativeType type) {
- return super.mismatch(type) || len < ((VarlenNativeType)type).len;
- }
+ /**
+ * @param byteBuf Byte buffer with byte array.
+ * @return SchemaDescriptor object.
+ */
+ SchemaDescriptor readFrom(ByteBuffer byteBuf);
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(VarlenNativeType.class.getSimpleName(), "name",
spec(), "len", len);
- }
+ /**
+ * Calculates size in bytes of SchemaDescriptor object.
+ *
+ * @param desc SchemaDescriptor object.
+ * @return size in bytes.
+ */
+ int size(SchemaDescriptor desc);
}
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializerImpl.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializerImpl.java
new file mode 100644
index 0000000..df0ef8d
--- /dev/null
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/schema/SchemaSerializerImpl.java
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.marshaller.schema;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.internal.schema.BitmaskNativeType;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.Columns;
+import org.apache.ignite.internal.schema.DecimalNativeType;
+import org.apache.ignite.internal.schema.InvalidTypeException;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.NumberNativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TemporalNativeType;
+import org.apache.ignite.internal.schema.VarlenNativeType;
+import org.apache.ignite.internal.schema.mapping.ColumnMapper;
+import org.apache.ignite.internal.schema.mapping.ColumnMapping;
+
+/**
+ * Serialize SchemaDescriptor object to byte array and vice versa.
+ */
+public class SchemaSerializerImpl extends AbstractSchemaSerializer {
+ /** Instance. */
+ public static final AbstractSchemaSerializer INSTANCE = new
SchemaSerializerImpl();
+
+ /** String array length. */
+ private static final int STRING_HEADER = 4;
+
+ /** Array length. */
+ private static final int ARRAY_HEADER_LENGTH = 4;
+
+ /** Byte. */
+ private static final int BYTE = 1;
+
+ /** Short. */
+ private static final int SHORT = 2;
+
+ /** Int. */
+ private static final int INT = 4;
+
+ /** Long. */
+ private static final int LONG = 8;
+
+ /** Float. */
+ private static final int FLOAT = 4;
+
+ /** Double. */
+ private static final int DOUBLE = 8;
+
+ /** Schema version. */
+ private static final short SCHEMA_VER = 1;
+
+ /**
+ * Default constructor.
+ */
+ public SchemaSerializerImpl() {
+ super(SCHEMA_VER);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeTo(SchemaDescriptor desc, ByteBuffer byteBuf) {
+ byteBuf.putShort(SCHEMA_VER);
+ byteBuf.putInt(desc.version());
+
+ appendColumns(desc.keyColumns(), byteBuf);
+ appendColumns(desc.valueColumns(), byteBuf);
+
+ Column[] affinityCols = desc.affinityColumns();
+
+ byteBuf.putInt(affinityCols.length);
+
+ for (Column column : affinityCols)
+ appendString(column.name(), byteBuf);
+
+ appendColumnMapping(desc.columnMapping(), desc.length(), byteBuf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public SchemaDescriptor readFrom(ByteBuffer byteBuf) {
+ int ver = byteBuf.getInt();
+
+ Column[] keyCols = readColumns(byteBuf);
+ Column[] valCols = readColumns(byteBuf);
+
+ int affinityColsSize = byteBuf.getInt();
+
+ String[] affinityCols = new String[affinityColsSize];
+
+ for (int i = 0; i < affinityColsSize; i++)
+ affinityCols[i] = readString(byteBuf);
+
+ SchemaDescriptor descriptor = new SchemaDescriptor(ver, keyCols,
affinityCols, valCols);
+
+ ColumnMapper mapper = readColumnMapping(descriptor, byteBuf);
+
+ descriptor.columnMapping(mapper);
+
+ return descriptor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size(SchemaDescriptor desc) {
+ return SHORT + //Assembler version
+ INT + //Descriptor version
+ getColumnsSize(desc.keyColumns()) +
+ getColumnsSize(desc.valueColumns()) +
+ ARRAY_HEADER_LENGTH + //Affinity columns length
+ getStringArraySize(desc.affinityColumns()) +
+ getColumnMappingSize(desc.columnMapping(), desc.length());
+ }
+
+ /**
+ * Gets column mapping size in bytes.
+ *
+ * @param len Column array length (both key and value columns).
+ * @return Size of column mapping.
+ */
+ private int getColumnMappingSize(ColumnMapper mapper, int len) {
+ int size = INT;
+
+ for (int i = 0; i < len; i++) {
+ if (mapper.map(i) != i) {
+ size += INT;
+ size += INT;
+
+ if (mapper.map(i) == -1)
+ size += getColumnSize(mapper.mappedColumn(i));
+ }
+ }
+
+ return size;
+ }
+
+ /**
+ * Gets column names array size in bytes.
+ *
+ * @param cols Column array.
+ * @return Size of an array with column names.
+ */
+ private int getStringArraySize(Column[] cols) {
+ int size = ARRAY_HEADER_LENGTH; //String array size header
+ for (Column column : cols)
+ size += getStringSize(column.name());
+
+ return size;
+ }
+
+ /**
+ * Gets columns array size in bytes.
+ *
+ * @param cols Column array.
+ * @return Size of column array, including column name and column native
type.
+ */
+ private int getColumnsSize(Columns cols) {
+ int size = ARRAY_HEADER_LENGTH; //cols array length
+
+ for (Column column : cols.columns())
+ size += getColumnSize(column);
+
+ return size;
+ }
+
+ /**
+ * Gets column size in bytes.
+ *
+ * @param col Column object.
+ * @return Column size in bytes.
+ */
+ private int getColumnSize(Column col) {
+ return INT + //Schema index
+ BYTE + //nullable flag
+ getStringSize(col.name()) +
+ getNativeTypeSize(col.type()) +
+ BYTE + getDefaultObjectSize(col.type(), col.defaultValue());
+ }
+
+ /**
+ * Gets default object size in bytes based on object native type.
+ *
+ * @param type Column native type.
+ * @param val Object.
+ * @return Object size in bytes.
+ */
+ private int getDefaultObjectSize(NativeType type, Object val) {
+ if (val == null)
+ return 0;
+
+ switch (type.spec()) {
+ case INT8:
+ return BYTE;
+
+ case INT16:
+ return SHORT;
+
+ case INT32:
+ return INT;
+
+ case INT64:
+ return LONG;
+
+ case FLOAT:
+ return FLOAT;
+
+ case DOUBLE:
+ return DOUBLE;
+
+ case DECIMAL:
+ return INT + INT +
((BigDecimal)val).unscaledValue().toByteArray().length;
+
+ case UUID:
+ return LONG + LONG;
+
+ case STRING:
+ return getStringSize(((String)val));
+
+ case BYTES:
+ return INT + ((byte[])val).length;
+
+ case BITMASK:
+ return INT + ((BitSet)val).toByteArray().length;
+
+ case NUMBER:
+ return INT + ((BigInteger)val).toByteArray().length;
+ }
+
+ return 0;
+ }
+
+ /**
+ * Gets native type size in bytes.
+ *
+ * @param type Native type.
+ * @return Native type size depending on NativeTypeSpec params.
+ */
+ private int getNativeTypeSize(NativeType type) {
+ int typeSize = 0;
+
+ switch (type.spec()) {
+ case STRING:
+ case BYTES:
+ case TIME:
+ case DATETIME:
+ case TIMESTAMP:
+ case NUMBER:
+ case BITMASK:
+ typeSize += INT; //For precision, len or bits
+
+ break;
+ case DECIMAL:
+ typeSize += INT; //For precision
+ typeSize += INT; //For scale
+
+ break;
+ default:
+ break;
+ }
+
+ return getStringSize(type.spec().name()) + //native type name
+ typeSize;
+ }
+
+ /**
+ * Gets string size in bytes.
+ *
+ * @param str String.
+ * @return Byte array size.
+ */
+ private int getStringSize(String str) {
+ return STRING_HEADER + //string byte array header
+ str.getBytes().length; // string byte array length
+ }
+
+ /**
+ * Appends column mapping to byte buffer.
+ *
+ * @param mapper ColumnMapper object.
+ * @param len Column array length (both key and value columns).
+ * @param buff Allocated ByteBuffer.
+ */
+ private void appendColumnMapping(ColumnMapper mapper, int len, ByteBuffer
buff) {
+ int mappingSize = 0;
+ for (int i = 0; i < len; i++) {
+ if (mapper.map(i) != i)
+ mappingSize += 1;
+ }
+
+ buff.putInt(mappingSize);
+
+ for (int i = 0; i < len; i++) {
+ if (mapper.map(i) != i) {
+ buff.putInt(i);
+ buff.putInt(mapper.map(i));
+
+ if (mapper.map(i) == -1)
+ appendColumn(mapper.mappedColumn(i), buff);
+ }
+ }
+ }
+
+ /**
+ * Appends column array to byte buffer.
+ *
+ * @param buf Byte buffer.
+ * @param cols Column array.
+ */
+ private void appendColumns(Columns cols, ByteBuffer buf) {
+ Column[] colArr = cols.columns();
+
+ buf.putInt(colArr.length);
+
+ for (Column column : colArr)
+ appendColumn(column, buf);
+ }
+
+ /**
+ * Appends column to byte buffer.
+ *
+ * @param buf Byte buffer.
+ * @param col Column.
+ */
+ private void appendColumn(Column col, ByteBuffer buf) {
+ buf.putInt(col.schemaIndex());
+ buf.put((byte)(col.nullable() ? 1 : 0));
+
+ appendString(col.name(), buf);
+ appendNativeType(buf, col.type());
+
+ appendDefaultValue(buf, col.type(), col.defaultValue());
+ }
+
+ /**
+ * Appends default value object to byte buffer based on native type.
+ *
+ * @param buf Allocated ByteBuffer.
+ * @param type Column native type.
+ * @param val Default object value.
+ */
+ private void appendDefaultValue(ByteBuffer buf, NativeType type, Object
val) {
+ boolean isPresent = val != null;
+
+ buf.put((byte)(isPresent ? 1 : 0));
+
+ if (!isPresent)
+ return;
+
+ switch (type.spec()) {
+ case INT8: {
+ buf.put((byte)val);
+
+ break;
+ }
+ case INT16: {
+ buf.putShort((short)val);
+
+ break;
+ }
+ case INT32: {
+ buf.putInt((int)val);
+
+ break;
+ }
+ case INT64: {
+ buf.putLong((long)val);
+
+ break;
+ }
+ case FLOAT: {
+ buf.putFloat((float)val);
+
+ break;
+ }
+ case DOUBLE: {
+ buf.putDouble((double)val);
+
+ break;
+ }
+ case DECIMAL: {
+ BigDecimal decimal = (BigDecimal)val;
+
+ buf.putInt(decimal.scale());
+ appendByteArray(decimal.unscaledValue().toByteArray(), buf);
+
+ break;
+ }
+ case UUID: {
+ UUID uuid = (UUID)val;
+
+ buf.putLong(uuid.getMostSignificantBits());
+ buf.putLong(uuid.getLeastSignificantBits());
+
+ break;
+ }
+ case STRING: {
+ appendString((String)val, buf);
+
+ break;
+ }
+ case BYTES: {
+ appendByteArray((byte[])val, buf);
+
+ break;
+ }
+ case BITMASK: {
+ BitSet bitSet = (BitSet)val;
+ appendByteArray(bitSet.toByteArray(), buf);
+
+ break;
+ }
+ case NUMBER: {
+ BigInteger bigInt = (BigInteger)val;
+
+ appendByteArray(bigInt.toByteArray(), buf);
+
+ break;
+ }
+ }
+ }
+
+ /**
+ * Appends native type to byte buffer.
+ *
+ * @param buf Byte buffer.
+ * @param type Native type.
+ */
+ private void appendNativeType(ByteBuffer buf, NativeType type) {
+ appendString(type.spec().name(), buf);
+
+ switch (type.spec()) {
+ case STRING:
+ case BYTES: {
+ int len = ((VarlenNativeType)type).length();
+
+ buf.putInt(len);
+
+ break;
+ }
+ case BITMASK: {
+ int bits = ((BitmaskNativeType)type).bits();
+
+ buf.putInt(bits);
+
+ break;
+ }
+ case DECIMAL: {
+ int precision = ((DecimalNativeType)type).precision();
+ int scale = ((DecimalNativeType)type).scale();
+
+ buf.putInt(precision);
+ buf.putInt(scale);
+
+ break;
+ }
+ case TIME:
+ case DATETIME:
+ case TIMESTAMP: {
+ int precision = ((TemporalNativeType)type).precision();
+
+ buf.putInt(precision);
+
+ break;
+ }
+ case NUMBER: {
+ int precision = ((NumberNativeType)type).precision();
+
+ buf.putInt(precision);
+
+ break;
+ }
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Appends string byte representation to byte buffer.
+ *
+ * @param buf Byte buffer.
+ * @param str String.
+ */
+ private void appendString(String str, ByteBuffer buf) {
+ appendByteArray(str.getBytes(), buf);
+ }
+
+ /**
+ * Appends byte array to byte buffer.
+ *
+ * @param buf Byte buffer.
+ * @param bytes Byte array.
+ */
+ private void appendByteArray(byte[] bytes, ByteBuffer buf) {
+ buf.putInt(bytes.length);
+ buf.put(bytes);
+ }
+
+ /**
+ * Reads column mapping from byte buffer.
+ *
+ * @param desc SchemaDescriptor.
+ * @param buf Byte buffer.
+ * @return ColumnMapper object.
+ */
+ private ColumnMapper readColumnMapping(SchemaDescriptor desc, ByteBuffer
buf) {
+ int mappingSize = buf.getInt();
+
+ if (mappingSize == 0)
+ return ColumnMapping.identityMapping();
+
+ ColumnMapper mapper = ColumnMapping.createMapper(desc);
+
+ for (int i = 0; i < mappingSize; i++) {
+ int from = buf.getInt();
+ int to = buf.getInt();
+
+ if (to == -1) {
+ Column col = readColumn(buf);
+ mapper.add(col);
+ } else
+ mapper.add(from, to);
+ }
+
+ return mapper;
+ }
+
+ /**
+ * Reads column array from byte buffer.
+ *
+ * @param buf Byte buffer.
+ * @return Column array.
+ */
+ private Column[] readColumns(ByteBuffer buf) {
+ int size = buf.getInt();
+
+ Column[] colArr = new Column[size];
+
+ for (int i = 0; i < size; i++)
+ colArr[i] = readColumn(buf);
+
+ return colArr;
+ }
+
+ /**
+ * Reads column from byte buffer.
+ *
+ * @param buf Byte buffer.
+ * @return Column.
+ */
+ private Column readColumn(ByteBuffer buf) {
+ int schemaIdx = buf.getInt();
+ boolean nullable = buf.get() == 1;
+ String name = readString(buf);
+
+ NativeType nativeType = fromByteBuffer(buf);
+
+ Object object = readDefaultValue(buf, nativeType);
+
+ return new Column(name, nativeType, nullable, () ->
object).copy(schemaIdx);
+ }
+
+ /**
+ * Reads default value object or null.
+ *
+ * @param buf ByteBuffer.
+ * @param type Column native type.
+ * @return Column default value.
+ */
+ private Object readDefaultValue(ByteBuffer buf, NativeType type) {
+
+ boolean isPresent = buf.get() == 1;
+
+ if (!isPresent)
+ return null;
+
+ switch (type.spec()) {
+ case INT8:
+ return buf.get();
+
+ case INT16:
+ return buf.getShort();
+
+ case INT32:
+ return buf.getInt();
+
+ case INT64:
+ return buf.getLong();
+
+ case FLOAT:
+ return buf.getFloat();
+
+ case DOUBLE:
+ return buf.getDouble();
+
+ case DECIMAL: {
+ int scale = buf.getInt();
+ byte[] bytes = readByteArray(buf);
+
+ return new BigDecimal(new BigInteger(bytes), scale);
+ }
+ case UUID:
+ return new UUID(buf.getLong(), buf.getLong());
+
+ case STRING:
+ return readString(buf);
+
+ case BYTES:
+ return readByteArray(buf);
+
+ case BITMASK:
+ return BitSet.valueOf(readByteArray(buf));
+
+ case NUMBER:
+ return new BigInteger(readByteArray(buf));
+ }
+
+ return null;
+ }
+
+ /**
+ * Reads native type from byte buffer.
+ *
+ * @param buf Byte buffer.
+ * @return Native type.
+ */
+ private NativeType fromByteBuffer(ByteBuffer buf) {
+ String nativeTypeSpecName = readString(buf);
+
+ NativeTypeSpec spec = NativeTypeSpec.valueOf(nativeTypeSpecName);
+
+ switch (spec) {
+ case STRING:
+ int strLen = buf.getInt();
+
+ return NativeTypes.stringOf(strLen);
+
+ case BYTES:
+ int len = buf.getInt();
+
+ return NativeTypes.blobOf(len);
+
+ case BITMASK:
+ int bits = buf.getInt();
+
+ return NativeTypes.bitmaskOf(bits);
+
+ case DECIMAL: {
+ int precision = buf.getInt();
+ int scale = buf.getInt();
+
+ return NativeTypes.decimalOf(precision, scale);
+ }
+ case TIME: {
+ int precision = buf.getInt();
+
+ return NativeTypes.time(precision);
+ }
+ case DATETIME: {
+ int precision = buf.getInt();
+
+ return NativeTypes.datetime(precision);
+ }
+ case TIMESTAMP: {
+ int precision = buf.getInt();
+
+ return NativeTypes.timestamp(precision);
+ }
+ case NUMBER: {
+ int precision = buf.getInt();
+
+ return NativeTypes.numberOf(precision);
+ }
+ case INT8:
+ return NativeTypes.INT8;
+
+ case INT16:
+ return NativeTypes.INT16;
+
+ case INT32:
+ return NativeTypes.INT32;
+
+ case INT64:
+ return NativeTypes.INT64;
+
+ case FLOAT:
+ return NativeTypes.FLOAT;
+
+ case DOUBLE:
+ return NativeTypes.DOUBLE;
+
+ case UUID:
+ return NativeTypes.UUID;
+
+ case DATE:
+ return NativeTypes.DATE;
+ }
+
+ throw new InvalidTypeException("Unexpected type " + spec);
+ }
+
+ /**
+ * Reads string from byte buffer.
+ *
+ * @param buf Byte buffer.
+ * @return String.
+ */
+ private String readString(ByteBuffer buf) {
+ return new String(readByteArray(buf));
+ }
+
+ /**
+ * Reads byte array from byte buffer.
+ *
+ * @param buf Byte buffer.
+ * @return Byte array.
+ */
+ private byte[] readByteArray(ByteBuffer buf) {
+ int len = buf.getInt();
+ byte[] arr = new byte[len];
+
+ buf.get(arr);
+
+ return arr;
+ }
+}
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/serializer/AbstractSerializerTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/serializer/AbstractSerializerTest.java
new file mode 100644
index 0000000..5d3fe56
--- /dev/null
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/serializer/AbstractSerializerTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.serializer;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.mapping.ColumnMapper;
+import org.apache.ignite.internal.schema.mapping.ColumnMapping;
+import
org.apache.ignite.internal.schema.marshaller.schema.AbstractSchemaSerializer;
+import
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * SchemaDescriptor (de)serializer test.
+ */
+public class AbstractSerializerTest {
+ /**
+ * (de)Serialize schema test.
+ */
+ @Test
+ public void schemaSerializeTest() {
+ AbstractSchemaSerializer assembler = SchemaSerializerImpl.INSTANCE;
+
+ SchemaDescriptor desc = new SchemaDescriptor(100500,
+ new Column[] {
+ new Column("A", NativeTypes.INT8, false),
+ new Column("B", NativeTypes.INT16, false),
+ new Column("C", NativeTypes.INT32, false),
+ new Column("D", NativeTypes.INT64, false),
+ new Column("E", NativeTypes.UUID, false),
+ new Column("F", NativeTypes.FLOAT, false),
+ new Column("G", NativeTypes.DOUBLE, false),
+ new Column("H", NativeTypes.DATE, false),
+ },
+ new Column[] {
+ new Column("A1", NativeTypes.stringOf(128), false),
+ new Column("B1", NativeTypes.numberOf(255), false),
+ new Column("C1", NativeTypes.decimalOf(128, 64), false),
+ new Column("D1", NativeTypes.bitmaskOf(256), false),
+ new Column("E1", NativeTypes.datetime(8), false),
+ new Column("F1", NativeTypes.time(8), false),
+ new Column("G1", NativeTypes.timestamp(8), true)
+ }
+ );
+
+ byte[] serialize = assembler.serialize(desc);
+
+ SchemaDescriptor deserialize = assembler.deserialize(serialize);
+
+ assertEquals(desc.version(), deserialize.version());
+
+ assertArrayEquals(desc.keyColumns().columns(),
deserialize.keyColumns().columns());
+ assertArrayEquals(desc.valueColumns().columns(),
deserialize.valueColumns().columns());
+ assertArrayEquals(desc.affinityColumns(),
deserialize.affinityColumns());
+ }
+
+ /**
+ * (de)Serialize default value test.
+ */
+ @Test
+ public void defaultValueSerializeTest() {
+ AbstractSchemaSerializer assembler = SchemaSerializerImpl.INSTANCE;
+
+ SchemaDescriptor desc = new SchemaDescriptor(100500,
+ new Column[] {
+ new Column("A", NativeTypes.INT8, false, () -> (byte)1),
+ new Column("B", NativeTypes.INT16, false, () -> (short)1),
+ new Column("C", NativeTypes.INT32, false, () -> 1),
+ new Column("D", NativeTypes.INT64, false, () -> 1L),
+ new Column("E", NativeTypes.UUID, false, () -> new
UUID(12,34)),
+ new Column("F", NativeTypes.FLOAT, false, () -> 1.0f),
+ new Column("G", NativeTypes.DOUBLE, false, () -> 1.0d),
+ new Column("H", NativeTypes.DATE, false),
+ },
+ new Column[] {
+ new Column("A1", NativeTypes.stringOf(128), false, () ->
"test"),
+ new Column("B1", NativeTypes.numberOf(255), false, () ->
BigInteger.TEN),
+ new Column("C1", NativeTypes.decimalOf(128, 64), false, () ->
BigDecimal.TEN),
+ new Column("D1", NativeTypes.bitmaskOf(256), false,
BitSet::new)
+ }
+ );
+
+ byte[] serialize = assembler.serialize(desc);
+
+ SchemaDescriptor deserialize = assembler.deserialize(serialize);
+
+ //key columns
+ assertEquals(deserialize.column("A").defaultValue(), (byte)1);
+ assertEquals(deserialize.column("B").defaultValue(), (short)1);
+ assertEquals(deserialize.column("C").defaultValue(), 1);
+ assertEquals(deserialize.column("D").defaultValue(), 1L);
+ assertEquals(deserialize.column("E").defaultValue(), new UUID(12,34));
+ assertEquals(deserialize.column("F").defaultValue(), 1.0f);
+ assertEquals(deserialize.column("G").defaultValue(), 1.0d);
+ assertNull(deserialize.column("H").defaultValue());
+
+ //value columns
+ assertEquals(deserialize.column("A1").defaultValue(), "test");
+ assertEquals(deserialize.column("B1").defaultValue(), BigInteger.TEN);
+ assertEquals(deserialize.column("C1").defaultValue(), BigDecimal.TEN);
+ assertEquals(deserialize.column("D1").defaultValue(), new BitSet());
+ }
+
+ /**
+ * (de)Serialize column mapping test.
+ */
+ @Test
+ public void columnMappingSerializeTest() {
+ AbstractSchemaSerializer assembler = SchemaSerializerImpl.INSTANCE;
+
+ SchemaDescriptor desc = new SchemaDescriptor(100500,
+ new Column[] {
+ new Column("A", NativeTypes.INT8, false, () -> (byte)1)
+ },
+ new Column[] {
+ new Column("A1", NativeTypes.stringOf(128), false, () ->
"test"),
+ new Column("B1", NativeTypes.numberOf(255), false, () ->
BigInteger.TEN)
+ }
+ );
+
+ ColumnMapper mapper = ColumnMapping.createMapper(desc);
+
+ mapper.add(0, 1);
+
+ Column c1 = new Column("C1", NativeTypes.stringOf(128), false, () ->
"brandNewColumn").copy(2);
+
+ mapper.add(c1);
+
+ desc.columnMapping(mapper);
+
+ byte[] serialize = assembler.serialize(desc);
+
+ SchemaDescriptor deserialize = assembler.deserialize(serialize);
+
+ ColumnMapper mapper1 = deserialize.columnMapping();
+
+ assertEquals(1, mapper1.map(0));
+ assertEquals(c1, mapper1.mappedColumn(2));
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index eb10ede..42dda6e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaUtils;
+import
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
import org.apache.ignite.internal.table.IgniteTablesInternal;
@@ -165,8 +166,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
@NotNull
ConfigurationNotificationEvent<SchemaView> schemasCtx) {
try {
((SchemaRegistryImpl)tables.get(ctx.newValue().name()).schemaView()).
-
onSchemaRegistered((SchemaDescriptor)ByteUtils.
-
fromBytes(schemasCtx.newValue().schema()));
+ onSchemaRegistered(
+
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()))
+ );
fireEvent(TableEvent.ALTER, new
TableEventParameters(tablesById.get(tblId)), null);
}
@@ -198,7 +200,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
ctx.newValue().name(),
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id()),
(List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).assignments()),
-
(SchemaDescriptor)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).schemas().
+
SchemaSerializerImpl.INSTANCE.deserialize(((ExtendedTableView)ctx.newValue()).schemas().
get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
);
@@ -452,7 +454,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
schemasCh -> schemasCh.create(
String.valueOf(INITIAL_SCHEMA_VERSION),
schemaCh -> schemaCh.changeSchema(
- ByteUtils.toBytes(
+
SchemaSerializerImpl.INSTANCE.serialize(
SchemaUtils.prepareSchemaDescriptor(
((ExtendedTableView)ch).schemas().size(),
ch
@@ -539,7 +541,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
tblCh
));
-
schemaCh.changeSchema(ByteUtils.toBytes(descriptor));
+
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(descriptor));
}
)
));